can you patch this code for good code interpreter ...

Créé le : 19 février 2025

Répondu en utilisant O3 Mini High par Chat01

Question

can you patch this code for good code interpreter working greats pls :
"""
from gevent import monkey
monkey.patch_all()

import os
import re
import json
import logging
from typing import Any, Dict, List, Optional, Tuple
import time
import datetime
from tqdm import tqdm

from flask import Flask, request, Response, jsonify, stream_with_context
from flask_cors import CORS

from e2b_code_interpreter import Sandbox

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

import requests
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from werkzeug.utils import secure_filename # Ajout pour la gestion sécurisée des noms de fichier
from groq import Groq
import PyPDF2
from PIL import Image
import pytesseract

--- Configuration ---

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)

app = Flask(name)
CORS(app)
app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(file)), 'uploads') # Dossier de destination

GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
E2B_API_KEY = os.environ.get("E2B_API_KEY")
OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") # Ajout de la clé OpenRouter

PISTON_API_URL = "https://emkc.org/api/v2/piston/execute"

groq_client = Groq(api_key=GROQ_API_KEY)

--- E2B Sandbox Initialization with Retry ---

@retry(stop=stop_after_attempt(5), # Tentatives augmentées à 5
wait=wait_exponential(multiplier=1, min=4, max=20), # Backoff jusqu'à 20 secondes
retry=retry_if_exception_type(Exception))
def initialize_sandbox():
"""Initializes the e2b Sandbox with a higher timeout."""
logger.info("Initializing e2b Sandbox with 300-second timeout...")
return Sandbox(api_key=E2B_API_KEY, timeout=300) # Timeout augmenté à 300 secondes

try:
sandbox = initialize_sandbox()
logger.info("e2b Sandbox initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize e2b Sandbox after multiple retries: {e}")
# It's crucial to handle the failure of sandbox initialization.
# You might want to exit the application or have a fallback mechanism.
# For now, let's raise the exception to prevent the app from starting improperly.
raise e

VALID_MODELS = [
"google/gemini-2.0-pro-exp-02-05:free",
"deepseek/deepseek-r1:free",
"qwen/qwen2.5-vl-72b-instruct:free",
"meta-llama/llama-3.3-70b-instruct:free"
]

CODE_PATTERN = re.compile(r"python\s*([\s\S]*?)")
CODE_REQUIRED_MARKER = "[CODE_REQUIRED]"
CANVAS_START_MARKER = "[CANVAS_START]"
CANVAS_END_MARKER = "[CANVAS_END]"

--- Configuration pour la recherche Web (comme dans le script CLI) ---

WEB_SEARCH_CONFIG = {
'openrouter_api_key': OPENROUTER_API_KEY,
'max_results': 9, # Augmenté à 9 par défaut
'cache_file': 'search_cache.json', # Le cache est géré, mais pas crucial pour le backend Flask
'model': 'mistralai/mistral-medium', # Modèle OpenRouter pour le filtrage et la synthèse. Peut être changé.
'timeout': 20,
'min_quality_score': 0.65 # Ce paramètre n'est plus utilisé directement, remplacé par un filtrage AI plus précis.
}

--- Classes pour la recherche Web (adaptées du script CLI) ---

class DataCollector:
@staticmethod
def web_search(query: str) -> List[Dict]:
with DDGS() as ddgs:
results = ddgs.text(query, region='fr-fr', max_results=WEB_SEARCH_CONFIG['max_results'])
return [{
'url': r['href'],
'title': r['title'],
'domain': urlparse(r['href']).netloc.replace('www.', '')
} for r in results if urlparse(r['href']).scheme in ['http', 'https']]

class ContentProcessor:
def init(self):
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}

text
def fetch_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=WEB_SEARCH_CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: logger.error(f"Erreur sur {url}: {str(e)}") return ''

class AICore:
@staticmethod
def query_ai(system_prompt: str, user_content: str, temp: float = 0.1, model:str = WEB_SEARCH_CONFIG['model']) -> str: # Ajout du paramètre model
headers = {
"Authorization": f"Bearer {WEB_SEARCH_CONFIG['openrouter_api_key']}",
"Content-Type": "application/json",
"HTTP-Referer": "https://your-app-url", # Remplacez par l'URL de votre application
"X-Title": "Flask Search AI"
}

text
data = { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "temperature": temp, "max_tokens": 2000 } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() # On lève une exception si le statut n'est pas 200 return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Erreur API OpenRouter: {str(e)}"

def open_router_chat_completion_stream(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0):
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": 2000,
"stream": True
}
response = requests.post("https://openrouter.ai/api/v1/chat/completions",
headers=headers, json=payload, stream=True, timeout=WEB_SEARCH_CONFIG['timeout'])
response.raise_for_status()
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith("data: "):
data_str = decoded_line[len("data: "):]
try:
data = json.loads(data_str)
yield data
except Exception:
continue

def open_router_chat_completion(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0) -> Dict:
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": 2000,
"stream": False
}
response = requests.post("https://openrouter.ai/api/v1/chat/completions",
headers=headers, json=payload, timeout=WEB_SEARCH_CONFIG['timeout'])
response.raise_for_status()
return response.json()

--- Fonctions utilitaires (restent inchangées) ---

def extract_code_blocks(text: str) -> Tuple[Optional[str], Optional[str]]:
"""Extrait les blocs de code Python et gère le marqueur [CODE_REQUIRED]."""
if CODE_REQUIRED_MARKER in text:
text_no_marker = text.replace(CODE_REQUIRED_MARKER, "", 1).strip()
code_match = CODE_PATTERN.search(text_no_marker)
if code_match:
return code_match.group(1).strip(), None
else:
code_match = CODE_PATTERN.search(text)
if code_match:
return None, code_match.group(1).strip() # code à afficher sans exécution
return None, None

PATCH: Mise à jour de la fonction run_code_in_sandbox pour utiliser execute_sync

def run_code_in_sandbox(code: str) -> str:
start_time = time.time()
try:
logger.info(f"Running code in e2b sandbox using execute_sync...") # Updated log message
execution = sandbox.execute_sync(code) # Using execute_sync instead of run_code_sync
logs = execution.logs
stdout_logs = logs.stdout if hasattr(logs, 'stdout') else []
stderr_logs = logs.stderr if hasattr(logs, 'stderr') else []
stdout = "\n".join(stdout_logs)
stderr = "\n".join(stderr_logs)
if stderr:
return f"Stdout:\n{stdout}\n\nStderr:\n{stderr}"
return stdout
except Exception as e:
duration = time.time() - start_time
error_msg = f"Erreur lors de l'exécution du code dans e2b après {duration:.2f} secondes. Erreur: {e}"
logger.exception(error_msg)
return error_msg

def extract_numeric_result(exec_result: str) -> Optional[str]:
"""Extrait la dernière ligne numérique du résultat d'exécution de code."""
if "Stdout:" in exec_result:
stdout_section = exec_result.split("Stdout:\n", 1)[1].split("\n\nStderr:")[0].strip()
else:
stdout_section = exec_result.strip()
lines = [l.strip() for l in stdout_section.splitlines() if l.strip()]
if not lines:
return None
last_line = lines[-1]
if re.search(r'\d', last_line):
return last_line
return None

