Your service is live đ 127.0.0.1 - - [18/Feb/2025...
Créé le : 18 février 2025
RĂ©pondu en utilisant O3 Mini High par Chat01
Créé le : 18 février 2025
RĂ©pondu en utilisant O3 Mini High par Chat01
Your service is live đ
127.0.0.1 - - [18/Feb/2025:12:12:06 +0000] "GET / HTTP/1.1" 404 207 "-" "Go-http-client/2.0"
127.0.0.1 - - [18/Feb/2025:12:12:12 +0000] "OPTIONS /chat HTTP/1.1" 200 0 "https://pl1nkochat-1.onrender.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
INFO:app:Envoi événement CODE_BLOCK
INFO:app:Début de l'exécution du code dans le sandbox e2b.
INFO:app:Utilisation de la méthode sandbox.run_code
INFO:e2b.sandbox_sync.main:Request: POST https://49999-iqbmzptqfll8cy9yqyo1a-4818f120.e2b.dev/execute
INFO:e2b.sandbox_sync.main:Response: 200 https://49999-iqbmzptqfll8cy9yqyo1a-4818f120.e2b.dev/execute
INFO:httpx:HTTP Request: POST https://49999-iqbmzptqfll8cy9yqyo1a-4818f120.e2b.dev/execute "HTTP/1.1 200 OK"
INFO:app:Envoi événement EXECUTED_CODE_RESULT
ERROR:app:Erreur serveur IA: 'str' object has no attribute 'get'
Traceback (most recent call last):
File "/opt/render/project/src/app.py", line 425, in stream_response
delta_content = chunk.get("choices", [{}])[0].get("message", {}).get("content", "") or ""
^^^^^^^^^
AttributeError: 'str' object has no attribute 'get'
127.0.0.1 - - [18/Feb/2025:12:12:21 +0000] "POST /chat HTTP/1.1" 200 1483 "https://pl1nkochat-1.onrender.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
127.0.0.1 - - [18/Feb/2025:12:12:32 +0000] "GET /models HTTP/1.1" 200 157 "https://pl1nkochat-1.onrender.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
127.0.0.1 - - [18/Feb/2025:12:12:35 +0000] "OPTIONS /chat HTTP/1.1" 200 0 "https://pl1nkochat-1.onrender.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
INFO:app:Envoi événement CODE_BLOCK
INFO:app:Début de l'exécution du code dans le sandbox e2b.
INFO:app:Utilisation de la méthode sandbox.run_code
INFO:e2b.sandbox_sync.main:Request: POST https://49999-iqbmzptqfll8cy9yqyo1a-4818f120.e2b.dev/execute
INFO:e2b.sandbox_sync.main:Response: 200 https://49999-iqbmzptqfll8cy9yqyo1a-4818f120.e2b.dev/execute
INFO:httpx:HTTP Request: POST https://49999-iqbmzptqfll8cy9yqyo1a-4818f120.e2b.dev/execute "HTTP/1.1 200 OK"
INFO:app:Envoi événement EXECUTED_CODE_RESULT
ERROR:app:Erreur serveur IA: 'str' object has no attribute 'get'
Traceback (most recent call last):
File "/opt/render/project/src/app.py", line 425, in stream_response
delta_content = chunk.get("choices", [{}])[0].get("message", {}).get("content", "") or ""
^^^^^^^^^
AttributeError: 'str' object has no attribute 'get'
127.0.0.1 - - [18/Feb/2025:12:12:41 +0000] "POST /chat HTTP/1.1" 200 1469 "https://pl1nkochat-1.onrender.com/" "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
from gevent import monkey
monkey.patch_all()
import os
import re
import json
import logging
from typing import Any, Dict, List, Optional, Tuple
import time
import datetime
from tqdm import tqdm
from flask import Flask, request, Response, jsonify, stream_with_context
from flask_cors import CORS
from e2b_code_interpreter import Sandbox
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from werkzeug.utils import secure_filename # Ajout pour la gestion sécurisée des noms de fichier
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
app = Flask(name)
CORS(app)
app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(file)), 'uploads') # Dossier de destination
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
E2B_API_KEY = os.environ.get("E2B_API_KEY")
OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") # Ajout de la clé OpenRouter
PISTON_API_URL = "https://emkc.org/api/v2/piston/execute"
@retry(stop=stop_after_attempt(5), # Tentatives augmentées à 5
wait=wait_exponential(multiplier=1, min=4, max=20), # Backoff jusqu'Ă 20 secondes
retry=retry_if_exception_type(Exception))
def initialize_sandbox():
"""Initializes the e2b Sandbox with a higher timeout."""
logger.info("Initializing e2b Sandbox with 300-second timeout...")
return Sandbox(api_key=E2B_API_KEY, timeout=300) # Timeout augmenté à 300 secondes
try:
sandbox = initialize_sandbox()
logger.info("e2b Sandbox initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize e2b Sandbox after multiple retries: {e}")
# It's crucial to handle the failure of sandbox initialization.
# You might want to exit the application or have a fallback mechanism.
# For now, let's raise the exception to prevent the app from starting improperly.
raise e
VALID_MODELS = [
"google/gemini-2.0-pro-exp-02-05:free",
"deepseek/deepseek-r1:free",
"qwen/qwen2.5-vl-72b-instruct:free",
"meta-llama/llama-3.3-70b-instruct:free"
]
CODE_PATTERN = re.compile(r"python\s*([\s\S]*?)
")
CODE_REQUIRED_MARKER = "[CODE_REQUIRED]"
CANVAS_START_MARKER = "[CANVAS_START]"
CANVAS_END_MARKER = "[CANVAS_END]"
WEB_SEARCH_CONFIG = {
'openrouter_api_key': OPENROUTER_API_KEY,
'max_results': 9, # Augmenté à 9 par défaut
'cache_file': 'search_cache.json', # Le cache est géré, mais pas crucial pour le backend Flask
'model': 'mistralai/mistral-medium', # ModĂšle OpenRouter pour le filtrage et la synthĂšse. Peut ĂȘtre changĂ©.
'timeout': 20,
'min_quality_score': 0.65 # Ce paramÚtre n'est plus utilisé directement, remplacé par un filtrage AI plus précis.
}
class DataCollector:
@staticmethod
def web_search(query: str) -> List[Dict]:
with DDGS() as ddgs:
results = ddgs.text(query, region='fr-fr', max_results=WEB_SEARCH_CONFIG['max_results'])
return [{
'url': r['href'],
'title': r['title'],
'domain': urlparse(r['href']).netloc.replace('www.', '')
} for r in results if urlparse(r['href']).scheme in ['http', 'https']]
class ContentProcessor:
def init(self):
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
textdef fetch_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=WEB_SEARCH_CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: logger.error(f"Erreur sur {url}: {str(e)}") return ''
class AICore:
@staticmethod
def query_ai(system_prompt: str, user_content: str, temp: float = 0.1, model:str = WEB_SEARCH_CONFIG['model']) -> str: # Ajout du paramĂštre model
headers = {
"Authorization": f"Bearer {WEB_SEARCH_CONFIG['openrouter_api_key']}",
"Content-Type": "application/json",
"HTTP-Referer": "https://your-app-url", # Remplacez par l'URL de votre application
"X-Title": "Flask Search AI"
}
textdata = { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "temperature": temp, "max_tokens": 2000 } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() # On lĂšve une exception si le statut n'est pas 200 return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Erreur API OpenRouter: {str(e)}"
def open_router_chat_completion_stream(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0):
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": 2000,
"stream": True
}
response = requests.post("https://openrouter.ai/api/v1/chat/completions",
headers=headers, json=payload, stream=True, timeout=WEB_SEARCH_CONFIG['timeout'])
response.raise_for_status()
for line in response.iter_lines():
if line:
decoded_line = line.decode('utf-8')
if decoded_line.startswith("data: "):
data_str = decoded_line[len("data: "):]
try:
data = json.loads(data_str)
yield data
except Exception:
continue
def open_router_chat_completion(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0) -> Dict:
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": 2000,
"stream": False
}
response = requests.post("https://openrouter.ai/api/v1/chat/completions",
headers=headers, json=payload, timeout=WEB_SEARCH_CONFIG['timeout'])
response.raise_for_status()
return response.json()
def extract_code_blocks(text: str) -> Tuple[Optional[str], Optional[str]]:
"""Extrait les blocs de code Python et gĂšre le marqueur [CODE_REQUIRED]."""
if CODE_REQUIRED_MARKER in text:
text_no_marker = text.replace(CODE_REQUIRED_MARKER, "", 1).strip()
code_match = CODE_PATTERN.search(text_no_marker)
if code_match:
return code_match.group(1).strip(), None
else:
code_match = CODE_PATTERN.search(text)
if code_match:
return None, code_match.group(1).strip() # code à afficher sans exécution
return None, None
def run_code_in_sandbox(code: str) -> str:
"""Exécute le code Python dans le sandbox e2b et retourne le résultat."""
start_time = time.time()
try:
logger.info("Début de l'exécution du code dans le sandbox e2b.")
execution = None
method_name = next((method for method in ["run_code_sync", "execute_sync", "execute_code", "execute", "run_code", "run"] if hasattr(sandbox, method)), None)
if method_name:
logger.info(f"Utilisation de la méthode sandbox.{method_name}")
execution = getattr(sandbox, method_name)(code)
else:
error_msg = "Aucune méthode d'exécution de code disponible dans le sandbox."
logger.error(error_msg)
return error_msg
logs = execution.logs
stdout_logs = logs.stdout if hasattr(logs, 'stdout') else []
stderr_logs = logs.stderr if hasattr(logs, 'stderr') else []
stdout = "\n".join(stdout_logs)
stderr = "\n".join(stderr_logs)
if stderr:
return f"Stdout:\n{stdout}\n\nStderr:\n{stderr}"
return stdout
except Exception as e:
duration = time.time() - start_time
error_msg = f"Erreur lors de l'exécution du code dans e2b aprÚs {duration:.2f} secondes: {str(e)}"
logger.exception(error_msg)
return f"Erreur exécution code: {str(e)}"
def extract_numeric_result(exec_result: str) -> Optional[str]:
"""Extrait la derniÚre ligne numérique du résultat d'exécution de code."""
if "Stdout:" in exec_result:
stdout_section = exec_result.split("Stdout:\n", 1)[1].split("\n\nStderr:")[0].strip()
else:
stdout_section = exec_result.strip()
lines = [l.strip() for l in stdout_section.splitlines() if l.strip()]
if not lines:
return None
last_line = lines[-1]
if re.search(r'\d', last_line):
return last_line
return None
@app.route('/models', methods=['GET'])
def get_models() -> Response:
"""Retourne la liste des modĂšles valides."""
return jsonify({"models": VALID_MODELS})
@app.route('/chat', methods=['POST'])
def chat_endpoint() -> Response:
"""
Point d'entrée principal pour le chat.
GÚre le streaming SSE, l'exécution de code, les canvas et la recherche Web.
"""
@stream_with_context
@retry(stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(Exception))
def stream_response():
data = request.get_json()
if not data:
logger.warning("RequĂȘte chat invalide: corps de requĂȘte manquant.")
yield format_sse_event({"type": "error", "content": "RequĂȘte chat invalide : corps de requĂȘte manquant."})
return
textmessages = data.get('messages', []) model: str = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip() execute_code: bool = data.get('execute_code', True) web_search_enabled: bool = data.get('web_search', False) # RĂ©cupĂšre l'Ă©tat de la recherche Web if not messages or not isinstance(messages, list) or not model or model not in VALID_MODELS: logger.warning("RequĂȘte chat invalide: messages=%s, model=%s", messages, model) yield format_sse_event({"type": "error", "content": "RequĂȘte chat invalide : messages ou modĂšle incorrects."}) return # --- IntĂ©gration de la recherche Web --- web_search_results = [] if web_search_enabled: user_query = messages[-1]['content'] # On utilise le dernier message de l'utilisateur comme requĂȘte yield format_sse_event({"type": "web_search_status", "content": "start"}) yield format_sse_event({"type": "web_search", "query": user_query}) # Envoi de la requĂȘte pour l'afficher search_processor = ContentProcessor() search_results = DataCollector.web_search(user_query) # Filtrage et extraction du contenu (comme dans le CLI, mais intĂ©grĂ© ici) filtered_results = [] for result in search_results: content = search_processor.fetch_content(result['url']) if content: # Filtrage de pertinence avec OpenRouter (comme dans le CLI) filter_prompt = ( "Tu es un expert en analyse de la pertinence des documents. " f"DĂ©termine si le contenu web suivant est pertinent pour la requĂȘte utilisateur : '{user_query}'. " "RĂ©ponds uniquement par 'oui' ou 'non'." ) is_relevant = AICore.query_ai(filter_prompt, content[:2000], temp=0.0, model="mistralai/mistral-small") # Filtrage plus rapide if is_relevant.lower().startswith('oui'): filtered_results.append({'url': result['url'], 'domain': result['domain'], 'content': content}) if len(filtered_results) >= WEB_SEARCH_CONFIG['max_results']: break web_search_results = filtered_results # Envoi des URLs des sources *aprĂšs* le filtrage source_urls = [result['url'] for result in web_search_results] yield format_sse_event({"type": "web_sources", "content": source_urls}) yield format_sse_event({"type": "web_search_status", "content": "end"}) # --- Construction du prompt systĂšme (adaptĂ© pour la recherche Web) --- base_prompt = ( "đŻ **RĂŽle : Expert en calcul numĂ©rique prĂ©cis, assistant polyvalent avec canvas interactifs et capacitĂ©s de recherche Web.**\n" "**Mission :** RĂ©pondre aux questions de l'utilisateur, effectuer des calculs prĂ©cis, fournir des canvas interactifs (texte et code) et, si nĂ©cessaire, effectuer des recherches sur le Web pour enrichir la rĂ©ponse.\n\n" "**Si la question demande un calcul numĂ©rique :**\n" "1. Annonce que tu vas exĂ©cuter un code Python pour rĂ©aliser le calcul.\n" "2. Ajoute le marqueur `[CODE_REQUIRED]` suivi du bloc de code minimal en Python.\n" "3. N'envoie rien d'autre et attends l'exĂ©cution du code.\n\n" "**AprĂšs l'exĂ©cution du code**, explique le rĂ©sultat en dĂ©tails en te basant sur le rĂ©sultat numĂ©rique obtenu.\n\n" "**Pour la crĂ©ation ou modification de canvas** (texte ou code) :\n" "Utilise les marqueurs `[CANVAS_START]` et `[CANVAS_END]` pour encadrer le contenu du canvas.\n\n" "**Si la recherche Web est activĂ©e :**\n" "1. Tu recevras des extraits de pages Web pertinentes.\n" "2. Utilise ces informations pour complĂ©ter ta rĂ©ponse, en citant les sources de maniĂšre concise (ex: [nomdedomaine.com]).\n" "3. Si les sources Web ne sont pas pertinentes ou suffisantes, base-toi sur tes connaissances internes.\n\n" "Si aucune de ces conditions n'est remplie, rĂ©ponds directement et clairement Ă la question." ) # --- PrĂ©paration des messages pour Groq --- system_prompt = {"role": "system", "content": base_prompt} # Ajout des rĂ©sultats de la recherche Web au contexte, si applicable if web_search_results: # CrĂ©ation d'un rĂ©sumĂ© des sources pour Groq (plus concis que d'envoyer tout le contenu) sources_summary = "\n\n".join([ f"[{result['domain']}]\nExtrait: {result['content'][:500]}" # Limite la longueur de l'extrait for result in web_search_results ]) web_search_context = {"role": "user", "content": f"Voici des extraits de pages web pertinentes pour la question:\n\n{sources_summary}"} messages_with_system_prompt = [system_prompt, web_search_context] + messages else: messages_with_system_prompt = [system_prompt] + messages full_reply_text = "" code_to_execute = None code_to_display = None is_code_used = False canvas_content = None code_executed = False prompt_tokens = 0 completion_tokens = 0 total_tokens = 0 initial_response_completed = False try: response_stream = open_router_chat_completion_stream(messages_with_system_prompt, model, 0.0) for chunk in response_stream: # Si chunk n'est pas un dict, tenter de le parser en JSON et vĂ©rifier son type if not isinstance(chunk, dict): try: chunk_parsed = json.loads(chunk) if not isinstance(chunk_parsed, dict): continue chunk = chunk_parsed except Exception: continue if initial_response_completed: break delta_content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "") or "" delta_content_no_marker = delta_content.replace(CODE_REQUIRED_MARKER, "") full_reply_text += delta_content_no_marker # Extraction du code dans le texte complet code_to_execute, code_to_display = extract_code_blocks(full_reply_text) # Si aucun code destinĂ© Ă l'exĂ©cution n'est extrait mais qu'un bloc est trouvĂ©, utiliser ce dernier if not code_to_execute and code_to_display: code_to_execute = code_to_display # Envoi du canvas s'il est dĂ©tectĂ© if CANVAS_START_MARKER in full_reply_text and CANVAS_END_MARKER in full_reply_text: start_index = full_reply_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = full_reply_text.find(CANVAS_END_MARKER) canvas_content_extracted = full_reply_text[start_index:end_index].strip() canvas_content = canvas_content_extracted delta_content_for_chat = delta_content_no_marker.replace(CANVAS_START_MARKER, "").replace(CANVAS_END_MARKER, "") yield format_sse_event({"type": "text", "content": delta_content_for_chat}) yield format_sse_event({"type": "canvas_start", "content": canvas_content}) continue else: yield format_sse_event({"type": "text", "content": delta_content_no_marker}) if code_to_execute: initial_response_completed = True if execute_code and code_to_execute: is_code_used = True logger.info("Envoi Ă©vĂ©nement CODE_BLOCK") yield format_sse_event({"type": "code_block", "content": code_to_execute}) yield format_sse_event({"type": "code_execution_start"}) execution_result = run_code_in_sandbox(code_to_execute) numeric_result = extract_numeric_result(execution_result) if numeric_result: code_executed = True logger.info("Envoi Ă©vĂ©nement EXECUTED_CODE_RESULT") yield format_sse_event({"type": "executed_code_result", "content": numeric_result}) conclusion_prompt_content = ( f"**[INSTRUCTION SUITE - APRES EXECUTION DU CODE]** Tu as exĂ©cutĂ© le code Python suivant:\n" f"```python\n{code_to_execute}\n```\n" f"Le rĂ©sultat du calcul est: **{numeric_result}**. " "Continue ensuite la conversation en t'appuyant sur ce rĂ©sultat." ) conclusion_prompt = [{"role": "user", "content": conclusion_prompt_content}] conclusion_stream = open_router_chat_completion(conclusion_prompt, model, 0.0) for chunk in conclusion_stream: delta_content = chunk.get("choices", [{}])[0].get("message", {}).get("content", "") or "" yield format_sse_event({"type": "text", "content": delta_content}) yield format_sse_event({"type": "final"}) else: code_executed = True logger.info("Envoi Ă©vĂ©nement EXECUTED_CODE_ERROR") yield format_sse_event({"type": "executed_code_error", "content": execution_result}) yield format_sse_event({"type": "final", "result": "Fin de la rĂ©ponse (erreur ou pas de rĂ©sultat numĂ©rique)."}) prompt_tokens = 0 # Comptage non implĂ©mentĂ© completion_tokens = 0 total_tokens = prompt_tokens + completion_tokens if not code_executed: logger.info("Envoi Ă©vĂ©nement FINAL (sans code exĂ©cutĂ©)") yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) else: yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) except Exception as e: error_message = f"Erreur serveur IA: {str(e)}" logger.exception(error_message) yield format_sse_event({"type": "error", "content": error_message}) finally: yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "done", "is_code_execution": is_code_used}) return Response(stream_with_context(stream_response()), mimetype='text/event-stream')
@app.route('/canvas_modify', methods=['POST'])
def canvas_modify_endpoint() -> Response:
data = request.get_json()
if not data:
logger.warning("RequĂȘte canvas_modify sans donnĂ©es.")
return jsonify({"error": "RequĂȘte canvas_modify invalide : corps de requĂȘte manquant."}), 400
user_message = data.get('message')
canvas_content = data.get('canvas_content')
model = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip()
if not user_message or canvas_content is None:
logger.warning("RequĂȘte canvas_modify incomplĂšte: message=%s, canvas_content=%s", user_message, canvas_content)
return jsonify({"error": "RequĂȘte canvas_modify invalide : message ou contenu canvas manquant."}), 400
canvas_chat_messages = [
{"role": "system", "content": (
"Tu es dans un chat de canvas. L'utilisateur va te demander de modifier le contenu du canvas suivant. "
"Fournis UNIQUEMENT le NOUVEAU contenu du canvas ENTIĂREMENT MIS Ă JOUR, encadrĂ© par [CANVAS_START]
et [CANVAS_END]
. "
"Ne réponds pas avec du texte en dehors de ces marqueurs, sauf une courte phrase de confirmation aprÚs le canvas. "
"Maintiens le contexte des modifications précédentes du canvas, en te basant sur l'historique de ce chat canvas."
)},
{"role": "assistant", "content": f"[CANVAS_START]\n{canvas_content}\n[CANVAS_END] Voici le canvas actuel. Dites-moi comment le modifier."},
{"role": "user", "content": user_message}
]
text@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def canvas_modify_groq_call(canvas_chat_messages, model): return open_router_chat_completion(canvas_chat_messages, model, 0.0) try: response_data = canvas_modify_groq_call(canvas_chat_messages, model) if isinstance(response_data, dict): ai_response_text = response_data.get("choices", [{}])[0].get("message", {}).get("content", "") or "" else: ai_response_text = response_data # Si c'est une chaßne, l'utiliser directement if CANVAS_START_MARKER in ai_response_text and CANVAS_END_MARKER in ai_response_text: start_index = ai_response_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = ai_response_text.find(CANVAS_END_MARKER) updated_canvas_content = ai_response_text[start_index:end_index].strip() response_message = "Canvas mis à jour." logger.info("Canvas modifié avec succÚs.") return jsonify({"updated_canvas_content": updated_canvas_content, "response_message": response_message}) else: logger.error("Réponse IA canvas_modify invalide: marqueurs CANVAS_START/END manquants.") return jsonify({"error": "Réponse IA invalide pour la modification du canvas", "full_response": ai_response_text}), 500 except Exception as e: error_message = f"Erreur serveur (canvas_modify): {str(e)}" logger.exception(error_message) return jsonify({"error": error_message}), 500
@app.route('/run', methods=['POST'])
def run_code_endpoint() -> Response:
"""
Point d'accÚs pour exécuter du code via l'API Piston (pour canvas code).
"""
data = request.get_json()
if not data:
logger.warning("RequĂȘte run sans donnĂ©es.")
return jsonify({"error": "RequĂȘte run invalide : corps de requĂȘte manquant."}), 400
language = data.get("language")
code = data.get("code")
stdin = data.get("stdin", "")
if not language or not code:
logger.warning("RequĂȘte run incomplĂšte: language=%s, code=%s", language, code)
return jsonify({"error": "RequĂȘte run invalide : langage et code requis."}), 400
payload = {
"language": language,
"version": "*",
"files": [{"content": code}],
"stdin": stdin
}
try:
piston_response = requests.post(PISTON_API_URL, json=payload, timeout=3000000)
piston_response.raise_for_status()
logger.info("Code exécuté avec Piston avec succÚs.")
return jsonify(piston_response.json())
except requests.RequestException as e:
error_message = f"Erreur lors de l'appel Ă l'API Piston: {str(e)}"
logger.exception(error_message)
piston_text = piston_response.text if 'piston_response' in locals() and piston_response is not None else ""
return jsonify({"error": "Erreur d'exécution du code via l'API Piston.", "piston_response": piston_text, "details": error_message}), 500
except Exception as e:
error_message = f"Erreur inattendue lors de l'exécution du code: {str(e)}"
logger.exception(error_message)
return jsonify({"error": "Erreur inattendue lors de l'exécution du code.", "details": error_message}), 500
@app.route('/run', methods=['OPTIONS'])
def run_code_options() -> Response:
"""GĂšre les requĂȘtes OPTIONS pour l'endpoint /run (CORS preflight)."""
return jsonify({'message': 'OPTIONS requests allowed'}), 200
@app.route('/upload', methods=['POST'])
def upload_file():
# Upload de fichier similaire Ă dp.py
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400
filename = secure_filename(file.filename)
upload_folder = app.config['UPLOAD_FOLDER']
if not os.path.exists(upload_folder):
os.makedirs(upload_folder)
file_path = os.path.join(upload_folder, filename)
try:
file.save(file_path)
return jsonify({"message": "File uploaded successfully", "file_path": file_path}), 200
except Exception as e:
logger.exception("File upload failed")
return jsonify({"error": str(e)}), 500
def format_sse_event(data: Dict[str, Any]) -> str:
"""Formate les données en événement SSE (Server-Sent Events)."""
return f"data: {json.dumps(data)}\n\n"
if name == 'main':
if os.environ.get('FLASK_ENV') == 'development':
app.run(debug=True, port=5000)
show me the full patched code without this bug pls
Below is the full code with the fixes applied. In summary, the changes are:
pythonfrom gevent import monkey monkey.patch_all() import os import re import json import logging from typing import Any, Dict, List, Optional, Tuple import time import datetime from tqdm import tqdm from flask import Flask, request, Response, jsonify, stream_with_context from flask_cors import CORS from e2b_code_interpreter import Sandbox from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type import requests from duckduckgo_search import DDGS from bs4 import BeautifulSoup from urllib.parse import urlparse from werkzeug.utils import secure_filename # Ajout pour la gestion sĂ©curisĂ©e des noms de fichier # --- Configuration --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) CORS(app) app.config['UPLOAD_FOLDER'] = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads') # Dossier de destination GROQ_API_KEY = os.environ.get("GROQ_API_KEY") E2B_API_KEY = os.environ.get("E2B_API_KEY") OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") # Ajout de la clĂ© OpenRouter PISTON_API_URL = "https://emkc.org/api/v2/piston/execute" # --- E2B Sandbox Initialization with Retry --- @retry(stop=stop_after_attempt(5), # Tentatives augmentĂ©es Ă 5 wait=wait_exponential(multiplier=1, min=4, max=20), # Backoff jusqu'Ă 20 secondes retry=retry_if_exception_type(Exception)) def initialize_sandbox(): """Initializes the e2b Sandbox with a higher timeout.""" logger.info("Initializing e2b Sandbox with 300-second timeout...") return Sandbox(api_key=E2B_API_KEY, timeout=300) # Timeout augmentĂ© Ă 300 secondes try: sandbox = initialize_sandbox() logger.info("e2b Sandbox initialized successfully.") except Exception as e: logger.error(f"Failed to initialize e2b Sandbox after multiple retries: {e}") # It's crucial to handle the failure of sandbox initialization. # You might want to exit the application or have a fallback mechanism. # For now, let's raise the exception to prevent the app from starting improperly. raise e VALID_MODELS = [ "google/gemini-2.0-pro-exp-02-05:free", "deepseek/deepseek-r1:free", "qwen/qwen2.5-vl-72b-instruct:free", "meta-llama/llama-3.3-70b-instruct:free" ] CODE_PATTERN = re.compile(r"```python\s*([\s\S]*?)```") CODE_REQUIRED_MARKER = "[CODE_REQUIRED]" CANVAS_START_MARKER = "[CANVAS_START]" CANVAS_END_MARKER = "[CANVAS_END]" # --- Configuration pour la recherche Web (comme dans le script CLI) --- WEB_SEARCH_CONFIG = { 'openrouter_api_key': OPENROUTER_API_KEY, 'max_results': 9, # AugmentĂ© Ă 9 par dĂ©faut 'cache_file': 'search_cache.json', # Le cache est gĂ©rĂ©, mais pas crucial pour le backend Flask 'model': 'mistralai/mistral-medium', # ModĂšle OpenRouter pour le filtrage et la synthĂšse. Peut ĂȘtre changĂ©. 'timeout': 20, 'min_quality_score': 0.65 # Ce paramĂštre n'est plus utilisĂ© directement, remplacĂ© par un filtrage AI plus prĂ©cis. } # --- Classes pour la recherche Web (adaptĂ©es du script CLI) --- class DataCollector: @staticmethod def web_search(query: str) -> List[Dict]: with DDGS() as ddgs: results = ddgs.text(query, region='fr-fr', max_results=WEB_SEARCH_CONFIG['max_results']) return [{ 'url': r['href'], 'title': r['title'], 'domain': urlparse(r['href']).netloc.replace('www.', '') } for r in results if urlparse(r['href']).scheme in ['http', 'https']] class ContentProcessor: def __init__(self): self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} def fetch_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=WEB_SEARCH_CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: logger.error(f"Erreur sur {url}: {str(e)}") return '' class AICore: @staticmethod def query_ai(system_prompt: str, user_content: str, temp: float = 0.1, model:str = WEB_SEARCH_CONFIG['model']) -> str: # Ajout du paramĂštre model headers = { "Authorization": f"Bearer {WEB_SEARCH_CONFIG['openrouter_api_key']}", "Content-Type": "application/json", "HTTP-Referer": "https://your-app-url", # Remplacez par l'URL de votre application "X-Title": "Flask Search AI" } data = { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "temperature": temp, "max_tokens": 2000 } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() # On lĂšve une exception si le statut n'est pas 200 return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Erreur API OpenRouter: {str(e)}" def open_router_chat_completion_stream(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0): headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", } payload = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": 2000, "stream": True } response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, stream=True, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() for line in response.iter_lines(): if line: decoded_line = line.decode('utf-8') if decoded_line.startswith("data: "): data_str = decoded_line[len("data: "):] try: data = json.loads(data_str) yield data except Exception: continue def open_router_chat_completion(messages: List[Dict[str, Any]], model: str, temperature: float = 0.0) -> Dict: headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", } payload = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": 2000, "stream": False } response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=WEB_SEARCH_CONFIG['timeout']) response.raise_for_status() return response.json() # --- Fonctions utilitaires (restent inchangĂ©es) --- def extract_code_blocks(text: str) -> Tuple[Optional[str], Optional[str]]: """Extrait les blocs de code Python et gĂšre le marqueur [CODE_REQUIRED].""" if CODE_REQUIRED_MARKER in text: text_no_marker = text.replace(CODE_REQUIRED_MARKER, "", 1).strip() code_match = CODE_PATTERN.search(text_no_marker) if code_match: return code_match.group(1).strip(), None else: code_match = CODE_PATTERN.search(text) if code_match: return None, code_match.group(1).strip() # code Ă afficher sans exĂ©cution return None, None def run_code_in_sandbox(code: str) -> str: """ExĂ©cute le code Python dans le sandbox e2b et retourne le rĂ©sultat.""" start_time = time.time() try: logger.info("DĂ©but de l'exĂ©cution du code dans le sandbox e2b.") execution = None method_name = next((method for method in ["run_code_sync", "execute_sync", "execute_code", "execute", "run_code", "run"] if hasattr(sandbox, method)), None) if method_name: logger.info(f"Utilisation de la mĂ©thode sandbox.{method_name}") execution = getattr(sandbox, method_name)(code) else: error_msg = "Aucune mĂ©thode d'exĂ©cution de code disponible dans le sandbox." logger.error(error_msg) return error_msg logs = execution.logs stdout_logs = logs.stdout if hasattr(logs, 'stdout') else [] stderr_logs = logs.stderr if hasattr(logs, 'stderr') else [] stdout = "\n".join(stdout_logs) stderr = "\n".join(stderr_logs) if stderr: return f"Stdout:\n{stdout}\n\nStderr:\n{stderr}" return stdout except Exception as e: duration = time.time() - start_time error_msg = f"Erreur lors de l'exĂ©cution du code dans e2b aprĂšs {duration:.2f} secondes: {str(e)}" logger.exception(error_msg) return f"Erreur exĂ©cution code: {str(e)}" def extract_numeric_result(exec_result: str) -> Optional[str]: """Extrait la derniĂšre ligne numĂ©rique du rĂ©sultat d'exĂ©cution de code.""" if "Stdout:" in exec_result: stdout_section = exec_result.split("Stdout:\n", 1)[1].split("\n\nStderr:")[0].strip() else: stdout_section = exec_result.strip() lines = [l.strip() for l in stdout_section.splitlines() if l.strip()] if not lines: return None last_line = lines[-1] if re.search(r'\d', last_line): return last_line return None # --- Routes Flask (lĂ©gĂšres modifications) --- @app.route('/models', methods=['GET']) def get_models() -> Response: """Retourne la liste des modĂšles valides.""" return jsonify({"models": VALID_MODELS}) @app.route('/chat', methods=['POST']) def chat_endpoint() -> Response: """ Point d'entrĂ©e principal pour le chat. GĂšre le streaming SSE, l'exĂ©cution de code, les canvas et la recherche Web. """ @stream_with_context @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def stream_response(): data = request.get_json() if not data: logger.warning("RequĂȘte chat invalide: corps de requĂȘte manquant.") yield format_sse_event({"type": "error", "content": "RequĂȘte chat invalide : corps de requĂȘte manquant."}) return messages = data.get('messages', []) model: str = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip() execute_code: bool = data.get('execute_code', True) web_search_enabled: bool = data.get('web_search', False) # RĂ©cupĂšre l'Ă©tat de la recherche Web if not messages or not isinstance(messages, list) or not model or model not in VALID_MODELS: logger.warning("RequĂȘte chat invalide: messages=%s, model=%s", messages, model) yield format_sse_event({"type": "error", "content": "RequĂȘte chat invalide : messages ou modĂšle incorrects."}) return # --- IntĂ©gration de la recherche Web --- web_search_results = [] if web_search_enabled: user_query = messages[-1]['content'] # On utilise le dernier message de l'utilisateur comme requĂȘte yield format_sse_event({"type": "web_search_status", "content": "start"}) yield format_sse_event({"type": "web_search", "query": user_query}) # Envoi de la requĂȘte pour l'afficher search_processor = ContentProcessor() search_results = DataCollector.web_search(user_query) # Filtrage et extraction du contenu (comme dans le CLI, mais intĂ©grĂ© ici) filtered_results = [] for result in search_results: content = search_processor.fetch_content(result['url']) if content: # Filtrage de pertinence avec OpenRouter (comme dans le CLI) filter_prompt = ( "Tu es un expert en analyse de la pertinence des documents. " f"DĂ©termine si le contenu web suivant est pertinent pour la requĂȘte utilisateur : '{user_query}'. " "RĂ©ponds uniquement par 'oui' ou 'non'." ) is_relevant = AICore.query_ai(filter_prompt, content[:2000], temp=0.0, model="mistralai/mistral-small") # Filtrage plus rapide if is_relevant.lower().startswith('oui'): filtered_results.append({'url': result['url'], 'domain': result['domain'], 'content': content}) if len(filtered_results) >= WEB_SEARCH_CONFIG['max_results']: break web_search_results = filtered_results # Envoi des URLs des sources *aprĂšs* le filtrage source_urls = [result['url'] for result in web_search_results] yield format_sse_event({"type": "web_sources", "content": source_urls}) yield format_sse_event({"type": "web_search_status", "content": "end"}) # --- Construction du prompt systĂšme (adaptĂ© pour la recherche Web) --- base_prompt = ( "đŻ **RĂŽle : Expert en calcul numĂ©rique prĂ©cis, assistant polyvalent avec canvas interactifs et capacitĂ©s de recherche Web.**\n" "**Mission :** RĂ©pondre aux questions de l'utilisateur, effectuer des calculs prĂ©cis, fournir des canvas interactifs (texte et code) et, si nĂ©cessaire, effectuer des recherches sur le Web pour enrichir la rĂ©ponse.\n\n" "**Si la question demande un calcul numĂ©rique :**\n" "1. Annonce que tu vas exĂ©cuter un code Python pour rĂ©aliser le calcul.\n" "2. Ajoute le marqueur `[CODE_REQUIRED]` suivi du bloc de code minimal en Python.\n" "3. N'envoie rien d'autre et attends l'exĂ©cution du code.\n\n" "**AprĂšs l'exĂ©cution du code**, explique le rĂ©sultat en dĂ©tails en te basant sur le rĂ©sultat numĂ©rique obtenu.\n\n" "**Pour la crĂ©ation ou modification de canvas** (texte ou code) :\n" "Utilise les marqueurs `[CANVAS_START]` et `[CANVAS_END]` pour encadrer le contenu du canvas.\n\n" "**Si la recherche Web est activĂ©e :**\n" "1. Tu recevras des extraits de pages Web pertinentes.\n" "2. Utilise ces informations pour complĂ©ter ta rĂ©ponse, en citant les sources de maniĂšre concise (ex: [nomdedomaine.com]).\n" "3. Si les sources Web ne sont pas pertinentes ou suffisantes, base-toi sur tes connaissances internes.\n\n" "Si aucune de ces conditions n'est remplie, rĂ©ponds directement et clairement Ă la question." ) # --- PrĂ©paration des messages pour Groq --- system_prompt = {"role": "system", "content": base_prompt} # Ajout des rĂ©sultats de la recherche Web au contexte, si applicable if web_search_results: # CrĂ©ation d'un rĂ©sumĂ© des sources pour Groq (plus concis que d'envoyer tout le contenu) sources_summary = "\n\n".join([ f"[{result['domain']}]\nExtrait: {result['content'][:500]}" # Limite la longueur de l'extrait for result in web_search_results ]) web_search_context = {"role": "user", "content": f"Voici des extraits de pages web pertinentes pour la question:\n\n{sources_summary}"} messages_with_system_prompt = [system_prompt, web_search_context] + messages else: messages_with_system_prompt = [system_prompt] + messages full_reply_text = "" code_to_execute = None code_to_display = None is_code_used = False canvas_content = None code_executed = False prompt_tokens = 0 completion_tokens = 0 total_tokens = 0 initial_response_completed = False try: response_stream = open_router_chat_completion_stream(messages_with_system_prompt, model, 0.0) for chunk in response_stream: # If chunk is a string, try to parse it as JSON. if isinstance(chunk, str): try: chunk = json.loads(chunk) except Exception: continue if not isinstance(chunk, dict): continue if initial_response_completed: break delta_content = chunk.get("choices", [{}])[0].get("delta", {}).get("content", "") or "" delta_content_no_marker = delta_content.replace(CODE_REQUIRED_MARKER, "") full_reply_text += delta_content_no_marker # Extraction du code dans le texte complet code_to_execute, code_to_display = extract_code_blocks(full_reply_text) # Si aucun code destinĂ© Ă l'exĂ©cution n'est extrait mais qu'un bloc est trouvĂ©, utiliser ce dernier if not code_to_execute and code_to_display: code_to_execute = code_to_display # Envoi du canvas s'il est dĂ©tectĂ© if CANVAS_START_MARKER in full_reply_text and CANVAS_END_MARKER in full_reply_text: start_index = full_reply_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = full_reply_text.find(CANVAS_END_MARKER) canvas_content_extracted = full_reply_text[start_index:end_index].strip() canvas_content = canvas_content_extracted delta_content_for_chat = delta_content_no_marker.replace(CANVAS_START_MARKER, "").replace(CANVAS_END_MARKER, "") yield format_sse_event({"type": "text", "content": delta_content_for_chat}) yield format_sse_event({"type": "canvas_start", "content": canvas_content}) continue else: yield format_sse_event({"type": "text", "content": delta_content_no_marker}) if code_to_execute: initial_response_completed = True if execute_code and code_to_execute: is_code_used = True logger.info("Envoi Ă©vĂ©nement CODE_BLOCK") yield format_sse_event({"type": "code_block", "content": code_to_execute}) yield format_sse_event({"type": "code_execution_start"}) execution_result = run_code_in_sandbox(code_to_execute) numeric_result = extract_numeric_result(execution_result) if numeric_result: code_executed = True logger.info("Envoi Ă©vĂ©nement EXECUTED_CODE_RESULT") yield format_sse_event({"type": "executed_code_result", "content": numeric_result}) conclusion_prompt_content = ( f"**[INSTRUCTION SUITE - APRES EXECUTION DU CODE]** Tu as exĂ©cutĂ© le code Python suivant:\n" f"```python\n{code_to_execute}\n```\n" f"Le rĂ©sultat du calcul est: **{numeric_result}**. " "Continue ensuite la conversation en t'appuyant sur ce rĂ©sultat." ) conclusion_prompt = [{"role": "user", "content": conclusion_prompt_content}] # Note: open_router_chat_completion returns a dict (non-streaming) conclusion_response = open_router_chat_completion(conclusion_prompt, model, 0.0) delta_content = conclusion_response.get("choices", [{}])[0].get("message", {}).get("content", "") or "" yield format_sse_event({"type": "text", "content": delta_content}) yield format_sse_event({"type": "final"}) else: code_executed = True logger.info("Envoi Ă©vĂ©nement EXECUTED_CODE_ERROR") yield format_sse_event({"type": "executed_code_error", "content": execution_result}) yield format_sse_event({"type": "final", "result": "Fin de la rĂ©ponse (erreur ou pas de rĂ©sultat numĂ©rique)."}) prompt_tokens = 0 # Comptage non implĂ©mentĂ© completion_tokens = 0 total_tokens = prompt_tokens + completion_tokens if not code_executed: logger.info("Envoi Ă©vĂ©nement FINAL (sans code exĂ©cutĂ©)") yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) else: yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) except Exception as e: error_message = f"Erreur serveur IA: {str(e)}" logger.exception(error_message) yield format_sse_event({"type": "error", "content": error_message}) finally: yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "done", "is_code_execution": is_code_used}) return Response(stream_with_context(stream_response()), mimetype='text/event-stream') @app.route('/canvas_modify', methods=['POST']) def canvas_modify_endpoint() -> Response: data = request.get_json() if not data: logger.warning("RequĂȘte canvas_modify sans donnĂ©es.") return jsonify({"error": "RequĂȘte canvas_modify invalide : corps de requĂȘte manquant."}), 400 user_message = data.get('message') canvas_content = data.get('canvas_content') model = data.get('model', 'google/gemini-2.0-pro-exp-02-05:free').strip() if not user_message or canvas_content is None: logger.warning("RequĂȘte canvas_modify incomplĂšte: message=%s, canvas_content=%s", user_message, canvas_content) return jsonify({"error": "RequĂȘte canvas_modify invalide : message ou contenu canvas manquant."}), 400 canvas_chat_messages = [ {"role": "system", "content": ( "Tu es dans un chat de canvas. L'utilisateur va te demander de modifier le contenu du canvas suivant. " "Fournis UNIQUEMENT le NOUVEAU contenu du canvas ENTIĂREMENT MIS Ă JOUR, encadrĂ© par `[CANVAS_START]` et `[CANVAS_END]`. " "Ne rĂ©ponds pas avec du texte en dehors de ces marqueurs, sauf une courte phrase de confirmation aprĂšs le canvas. " "Maintiens le contexte des modifications prĂ©cĂ©dentes du canvas, en te basant sur l'historique de ce chat canvas." )}, {"role": "assistant", "content": f"[CANVAS_START]\n{canvas_content}\n[CANVAS_END] Voici le canvas actuel. Dites-moi comment le modifier."}, {"role": "user", "content": user_message} ] @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def canvas_modify_groq_call(canvas_chat_messages, model): return open_router_chat_completion(canvas_chat_messages, model, 0.0) try: response_data = canvas_modify_groq_call(canvas_chat_messages, model) if isinstance(response_data, dict): ai_response_text = response_data.get("choices", [{}])[0].get("message", {}).get("content", "") or "" else: ai_response_text = response_data # Si c'est une chaĂźne, l'utiliser directement if CANVAS_START_MARKER in ai_response_text and CANVAS_END_MARKER in ai_response_text: start_index = ai_response_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = ai_response_text.find(CANVAS_END_MARKER) updated_canvas_content = ai_response_text[start_index:end_index].strip() response_message = "Canvas mis Ă jour." logger.info("Canvas modifiĂ© avec succĂšs.") return jsonify({"updated_canvas_content": updated_canvas_content, "response_message": response_message}) else: logger.error("RĂ©ponse IA canvas_modify invalide: marqueurs CANVAS_START/END manquants.") return jsonify({"error": "RĂ©ponse IA invalide pour la modification du canvas", "full_response": ai_response_text}), 500 except Exception as e: error_message = f"Erreur serveur (canvas_modify): {str(e)}" logger.exception(error_message) return jsonify({"error": error_message}), 500 @app.route('/run', methods=['POST']) def run_code_endpoint() -> Response: """ Point d'accĂšs pour exĂ©cuter du code via l'API Piston (pour canvas code). """ data = request.get_json() if not data: logger.warning("RequĂȘte run sans donnĂ©es.") return jsonify({"error": "RequĂȘte run invalide : corps de requĂȘte manquant."}), 400 language = data.get("language") code = data.get("code") stdin = data.get("stdin", "") if not language or not code: logger.warning("RequĂȘte run incomplĂšte: language=%s, code=%s", language, code) return jsonify({"error": "RequĂȘte run invalide : langage et code requis."}), 400 payload = { "language": language, "version": "*", "files": [{"content": code}], "stdin": stdin } try: piston_response = requests.post(PISTON_API_URL, json=payload, timeout=3000000) piston_response.raise_for_status() logger.info("Code exĂ©cutĂ© avec Piston avec succĂšs.") return jsonify(piston_response.json()) except requests.RequestException as e: error_message = f"Erreur lors de l'appel Ă l'API Piston: {str(e)}" logger.exception(error_message) piston_text = piston_response.text if 'piston_response' in locals() and piston_response is not None else "" return jsonify({"error": "Erreur d'exĂ©cution du code via l'API Piston.", "piston_response": piston_text, "details": error_message}), 500 except Exception as e: error_message = f"Erreur inattendue lors de l'exĂ©cution du code: {str(e)}" logger.exception(error_message) return jsonify({"error": "Erreur inattendue lors de l'exĂ©cution du code.", "details": error_message}), 500 @app.route('/run', methods=['OPTIONS']) def run_code_options() -> Response: """GĂšre les requĂȘtes OPTIONS pour l'endpoint /run (CORS preflight).""" return jsonify({'message': 'OPTIONS requests allowed'}), 200 @app.route('/upload', methods=['POST']) def upload_file(): # Upload de fichier similaire Ă dp.py if 'file' not in request.files: return jsonify({"error": "No file part"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"error": "No selected file"}), 400 filename = secure_filename(file.filename) upload_folder = app.config['UPLOAD_FOLDER'] if not os.path.exists(upload_folder): os.makedirs(upload_folder) file_path = os.path.join(upload_folder, filename) try: file.save(file_path) return jsonify({"message": "File uploaded successfully", "file_path": file_path}), 200 except Exception as e: logger.exception("File upload failed") return jsonify({"error": str(e)}), 500 def format_sse_event(data: Dict[str, Any]) -> str: """Formate les donnĂ©es en Ă©vĂ©nement SSE (Server-Sent Events).""" return f"data: {json.dumps(data)}\n\n" if __name__ == '__main__': if os.environ.get('FLASK_ENV') == 'development': app.run(debug=True, port=5000)
This version should no longer trigger the ââstrâ object has no attribute âgetââ error.