Your service is live 🎉 127.0.0.1 - - [18/Feb/2025...
تم الإنشاء في: ١٨ فبراير ٢٠٢٥
تمت الإجابة باستخدام O3 Mini High بواسطة Chat01
تم الإنشاء في: ١٨ فبراير ٢٠٢٥
تمت الإجابة باستخدام O3 Mini High بواسطة Chat01
Your service is live 🎉
127.0.0.1 - - [18/Feb/2025:12:12:06 +0000] "GET / HTTP/1.1" 404 207 "-" "Go-http-client/2.0"
127.0.0.1 - - [18/Feb/2025:12:12:12 +0000] "OPTIONS /chat HTTP/1.1" 200 0 "https://pl1nkochat-1.onrender.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
INFO:app:Envoi événement CODE_BLOCK
INFO:app:Début de l'exécution du code dans le sandbox e2b.
INFO:app:Utilisation de la méthode sandbox.run_code
INFO:e2b.sandbox_sync.main:Request: POST https://49999-iqbmzptqfll8cy9yqyo1a-4818f120.e2b.dev/execute
INFO:e2b.sandbox_sync.main:Response: 200 https://49999-iqbmzptqfll8cy9yqyo1a-4818f120.e2b.dev/execute
INFO:httpx:HTTP Request: POST https://49999-iqbmzptqfll8cy9yqyo1a-4818f120.e2b.dev/execute "HTTP/1.1 200 OK"
INFO:app:Envoi événement EXECUTED_CODE_RESULT
ERROR:app:Erreur serveur IA: 'str' object has no attribute 'get'
Traceback (most recent call last):
File "/opt/render/project/src/app.py", line 425, in stream_response
delta_content = chunk.get("choices", [{}])[0].get("message", {}).get("content", "") or ""
^^^^^^^^^
AttributeError: 'str' object has no attribute 'get'
127.0.0.1 - - [18/Feb/2025:12:12:21 +0000] "POST /chat HTTP/1.1" 200 1483 "https://pl1nkochat-1.onrender.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
127.0.0.1 - - [18/Feb/2025:12:12:32 +0000] "GET /models HTTP/1.1" 200 157 "https://pl1nkochat-1.onrender.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
127.0.0.1 - - [18/Feb/2025:12:12:35 +0000] "OPTIONS /chat HTTP/1.1" 200 0 "https://pl1nkochat-1.onrender.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
INFO:app:Envoi événement CODE_BLOCK
INFO:app:Début de l'exécution du code dans le sandbox e2b.
INFO:app:Utilisation de la méthode sandbox.run_code
INFO:e2b.sandbox_sync.main:Request: POST https://49999-iqbmzptqfll8cy9yqyo1a-4818f120.e2b.dev/execute
INFO:e2b.sandbox_sync.main:Response: 200 https://49999-iqbmzptqfll8cy9yqyo1a-4818f120.e2b.dev/execute
INFO:httpx:HTTP Request: POST https://49999-iqbmzptqfll8cy9yqyo1a-4818f120.e2b.dev/execute "HTTP/1.1 200 OK"
INFO:app:Envoi événement EXECUTED_CODE_RESULT
ERROR:app:Erreur serveur IA: 'str' object has no attribute 'get'
Traceback (most recent call last):
File "/opt/render/project/src/app.py", line 425, in stream_response
delta_content = chunk.get("choices", [{}])[0].get("message", {}).get("content", "") or ""
^^^^^^^^^
AttributeError: 'str' object has no attribute 'get'
127.0.0.1 - - [18/Feb/2025:12:12:41 +0000] "POST /chat HTTP/1.1" 200 1469 "https://pl1nkochat-1.onrender.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
from gevent import monkey
monkey.patch_all()
import os
import re
import json
import logging
from typing import Any, Dict, List, Optional, Tuple
import time
import datetime
from tqdm import tqdm
from flask import Flask, request, Response, jsonify, stream_with_context
from flask_cors import CORS
from e2b_code_interpreter import Sandbox
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from werkzeug.utils import secure_filename # Ajout pour la gestion sécurisée des noms de fichier
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
app = Flask(name)
CORS(app)
app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(file)), 'uploads') # Dossier de destination
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
E2B_API_KEY = os.environ.get("E2B_API_KEY")
OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") # Ajout de la clé OpenRouter
PISTON_API_URL = "https://emkc.org/api/v2/piston/execute"
@retry(stop=stop_after_attempt(5), # Tentatives augmentées à 5
wait=wait_exponential(multiplier=1, min=4, max=20), # Backoff jusqu'à 20 secondes
retry=retry_if_exception_type(Exception))
def initialize_sandbox():
"""Initializes the e2b Sandbox with a higher timeout."""
logger.info("Initializing e2b Sandbox with 300-second timeout...")
return Sandbox(api_key=E2B_API_KEY, timeout=300) # Timeout augmenté à 300 secondes
try:
sandbox = initialize_sandbox()
logger.info("e2b Sandbox initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize e2b Sandbox after multiple retries: {e}")
# It's crucial to handle the failure of sandbox initialization.
# You might want to exit the application or have a fallback mechanism.
# For now, let's raise the exception to prevent the app from starting improperly.
raise e
VALID_MODELS = [
"google/gemini-2.0-pro-exp-02-05:free",
"deepseek/deepseek-r1:free",
"qwen/qwen2.5-vl-72b-instruct:free",
"meta-llama/llama-3.3-70b-instruct:free"
]
CODE_PATTERN = re.compile(r"python\s*([\s\S]*?)
")
CODE_REQUIRED_MARKER = "[CODE_REQUIRED]"
CANVAS_START_MARKER = "[CANVAS_START]"
CANVAS_END_MARKER = "[CANVAS_END]"
WEB_SEARCH_CONFIG = {
'openrouter_api_key': OPENROUTER_API_KEY,
'max_results': 9, # Augmenté à 9 par défaut
'cache_file': 'search_cache.json', # Le cache est géré, mais pas crucial pour le backend Flask
'model': 'mistralai/mistral-medium', # Modèle OpenRouter pour le filtrage et la synthèse. Peut être changé.
'timeout': 20,
'min_quality_score': 0.65 # Ce paramètre n'est plus utilisé directement, remplacé par un filtrage AI plus précis.
}
class DataCollector:
@staticmethod
def web_search(query: str) -> List[Dict]:
with DDGS() as ddgs:
results = ddgs.text(query, region='fr-fr', max_results=WEB_SEARCH_CONFIG['max_results'])
return [{
'url': r['href'],
'title': r['title'],
'domain': urlparse(r['href']).netloc.replace('www.', '')
} for r in results if urlparse(r['href']).scheme in ['http', 'https']]
class ContentProcessor:
def init(self):
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
textdef fetch_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=WEB_SEARCH_CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: logger.error(f"Erreur sur {url}: {str(e)}") return ''
class AICore:
@staticmethod
def query_ai(system_prompt: str, user_content: str, temp: float = 0.1, model:str = WEB_SEARCH_CONFIG['model']) -> str: # Ajout du paramètre model
headers = {
"Authorization": f"Bearer {WEB_SEARCH_CONFIG['openrouter_api_key']}",
"Content-Type": "application/json",
"HTTP-Referer": "https://your-app-url", # Remplacez par l'URL de votre application
"X-Title": "Flask Search AI"
}
textdata = { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "temperature": temp, "max_tokens": 2000 } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() # On lève une exception si le statut n'est pas 200 return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Erreur API OpenRouter: {str(e)}"
def open_router_chat_completion_stream(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0):
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": 2000,
"stream": True
}
response = requests.post("https://openrouter.ai/api/v1/chat/completions",
headers=headers, json=payload, stream=True, timeout=WEB_SEARCH_CONFIG['timeout'])
response.raise_for_status()
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith("data: "):
data_str = decoded_line[len("data: "):]
try:
data = json.loads(data_str)
yield data
except Exception:
continue
def open_router_chat_completion(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0) -> Dict:
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": 2000,
"stream": False
}
response = requests.post("https://openrouter.ai/api/v1/chat/completions",
headers=headers, json=payload, timeout=WEB_SEARCH_CONFIG['timeout'])
response.raise_for_status()
return response.json()
def extract_code_blocks(text: str) -> Tuple[Optional[str], Optional[str]]:
"""Extrait les blocs de code Python et gère le marqueur [CODE_REQUIRED]."""
if CODE_REQUIRED_MARKER in text:
text_no_marker = text.replace(CODE_REQUIRED_MARKER, "", 1).strip()
code_match = CODE_PATTERN.search(text_no_marker)
if code_match:
return code_match.group(1).strip(), None
else:
code_match = CODE_PATTERN.search(text)
if code_match:
return None, code_match.group(1).strip() # code à afficher sans exécution
return None, None
def run_code_in_sandbox(code: str) -> str:
"""Exécute le code Python dans le sandbox e2b et retourne le résultat."""
start_time = time.time()
try:
logger.info("Début de l'exécution du code dans le sandbox e2b.")
execution = None
method_name = next((method for method in ["run_code_sync", "execute_sync", "execute_code", "execute", "run_code", "run"] if hasattr(sandbox, method)), None)
if method_name:
logger.info(f"Utilisation de la méthode sandbox.{method_name}")
execution = getattr(sandbox, method_name)(code)
else:
error_msg = "Aucune méthode d'exécution de code disponible dans le sandbox."
logger.error(error_msg)
return error_msg
logs = execution.logs
stdout_logs = logs.stdout if hasattr(logs, 'stdout') else []
stderr_logs = logs.stderr if hasattr(logs, 'stderr') else []
stdout = "\n".join(stdout_logs)
stderr = "\n".join(stderr_logs)
if stderr:
return f"Stdout:\n{stdout}\n\nStderr:\n{stderr}"
return stdout
except Exception as e:
duration = time.time() - start_time
error_msg = f"Erreur lors de l'exécution du code dans e2b après {duration:.2f} secondes: {str(e)}"
logger.exception(error_msg)
return f"Erreur exécution code: {str(e)}"
def extract_numeric_result(exec_result: str) -> Optional[str]:
"""Extrait la dernière ligne numérique du résultat d'exécution de code."""
if "Stdout:" in exec_result:
stdout_section = exec_result.split("Stdout:\n", 1)[1].split("\n\nStderr:")[0].strip()
else:
stdout_section = exec_result.strip()
lines = [l.strip() for l in stdout_section.splitlines() if l.strip()]
if not lines:
return None
last_line = lines[-1]
if re.search(r'\d', last_line):
return last_line
return None
@app.route('/models', methods=['GET'])
def get_models() -> Response:
"""Retourne la liste des modèles valides."""
return jsonify({"models": VALID_MODELS})
@app.route('/chat', methods=['POST'])
def chat_endpoint() -> Response:
"""
Point d'entrée principal pour le chat.
Gère le streaming SSE, l'exécution de code, les canvas et la recherche Web.
"""
@stream_with_context
@retry(stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(Exception))
def stream_response():
data = request.get_json()
if not data:
logger.warning("Requête chat invalide: corps de requête manquant.")
yield format_sse_event({"type": "error", "content": "Requête chat invalide : corps de requête manquant."})
return
textmessages = data.get('messages', []) model: str = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip() execute_code: bool = data.get('execute_code', True) web_search_enabled: bool = data.get('web_search', False) # Récupère l'état de la recherche Web if not messages or not isinstance(messages, list) or not model or model not in VALID_MODELS: logger.warning("Requête chat invalide: messages=%s, model=%s", messages, model) yield format_sse_event({"type": "error", "content": "Requête chat invalide : messages ou modèle incorrects."}) return # --- Intégration de la recherche Web --- web_search_results = [] if web_search_enabled: user_query = messages[-1]['content'] # On utilise le dernier message de l'utilisateur comme requête yield format_sse_event({"type": "web_search_status", "content": "start"}) yield format_sse_event({"type": "web_search", "query": user_query}) # Envoi de la requête pour l'afficher search_processor = ContentProcessor() search_results = DataCollector.web_search(user_query) # Filtrage et extraction du contenu (comme dans le CLI, mais intégré ici) filtered_results = [] for result in search_results: content = search_processor.fetch_content(result['url']) if content: # Filtrage de pertinence avec OpenRouter (comme dans le CLI) filter_prompt = ( "Tu es un expert en analyse de la pertinence des documents. " f"Détermine si le contenu web suivant est pertinent pour la requête utilisateur : '{user_query}'. " "Réponds uniquement par 'oui' ou 'non'." ) is_relevant = AICore.query_ai(filter_prompt, content[:2000], temp=0.0, model="mistralai/mistral-small") # Filtrage plus rapide if is_relevant.lower().startswith('oui'): filtered_results.append({'url': result['url'], 'domain': result['domain'], 'content': content}) if len(filtered_results) >= WEB_SEARCH_CONFIG['max_results']: break web_search_results = filtered_results # Envoi des URLs des sources *après* le filtrage source_urls = [result['url'] for result in web_search_results] yield format_sse_event({"type": "web_sources", "content": source_urls}) yield format_sse_event({"type": "web_search_status", "content": "end"}) # --- Construction du prompt système (adapté pour la recherche Web) --- base_prompt = ( "🎯 **Rôle : Expert en calcul numérique précis, assistant polyvalent avec canvas interactifs et capacités de recherche Web.**\n" "**Mission :** Répondre aux questions de l'utilisateur, effectuer des calculs précis, fournir des canvas interactifs (texte et code) et, si nécessaire, effectuer des recherches sur le Web pour enrichir la réponse.\n\n" "**Si la question demande un calcul numérique :**\n" "1. Annonce que tu vas exécuter un code Python pour réaliser le calcul.\n" "2. Ajoute le marqueur `[CODE_REQUIRED]` suivi du bloc de code minimal en Python.\n" "3. N'envoie rien d'autre et attends l'exécution du code.\n\n" "**Après l'exécution du code**, explique le résultat en détails en te basant sur le résultat numérique obtenu.\n\n" "**Pour la création ou modification de canvas** (texte ou code) :\n" "Utilise les marqueurs `[CANVAS_START]` et `[CANVAS_END]` pour encadrer le contenu du canvas.\n\n" "**Si la recherche Web est activée :**\n" "1. Tu recevras des extraits de pages Web pertinentes.\n" "2. Utilise ces informations pour compléter ta réponse, en citant les sources de manière concise (ex: [nomdedomaine.com]).\n" "3. Si les sources Web ne sont pas pertinentes ou suffisantes, base-toi sur tes connaissances internes.\n\n" "Si aucune de ces conditions n'est remplie, réponds directement et clairement à la question." ) # --- Préparation des messages pour Groq --- system_prompt = {"role": "system", "content": base_prompt} # Ajout des résultats de la recherche Web au contexte, si applicable if web_search_results: # Création d'un résumé des sources pour Groq (plus concis que d'envoyer tout le contenu) sources_summary = "\n\n".join([ f"[{result['domain']}]\nExtrait: {result['content'][:500]}" # Limite la longueur de l'extrait for result in web_search_results ]) web_search_context = {"role": "user", "content": f"Voici des extraits de pages web pertinentes pour la question:\n\n{sources_summary}"} messages_with_system_prompt = [system_prompt, web_search_context] + messages else: messages_with_system_prompt = [system_prompt] + messages full_reply_text = "" code_to_execute = None code_to_display = None is_code_used = False canvas_content = None code_executed = False prompt_tokens = 0 completion_tokens = 0 total_tokens = 0 initial_response_completed = False try: response_stream = open_router_chat_completion_stream(messages_with_system_prompt, model, 0.0) for chunk in response_stream: # Si chunk n'est pas un dict, tenter de le parser en JSON et vérifier son type if not isinstance(chunk, dict): try: chunk_parsed = json.loads(chunk) if not isinstance(chunk_parsed, dict): continue chunk = chunk_parsed except Exception: continue if initial_response_completed: break delta_content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "") or "" delta_content_no_marker = delta_content.replace(CODE_REQUIRED_MARKER, "") full_reply_text += delta_content_no_marker # Extraction du code dans le texte complet code_to_execute, code_to_display = extract_code_blocks(full_reply_text) # Si aucun code destiné à l'exécution n'est extrait mais qu'un bloc est trouvé, utiliser ce dernier if not code_to_execute and code_to_display: code_to_execute = code_to_display # Envoi du canvas s'il est détecté if CANVAS_START_MARKER in full_reply_text and CANVAS_END_MARKER in full_reply_text: start_index = full_reply_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = full_reply_text.find(CANVAS_END_MARKER) canvas_content_extracted = full_reply_text[start_index:end_index].strip() canvas_content = canvas_content_extracted delta_content_for_chat = delta_content_no_marker.replace(CANVAS_START_MARKER, "").replace(CANVAS_END_MARKER, "") yield format_sse_event({"type": "text", "content": delta_content_for_chat}) yield format_sse_event({"type": "canvas_start", "content": canvas_content}) continue else: yield format_sse_event({"type": "text", "content": delta_content_no_marker}) if code_to_execute: initial_response_completed = True if execute_code and code_to_execute: is_code_used = True logger.info("Envoi événement CODE_BLOCK") yield format_sse_event({"type": "code_block", "content": code_to_execute}) yield format_sse_event({"type": "code_execution_start"}) execution_result = run_code_in_sandbox(code_to_execute) numeric_result = extract_numeric_result(execution_result) if numeric_result: code_executed = True logger.info("Envoi événement EXECUTED_CODE_RESULT") yield format_sse_event({"type": "executed_code_result", "content": numeric_result}) conclusion_prompt_content = ( f"**[INSTRUCTION SUITE - APRES EXECUTION DU CODE]** Tu as exécuté le code Python suivant:\n" f"```python\n{code_to_execute}\n```\n" f"Le résultat du calcul est: **{numeric_result}**. " "Continue ensuite la conversation en t'appuyant sur ce résultat." ) conclusion_prompt = [{"role": "user", "content": conclusion_prompt_content}] conclusion_stream = open_router_chat_completion(conclusion_prompt, model, 0.0) for chunk in conclusion_stream: delta_content = chunk.get("choices", [{}])[0].get("message", {}).get("content", "") or "" yield format_sse_event({"type": "text", "content": delta_content}) yield format_sse_event({"type": "final"}) else: code_executed = True logger.info("Envoi événement EXECUTED_CODE_ERROR") yield format_sse_event({"type": "executed_code_error", "content": execution_result}) yield format_sse_event({"type": "final", "result": "Fin de la réponse (erreur ou pas de résultat numérique)."}) prompt_tokens = 0 # Comptage non implémenté completion_tokens = 0 total_tokens = prompt_tokens + completion_tokens if not code_executed: logger.info("Envoi événement FINAL (sans code exécuté)") yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) else: yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) except Exception as e: error_message = f"Erreur serveur IA: {str(e)}" logger.exception(error_message) yield format_sse_event({"type": "error", "content": error_message}) finally: yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "done", "is_code_execution": is_code_used}) return Response(stream_with_context(stream_response()), mimetype='text/event-stream')
@app.route('/canvas_modify', methods=['POST'])
def canvas_modify_endpoint() -> Response:
data = request.get_json()
if not data:
logger.warning("Requête canvas_modify sans données.")
return jsonify({"error": "Requête canvas_modify invalide : corps de requête manquant."}), 400
user_message = data.get('message')
canvas_content = data.get('canvas_content')
model = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip()
if not user_message or canvas_content is None:
logger.warning("Requête canvas_modify incomplète: message=%s, canvas_content=%s", user_message, canvas_content)
return jsonify({"error": "Requête canvas_modify invalide : message ou contenu canvas manquant."}), 400
canvas_chat_messages = [
{"role": "system", "content": (
"Tu es dans un chat de canvas. L'utilisateur va te demander de modifier le contenu du canvas suivant. "
"Fournis UNIQUEMENT le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR, encadré par [CANVAS_START]
et [CANVAS_END]
. "
"Ne réponds pas avec du texte en dehors de ces marqueurs, sauf une courte phrase de confirmation après le canvas. "
"Maintiens le contexte des modifications précédentes du canvas, en te basant sur l'historique de ce chat canvas."
)},
{"role": "assistant", "content": f"[CANVAS_START]\n{canvas_content}\n[CANVAS_END] Voici le canvas actuel. Dites-moi comment le modifier."},
{"role": "user", "content": user_message}
]
text@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def canvas_modify_groq_call(canvas_chat_messages, model): return open_router_chat_completion(canvas_chat_messages, model, 0.0) try: response_data = canvas_modify_groq_call(canvas_chat_messages, model) if isinstance(response_data, dict): ai_response_text = response_data.get("choices", [{}])[0].get("message", {}).get("content", "") or "" else: ai_response_text = response_data # Si c'est une chaîne, l'utiliser directement if CANVAS_START_MARKER in ai_response_text and CANVAS_END_MARKER in ai_response_text: start_index = ai_response_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = ai_response_text.find(CANVAS_END_MARKER) updated_canvas_content = ai_response_text[start_index:end_index].strip() response_message = "Canvas mis à jour." logger.info("Canvas modifié avec succès.") return jsonify({"updated_canvas_content": updated_canvas_content, "response_message": response_message}) else: logger.error("Réponse IA canvas_modify invalide: marqueurs CANVAS_START/END manquants.") return jsonify({"error": "Réponse IA invalide pour la modification du canvas", "full_response": ai_response_text}), 500 except Exception as e: error_message = f"Erreur serveur (canvas_modify): {str(e)}" logger.exception(error_message) return jsonify({"error": error_message}), 500
@app.route('/run', methods=['POST'])
def run_code_endpoint() -> Response:
"""
Point d'accès pour exécuter du code via l'API Piston (pour canvas code).
"""
data = request.get_json()
if not data:
logger.warning("Requête run sans données.")
return jsonify({"error": "Requête run invalide : corps de requête manquant."}), 400
language = data.get("language")
code = data.get("code")
stdin = data.get("stdin", "")
if not language or not code:
logger.warning("Requête run incomplète: language=%s, code=%s", language, code)
return jsonify({"error": "Requête run invalide : langage et code requis."}), 400
payload = {
"language": language,
"version": "*",
"files": [{"content": code}],
"stdin": stdin
}
try:
piston_response = requests.post(PISTON_API_URL, json=payload, timeout=3000000)
piston_response.raise_for_status()
logger.info("Code exécuté avec Piston avec succès.")
return jsonify(piston_response.json())
except requests.RequestException as e:
error_message = f"Erreur lors de l'appel à l'API Piston: {str(e)}"
logger.exception(error_message)
piston_text = piston_response.text if 'piston_response' in locals() and piston_response is not None else ""
return jsonify({"error": "Erreur d'exécution du code via l'API Piston.", "piston_response": piston_text, "details": error_message}), 500
except Exception as e:
error_message = f"Erreur inattendue lors de l'exécution du code: {str(e)}"
logger.exception(error_message)
return jsonify({"error": "Erreur inattendue lors de l'exécution du code.", "details": error_message}), 500
@app.route('/run', methods=['OPTIONS'])
def run_code_options() -> Response:
"""Gère les requêtes OPTIONS pour l'endpoint /run (CORS preflight)."""
return jsonify({'message': 'OPTIONS requests allowed'}), 200
@app.route('/upload', methods=['POST'])
def upload_file():
# Upload de fichier similaire à dp.py
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
filename = secure_filename(file.filename)
upload_folder = app.config['UPLOAD_FOLDER']
if not os.path.exists(upload_folder):
os.makedirs(upload_folder)
file_path = os.path.join(upload_folder, filename)
try:
file.save(file_path)
return jsonify({"message": "File uploaded successfully", "file_path": file_path}), 200
except Exception as e:
logger.exception("File upload failed")
return jsonify({"error": str(e)}), 500
def format_sse_event(data: Dict[str, Any]) -> str:
"""Formate les données en événement SSE (Server-Sent Events)."""
return f"data: {json.dumps(data)}\n\n"
if name == 'main':
if os.environ.get('FLASK_ENV') == 'development':
app.run(debug=True, port=5000)
show me the full patched code without this bug pls
Below is the full code with the fixes applied. In summary, the changes are:
pythonfrom gevent import monkey monkey.patch_all() import os import re import json import logging from typing import Any, Dict, List, Optional, Tuple import time import datetime from tqdm import tqdm from flask import Flask, request, Response, jsonify, stream_with_context from flask_cors import CORS from e2b_code_interpreter import Sandbox from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import requests from duckduckgo_search import DDGS from bs4 import BeautifulSoup from urllib.parse import urlparse from werkzeug.utils import secure_filename # Ajout pour la gestion sécurisée des noms de fichier # --- Configuration --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) CORS(app) app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads') # Dossier de destination GROQ_API_KEY = os.environ.get("GROQ_API_KEY") E2B_API_KEY = os.environ.get("E2B_API_KEY") OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") # Ajout de la clé OpenRouter PISTON_API_URL = "https://emkc.org/api/v2/piston/execute" # --- E2B Sandbox Initialization with Retry --- @retry(stop=stop_after_attempt(5), # Tentatives augmentées à 5 wait=wait_exponential(multiplier=1, min=4, max=20), # Backoff jusqu'à 20 secondes retry=retry_if_exception_type(Exception)) def initialize_sandbox(): """Initializes the e2b Sandbox with a higher timeout.""" logger.info("Initializing e2b Sandbox with 300-second timeout...") return Sandbox(api_key=E2B_API_KEY, timeout=300) # Timeout augmenté à 300 secondes try: sandbox = initialize_sandbox() logger.info("e2b Sandbox initialized successfully.") except Exception as e: logger.error(f"Failed to initialize e2b Sandbox after multiple retries: {e}") # It's crucial to handle the failure of sandbox initialization. # You might want to exit the application or have a fallback mechanism. # For now, let's raise the exception to prevent the app from starting improperly. raise e VALID_MODELS = [ "google/gemini-2.0-pro-exp-02-05:free", "deepseek/deepseek-r1:free", "qwen/qwen2.5-vl-72b-instruct:free", "meta-llama/llama-3.3-70b-instruct:free" ] CODE_PATTERN = re.compile(r"```python\s*([\s\S]*?)```") CODE_REQUIRED_MARKER = "[CODE_REQUIRED]" CANVAS_START_MARKER = "[CANVAS_START]" CANVAS_END_MARKER = "[CANVAS_END]" # --- Configuration pour la recherche Web (comme dans le script CLI) --- WEB_SEARCH_CONFIG = { 'openrouter_api_key': OPENROUTER_API_KEY, 'max_results': 9, # Augmenté à 9 par défaut 'cache_file': 'search_cache.json', # Le cache est géré, mais pas crucial pour le backend Flask 'model': 'mistralai/mistral-medium', # Modèle OpenRouter pour le filtrage et la synthèse. Peut être changé. 'timeout': 20, 'min_quality_score': 0.65 # Ce paramètre n'est plus utilisé directement, remplacé par un filtrage AI plus précis. } # --- Classes pour la recherche Web (adaptées du script CLI) --- class DataCollector: @staticmethod def web_search(query: str) -> List[Dict]: with DDGS() as ddgs: results = ddgs.text(query, region='fr-fr', max_results=WEB_SEARCH_CONFIG['max_results']) return [{ 'url': r['href'], 'title': r['title'], 'domain': urlparse(r['href']).netloc.replace('www.', '') } for r in results if urlparse(r['href']).scheme in ['http', 'https']] class ContentProcessor: def __init__(self): self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} def fetch_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=WEB_SEARCH_CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: logger.error(f"Erreur sur {url}: {str(e)}") return '' class AICore: @staticmethod def query_ai(system_prompt: str, user_content: str, temp: float = 0.1, model:str = WEB_SEARCH_CONFIG['model']) -> str: # Ajout du paramètre model headers = { "Authorization": f"Bearer {WEB_SEARCH_CONFIG['openrouter_api_key']}", "Content-Type": "application/json", "HTTP-Referer": "https://your-app-url", # Remplacez par l'URL de votre application "X-Title": "Flask Search AI" } data = { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "temperature": temp, "max_tokens": 2000 } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() # On lève une exception si le statut n'est pas 200 return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Erreur API OpenRouter: {str(e)}" def open_router_chat_completion_stream(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0): headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", } payload = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": 2000, "stream": True } response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, stream=True, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() for line in response.iter_lines(): if line: decoded_line = line.decode('utf-8') if decoded_line.startswith("data: "): data_str = decoded_line[len("data: "):] try: data = json.loads(data_str) yield data except Exception: continue def open_router_chat_completion(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0) -> Dict: headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", } payload = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": 2000, "stream": False } response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() return response.json() # --- Fonctions utilitaires (restent inchangées) --- def extract_code_blocks(text: str) -> Tuple[Optional[str], Optional[str]]: """Extrait les blocs de code Python et gère le marqueur [CODE_REQUIRED].""" if CODE_REQUIRED_MARKER in text: text_no_marker = text.replace(CODE_REQUIRED_MARKER, "", 1).strip() code_match = CODE_PATTERN.search(text_no_marker) if code_match: return code_match.group(1).strip(), None else: code_match = CODE_PATTERN.search(text) if code_match: return None, code_match.group(1).strip() # code à afficher sans exécution return None, None def run_code_in_sandbox(code: str) -> str: """Exécute le code Python dans le sandbox e2b et retourne le résultat.""" start_time = time.time() try: logger.info("Début de l'exécution du code dans le sandbox e2b.") execution = None method_name = next((method for method in ["run_code_sync", "execute_sync", "execute_code", "execute", "run_code", "run"] if hasattr(sandbox, method)), None) if method_name: logger.info(f"Utilisation de la méthode sandbox.{method_name}") execution = getattr(sandbox, method_name)(code) else: error_msg = "Aucune méthode d'exécution de code disponible dans le sandbox." logger.error(error_msg) return error_msg logs = execution.logs stdout_logs = logs.stdout if hasattr(logs, 'stdout') else [] stderr_logs = logs.stderr if hasattr(logs, 'stderr') else [] stdout = "\n".join(stdout_logs) stderr = "\n".join(stderr_logs) if stderr: return f"Stdout:\n{stdout}\n\nStderr:\n{stderr}" return stdout except Exception as e: duration = time.time() - start_time error_msg = f"Erreur lors de l'exécution du code dans e2b après {duration:.2f} secondes: {str(e)}" logger.exception(error_msg) return f"Erreur exécution code: {str(e)}" def extract_numeric_result(exec_result: str) -> Optional[str]: """Extrait la dernière ligne numérique du résultat d'exécution de code.""" if "Stdout:" in exec_result: stdout_section = exec_result.split("Stdout:\n", 1)[1].split("\n\nStderr:")[0].strip() else: stdout_section = exec_result.strip() lines = [l.strip() for l in stdout_section.splitlines() if l.strip()] if not lines: return None last_line = lines[-1] if re.search(r'\d', last_line): return last_line return None # --- Routes Flask (légères modifications) --- @app.route('/models', methods=['GET']) def get_models() -> Response: """Retourne la liste des modèles valides.""" return jsonify({"models": VALID_MODELS}) @app.route('/chat', methods=['POST']) def chat_endpoint() -> Response: """ Point d'entrée principal pour le chat. Gère le streaming SSE, l'exécution de code, les canvas et la recherche Web. """ @stream_with_context @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def stream_response(): data = request.get_json() if not data: logger.warning("Requête chat invalide: corps de requête manquant.") yield format_sse_event({"type": "error", "content": "Requête chat invalide : corps de requête manquant."}) return messages = data.get('messages', []) model: str = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip() execute_code: bool = data.get('execute_code', True) web_search_enabled: bool = data.get('web_search', False) # Récupère l'état de la recherche Web if not messages or not isinstance(messages, list) or not model or model not in VALID_MODELS: logger.warning("Requête chat invalide: messages=%s, model=%s", messages, model) yield format_sse_event({"type": "error", "content": "Requête chat invalide : messages ou modèle incorrects."}) return # --- Intégration de la recherche Web --- web_search_results = [] if web_search_enabled: user_query = messages[-1]['content'] # On utilise le dernier message de l'utilisateur comme requête yield format_sse_event({"type": "web_search_status", "content": "start"}) yield format_sse_event({"type": "web_search", "query": user_query}) # Envoi de la requête pour l'afficher search_processor = ContentProcessor() search_results = DataCollector.web_search(user_query) # Filtrage et extraction du contenu (comme dans le CLI, mais intégré ici) filtered_results = [] for result in search_results: content = search_processor.fetch_content(result['url']) if content: # Filtrage de pertinence avec OpenRouter (comme dans le CLI) filter_prompt = ( "Tu es un expert en analyse de la pertinence des documents. " f"Détermine si le contenu web suivant est pertinent pour la requête utilisateur : '{user_query}'. " "Réponds uniquement par 'oui' ou 'non'." ) is_relevant = AICore.query_ai(filter_prompt, content[:2000], temp=0.0, model="mistralai/mistral-small") # Filtrage plus rapide if is_relevant.lower().startswith('oui'): filtered_results.append({'url': result['url'], 'domain': result['domain'], 'content': content}) if len(filtered_results) >= WEB_SEARCH_CONFIG['max_results']: break web_search_results = filtered_results # Envoi des URLs des sources *après* le filtrage source_urls = [result['url'] for result in web_search_results] yield format_sse_event({"type": "web_sources", "content": source_urls}) yield format_sse_event({"type": "web_search_status", "content": "end"}) # --- Construction du prompt système (adapté pour la recherche Web) --- base_prompt = ( "🎯 **Rôle : Expert en calcul numérique précis, assistant polyvalent avec canvas interactifs et capacités de recherche Web.**\n" "**Mission :** Répondre aux questions de l'utilisateur, effectuer des calculs précis, fournir des canvas interactifs (texte et code) et, si nécessaire, effectuer des recherches sur le Web pour enrichir la réponse.\n\n" "**Si la question demande un calcul numérique :**\n" "1. Annonce que tu vas exécuter un code Python pour réaliser le calcul.\n" "2. Ajoute le marqueur `[CODE_REQUIRED]` suivi du bloc de code minimal en Python.\n" "3. N'envoie rien d'autre et attends l'exécution du code.\n\n" "**Après l'exécution du code**, explique le résultat en détails en te basant sur le résultat numérique obtenu.\n\n" "**Pour la création ou modification de canvas** (texte ou code) :\n" "Utilise les marqueurs `[CANVAS_START]` et `[CANVAS_END]` pour encadrer le contenu du canvas.\n\n" "**Si la recherche Web est activée :**\n" "1. Tu recevras des extraits de pages Web pertinentes.\n" "2. Utilise ces informations pour compléter ta réponse, en citant les sources de manière concise (ex: [nomdedomaine.com]).\n" "3. Si les sources Web ne sont pas pertinentes ou suffisantes, base-toi sur tes connaissances internes.\n\n" "Si aucune de ces conditions n'est remplie, réponds directement et clairement à la question." ) # --- Préparation des messages pour Groq --- system_prompt = {"role": "system", "content": base_prompt} # Ajout des résultats de la recherche Web au contexte, si applicable if web_search_results: # Création d'un résumé des sources pour Groq (plus concis que d'envoyer tout le contenu) sources_summary = "\n\n".join([ f"[{result['domain']}]\nExtrait: {result['content'][:500]}" # Limite la longueur de l'extrait for result in web_search_results ]) web_search_context = {"role": "user", "content": f"Voici des extraits de pages web pertinentes pour la question:\n\n{sources_summary}"} messages_with_system_prompt = [system_prompt, web_search_context] + messages else: messages_with_system_prompt = [system_prompt] + messages full_reply_text = "" code_to_execute = None code_to_display = None is_code_used = False canvas_content = None code_executed = False prompt_tokens = 0 completion_tokens = 0 total_tokens = 0 initial_response_completed = False try: response_stream = open_router_chat_completion_stream(messages_with_system_prompt, model, 0.0) for chunk in response_stream: # If chunk is a string, try to parse it as JSON. if isinstance(chunk, str): try: chunk = json.loads(chunk) except Exception: continue if not isinstance(chunk, dict): continue if initial_response_completed: break delta_content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "") or "" delta_content_no_marker = delta_content.replace(CODE_REQUIRED_MARKER, "") full_reply_text += delta_content_no_marker # Extraction du code dans le texte complet code_to_execute, code_to_display = extract_code_blocks(full_reply_text) # Si aucun code destiné à l'exécution n'est extrait mais qu'un bloc est trouvé, utiliser ce dernier if not code_to_execute and code_to_display: code_to_execute = code_to_display # Envoi du canvas s'il est détecté if CANVAS_START_MARKER in full_reply_text and CANVAS_END_MARKER in full_reply_text: start_index = full_reply_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = full_reply_text.find(CANVAS_END_MARKER) canvas_content_extracted = full_reply_text[start_index:end_index].strip() canvas_content = canvas_content_extracted delta_content_for_chat = delta_content_no_marker.replace(CANVAS_START_MARKER, "").replace(CANVAS_END_MARKER, "") yield format_sse_event({"type": "text", "content": delta_content_for_chat}) yield format_sse_event({"type": "canvas_start", "content": canvas_content}) continue else: yield format_sse_event({"type": "text", "content": delta_content_no_marker}) if code_to_execute: initial_response_completed = True if execute_code and code_to_execute: is_code_used = True logger.info("Envoi événement CODE_BLOCK") yield format_sse_event({"type": "code_block", "content": code_to_execute}) yield format_sse_event({"type": "code_execution_start"}) execution_result = run_code_in_sandbox(code_to_execute) numeric_result = extract_numeric_result(execution_result) if numeric_result: code_executed = True logger.info("Envoi événement EXECUTED_CODE_RESULT") yield format_sse_event({"type": "executed_code_result", "content": numeric_result}) conclusion_prompt_content = ( f"**[INSTRUCTION SUITE - APRES EXECUTION DU CODE]** Tu as exécuté le code Python suivant:\n" f"```python\n{code_to_execute}\n```\n" f"Le résultat du calcul est: **{numeric_result}**. " "Continue ensuite la conversation en t'appuyant sur ce résultat." ) conclusion_prompt = [{"role": "user", "content": conclusion_prompt_content}] # Note: open_router_chat_completion returns a dict (non-streaming) conclusion_response = open_router_chat_completion(conclusion_prompt, model, 0.0) delta_content = conclusion_response.get("choices", [{}])[0].get("message", {}).get("content", "") or "" yield format_sse_event({"type": "text", "content": delta_content}) yield format_sse_event({"type": "final"}) else: code_executed = True logger.info("Envoi événement EXECUTED_CODE_ERROR") yield format_sse_event({"type": "executed_code_error", "content": execution_result}) yield format_sse_event({"type": "final", "result": "Fin de la réponse (erreur ou pas de résultat numérique)."}) prompt_tokens = 0 # Comptage non implémenté completion_tokens = 0 total_tokens = prompt_tokens + completion_tokens if not code_executed: logger.info("Envoi événement FINAL (sans code exécuté)") yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) else: yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) except Exception as e: error_message = f"Erreur serveur IA: {str(e)}" logger.exception(error_message) yield format_sse_event({"type": "error", "content": error_message}) finally: yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "done", "is_code_execution": is_code_used}) return Response(stream_with_context(stream_response()), mimetype='text/event-stream') @app.route('/canvas_modify', methods=['POST']) def canvas_modify_endpoint() -> Response: data = request.get_json() if not data: logger.warning("Requête canvas_modify sans données.") return jsonify({"error": "Requête canvas_modify invalide : corps de requête manquant."}), 400 user_message = data.get('message') canvas_content = data.get('canvas_content') model = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip() if not user_message or canvas_content is None: logger.warning("Requête canvas_modify incomplète: message=%s, canvas_content=%s", user_message, canvas_content) return jsonify({"error": "Requête canvas_modify invalide : message ou contenu canvas manquant."}), 400 canvas_chat_messages = [ {"role": "system", "content": ( "Tu es dans un chat de canvas. L'utilisateur va te demander de modifier le contenu du canvas suivant. " "Fournis UNIQUEMENT le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR, encadré par `[CANVAS_START]` et `[CANVAS_END]`. " "Ne réponds pas avec du texte en dehors de ces marqueurs, sauf une courte phrase de confirmation après le canvas. " "Maintiens le contexte des modifications précédentes du canvas, en te basant sur l'historique de ce chat canvas." )}, {"role": "assistant", "content": f"[CANVAS_START]\n{canvas_content}\n[CANVAS_END] Voici le canvas actuel. Dites-moi comment le modifier."}, {"role": "user", "content": user_message} ] @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def canvas_modify_groq_call(canvas_chat_messages, model): return open_router_chat_completion(canvas_chat_messages, model, 0.0) try: response_data = canvas_modify_groq_call(canvas_chat_messages, model) if isinstance(response_data, dict): ai_response_text = response_data.get("choices", [{}])[0].get("message", {}).get("content", "") or "" else: ai_response_text = response_data # Si c'est une chaîne, l'utiliser directement if CANVAS_START_MARKER in ai_response_text and CANVAS_END_MARKER in ai_response_text: start_index = ai_response_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = ai_response_text.find(CANVAS_END_MARKER) updated_canvas_content = ai_response_text[start_index:end_index].strip() response_message = "Canvas mis à jour." logger.info("Canvas modifié avec succès.") return jsonify({"updated_canvas_content": updated_canvas_content, "response_message": response_message}) else: logger.error("Réponse IA canvas_modify invalide: marqueurs CANVAS_START/END manquants.") return jsonify({"error": "Réponse IA invalide pour la modification du canvas", "full_response": ai_response_text}), 500 except Exception as e: error_message = f"Erreur serveur (canvas_modify): {str(e)}" logger.exception(error_message) return jsonify({"error": error_message}), 500 @app.route('/run', methods=['POST']) def run_code_endpoint() -> Response: """ Point d'accès pour exécuter du code via l'API Piston (pour canvas code). """ data = request.get_json() if not data: logger.warning("Requête run sans données.") return jsonify({"error": "Requête run invalide : corps de requête manquant."}), 400 language = data.get("language") code = data.get("code") stdin = data.get("stdin", "") if not language or not code: logger.warning("Requête run incomplète: language=%s, code=%s", language, code) return jsonify({"error": "Requête run invalide : langage et code requis."}), 400 payload = { "language": language, "version": "*", "files": [{"content": code}], "stdin": stdin } try: piston_response = requests.post(PISTON_API_URL, json=payload, timeout=3000000) piston_response.raise_for_status() logger.info("Code exécuté avec Piston avec succès.") return jsonify(piston_response.json()) except requests.RequestException as e: error_message = f"Erreur lors de l'appel à l'API Piston: {str(e)}" logger.exception(error_message) piston_text = piston_response.text if 'piston_response' in locals() and piston_response is not None else "" return jsonify({"error": "Erreur d'exécution du code via l'API Piston.", "piston_response": piston_text, "details": error_message}), 500 except Exception as e: error_message = f"Erreur inattendue lors de l'exécution du code: {str(e)}" logger.exception(error_message) return jsonify({"error": "Erreur inattendue lors de l'exécution du code.", "details": error_message}), 500 @app.route('/run', methods=['OPTIONS']) def run_code_options() -> Response: """Gère les requêtes OPTIONS pour l'endpoint /run (CORS preflight).""" return jsonify({'message': 'OPTIONS requests allowed'}), 200 @app.route('/upload', methods=['POST']) def upload_file(): # Upload de fichier similaire à dp.py if 'file' not in request.files: return jsonify({"error": "No file part"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "No selected file"}), 400 filename = secure_filename(file.filename) upload_folder = app.config['UPLOAD_FOLDER'] if not os.path.exists(upload_folder): os.makedirs(upload_folder) file_path = os.path.join(upload_folder, filename) try: file.save(file_path) return jsonify({"message": "File uploaded successfully", "file_path": file_path}), 200 except Exception as e: logger.exception("File upload failed") return jsonify({"error": str(e)}), 500 def format_sse_event(data: Dict[str, Any]) -> str: """Formate les données en événement SSE (Server-Sent Events).""" return f"data: {json.dumps(data)}\n\n" if __name__ == '__main__': if os.environ.get('FLASK_ENV') == 'development': app.run(debug=True, port=5000)
This version should no longer trigger the “‘str’ object has no attribute ‘get’” error.