--- Routes Flask (légères modifications) ---

@app.route('/models', methods=['GET'])
def get_models() -> Response:
"""Retourne la liste des modèles valides."""
return jsonify({"models": VALID_MODELS})

@app.route('/chat', methods=['POST'])
def chat_endpoint() -> Response:
"""
Point d'entrée principal pour le chat.
Gère le streaming SSE, l'exécution de code, les canvas et la recherche Web.
"""
@stream_with_context
@retry(stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(Exception))
def stream_response():
data = request.get_json()
if not data:
logger.warning("Requête chat invalide: corps de requête manquant.")
yield format_sse_event({"type": "error", "content": "Requête chat invalide : corps de requête manquant."})
return

text
messages = data.get('messages', []) model: str = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip() execute_code: bool = data.get('execute_code', True) web_search_enabled: bool = data.get('web_search', False) # Récupère l'état de la recherche Web if not messages or not isinstance(messages, list) or not model or model not in VALID_MODELS: logger.warning("Requête chat invalide: messages=%s, model=%s", messages, model) yield format_sse_event({"type": "error", "content": "Requête chat invalide : messages ou modèle incorrects."}) return # --- Intégration de la recherche Web --- web_search_results = [] if web_search_enabled: user_query = messages[-1]['content'] # On utilise le dernier message de l'utilisateur comme requête yield format_sse_event({"type": "web_search_status", "content": "start"}) yield format_sse_event({"type": "web_search", "query": user_query}) # Envoi de la requête pour l'afficher search_processor = ContentProcessor() search_results = DataCollector.web_search(user_query) # Filtrage et extraction du contenu (comme dans le CLI, mais intégré ici) filtered_results = [] for result in search_results: content = search_processor.fetch_content(result['url']) if content: # Filtrage de pertinence avec OpenRouter (comme dans le CLI) filter_prompt = ( "Tu es un expert en analyse de la pertinence des documents. " f"Détermine si le contenu web suivant est pertinent pour la requête utilisateur : '{user_query}'. " "Réponds uniquement par 'oui' ou 'non'." ) is_relevant = AICore.query_ai(filter_prompt, content[:2000], temp=0.0, model="mistralai/mistral-small") # Filtrage plus rapide if is_relevant.lower().startswith('oui'): filtered_results.append({'url': result['url'], 'domain': result['domain'], 'content': content}) if len(filtered_results) >= WEB_SEARCH_CONFIG['max_results']: break web_search_results = filtered_results # Envoi des URLs des sources *après* le filtrage source_urls = [result['url'] for result in web_search_results] yield format_sse_event({"type": "web_sources", "content": source_urls}) yield format_sse_event({"type": "web_search_status", "content": "end"}) # --- Construction du prompt système (adapté pour la recherche Web) --- base_prompt = ( "🎯 **Rôle : Expert en calcul numérique précis, assistant polyvalent avec canvas interactifs et capacités de recherche Web.**\n" "**Mission :** Répondre aux questions de l'utilisateur, effectuer des calculs précis, fournir des canvas interactifs (texte et code) et, si nécessaire, effectuer des recherches sur le Web pour enrichir la réponse.\n\n" "**Si la question demande un calcul numérique :**\n" "1. Annonce que tu vas exécuter un code Python pour réaliser le calcul.\n" "2. Ajoute le marqueur `[CODE_REQUIRED]` suivi du bloc de code minimal en Python.\n" "3. N'envoie rien d'autre et attends l'exécution du code.\n\n" "**Après l'exécution du code**, explique le résultat en détails en te basant sur le résultat numérique obtenu.\n\n" "**Pour la création ou modification de canvas** (texte ou code) :\n" "Utilise les marqueurs `[CANVAS_START]` et `[CANVAS_END]` pour encadrer le contenu du canvas.\n\n" "**Si la recherche Web est activée :**\n" "1. Tu recevras des extraits de pages Web pertinentes.\n" "2. Utilise ces informations pour compléter ta réponse, en citant les sources de manière concise (ex: [nomdedomaine.com]).\n" "3. Si les sources Web ne sont pas pertinentes ou suffisantes, base-toi sur tes connaissances internes.\n\n" "Si aucune de ces conditions n'est remplie, réponds directement et clairement à la question." ) # --- Préparation des messages pour Groq --- system_prompt = {"role": "system", "content": base_prompt} # Ajout des résultats de la recherche Web au contexte, si applicable if web_search_results: # Création d'un résumé des sources pour Groq (plus concis que d'envoyer tout le contenu) sources_summary = "\n\n".join([ f"[{result['domain']}]\nExtrait: {result['content'][:500]}" # Limite la longueur de l'extrait for result in web_search_results ]) web_search_context = {"role": "user", "content": f"Voici des extraits de pages web pertinentes pour la question:\n\n{sources_summary}"} messages_with_system_prompt = [system_prompt, web_search_context] + messages else: messages_with_system_prompt = [system_prompt] + messages full_reply_text = "" code_to_execute = None code_to_display = None is_code_used = False canvas_content = None code_executed = False prompt_tokens = 0 completion_tokens = 0 total_tokens = 0 initial_response_completed = False try: # Remplacez l'appel à groq_client par OpenRouter en mode streaming const_response_stream = open_router_chat_completion_stream(messages_with_system_prompt, model, 0.0) for chunk in const_response_stream: if initial_response_completed: break delta_content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "") or "" delta_content_no_marker = delta_content.replace(CODE_REQUIRED_MARKER, "") full_reply_text += delta_content_no_marker if CANVAS_START_MARKER in full_reply_text and CANVAS_END_MARKER in full_reply_text: start_index = full_reply_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = full_reply_text.find(CANVAS_END_MARKER) yield format_sse_event({"type": "canvas_start", "content": full_reply_text[start_index:end_index].strip()}) continue yield format_sse_event({"type": "text", "content": delta_content_no_marker}) if execute_code: # Condition pour exécuter le code seulement si execute_code est True code_to_execute, code_to_display = extract_code_blocks(full_reply_text) if code_to_execute: yield format_sse_event({"type": "code", "content": code_to_display}) yield format_sse_event({"type": "analysis_status", "content": "start"}) exec_result = run_code_in_sandbox(code_to_execute) numeric_result = extract_numeric_result(exec_result) yield format_sse_event({"type": "code_result", "content": exec_result}) yield format_sse_event({"type": "analysis_status", "content": "end"}) full_reply_text += f"\nRésultat du code:\n```\n{exec_result}\n```\n" # Ajout du résultat à full_reply_text pour le contexte futur initial_response_completed = True # Stop streaming after code execution yield format_sse_event({"type": "final", "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}) except Exception as e: error_message = f"Erreur serveur IA: {str(e)}" logger.exception(error_message) yield format_sse_event({"type": "error", "content": error_message}) finally: yield format_sse_event({"type": "analysis_status", "content": "end"}) # Assurer que le statut "analysis_end" est toujours envoyé yield format_sse_event({"type": "done", "is_code_execution": code_to_execute is not None}) # Indique si l'exécution de code a eu lieu return Response(stream_with_context(stream_response()), mimetype='text/event-stream')

