can you patch this code for good code interpreter ...
Créé le : 19 février 2025
Répondu en utilisant O3 Mini High par Chat01
Créé le : 19 février 2025
Répondu en utilisant O3 Mini High par Chat01
can you patch this code for good code interpreter working greats pls :
"""
from gevent import monkey
monkey.patch_all()
import os
import re
import json
import logging
from typing import Any, Dict, List, Optional, Tuple
import time
import datetime
from tqdm import tqdm
from flask import Flask, request, Response, jsonify, stream_with_context
from flask_cors import CORS
from e2b_code_interpreter import Sandbox
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from werkzeug.utils import secure_filename # Ajout pour la gestion sécurisée des noms de fichier
from groq import Groq
import PyPDF2
from PIL import Image
import pytesseract
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
app = Flask(name)
CORS(app)
app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(file)), 'uploads') # Dossier de destination
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
E2B_API_KEY = os.environ.get("E2B_API_KEY")
OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") # Ajout de la clé OpenRouter
PISTON_API_URL = "https://emkc.org/api/v2/piston/execute"
groq_client = Groq(api_key=GROQ_API_KEY)
@retry(stop=stop_after_attempt(5), # Tentatives augmentées à 5
wait=wait_exponential(multiplier=1, min=4, max=20), # Backoff jusqu'à 20 secondes
retry=retry_if_exception_type(Exception))
def initialize_sandbox():
"""Initializes the e2b Sandbox with a higher timeout."""
logger.info("Initializing e2b Sandbox with 300-second timeout...")
return Sandbox(api_key=E2B_API_KEY, timeout=300) # Timeout augmenté à 300 secondes
try:
sandbox = initialize_sandbox()
logger.info("e2b Sandbox initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize e2b Sandbox after multiple retries: {e}")
# It's crucial to handle the failure of sandbox initialization.
# You might want to exit the application or have a fallback mechanism.
# For now, let's raise the exception to prevent the app from starting improperly.
raise e
VALID_MODELS = [
"google/gemini-2.0-pro-exp-02-05:free",
"deepseek/deepseek-r1:free",
"qwen/qwen2.5-vl-72b-instruct:free",
"meta-llama/llama-3.3-70b-instruct:free"
]
CODE_PATTERN = re.compile(r"python\s*([\s\S]*?)
")
CODE_REQUIRED_MARKER = "[CODE_REQUIRED]"
CANVAS_START_MARKER = "[CANVAS_START]"
CANVAS_END_MARKER = "[CANVAS_END]"
WEB_SEARCH_CONFIG = {
'openrouter_api_key': OPENROUTER_API_KEY,
'max_results': 9, # Augmenté à 9 par défaut
'cache_file': 'search_cache.json', # Le cache est géré, mais pas crucial pour le backend Flask
'model': 'mistralai/mistral-medium', # Modèle OpenRouter pour le filtrage et la synthèse. Peut être changé.
'timeout': 20,
'min_quality_score': 0.65 # Ce paramètre n'est plus utilisé directement, remplacé par un filtrage AI plus précis.
}
class DataCollector:
@staticmethod
def web_search(query: str) -> List[Dict]:
with DDGS() as ddgs:
results = ddgs.text(query, region='fr-fr', max_results=WEB_SEARCH_CONFIG['max_results'])
return [{
'url': r['href'],
'title': r['title'],
'domain': urlparse(r['href']).netloc.replace('www.', '')
} for r in results if urlparse(r['href']).scheme in ['http', 'https']]
class ContentProcessor:
def init(self):
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
textdef fetch_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=WEB_SEARCH_CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: logger.error(f"Erreur sur {url}: {str(e)}") return ''
class AICore:
@staticmethod
def query_ai(system_prompt: str, user_content: str, temp: float = 0.1, model:str = WEB_SEARCH_CONFIG['model']) -> str: # Ajout du paramètre model
headers = {
"Authorization": f"Bearer {WEB_SEARCH_CONFIG['openrouter_api_key']}",
"Content-Type": "application/json",
"HTTP-Referer": "https://your-app-url", # Remplacez par l'URL de votre application
"X-Title": "Flask Search AI"
}
textdata = { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "temperature": temp, "max_tokens": 2000 } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() # On lève une exception si le statut n'est pas 200 return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Erreur API OpenRouter: {str(e)}"
def open_router_chat_completion_stream(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0):
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": 2000,
"stream": True
}
response = requests.post("https://openrouter.ai/api/v1/chat/completions",
headers=headers, json=payload, stream=True, timeout=WEB_SEARCH_CONFIG['timeout'])
response.raise_for_status()
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith("data: "):
data_str = decoded_line[len("data: "):]
try:
data = json.loads(data_str)
yield data
except Exception:
continue
def open_router_chat_completion(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0) -> Dict:
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": 2000,
"stream": False
}
response = requests.post("https://openrouter.ai/api/v1/chat/completions",
headers=headers, json=payload, timeout=WEB_SEARCH_CONFIG['timeout'])
response.raise_for_status()
return response.json()
def extract_code_blocks(text: str) -> Tuple[Optional[str], Optional[str]]:
"""Extrait les blocs de code Python et gère le marqueur [CODE_REQUIRED]."""
if CODE_REQUIRED_MARKER in text:
text_no_marker = text.replace(CODE_REQUIRED_MARKER, "", 1).strip()
code_match = CODE_PATTERN.search(text_no_marker)
if code_match:
return code_match.group(1).strip(), None
else:
code_match = CODE_PATTERN.search(text)
if code_match:
return None, code_match.group(1).strip() # code à afficher sans exécution
return None, None
def run_code_in_sandbox(code: str) -> str:
start_time = time.time()
try:
logger.info(f"Running code in e2b sandbox using execute_sync...") # Updated log message
execution = sandbox.execute_sync(code) # Using execute_sync instead of run_code_sync
logs = execution.logs
stdout_logs = logs.stdout if hasattr(logs, 'stdout') else []
stderr_logs = logs.stderr if hasattr(logs, 'stderr') else []
stdout = "\n".join(stdout_logs)
stderr = "\n".join(stderr_logs)
if stderr:
return f"Stdout:\n{stdout}\n\nStderr:\n{stderr}"
return stdout
except Exception as e:
duration = time.time() - start_time
error_msg = f"Erreur lors de l'exécution du code dans e2b après {duration:.2f} secondes. Erreur: {e}"
logger.exception(error_msg)
return error_msg
def extract_numeric_result(exec_result: str) -> Optional[str]:
"""Extrait la dernière ligne numérique du résultat d'exécution de code."""
if "Stdout:" in exec_result:
stdout_section = exec_result.split("Stdout:\n", 1)[1].split("\n\nStderr:")[0].strip()
else:
stdout_section = exec_result.strip()
lines = [l.strip() for l in stdout_section.splitlines() if l.strip()]
if not lines:
return None
last_line = lines[-1]
if re.search(r'\d', last_line):
return last_line
return None
@app.route('/models', methods=['GET'])
def get_models() -> Response:
"""Retourne la liste des modèles valides."""
return jsonify({"models": VALID_MODELS})
@app.route('/chat', methods=['POST'])
def chat_endpoint() -> Response:
"""
Point d'entrée principal pour le chat.
Gère le streaming SSE, l'exécution de code, les canvas et la recherche Web.
"""
@stream_with_context
@retry(stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(Exception))
def stream_response():
data = request.get_json()
if not data:
logger.warning("Requête chat invalide: corps de requête manquant.")
yield format_sse_event({"type": "error", "content": "Requête chat invalide : corps de requête manquant."})
return
textmessages = data.get('messages', []) model: str = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip() execute_code: bool = data.get('execute_code', True) web_search_enabled: bool = data.get('web_search', False) # Récupère l'état de la recherche Web if not messages or not isinstance(messages, list) or not model or model not in VALID_MODELS: logger.warning("Requête chat invalide: messages=%s, model=%s", messages, model) yield format_sse_event({"type": "error", "content": "Requête chat invalide : messages ou modèle incorrects."}) return # --- Intégration de la recherche Web --- web_search_results = [] if web_search_enabled: user_query = messages[-1]['content'] # On utilise le dernier message de l'utilisateur comme requête yield format_sse_event({"type": "web_search_status", "content": "start"}) yield format_sse_event({"type": "web_search", "query": user_query}) # Envoi de la requête pour l'afficher search_processor = ContentProcessor() search_results = DataCollector.web_search(user_query) # Filtrage et extraction du contenu (comme dans le CLI, mais intégré ici) filtered_results = [] for result in search_results: content = search_processor.fetch_content(result['url']) if content: # Filtrage de pertinence avec OpenRouter (comme dans le CLI) filter_prompt = ( "Tu es un expert en analyse de la pertinence des documents. " f"Détermine si le contenu web suivant est pertinent pour la requête utilisateur : '{user_query}'. " "Réponds uniquement par 'oui' ou 'non'." ) is_relevant = AICore.query_ai(filter_prompt, content[:2000], temp=0.0, model="mistralai/mistral-small") # Filtrage plus rapide if is_relevant.lower().startswith('oui'): filtered_results.append({'url': result['url'], 'domain': result['domain'], 'content': content}) if len(filtered_results) >= WEB_SEARCH_CONFIG['max_results']: break web_search_results = filtered_results # Envoi des URLs des sources *après* le filtrage source_urls = [result['url'] for result in web_search_results] yield format_sse_event({"type": "web_sources", "content": source_urls}) yield format_sse_event({"type": "web_search_status", "content": "end"}) # --- Construction du prompt système (adapté pour la recherche Web) --- base_prompt = ( "🎯 **Rôle : Expert en calcul numérique précis, assistant polyvalent avec canvas interactifs et capacités de recherche Web.**\n" "**Mission :** Répondre aux questions de l'utilisateur, effectuer des calculs précis, fournir des canvas interactifs (texte et code) et, si nécessaire, effectuer des recherches sur le Web pour enrichir la réponse.\n\n" "**Si la question demande un calcul numérique :**\n" "1. Annonce que tu vas exécuter un code Python pour réaliser le calcul.\n" "2. Ajoute le marqueur `[CODE_REQUIRED]` suivi du bloc de code minimal en Python.\n" "3. N'envoie rien d'autre et attends l'exécution du code.\n\n" "**Après l'exécution du code**, explique le résultat en détails en te basant sur le résultat numérique obtenu.\n\n" "**Pour la création ou modification de canvas** (texte ou code) :\n" "Utilise les marqueurs `[CANVAS_START]` et `[CANVAS_END]` pour encadrer le contenu du canvas.\n\n" "**Si la recherche Web est activée :**\n" "1. Tu recevras des extraits de pages Web pertinentes.\n" "2. Utilise ces informations pour compléter ta réponse, en citant les sources de manière concise (ex: [nomdedomaine.com]).\n" "3. Si les sources Web ne sont pas pertinentes ou suffisantes, base-toi sur tes connaissances internes.\n\n" "Si aucune de ces conditions n'est remplie, réponds directement et clairement à la question." ) # --- Préparation des messages pour Groq --- system_prompt = {"role": "system", "content": base_prompt} # Ajout des résultats de la recherche Web au contexte, si applicable if web_search_results: # Création d'un résumé des sources pour Groq (plus concis que d'envoyer tout le contenu) sources_summary = "\n\n".join([ f"[{result['domain']}]\nExtrait: {result['content'][:500]}" # Limite la longueur de l'extrait for result in web_search_results ]) web_search_context = {"role": "user", "content": f"Voici des extraits de pages web pertinentes pour la question:\n\n{sources_summary}"} messages_with_system_prompt = [system_prompt, web_search_context] + messages else: messages_with_system_prompt = [system_prompt] + messages full_reply_text = "" code_to_execute = None code_to_display = None is_code_used = False canvas_content = None code_executed = False prompt_tokens = 0 completion_tokens = 0 total_tokens = 0 initial_response_completed = False try: # Remplacez l'appel à groq_client par OpenRouter en mode streaming const_response_stream = open_router_chat_completion_stream(messages_with_system_prompt, model, 0.0) for chunk in const_response_stream: if initial_response_completed: break delta_content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "") or "" delta_content_no_marker = delta_content.replace(CODE_REQUIRED_MARKER, "") full_reply_text += delta_content_no_marker if CANVAS_START_MARKER in full_reply_text and CANVAS_END_MARKER in full_reply_text: start_index = full_reply_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = full_reply_text.find(CANVAS_END_MARKER) yield format_sse_event({"type": "canvas_start", "content": full_reply_text[start_index:end_index].strip()}) continue yield format_sse_event({"type": "text", "content": delta_content_no_marker}) if execute_code: # Condition pour exécuter le code seulement si execute_code est True code_to_execute, code_to_display = extract_code_blocks(full_reply_text) if code_to_execute: yield format_sse_event({"type": "code", "content": code_to_display}) yield format_sse_event({"type": "analysis_status", "content": "start"}) exec_result = run_code_in_sandbox(code_to_execute) numeric_result = extract_numeric_result(exec_result) yield format_sse_event({"type": "code_result", "content": exec_result}) yield format_sse_event({"type": "analysis_status", "content": "end"}) full_reply_text += f"\nRésultat du code:\n```\n{exec_result}\n```\n" # Ajout du résultat à full_reply_text pour le contexte futur initial_response_completed = True # Stop streaming after code execution yield format_sse_event({"type": "final", "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}) except Exception as e: error_message = f"Erreur serveur IA: {str(e)}" logger.exception(error_message) yield format_sse_event({"type": "error", "content": error_message}) finally: yield format_sse_event({"type": "analysis_status", "content": "end"}) # Assurer que le statut "analysis_end" est toujours envoyé yield format_sse_event({"type": "done", "is_code_execution": code_to_execute is not None}) # Indique si l'exécution de code a eu lieu return Response(stream_with_context(stream_response()), mimetype='text/event-stream')
@app.route('/canvas_modify', methods=['POST'])
def canvas_modify_endpoint() -> Response:
data = request.get_json()
if not data:
logger.warning("Requête canvas_modify sans données.")
return jsonify({"error": "Requête canvas_modify invalide : corps de requête manquant."}), 400
user_message = data.get('message')
canvas_content = data.get('canvas_content')
model = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip()
if not user_message or canvas_content is None:
logger.warning("Requête canvas_modify incomplète: message=%s, canvas_content=%s", user_message, canvas_content)
return jsonify({"error": "Requête canvas_modify invalide : message ou contenu canvas manquant."}), 400
canvas_chat_messages = [
{"role": "system", "content": (
"Tu es dans un chat de canvas. L'utilisateur va te demander de modifier le contenu du canvas suivant. "
"Fournis UNIQUEMENT le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR, encadré par [CANVAS_START]
et [CANVAS_END]
. "
"Ne réponds pas avec du texte en dehors de ces marqueurs, sauf une courte phrase de confirmation après le canvas. "
"Maintiens le contexte des modifications précédentes du canvas, en te basant sur l'historique de ce chat canvas."
)},
{"role": "assistant", "content": f"[CANVAS_START]\n{canvas_content}\n[CANVAS_END] Voici le canvas actuel. Dites-moi comment le modifier."},
{"role": "user", "content": user_message}
]
texttry: const_response = open_router_chat_completion(canvas_chat_messages, model, 0.0) ai_response_text = const_response.get("choices", [{}])[0].get("message", {}).get("content", "") or "" if CANVAS_START_MARKER in ai_response_text and CANVAS_END_MARKER in ai_response_text: start_index = ai_response_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = ai_response_text.find(CANVAS_END_MARKER) updated_canvas_content = ai_response_text[start_index:end_index].strip() response_message = "Canvas mis à jour." logger.info("Canvas modifié avec succès.") return jsonify({"updated_canvas_content": updated_canvas_content, "response_message": response_message}) else: logger.error("Réponse IA canvas_modify invalide: marqueurs CANVAS_START/END manquants.") return jsonify({"error": "Réponse IA invalide pour la modification du canvas", "full_response": ai_response_text}), 500 except Exception as e: error_message = f"Erreur serveur (canvas_modify): {str(e)}" logger.exception(error_message) return jsonify({"error": error_message}), 500
@app.route('/run', methods=['POST'])
def run_code_endpoint() -> Response:
"""
Point d'accès pour exécuter du code via l'API Piston (pour canvas code).
"""
data = request.get_json()
if not data:
logger.warning("Requête run sans données.")
return jsonify({"error": "Requête run invalide : corps de requête manquant."}), 400
language = data.get("language")
code = data.get("code")
stdin = data.get("stdin", "")
if not language or not code:
logger.warning("Requête run incomplète: language=%s, code=%s", language, code)
return jsonify({"error": "Requête run invalide : langage et code requis."}), 400
payload = {
"language": language,
"version": "*",
"files": [{"content": code}],
"stdin": stdin
}
try:
piston_response = requests.post(PISTON_API_URL, json=payload, timeout=3000000)
piston_response.raise_for_status()
logger.info("Code exécuté avec Piston avec succès.")
return jsonify(piston_response.json())
except requests.RequestException as e:
error_message = f"Erreur lors de l'appel à l'API Piston: {str(e)}"
logger.exception(error_message)
piston_text = piston_response.text if 'piston_response' in locals() and piston_response is not None else ""
return jsonify({"error": "Erreur d'exécution du code via l'API Piston.", "piston_response": piston_text, "details": error_message}), 500
except Exception as e:
error_message = f"Erreur inattendue lors de l'exécution du code: {str(e)}"
logger.exception(error_message)
return jsonify({"error": "Erreur inattendue lors de l'exécution du code.", "details": error_message}), 500
@app.route('/run', methods=['OPTIONS'])
def run_code_options() -> Response:
"""Gère les requêtes OPTIONS pour l'endpoint /run (CORS preflight)."""
return jsonify({'message': 'OPTIONS requests allowed'}), 200
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
files = request.files.getlist('file') # Récupérer tous les fichiers envoyés
if not files or all(file.filename == '' for file in files):
return jsonify({"error": "No selected file"}), 400
textuploaded_files = [] upload_folder = app.config['UPLOAD_FOLDER'] if not os.path.exists(upload_folder): os.makedirs(upload_folder) # Extensions autorisées étendues allowed_ext = ['.txt', '.py', '.md', '.js', '.json', '.pdf', '.png', '.jpg', '.jpeg'] for file in files: if file.filename == '': continue filename = secure_filename(file.filename) file_path = os.path.join(upload_folder, filename) try: file.save(file_path) file_ext = os.path.splitext(filename)[1].lower() file_content = "" if file_ext in ['.txt', '.py', '.md', '.js', '.json']: with open(file_path, 'r', encoding='utf-8') as f: file_content = f.read() elif file_ext == '.pdf': try: reader = PyPDF2.PdfReader(file_path) for page in reader.pages: file_content += page.extract_text() + "\n" except Exception as e: logger.exception("PDF extraction failed") file_content = f"Erreur extraction PDF: {str(e)}" elif file_ext in ['.png', '.jpg', '.jpeg']: try: img = Image.open(file_path) file_content = pytesseract.image_to_string(img) except Exception as e: logger.exception("OCR extraction failed") file_content = f"Erreur OCR extraction: {str(e)}" uploaded_files.append({"filename": filename, "file_path": file_path, "content": file_content}) except Exception as e: logger.exception("File upload failed") return jsonify({"error": str(e)}), 500 return jsonify({"message": "Files uploaded successfully", "files": uploaded_files}), 200
def format_sse_event(data: Dict[str, Any]) -> str:
"""Formate les données en événement SSE (Server-Sent Events)."""
return f"data: {json.dumps(data)}\n\n"
if name == 'main':
if os.environ.get('FLASK_ENV') == 'development':
app.run(debug=True, port=5000)
"""
Below is a revised (“patched”) version of your code. In this version, I made several improvements and adjustments:
run_code_in_sandbox
now checks for a valid logs object and uses the new execute_sync
method./chat
endpoint’s streaming generator, we now check that a response “delta” exists before processing and immediately break out of the loop once code execution is complete.You can review the code below:
pythonfrom gevent import monkey monkey.patch_all() import os import re import json import logging from typing import Any, Dict, List, Optional, Tuple import time from tqdm import tqdm from flask import Flask, request, Response, jsonify, stream_with_context from flask_cors import CORS from e2b_code_interpreter import Sandbox from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import requests from duckduckgo_search import DDGS from bs4 import BeautifulSoup from urllib.parse import urlparse from werkzeug.utils import secure_filename # For secure file naming # from groq import Groq # Unused in this version; commented out import PyPDF2 from PIL import Image import pytesseract # --- Configuration --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) CORS(app) app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads') GROQ_API_KEY = os.environ.get("GROQ_API_KEY") E2B_API_KEY = os.environ.get("E2B_API_KEY") OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") # Added for OpenRouter PISTON_API_URL = "https://emkc.org/api/v2/piston/execute" # groq_client = Groq(api_key=GROQ_API_KEY) # Not used in this patched version # --- Web search configuration --- WEB_SEARCH_CONFIG = { 'openrouter_api_key': OPENROUTER_API_KEY, 'max_results': 9, 'cache_file': 'search_cache.json', 'model': 'mistralai/mistral-medium', # OpenRouter model for filtering/synthesis 'timeout': 20, 'min_quality_score': 0.65 } VALID_MODELS = [ "google/gemini-2.0-pro-exp-02-05:free", "deepseek/deepseek-r1:free", "qwen/qwen2.5-vl-72b-instruct:free", "meta-llama/llama-3.3-70b-instruct:free" ] CODE_PATTERN = re.compile(r"```python\s*([\s\S]*?)```") CODE_REQUIRED_MARKER = "[CODE_REQUIRED]" CANVAS_START_MARKER = "[CANVAS_START]" CANVAS_END_MARKER = "[CANVAS_END]" # --- E2B Sandbox Initialization with Retry --- @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=20), retry=retry_if_exception_type(Exception)) def initialize_sandbox(): logger.info("Initializing e2b Sandbox with 300-second timeout...") return Sandbox(api_key=E2B_API_KEY, timeout=300) try: sandbox = initialize_sandbox() logger.info("e2b Sandbox initialized successfully.") except Exception as e: logger.error(f"Failed to initialize e2b Sandbox after multiple retries: {e}") raise e # --- Web Search and Content Processing Classes --- class DataCollector: @staticmethod def web_search(query: str) -> List[Dict]: with DDGS() as ddgs: results = ddgs.text(query, region='fr-fr', max_results=WEB_SEARCH_CONFIG['max_results']) return [{ 'url': r['href'], 'title': r['title'], 'domain': urlparse(r['href']).netloc.replace('www.', '') } for r in results if urlparse(r['href']).scheme in ['http', 'https']] class ContentProcessor: def __init__(self): self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'} def fetch_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=WEB_SEARCH_CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: logger.error(f"Erreur sur {url}: {str(e)}") return '' class AICore: @staticmethod def query_ai(system_prompt: str, user_content: str, temp: float = 0.1, model: str = WEB_SEARCH_CONFIG['model']) -> str: headers = { "Authorization": f"Bearer {WEB_SEARCH_CONFIG['openrouter_api_key']}", "Content-Type": "application/json", "HTTP-Referer": "https://your-app-url", # Replace with your app URL "X-Title": "Flask Search AI" } data = { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "temperature": temp, "max_tokens": 2000 } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Erreur API OpenRouter: {str(e)}" # --- OpenRouter API wrappers --- def open_router_chat_completion_stream(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0): headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", } payload = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": 2000, "stream": True } response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, stream=True, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() for line in response.iter_lines(): if line: decoded_line = line.decode('utf-8') if decoded_line.startswith("data: "): data_str = decoded_line[len("data: "):] try: data = json.loads(data_str) yield data except Exception: continue def open_router_chat_completion(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0) -> Dict: headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", } payload = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": 2000, "stream": False } response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() return response.json() # --- Utility functions --- def extract_code_blocks(text: str) -> Tuple[Optional[str], Optional[str]]: if CODE_REQUIRED_MARKER in text: text_no_marker = text.replace(CODE_REQUIRED_MARKER, "", 1).strip() code_match = CODE_PATTERN.search(text_no_marker) if code_match: return code_match.group(1).strip(), None else: code_match = CODE_PATTERN.search(text) if code_match: return None, code_match.group(1).strip() return None, None # PATCH: Updated to use execute_sync and properly handle logs def run_code_in_sandbox(code: str) -> str: start_time = time.time() try: logger.info("Running code in e2b sandbox using execute_sync...") execution = sandbox.execute_sync(code) logs = execution.logs if hasattr(execution, 'logs') else None stdout_logs = logs.stdout if logs and hasattr(logs, 'stdout') else [] stderr_logs = logs.stderr if logs and hasattr(logs, 'stderr') else [] stdout = "\n".join(stdout_logs) stderr = "\n".join(stderr_logs) if stderr: return f"Stdout:\n{stdout}\n\nStderr:\n{stderr}" return stdout except Exception as e: duration = time.time() - start_time error_msg = f"Erreur lors de l'exécution du code dans e2b après {duration:.2f} secondes. Erreur: {e}" logger.exception(error_msg) return error_msg def extract_numeric_result(exec_result: str) -> Optional[str]: if "Stdout:" in exec_result: stdout_section = exec_result.split("Stdout:\n", 1)[1].split("\n\nStderr:")[0].strip() else: stdout_section = exec_result.strip() lines = [l.strip() for l in stdout_section.splitlines() if l.strip()] if not lines: return None last_line = lines[-1] if re.search(r'\d', last_line): return last_line return None def format_sse_event(data: Dict[str, Any]) -> str: """Formats data as an SSE event.""" return f"data: {json.dumps(data)}\n\n" # --- Flask Routes --- @app.route('/models', methods=['GET']) def get_models() -> Response: """Returns the list of valid models.""" return jsonify({"models": VALID_MODELS}) @app.route('/chat', methods=['POST']) def chat_endpoint() -> Response: """ Main chat endpoint. Handles SSE streaming, code execution, canvas modifications, and web search. """ @stream_with_context @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def stream_response(): data = request.get_json() if not data: logger.warning("Requête chat invalide: corps de requête manquant.") yield format_sse_event({"type": "error", "content": "Requête chat invalide : corps de requête manquant."}) return messages = data.get('messages', []) model: str = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip() execute_code: bool = data.get('execute_code', True) web_search_enabled: bool = data.get('web_search', False) if not messages or not isinstance(messages, list) or not model or model not in VALID_MODELS: logger.warning("Requête chat invalide: messages=%s, model=%s", messages, model) yield format_sse_event({"type": "error", "content": "Requête chat invalide : messages ou modèle incorrects."}) return # --- Web Search Integration --- web_search_results = [] if web_search_enabled: user_query = messages[-1]['content'] yield format_sse_event({"type": "web_search_status", "content": "start"}) yield format_sse_event({"type": "web_search", "query": user_query}) search_processor = ContentProcessor() search_results = DataCollector.web_search(user_query) filtered_results = [] for result in search_results: content = search_processor.fetch_content(result['url']) if content: filter_prompt = ( "Tu es un expert en analyse de la pertinence des documents. " f"Détermine si le contenu web suivant est pertinent pour la requête utilisateur : '{user_query}'. " "Réponds uniquement par 'oui' ou 'non'." ) is_relevant = AICore.query_ai(filter_prompt, content[:2000], temp=0.0, model="mistralai/mistral-small") if is_relevant.lower().startswith('oui'): filtered_results.append({'url': result['url'], 'domain': result['domain'], 'content': content}) if len(filtered_results) >= WEB_SEARCH_CONFIG['max_results']: break web_search_results = filtered_results source_urls = [result['url'] for result in web_search_results] yield format_sse_event({"type": "web_sources", "content": source_urls}) yield format_sse_event({"type": "web_search_status", "content": "end"}) base_prompt = ( "🎯 **Rôle : Expert en calcul numérique précis, assistant polyvalent avec canvas interactifs et capacités de recherche Web.**\n" "**Mission :** Répondre aux questions de l'utilisateur, effectuer des calculs précis, fournir des canvas interactifs (texte et code) et, si nécessaire, effectuer des recherches sur le Web pour enrichir la réponse.\n\n" "**Si la question demande un calcul numérique :**\n" "1. Annonce que tu vas exécuter un code Python pour réaliser le calcul.\n" "2. Ajoute le marqueur `[CODE_REQUIRED]` suivi du bloc de code minimal en Python.\n" "3. N'envoie rien d'autre et attends l'exécution du code.\n\n" "**Après l'exécution du code**, explique le résultat en détails en te basant sur le résultat numérique obtenu.\n\n" "**Pour la création ou modification de canvas** (texte ou code) :\n" "Utilise les marqueurs `[CANVAS_START]` et `[CANVAS_END]` pour encadrer le contenu du canvas.\n\n" "**Si la recherche Web est activée :**\n" "1. Tu recevras des extraits de pages Web pertinentes.\n" "2. Utilise ces informations pour compléter ta réponse, en citant les sources de manière concise (ex: [nomdedomaine.com]).\n" "3. Si les sources Web ne sont pas pertinentes ou suffisantes, base-toi sur tes connaissances internes.\n\n" "Si aucune de ces conditions n'est remplie, réponds directement et clairement à la question." ) system_prompt = {"role": "system", "content": base_prompt} if web_search_results: sources_summary = "\n\n".join([ f"[{result['domain']}]\nExtrait: {result['content'][:500]}" for result in web_search_results ]) web_search_context = {"role": "user", "content": f"Voici des extraits de pages web pertinentes pour la question:\n\n{sources_summary}"} messages_with_system_prompt = [system_prompt, web_search_context] + messages else: messages_with_system_prompt = [system_prompt] + messages full_reply_text = "" code_to_execute = None code_to_display = None initial_response_completed = False try: router_response_stream = open_router_chat_completion_stream(messages_with_system_prompt, model, 0.0) for chunk in router_response_stream: if initial_response_completed: break delta_content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "") if not delta_content: continue delta_content_no_marker = delta_content.replace(CODE_REQUIRED_MARKER, "") full_reply_text += delta_content_no_marker if CANVAS_START_MARKER in full_reply_text and CANVAS_END_MARKER in full_reply_text: start_index = full_reply_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = full_reply_text.find(CANVAS_END_MARKER) yield format_sse_event({"type": "canvas_start", "content": full_reply_text[start_index:end_index].strip()}) continue yield format_sse_event({"type": "text", "content": delta_content_no_marker}) if execute_code: code_to_execute, code_to_display = extract_code_blocks(full_reply_text) if code_to_execute: yield format_sse_event({"type": "code", "content": code_to_display}) yield format_sse_event({"type": "analysis_status", "content": "start"}) exec_result = run_code_in_sandbox(code_to_execute) numeric_result = extract_numeric_result(exec_result) yield format_sse_event({"type": "code_result", "content": exec_result}) yield format_sse_event({"type": "analysis_status", "content": "end"}) full_reply_text += f"\nRésultat du code:\n```\n{exec_result}\n```\n" initial_response_completed = True break yield format_sse_event({"type": "final", "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}) except Exception as e: error_message = f"Erreur serveur IA: {str(e)}" logger.exception(error_message) yield format_sse_event({"type": "error", "content": error_message}) finally: yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "done", "is_code_execution": code_to_execute is not None}) return Response(stream_response(), mimetype='text/event-stream') @app.route('/canvas_modify', methods=['POST']) def canvas_modify_endpoint() -> Response: data = request.get_json() if not data: logger.warning("Requête canvas_modify sans données.") return jsonify({"error": "Requête canvas_modify invalide : corps de requête manquant."}), 400 user_message = data.get('message') canvas_content = data.get('canvas_content') model = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip() if not user_message or canvas_content is None: logger.warning("Requête canvas_modify incomplète: message=%s, canvas_content=%s", user_message, canvas_content) return jsonify({"error": "Requête canvas_modify invalide : message ou contenu canvas manquant."}), 400 canvas_chat_messages = [ {"role": "system", "content": ( "Tu es dans un chat de canvas. L'utilisateur va te demander de modifier le contenu du canvas suivant. " "Fournis UNIQUEMENT le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR, encadré par `[CANVAS_START]` et `[CANVAS_END]`. " "Ne réponds pas avec du texte en dehors de ces marqueurs, sauf une courte phrase de confirmation après le canvas. " "Maintiens le contexte des modifications précédentes du canvas, en te basant sur l'historique de ce chat canvas." )}, {"role": "assistant", "content": f"[CANVAS_START]\n{canvas_content}\n[CANVAS_END] Voici le canvas actuel. Dites-moi comment le modifier."}, {"role": "user", "content": user_message} ] try: const_response = open_router_chat_completion(canvas_chat_messages, model, 0.0) ai_response_text = const_response.get("choices", [{}])[0].get("message", {}).get("content", "") or "" if CANVAS_START_MARKER in ai_response_text and CANVAS_END_MARKER in ai_response_text: start_index = ai_response_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = ai_response_text.find(CANVAS_END_MARKER) updated_canvas_content = ai_response_text[start_index:end_index].strip() response_message = "Canvas mis à jour." logger.info("Canvas modifié avec succès.") return jsonify({"updated_canvas_content": updated_canvas_content, "response_message": response_message}) else: logger.error("Réponse IA canvas_modify invalide: marqueurs CANVAS_START/END manquants.") return jsonify({"error": "Réponse IA invalide pour la modification du canvas", "full_response": ai_response_text}), 500 except Exception as e: error_message = f"Erreur serveur (canvas_modify): {str(e)}" logger.exception(error_message) return jsonify({"error": error_message}), 500 @app.route('/run', methods=['POST']) def run_code_endpoint() -> Response: """ Endpoint for code execution via the Piston API (used for canvas code). """ data = request.get_json() if not data: logger.warning("Requête run sans données.") return jsonify({"error": "Requête run invalide : corps de requête manquant."}), 400 language = data.get("language") code = data.get("code") stdin = data.get("stdin", "") if not language or not code: logger.warning("Requête run incomplète: language=%s, code=%s", language, code) return jsonify({"error": "Requête run invalide : langage et code requis."}), 400 payload = { "language": language, "version": "*", "files": [{"content": code}], "stdin": stdin } try: # Using a more reasonable timeout (e.g., 30 seconds) piston_response = requests.post(PISTON_API_URL, json=payload, timeout=30) piston_response.raise_for_status() logger.info("Code exécuté avec Piston avec succès.") return jsonify(piston_response.json()) except requests.RequestException as e: error_message = f"Erreur lors de l'appel à l'API Piston: {str(e)}" logger.exception(error_message) piston_text = piston_response.text if 'piston_response' in locals() and piston_response is not None else "" return jsonify({"error": "Erreur d'exécution du code via l'API Piston.", "piston_response": piston_text, "details": error_message}), 500 except Exception as e: error_message = f"Erreur inattendue lors de l'exécution du code: {str(e)}" logger.exception(error_message) return jsonify({"error": "Erreur inattendue lors de l'exécution du code.", "details": error_message}), 500 @app.route('/run', methods=['OPTIONS']) def run_code_options() -> Response: """Handles OPTIONS requests for the /run endpoint (CORS preflight).""" return jsonify({'message': 'OPTIONS requests allowed'}), 200 @app.route('/upload', methods=['POST']) def upload_file(): if 'file' not in request.files: return jsonify({"error": "No file part"}), 400 files = request.files.getlist('file') if not files or all(file.filename == '' for file in files): return jsonify({"error": "No selected file"}), 400 uploaded_files = [] upload_folder = app.config['UPLOAD_FOLDER'] if not os.path.exists(upload_folder): os.makedirs(upload_folder) # Extended allowed file extensions allowed_ext = ['.txt', '.py', '.md', '.js', '.json', '.pdf', '.png', '.jpg', '.jpeg'] for file in files: if file.filename == '': continue filename = secure_filename(file.filename) file_path = os.path.join(upload_folder, filename) try: file.save(file_path) file_ext = os.path.splitext(filename)[1].lower() file_content = "" if file_ext in ['.txt', '.py', '.md', '.js', '.json']: with open(file_path, 'r', encoding='utf-8') as f: file_content = f.read() elif file_ext == '.pdf': try: reader = PyPDF2.PdfReader(file_path) for page in reader.pages: file_content += page.extract_text() + "\n" except Exception as e: logger.exception("PDF extraction failed") file_content = f"Erreur extraction PDF: {str(e)}" elif file_ext in ['.png', '.jpg', '.jpeg']: try: img = Image.open(file_path) file_content = pytesseract.image_to_string(img) except Exception as e: logger.exception("OCR extraction failed") file_content = f"Erreur OCR extraction: {str(e)}" uploaded_files.append({"filename": filename, "file_path": file_path, "content": file_content}) except Exception as e: logger.exception("File upload failed") return jsonify({"error": str(e)}), 500 return jsonify({"message": "Files uploaded successfully", "files": uploaded_files}), 200 if __name__ == '__main__': if os.environ.get('FLASK_ENV') == 'development': app.run(debug=True, port=5000) else: app.run(port=5000)
run_code_in_sandbox
function now safely checks if logs are available and uses sandbox.execute_sync
./run
endpoint now uses a 30‑second timeout instead of an extremely high value./chat
endpoint, the response stream checks for valid delta content and immediately breaks out once code execution is complete.Feel free to adjust any parameters (like timeout values or log messages) as needed for your environment.