@app.route('/canvas_modify', methods=['POST'])
def canvas_modify_endpoint() -> Response:
data = request.get_json()
if not data:
logger.warning("Requête canvas_modify sans données.")
return jsonify({"error": "Requête canvas_modify invalide : corps de requête manquant."}), 400
user_message = data.get('message')
canvas_content = data.get('canvas_content')
model = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip()
if not user_message or canvas_content is None:
logger.warning("Requête canvas_modify incomplète: message=%s, canvas_content=%s", user_message, canvas_content)
return jsonify({"error": "Requête canvas_modify invalide : message ou contenu canvas manquant."}), 400
canvas_chat_messages = [
{"role": "system", "content": (
"Tu es dans un chat de canvas. L'utilisateur va te demander de modifier le contenu du canvas suivant. "
"Fournis UNIQUEMENT le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR, encadré par [CANVAS_START] et [CANVAS_END]. "
"Ne réponds pas avec du texte en dehors de ces marqueurs, sauf une courte phrase de confirmation après le canvas. "
"Maintiens le contexte des modifications précédentes du canvas, en te basant sur l'historique de ce chat canvas."
)},
{"role": "assistant", "content": f"[CANVAS_START]\n{canvas_content}\n[CANVAS_END] Voici le canvas actuel. Dites-moi comment le modifier."},
{"role": "user", "content": user_message}
]

text
try: const_response = open_router_chat_completion(canvas_chat_messages, model, 0.0) ai_response_text = const_response.get("choices", [{}])[0].get("message", {}).get("content", "") or "" if CANVAS_START_MARKER in ai_response_text and CANVAS_END_MARKER in ai_response_text: start_index = ai_response_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = ai_response_text.find(CANVAS_END_MARKER) updated_canvas_content = ai_response_text[start_index:end_index].strip() response_message = "Canvas mis à jour." logger.info("Canvas modifié avec succès.") return jsonify({"updated_canvas_content": updated_canvas_content, "response_message": response_message}) else: logger.error("Réponse IA canvas_modify invalide: marqueurs CANVAS_START/END manquants.") return jsonify({"error": "Réponse IA invalide pour la modification du canvas", "full_response": ai_response_text}), 500 except Exception as e: error_message = f"Erreur serveur (canvas_modify): {str(e)}" logger.exception(error_message) return jsonify({"error": error_message}), 500

@app.route('/run', methods=['POST'])
def run_code_endpoint() -> Response:
"""
Point d'accès pour exécuter du code via l'API Piston (pour canvas code).
"""
data = request.get_json()
if not data:
logger.warning("Requête run sans données.")
return jsonify({"error": "Requête run invalide : corps de requête manquant."}), 400
language = data.get("language")
code = data.get("code")
stdin = data.get("stdin", "")
if not language or not code:
logger.warning("Requête run incomplète: language=%s, code=%s", language, code)
return jsonify({"error": "Requête run invalide : langage et code requis."}), 400
payload = {
"language": language,
"version": "*",
"files": [{"content": code}],
"stdin": stdin
}
try:
piston_response = requests.post(PISTON_API_URL, json=payload, timeout=3000000)
piston_response.raise_for_status()
logger.info("Code exécuté avec Piston avec succès.")
return jsonify(piston_response.json())
except requests.RequestException as e:
error_message = f"Erreur lors de l'appel à l'API Piston: {str(e)}"
logger.exception(error_message)
piston_text = piston_response.text if 'piston_response' in locals() and piston_response is not None else ""
return jsonify({"error": "Erreur d'exécution du code via l'API Piston.", "piston_response": piston_text, "details": error_message}), 500
except Exception as e:
error_message = f"Erreur inattendue lors de l'exécution du code: {str(e)}"
logger.exception(error_message)
return jsonify({"error": "Erreur inattendue lors de l'exécution du code.", "details": error_message}), 500

@app.route('/run', methods=['OPTIONS'])
def run_code_options() -> Response:
"""Gère les requêtes OPTIONS pour l'endpoint /run (CORS preflight)."""
return jsonify({'message': 'OPTIONS requests allowed'}), 200

@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
files = request.files.getlist('file') # Récupérer tous les fichiers envoyés
if not files or all(file.filename == '' for file in files):
return jsonify({"error": "No selected file"}), 400

text
uploaded_files = [] upload_folder = app.config['UPLOAD_FOLDER'] if not os.path.exists(upload_folder): os.makedirs(upload_folder) # Extensions autorisées étendues allowed_ext = ['.txt', '.py', '.md', '.js', '.json', '.pdf', '.png', '.jpg', '.jpeg'] for file in files: if file.filename == '': continue filename = secure_filename(file.filename) file_path = os.path.join(upload_folder, filename) try: file.save(file_path) file_ext = os.path.splitext(filename)[1].lower() file_content = "" if file_ext in ['.txt', '.py', '.md', '.js', '.json']: with open(file_path, 'r', encoding='utf-8') as f: file_content = f.read() elif file_ext == '.pdf': try: reader = PyPDF2.PdfReader(file_path) for page in reader.pages: file_content += page.extract_text() + "\n" except Exception as e: logger.exception("PDF extraction failed") file_content = f"Erreur extraction PDF: {str(e)}" elif file_ext in ['.png', '.jpg', '.jpeg']: try: img = Image.open(file_path) file_content = pytesseract.image_to_string(img) except Exception as e: logger.exception("OCR extraction failed") file_content = f"Erreur OCR extraction: {str(e)}" uploaded_files.append({"filename": filename, "file_path": file_path, "content": file_content}) except Exception as e: logger.exception("File upload failed") return jsonify({"error": str(e)}), 500 return jsonify({"message": "Files uploaded successfully", "files": uploaded_files}), 200

def format_sse_event(data: Dict[str, Any]) -> str:
"""Formate les données en événement SSE (Server-Sent Events)."""
return f"data: {json.dumps(data)}\n\n"

if name == 'main':
if os.environ.get('FLASK_ENV') == 'development':
app.run(debug=True, port=5000)
"""

Réflexion

Analyzing code for patching

I see that the user wants me to patch the code and make sure it’s working well. They mentioned a "good code interpreter"—I think that’s the goal here. It’s possible the code is missing something or has issues, and I need to find those problems and fix them. The request is a little ambiguous, though, so I’ll need to dig through the code to determine what needs patching. Let's go through it carefully and try to make improvements.

Reviewing and refining the code

The code seems like it’s a Flask backend with multiple endpoints like /models and /chat. There are several pieces to look at.

Some ideas for improvement include:

  • Avoid using stream_with_context twice in some places.
  • Consider adding error handling.
  • The timeout=3000000 in the /run endpoint seems too high; we might need to patch that.
  • There may be some warnings from the use of tenacity's @retry, but it’s already in place.
  • The gevent.monkey.patch_all() could potentially be reconsidered if it’s blocking things.

I’ll review the code in detail to see which adjustments are needed.

Identifying code improvements

I see that the user wants to patch the code to improve the code interpreter’s functionality. A few areas stand out:

  • It looks like the code logic in /chat for breaking the loop works okay, but we might want to check if it's handling errors properly, especially for execution failures.
  • There's a potential patch around using sandbox.execute_sync — maybe the return values need attention.
  • The timeout in the /run endpoint could be too large, so reducing it might help.
  • Error handling improvements could be useful, like ensuring the sandbox properly initializes or handling None values in execution logs.

I'll review these and implement thoughtful patches.

Patching code for improvements

The code seems fine overall, but there are some areas to optimize:

  • In run_code_endpoint, we can lower the timeout to more reasonable values, something like 30 seconds.
  • We'll clean up unused variables like groq_client.
  • Ensure better error handling around piston_response, and tidy the exception handling.
  • In stream_response, break the loop after sending code execution results.
  • Check that we are correctly accessing dictionary keys like "text" and "delta" in SSE events.
  • A quick cleanup of unnecessary imports like datetime could also help.

I'll head into producing a well-patched version now!

None

Reasoned for 50 seconds

Réponse

Below is a revised (“patched”) version of your code. In this version, I made several improvements and adjustments:

  • Sandbox execution: The function run_code_in_sandbox now checks for a valid logs object and uses the new execute_sync method.
  • Piston endpoint timeout: The timeout for calling the Piston API has been reduced to a more reasonable 30 seconds.
  • Streaming loop improvements: In the /chat endpoint’s streaming generator, we now check that a response “delta” exists before processing and immediately break out of the loop once code execution is complete.
  • Minor cleanup: Unused imports (such as the Groq client) are commented out and extra comments have been added to explain the changes.

You can review the code below:


python
from gevent import monkey monkey.patch_all() import os import re import json import logging from typing import Any, Dict, List, Optional, Tuple import time from tqdm import tqdm from flask import Flask, request, Response, jsonify, stream_with_context from flask_cors import CORS from e2b_code_interpreter import Sandbox from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import requests from duckduckgo_search import DDGS from bs4 import BeautifulSoup from urllib.parse import urlparse from werkzeug.utils import secure_filename # For secure file naming # from groq import Groq # Unused in this version; commented out import PyPDF2 from PIL import Image import pytesseract # --- Configuration --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) CORS(app) app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads') GROQ_API_KEY = os.environ.get("GROQ_API_KEY") E2B_API_KEY = os.environ.get("E2B_API_KEY") OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") # Added for OpenRouter PISTON_API_URL = "https://emkc.org/api/v2/piston/execute" # groq_client = Groq(api_key=GROQ_API_KEY) # Not used in this patched version # --- Web search configuration --- WEB_SEARCH_CONFIG = { 'openrouter_api_key': OPENROUTER_API_KEY, 'max_results': 9, 'cache_file': 'search_cache.json', 'model': 'mistralai/mistral-medium', # OpenRouter model for filtering/synthesis 'timeout': 20, 'min_quality_score': 0.65 } VALID_MODELS = [ "google/gemini-2.0-pro-exp-02-05:free", "deepseek/deepseek-r1:free", "qwen/qwen2.5-vl-72b-instruct:free", "meta-llama/llama-3.3-70b-instruct:free" ] CODE_PATTERN = re.compile(r"```python\s*([\s\S]*?)```") CODE_REQUIRED_MARKER = "[CODE_REQUIRED]" CANVAS_START_MARKER = "[CANVAS_START]" CANVAS_END_MARKER = "[CANVAS_END]" # --- E2B Sandbox Initialization with Retry --- @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=20), retry=retry_if_exception_type(Exception)) def initialize_sandbox(): logger.info("Initializing e2b Sandbox with 300-second timeout...") return Sandbox(api_key=E2B_API_KEY, timeout=300) try: sandbox = initialize_sandbox() logger.info("e2b Sandbox initialized successfully.") except Exception as e: logger.error(f"Failed to initialize e2b Sandbox after multiple retries: {e}") raise e # --- Web Search and Content Processing Classes --- class DataCollector: @staticmethod def web_search(query: str) -> List[Dict]: with DDGS() as ddgs: results = ddgs.text(query, region='fr-fr', max_results=WEB_SEARCH_CONFIG['max_results']) return [{ 'url': r['href'], 'title': r['title'], 'domain': urlparse(r['href']).netloc.replace('www.', '') } for r in results if urlparse(r['href']).scheme in ['http', 'https']] class ContentProcessor: def __init__(self): self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'} def fetch_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=WEB_SEARCH_CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: logger.error(f"Erreur sur {url}: {str(e)}") return '' class AICore: @staticmethod def query_ai(system_prompt: str, user_content: str, temp: float = 0.1, model: str = WEB_SEARCH_CONFIG['model']) -> str: headers = { "Authorization": f"Bearer {WEB_SEARCH_CONFIG['openrouter_api_key']}", "Content-Type": "application/json", "HTTP-Referer": "https://your-app-url", # Replace with your app URL "X-Title": "Flask Search AI" } data = { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "temperature": temp, "max_tokens": 2000 } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Erreur API OpenRouter: {str(e)}" # --- OpenRouter API wrappers --- def open_router_chat_completion_stream(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0): headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", } payload = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": 2000, "stream": True } response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, stream=True, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() for line in response.iter_lines(): if line: decoded_line = line.decode('utf-8') if decoded_line.startswith("data: "): data_str = decoded_line[len("data: "):] try: data = json.loads(data_str) yield data except Exception: continue def open_router_chat_completion(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0) -> Dict: headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", } payload = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": 2000, "stream": False } response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() return response.json() # --- Utility functions --- def extract_code_blocks(text: str) -> Tuple[Optional[str], Optional[str]]: if CODE_REQUIRED_MARKER in text: text_no_marker = text.replace(CODE_REQUIRED_MARKER, "", 1).strip() code_match = CODE_PATTERN.search(text_no_marker) if code_match: return code_match.group(1).strip(), None else: code_match = CODE_PATTERN.search(text) if code_match: return None, code_match.group(1).strip() return None, None # PATCH: Updated to use execute_sync and properly handle logs def run_code_in_sandbox(code: str) -> str: start_time = time.time() try: logger.info("Running code in e2b sandbox using execute_sync...") execution = sandbox.execute_sync(code) logs = execution.logs if hasattr(execution, 'logs') else None stdout_logs = logs.stdout if logs and hasattr(logs, 'stdout') else [] stderr_logs = logs.stderr if logs and hasattr(logs, 'stderr') else [] stdout = "\n".join(stdout_logs) stderr = "\n".join(stderr_logs) if stderr: return f"Stdout:\n{stdout}\n\nStderr:\n{stderr}" return stdout except Exception as e: duration = time.time() - start_time error_msg = f"Erreur lors de l'exécution du code dans e2b après {duration:.2f} secondes. Erreur: {e}" logger.exception(error_msg) return error_msg def extract_numeric_result(exec_result: str) -> Optional[str]: if "Stdout:" in exec_result: stdout_section = exec_result.split("Stdout:\n", 1)[1].split("\n\nStderr:")[0].strip() else: stdout_section = exec_result.strip() lines = [l.strip() for l in stdout_section.splitlines() if l.strip()] if not lines: return None last_line = lines[-1] if re.search(r'\d', last_line): return last_line return None def format_sse_event(data: Dict[str, Any]) -> str: """Formats data as an SSE event.""" return f"data: {json.dumps(data)}\n\n" # --- Flask Routes --- @app.route('/models', methods=['GET']) def get_models() -> Response: """Returns the list of valid models.""" return jsonify({"models": VALID_MODELS}) @app.route('/chat', methods=['POST']) def chat_endpoint() -> Response: """ Main chat endpoint. Handles SSE streaming, code execution, canvas modifications, and web search. """ @stream_with_context @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def stream_response(): data = request.get_json() if not data: logger.warning("Requête chat invalide: corps de requête manquant.") yield format_sse_event({"type": "error", "content": "Requête chat invalide : corps de requête manquant."}) return messages = data.get('messages', []) model: str = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip() execute_code: bool = data.get('execute_code', True) web_search_enabled: bool = data.get('web_search', False) if not messages or not isinstance(messages, list) or not model or model not in VALID_MODELS: logger.warning("Requête chat invalide: messages=%s, model=%s", messages, model) yield format_sse_event({"type": "error", "content": "Requête chat invalide : messages ou modèle incorrects."}) return # --- Web Search Integration --- web_search_results = [] if web_search_enabled: user_query = messages[-1]['content'] yield format_sse_event({"type": "web_search_status", "content": "start"}) yield format_sse_event({"type": "web_search", "query": user_query}) search_processor = ContentProcessor() search_results = DataCollector.web_search(user_query) filtered_results = [] for result in search_results: content = search_processor.fetch_content(result['url']) if content: filter_prompt = ( "Tu es un expert en analyse de la pertinence des documents. " f"Détermine si le contenu web suivant est pertinent pour la requête utilisateur : '{user_query}'. " "Réponds uniquement par 'oui' ou 'non'." ) is_relevant = AICore.query_ai(filter_prompt, content[:2000], temp=0.0, model="mistralai/mistral-small") if is_relevant.lower().startswith('oui'): filtered_results.append({'url': result['url'], 'domain': result['domain'], 'content': content}) if len(filtered_results) >= WEB_SEARCH_CONFIG['max_results']: break web_search_results = filtered_results source_urls = [result['url'] for result in web_search_results] yield format_sse_event({"type": "web_sources", "content": source_urls}) yield format_sse_event({"type": "web_search_status", "content": "end"}) base_prompt = ( "🎯 **Rôle : Expert en calcul numérique précis, assistant polyvalent avec canvas interactifs et capacités de recherche Web.**\n" "**Mission :** Répondre aux questions de l'utilisateur, effectuer des calculs précis, fournir des canvas interactifs (texte et code) et, si nécessaire, effectuer des recherches sur le Web pour enrichir la réponse.\n\n" "**Si la question demande un calcul numérique :**\n" "1. Annonce que tu vas exécuter un code Python pour réaliser le calcul.\n" "2. Ajoute le marqueur `[CODE_REQUIRED]` suivi du bloc de code minimal en Python.\n" "3. N'envoie rien d'autre et attends l'exécution du code.\n\n" "**Après l'exécution du code**, explique le résultat en détails en te basant sur le résultat numérique obtenu.\n\n" "**Pour la création ou modification de canvas** (texte ou code) :\n" "Utilise les marqueurs `[CANVAS_START]` et `[CANVAS_END]` pour encadrer le contenu du canvas.\n\n" "**Si la recherche Web est activée :**\n" "1. Tu recevras des extraits de pages Web pertinentes.\n" "2. Utilise ces informations pour compléter ta réponse, en citant les sources de manière concise (ex: [nomdedomaine.com]).\n" "3. Si les sources Web ne sont pas pertinentes ou suffisantes, base-toi sur tes connaissances internes.\n\n" "Si aucune de ces conditions n'est remplie, réponds directement et clairement à la question." ) system_prompt = {"role": "system", "content": base_prompt} if web_search_results: sources_summary = "\n\n".join([ f"[{result['domain']}]\nExtrait: {result['content'][:500]}" for result in web_search_results ]) web_search_context = {"role": "user", "content": f"Voici des extraits de pages web pertinentes pour la question:\n\n{sources_summary}"} messages_with_system_prompt = [system_prompt, web_search_context] + messages else: messages_with_system_prompt = [system_prompt] + messages full_reply_text = "" code_to_execute = None code_to_display = None initial_response_completed = False try: router_response_stream = open_router_chat_completion_stream(messages_with_system_prompt, model, 0.0) for chunk in router_response_stream: if initial_response_completed: break delta_content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "") if not delta_content: continue delta_content_no_marker = delta_content.replace(CODE_REQUIRED_MARKER, "") full_reply_text += delta_content_no_marker if CANVAS_START_MARKER in full_reply_text and CANVAS_END_MARKER in full_reply_text: start_index = full_reply_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = full_reply_text.find(CANVAS_END_MARKER) yield format_sse_event({"type": "canvas_start", "content": full_reply_text[start_index:end_index].strip()}) continue yield format_sse_event({"type": "text", "content": delta_content_no_marker}) if execute_code: code_to_execute, code_to_display = extract_code_blocks(full_reply_text) if code_to_execute: yield format_sse_event({"type": "code", "content": code_to_display}) yield format_sse_event({"type": "analysis_status", "content": "start"}) exec_result = run_code_in_sandbox(code_to_execute) numeric_result = extract_numeric_result(exec_result) yield format_sse_event({"type": "code_result", "content": exec_result}) yield format_sse_event({"type": "analysis_status", "content": "end"}) full_reply_text += f"\nRésultat du code:\n```\n{exec_result}\n```\n" initial_response_completed = True break yield format_sse_event({"type": "final", "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}) except Exception as e: error_message = f"Erreur serveur IA: {str(e)}" logger.exception(error_message) yield format_sse_event({"type": "error", "content": error_message}) finally: yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "done", "is_code_execution": code_to_execute is not None}) return Response(stream_response(), mimetype='text/event-stream') @app.route('/canvas_modify', methods=['POST']) def canvas_modify_endpoint() -> Response: data = request.get_json() if not data: logger.warning("Requête canvas_modify sans données.") return jsonify({"error": "Requête canvas_modify invalide : corps de requête manquant."}), 400 user_message = data.get('message') canvas_content = data.get('canvas_content') model = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip() if not user_message or canvas_content is None: logger.warning("Requête canvas_modify incomplète: message=%s, canvas_content=%s", user_message, canvas_content) return jsonify({"error": "Requête canvas_modify invalide : message ou contenu canvas manquant."}), 400 canvas_chat_messages = [ {"role": "system", "content": ( "Tu es dans un chat de canvas. L'utilisateur va te demander de modifier le contenu du canvas suivant. " "Fournis UNIQUEMENT le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR, encadré par `[CANVAS_START]` et `[CANVAS_END]`. " "Ne réponds pas avec du texte en dehors de ces marqueurs, sauf une courte phrase de confirmation après le canvas. " "Maintiens le contexte des modifications précédentes du canvas, en te basant sur l'historique de ce chat canvas." )}, {"role": "assistant", "content": f"[CANVAS_START]\n{canvas_content}\n[CANVAS_END] Voici le canvas actuel. Dites-moi comment le modifier."}, {"role": "user", "content": user_message} ] try: const_response = open_router_chat_completion(canvas_chat_messages, model, 0.0) ai_response_text = const_response.get("choices", [{}])[0].get("message", {}).get("content", "") or "" if CANVAS_START_MARKER in ai_response_text and CANVAS_END_MARKER in ai_response_text: start_index = ai_response_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = ai_response_text.find(CANVAS_END_MARKER) updated_canvas_content = ai_response_text[start_index:end_index].strip() response_message = "Canvas mis à jour." logger.info("Canvas modifié avec succès.") return jsonify({"updated_canvas_content": updated_canvas_content, "response_message": response_message}) else: logger.error("Réponse IA canvas_modify invalide: marqueurs CANVAS_START/END manquants.") return jsonify({"error": "Réponse IA invalide pour la modification du canvas", "full_response": ai_response_text}), 500 except Exception as e: error_message = f"Erreur serveur (canvas_modify): {str(e)}" logger.exception(error_message) return jsonify({"error": error_message}), 500 @app.route('/run', methods=['POST']) def run_code_endpoint() -> Response: """ Endpoint for code execution via the Piston API (used for canvas code). """ data = request.get_json() if not data: logger.warning("Requête run sans données.") return jsonify({"error": "Requête run invalide : corps de requête manquant."}), 400 language = data.get("language") code = data.get("code") stdin = data.get("stdin", "") if not language or not code: logger.warning("Requête run incomplète: language=%s, code=%s", language, code) return jsonify({"error": "Requête run invalide : langage et code requis."}), 400 payload = { "language": language, "version": "*", "files": [{"content": code}], "stdin": stdin } try: # Using a more reasonable timeout (e.g., 30 seconds) piston_response = requests.post(PISTON_API_URL, json=payload, timeout=30) piston_response.raise_for_status() logger.info("Code exécuté avec Piston avec succès.") return jsonify(piston_response.json()) except requests.RequestException as e: error_message = f"Erreur lors de l'appel à l'API Piston: {str(e)}" logger.exception(error_message) piston_text = piston_response.text if 'piston_response' in locals() and piston_response is not None else "" return jsonify({"error": "Erreur d'exécution du code via l'API Piston.", "piston_response": piston_text, "details": error_message}), 500 except Exception as e: error_message = f"Erreur inattendue lors de l'exécution du code: {str(e)}" logger.exception(error_message) return jsonify({"error": "Erreur inattendue lors de l'exécution du code.", "details": error_message}), 500 @app.route('/run', methods=['OPTIONS']) def run_code_options() -> Response: """Handles OPTIONS requests for the /run endpoint (CORS preflight).""" return jsonify({'message': 'OPTIONS requests allowed'}), 200 @app.route('/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({"error": "No file part"}), 400 files = request.files.getlist('file') if not files or all(file.filename == '' for file in files): return jsonify({"error": "No selected file"}), 400 uploaded_files = [] upload_folder = app.config['UPLOAD_FOLDER'] if not os.path.exists(upload_folder): os.makedirs(upload_folder) # Extended allowed file extensions allowed_ext = ['.txt', '.py', '.md', '.js', '.json', '.pdf', '.png', '.jpg', '.jpeg'] for file in files: if file.filename == '': continue filename = secure_filename(file.filename) file_path = os.path.join(upload_folder, filename) try: file.save(file_path) file_ext = os.path.splitext(filename)[1].lower() file_content = "" if file_ext in ['.txt', '.py', '.md', '.js', '.json']: with open(file_path, 'r', encoding='utf-8') as f: file_content = f.read() elif file_ext == '.pdf': try: reader = PyPDF2.PdfReader(file_path) for page in reader.pages: file_content += page.extract_text() + "\n" except Exception as e: logger.exception("PDF extraction failed") file_content = f"Erreur extraction PDF: {str(e)}" elif file_ext in ['.png', '.jpg', '.jpeg']: try: img = Image.open(file_path) file_content = pytesseract.image_to_string(img) except Exception as e: logger.exception("OCR extraction failed") file_content = f"Erreur OCR extraction: {str(e)}" uploaded_files.append({"filename": filename, "file_path": file_path, "content": file_content}) except Exception as e: logger.exception("File upload failed") return jsonify({"error": str(e)}), 500 return jsonify({"message": "Files uploaded successfully", "files": uploaded_files}), 200 if __name__ == '__main__': if os.environ.get('FLASK_ENV') == 'development': app.run(debug=True, port=5000) else: app.run(port=5000)

Summary of the patches:

  1. Sandbox Code Execution:
    – The run_code_in_sandbox function now safely checks if logs are available and uses sandbox.execute_sync.
  2. Piston Timeout:
    – The /run endpoint now uses a 30‑second timeout instead of an extremely high value.
  3. Streaming Adjustments:
    – In the /chat endpoint, the response stream checks for valid delta content and immediately breaks out once code execution is complete.
  4. Minor Cleanup:
    – Unused imports have been commented out, and comments have been added for clarity.

Feel free to adjust any parameters (like timeout values or log messages) as needed for your environment.

Partager cette Q&R