j'ai créé une queqête dans le CLI peux tu l'ajoute...
Créé le : 16 février 2025
Répondu en utilisant O3 Mini High par Chat01
Créé le : 16 février 2025
Répondu en utilisant O3 Mini High par Chat01
j'ai créé une queqête dans le CLI peux tu l'ajouter dans mon backend car je n'aime pas la recherche internect actuel stp change app.py pour une amélioration et des recherche plus peritnant exactement comme cella qui sont configuer dans le chat :
"""
import json
import requests
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import datetime
from tqdm import tqdm
from typing import List, Dict
CONFIG = {
'openrouter_api_key': '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',
'max_results': 7,
'cache_file': 'search_cache.json',
'model': 'google/gemini-2.0-pro-exp-02-05:free',
'timeout': 20,
'min_quality_score': 0.65
}
class DataCollector:
@staticmethod
def web_search(query: str) -> List[Dict]:
with DDGS() as ddgs:
results = ddgs.text(query, region='fr-fr', max_results=CONFIG['max_results'])
return [{
'url': r['href'],
'title': r['title'],
'domain': urlparse(r['href']).netloc.replace('www.', '')
} for r in results if urlparse(r['href']).scheme in ['http', 'https']]
class ContentProcessor:
def init(self):
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
textdef fetch_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: print(f"Erreur sur {url}: {str(e)}") return ''
class AICore:
@staticmethod
def query_ai(system_prompt: str, user_content: str, temp: float = 0.3) -> str:
headers = {
"Authorization": f"Bearer {CONFIG['openrouter_api_key']}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/your-repo",
"X-Title": "CLI Search AI"
}
textdata = { "model": CONFIG['model'], "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "temperature": temp, "max_tokens": 2000 } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=CONFIG['timeout']) return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Erreur API: {str(e)}"
class ConsoleSearch:
def init(self):
self.cache = self.load_cache()
self.processor = ContentProcessor()
textdef load_cache(self) -> Dict: try: with open(CONFIG['cache_file'], 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} def save_cache(self): with open(CONFIG['cache_file'], 'w') as f: json.dump(self.cache, f) def get_user_input(self): print("\n" + "="*40) print("🔍 Assistant de Recherche Historique") print("="*40) return ( input("\nEntrez votre recherche : ").strip(), input("\nMode verbeux ? (o/n) : ").lower() == 'o', input("\nFormat de sortie (text/json) [text] : ").lower() or 'text', int(input("\nNombre de sources à utiliser [3] : ") or 3) ) def _filter_content(self, content: str, query: str) -> bool: """Filtre les contenus non pertinents""" prompt = f"Ce contenu est-il pertinent pour '{query}' ? Répondre uniquement par oui/non" response = AICore.query_ai(prompt, content[:2000], temp=0.0) return response.lower().startswith('oui') def run(self): query, verbose, fmt, sources = self.get_user_input() if verbose: print(f"\nDémarrage de la recherche pour : {query}") print(f"Paramètres : {sources} sources | Format {fmt.upper()}\n") results = self._process_search(query, verbose) report = self.generate_report(query, results, sources, verbose) if fmt == 'json': print("\n" + json.dumps(report, indent=2, ensure_ascii=False)) else: self._display_text_report(report) self.save_cache() def _process_search(self, query: str, verbose: bool) -> List[Dict]: if query in self.cache: if verbose: print("\nUtilisation du cache...") return self.cache[query] results = DataCollector.web_search(query) valid_results = [] for result in tqdm(results, desc="Analyse des sources", disable=not verbose): content = self.processor.fetch_content(result['url']) if content and self._filter_content(content, query): result['content'] = content valid_results.append(result) if len(valid_results) >= CONFIG['max_results']: break self.cache[query] = valid_results return valid_results def generate_report(self, query: str, results: List[Dict], max_sources: int, verbose: bool) -> Dict: """Génère un rapport de style ChatGPT""" system_prompt = f"""**Rôle:** Expert en histoire
Consignes:
Exemple de format:
Point important 1 [Source1]
Détail complémentaire [Source2]
Conclusion synthétique [Source3]"""
textsources_text = "\n\n".join([f"[Source {i+1} - {r['domain']}]\n{r['content'][:1000]}" for i, r in enumerate(results[:max_sources])]) if verbose: print("\nGénération du rapport via IA...") report_text = AICore.query_ai(system_prompt, sources_text) return { 'query': query, 'date': datetime.datetime.now().isoformat(), 'sources_used': list(set(r['domain'] for r in results[:max_sources])), 'report': report_text }
def _display_text_report(self, report: Dict):
print("\n" + "="*40)
print(f"{report['report']}")
print("\n" + "-"*40)
print(f"📅 Date: {datetime.datetime.now().strftime('%d/%m/%Y %H:%M')}")
print(f"🔗 Sources: {', '.join(report['sources_used'])}")
print("="*40)
if name == "main":
console = ConsoleSearch()
console.run()
"""
je veux que ce sois le meme tu utilise la meme methode de scrapping le meme modele et tu fait cohabiter groq et open router pour que quand je fasse une reponse avec la recherche activé le resultat du chat sois le meme que le resultat de la recherche dans le cli en therme d'equivalence stp
from gevent import monkey
monkey.patch_all()
import nltk
import os
import re
import json
import logging
from typing import Any, Dict, List, Optional, Tuple, Generator
import time
from flask import Flask, request, Response, jsonify, stream_with_context
from flask_cors import CORS
from groq import Groq
from e2b_code_interpreter import Sandbox
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import datetime
from tqdm import tqdm
from typing import List, Dict
CONFIG = {
'openrouter_api_key': os.environ.get("OPENROUTER_API_KEY"), # From environment variable
'max_results': 7, # Increased to 7 to match original code
'cache_file': 'search_cache.json', # Not used in backend, can be removed or repurposed
'model': 'google/gemini-2.0-pro-exp-02-05:free', # Same model as CLI version
'timeout': 20, # Increased to 20 to match original code
'min_quality_score': 0.65 # Placeholder, can be used for content filtering later
}
class DataCollector:
@staticmethod
def web_search(query: str) -> List[Dict]:
with DDGS() as ddgs:
results = ddgs.text(query, region='fr-fr', max_results=CONFIG['max_results'])
return [{
'url': r['href'],
'title': r['title'],
'domain': urlparse(r['href']).netloc.replace('www.', '')
} for r in results if urlparse(r['href']).scheme in ['http', 'https']]
class ContentProcessor:
def init(self):
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
textdef fetch_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: print(f"Erreur sur {url}: {str(e)}") return ''
class AICore:
@staticmethod
def query_ai(system_prompt: str, user_content: str, temp: float = 0.0) -> str:
# Essayer d'abord avec Groq
try:
response = groq_client.chat.completions.create(
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content}
],
model="mixtral-8x7b-32768",
temperature=temp,
max_tokens=1000
)
return response.choices[0].message.content
except Exception as e:
# Fallback sur OpenRouter si Groq échoue
try:
headers = {
"Authorization": f"Bearer {os.environ.get('OPENROUTER_API_KEY')}",
"Content-Type": "application/json",
"HTTP-Referer": "https://pl1nkochat.com",
"X-Title": "Pl1nkochat"
}
textdata = { "model": "google/gemini-2.0-pro-exp-02-05:free", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "temperature": temp, "max_tokens": 1000 } response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=CONFIG['timeout']) return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Erreur API: {str(e)}"
class ContentFilter: # Class for content filtering logic
@staticmethod
def _filter_content(content: str, query: str) -> bool:
"""Filtre les contenus non pertinents"""
prompt = f"Ce contenu est-il pertinent pour '{query}' ? Répondre uniquement par oui/non"
response = AICore.query_ai(prompt, content[:2000], temp=0.0) # Reusing AICore for filtering
return response.lower().startswith('oui')
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
app = Flask(name)
CORS(app)
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
E2B_API_KEY = os.environ.get("E2B_API_KEY")
PISTON_API_URL = "https://emkc.org/api/v2/piston/execute"
groq_client = Groq(api_key=GROQ_API_KEY)
sandbox = Sandbox(api_key=E2B_API_KEY, timeout=3000)
VALID_MODELS = [
"llama-3.3-70b-versatile", "distil-whisper-large-v3-en", "gemma2-9b-it", "llama-3.1-8b-instant",
"llama-guard-3-8b", "llama3-70b-8192", "llama3-8b-8192", "mixtral-8x7b-32768", "whisper-large-v3",
"whisper-large-v3-turbo", "deepseek-r1-distill-llama-70b-specdec", "deepseek-r1-distill-llama-70b",
"llama-3.3-specdec", "llama-3.2-1b-preview", "llama-3.2-3b-preview", "llama-3.2-11b-vision-preview",
"llama-3.2-90b-vision-preview", "llama3-70b-8192"
]
CODE_PATTERN = re.compile(r"python\s*([\s\S]*?)
")
CODE_REQUIRED_MARKER = "[CODE_REQUIRED]"
CANVAS_START_MARKER = "[CANVAS_START]"
CANVAS_END_MARKER = "[CANVAS_END]"
def extract_code_blocks(text: str) -> Tuple[Optional[str], Optional[str]]:
"""Extrait les blocs de code Python et gère le marqueur [CODE_REQUIRED]."""
if CODE_REQUIRED_MARKER in text:
text_no_marker = text.replace(CODE_REQUIRED_MARKER, "", 1).strip()
code_match = CODE_PATTERN.search(text_no_marker)
if code_match:
return code_match.group(1).strip(), None
else:
code_match = CODE_PATTERN.search(text)
if code_match:
return None, code_match.group(1).strip() # Code à afficher sans exécution
return None, None
def run_code_in_sandbox(code: str) -> str:
"""Exécute le code Python dans le sandbox e2b et retourne le résultat."""
start_time = time.time()
try:
logger.info("Début de l'exécution du code dans le sandbox e2b.")
execution = None
method_name = next((method for method in ["run_code_sync", "execute_sync", "execute_code", "execute", "run_code", "run"] if hasattr(sandbox, method)), None)
if method_name:
logger.info(f"Utilisation de la méthode sandbox.{method_name}")
execution = getattr(sandbox, method_name)(code)
else:
error_msg = "Aucune méthode d'exécution de code disponible dans le sandbox."
logger.error(error_msg)
return error_msg
logs = execution.logs
stdout_logs = logs.stdout if hasattr(logs, 'stdout') else []
stderr_logs = logs.stderr if hasattr(logs, 'stderr') else []
stdout = "\n".join(stdout_logs)
stderr = "\n".join(stderr_logs)
if stderr:
return f"Stdout:\n{stdout}\n\nStderr:\n{stderr}"
return stdout
except Exception as e:
duration = time.time() - start_time
error_msg = f"Erreur lors de l'exécution du code dans e2b après {duration:.2f} secondes: {str(e)}"
logger.exception(error_msg)
return f"Erreur exécution code: {str(e)}"
def extract_numeric_result(exec_result: str) -> Optional[str]:
"""Extrait la dernière ligne numérique du résultat d'exécution de code."""
if "Stdout:" in exec_result:
stdout_section = exec_result.split("Stdout:\n", 1)[1].split("\n\nStderr:")[0].strip()
else:
stdout_section = exec_result.strip()
lines = [l.strip() for l in stdout_section.splitlines() if l.strip()]
if not lines:
return None
last_line = lines[-1]
if re.search(r'\d', last_line):
return last_line
return None
@app.route('/models', methods=['GET'])
def get_models() -> Response:
"""Retourne la liste des modèles valides."""
return jsonify({"models": VALID_MODELS})
@app.route('/chat', methods=['POST'])
def chat_endpoint() -> Response:
"""
Point d'entrée principal pour le chat.
Gère le streaming SSE, l'exécution de code et les canvas, et maintenant la recherche internet.
"""
@stream_with_context
@retry(stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(Exception))
def stream_response():
data = request.get_json()
if not data:
logger.warning("Requête chat invalide: corps de requête manquant.")
yield format_sse_event({"type": "error", "content": "Requête chat invalide : corps de requête manquant."})
return
textmessages = data.get('messages', []) model: str = data.get('model', 'llama3-70b-8192').strip() execute_code: bool = data.get('execute_code', True) enable_web_search: bool = data.get('web_search', False) # Get web_search parameter if not messages or not isinstance(messages, list) or not model or model not in VALID_MODELS: logger.warning("Requête chat invalide: messages=%s, model=%s", messages, model) yield format_sse_event({"type": "error", "content": "Requête chat invalide : messages ou modèle incorrects."}) return user_query = messages[-1]['content'] if messages else "" # Extract user query for web search # Constructing the system prompt base_prompt = ( "🎯 **Rôle : Expert en calcul numérique précis et assistant polyvalent avec canvas interactifs (texte et code) et recherche internet.**\n" "**Mission :** Répondre aux questions, effectuer des calculs numériques exacts, et assister l'utilisateur avec des canvas éditables et interactifs pour le **texte** (avec formatage Markdown) et le **code**. " "Maintiens le contexte de conversation, en particulier autour du code, des canvas et des résultats précédents.\n\n" "**Si la question DEMANDE une recherche internet (indiqué par l'utilisateur) :**\n" "1. Effectue une recherche internet discrète en utilisant des mots-clés pertinents de la question.\n" "2. Utilise les informations trouvées pour enrichir ta réponse et fournir des informations à jour ou contextuelles.\n" "3. Cite les sources web utilisées sous forme de liens à la fin de ta réponse.\n\n" # Instructions for web search "**Si la question DEMANDE un CALCUL NUMÉRIQUE :**\n" "1. Réponds d'abord avec une courte phrase d'introduction expliquant que tu vas utiliser un code Python pour résoudre le problème. **Sois clair sur le contexte du calcul, en te référant si nécessaire aux étapes précédentes de la conversation et aux données ou résultats déjà obtenus.**\n" "2. Sur la ligne suivante, ajoute TOUJOURS le marqueur `[CODE_REQUIRED]` pour indiquer qu'un code va suivre.\n" "3. Ensuite, fournis le **BLOC DE CODE PYTHON** minimal, lisible et correct pour effectuer UNIQUEMENT LE CALCUL demandé et afficher UNIQUEMENT LE RÉSULTAT NUMÉRIQUE via `print(resultat_numerique)`.\n" "**IMPORTANT :** Ne génère RIEN D'AUTRE que ces 3 éléments, et dans cet ordre précis. Arrête-toi **ICI**. Attends l'exécution du code.\n\n" "**[INSTRUCTION SUITE - APRES EXECUTION DU CODE]**\n" "Après avoir fourni le résultat du code, CONTINUE la conversation pour expliquer le résultat EN DETAILS, en explicitant clairement et précisément **LE Résultat du code Python** que tu viens d'utiliser pour le calcul. Il faut que l'explication du résultat réponde à la question de l'utilisateur.\n\n" "**Si la question DEMANDE la CRÉATION ou MODIFICATION de CODE dans un canvas :**\n" "1. Indique clairement que tu vas créer un **canvas de code** interactif pour cela. Précise que l'utilisateur pourra te demander des modifications dans un 'chat de canvas' dédié qui va s'ouvrir.\n" "2. Encadre le contenu initial du canvas avec les marqueurs `[CANVAS_START]` et `[CANVAS_END]`.\n" " Exemple : 'Très bien, un canvas de code interactif est lancé pour un quick sort en C. Vous pouvez maintenant me demander des modifications directement dans le chat de canvas qui vient de s'ouvrir :\n" "[CANVAS_START]\n```c\n// Code Quick Sort en C\n...\n```\n[CANVAS_END]'\n" "3. **IMPORTANT :** Ne fournis PAS de code markdown ```python ... ``` après le canvas, sauf si cela est explicitement demandé et pertinent.\n\n" "**Si la question DEMANDE la CRÉATION ou MODIFICATION de TEXTE RICHE (avec markdown) dans un canvas :**\n" "1. Indique clairement que tu vas créer un **canvas de texte** interactif pour cela. Précise que l'utilisateur pourra te demander des modifications dans un 'chat de canvas' dédié.\n" "2. Encadre le contenu initial du canvas de texte avec les marqueurs `[CANVAS_START]` et `[CANVAS_END]`.\n" " Exemple : 'Très bien, un canvas de texte interactif est lancé. Le texte supporte le **gras**, l'*italique*, les listes, etc. Vous pouvez maintenant me demander des modifications directement dans le chat de canvas qui vient de s'ouvrir :\n" "[CANVAS_START]\n# Titre principal en gras\n\n* Item 1\n* Item 2\n\n[CANVAS_END]'\n" "3. **IMPORTANT :** Ne fournis PAS de code markdown ```python ... ``` après le canvas de texte, sauf si cela est explicitement demandé.\n\n" "**Si l'utilisateur pose une question dans le 'chat de canvas' :**\n" " - Interprète sa demande comme une instruction pour MODIFIER le contenu du canvas actuel.\n" " - Fournis le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR encadré par `[CANVAS_START]` et `[CANVAS_END]`.\n" " - Ajoute une courte phrase de confirmation indiquant que le canvas a été mis à jour.\n\n" "**Si la question NE DEMANDE PAS de CALCUL NUMÉRIQUE NI DE CANVAS NI DE RECHERCHE WEB :**\n" # Added web search condition "Réponds directement et clairement à la question en tenant compte du contexte de la conversation.\n" "Si tu dois donner un exemple de code, utilise un bloc markdown ```python ... ``` MAIS SANS JAMAIS ajouter `[CODE_REQUIRED]` ni de canvas." ) system_prompt = {"role": "system", "content": base_prompt} messages_with_system_prompt = [system_prompt] + messages full_reply_text = "" code_to_execute = None code_to_display = None is_code_used = False canvas_content = None code_executed = False prompt_tokens = 0 completion_tokens = 0 total_tokens = 0 initial_response_completed = False web_search_results = [] # To store web search results yield format_sse_event({"type": "analysis_status", "content": "start"}) # Perform web search if enabled if enable_web_search: yield format_sse_event({"type": "web_search_status", "content": "start"}) yield format_sse_event({"type": "web_search", "content": {"query": user_query}}) # Send query to frontend for display search_results_raw = DataCollector.web_search(user_query) valid_search_results = [] for result in tqdm(search_results_raw, desc="Fetching Content", disable=True): # disable=True for backend content = ContentProcessor().fetch_content(result['url']) if content and ContentFilter._filter_content(content, user_query): # Using ContentFilter for filtering result['content'] = content valid_search_results.append(result) if len(valid_search_results) >= CONFIG['max_results']: break web_search_results = valid_search_results web_sources_urls = [res['url'] for res in web_search_results] # Extract URLs for sources event yield format_sse_event({"type": "web_sources", "content": web_sources_urls}) # Send sources URLs to frontend yield format_sse_event({"type": "web_search_status", "content": "end"}) # Generate AI report from web search results like CLI version system_prompt_report = f"""**Rôle:** Expert en recherche d'informations
Consignes:
Structure claire avec titres ##
Utilise exclusivement le titre: "{user_query}"
Liste à puces avec informations clés
Sources entre [crochets]
Maximum {len(web_search_results)} sources
Style naturel et fluide"""
textsources_text = "\n\n".join([f"[Source {i+1} - {r['domain']}]\n{r['content'][:1000]}" for i, r in enumerate(web_search_results)]) yield format_sse_event({"type": "web_analysis_status", "content": "start"}) report_text = AICore.query_ai(system_prompt_report, sources_text) yield format_sse_event({"type": "web_report", "content": report_text}) yield format_sse_event({"type": "web_analysis_status", "content": "end"}) # Add sources to final response sources_domains = list(set(r['domain'] for r in web_search_results)) yield format_sse_event({"type": "web_sources", "content": sources_domains}) try: response_stream = groq_client.chat.completions.create( model=model, messages=messages_with_system_prompt, temperature=0.0, stream=True, ) for chunk in response_stream: if initial_response_completed: break delta_content = chunk.choices[0].delta.content or "" delta_content_no_marker = delta_content.replace(CODE_REQUIRED_MARKER, "") full_reply_text += delta_content_no_marker # Vérifier et extraire le canvas s'il est présent if CANVAS_START_MARKER in full_reply_text and CANVAS_END_MARKER in full_reply_text: start_index = full_reply_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = full_reply_text.find(CANVAS_END_MARKER) canvas_content_extracted = full_reply_text[start_index:end_index].strip() canvas_content = canvas_content_extracted delta_content_for_chat = delta_content_no_marker.replace(CANVAS_START_MARKER, "").replace(CANVAS_END_MARKER, "") yield format_sse_event({"type": "text", "content": delta_content_for_chat}) yield format_sse_event({"type": "canvas_start", "content": canvas_content}) continue else: yield format_sse_event({"type": "text", "content": delta_content_no_marker}) code_to_execute, code_to_display = extract_code_blocks(full_reply_text) if code_to_execute: initial_response_completed = True if execute_code and code_to_execute: is_code_used = True logger.info("Envoi événement CODE_BLOCK") yield format_sse_event({"type": "code_block", "content": code_to_execute}) yield format_sse_event({"type": "code_execution_start"}) execution_result = run_code_in_sandbox(code_to_execute) numeric_result = extract_numeric_result(execution_result) if numeric_result: code_executed = True logger.info("Envoi événement EXECUTED_CODE_RESULT") yield format_sse_event({"type": "executed_code_result", "content": numeric_result}) conclusion_prompt_content = ( f"**[INSTRUCTION SUITE - APRES EXECUTION DU CODE]** Tu as exécuté le code Python suivant:\n" f"```python\n{code_to_execute}\n```\n" f"Le résultat du calcul est: **{numeric_result}**. " "Maintenant, continue pour une conversation logique en te basant sur la question initiale." ) conclusion_prompt = [{"role": "user", "content": conclusion_prompt_content}] conclusion_stream = groq_client.chat.completions.create( model=model, messages=conclusion_prompt, temperature=0.0, stream=True, ) for chunk in conclusion_stream: delta_content = chunk.choices[0].delta.content or "" yield format_sse_event({"type": "text", "content": delta_content}) yield format_sse_event({"type": "final"}) else: code_executed = True logger.info("Envoi événement EXECUTED_CODE_ERROR") yield format_sse_event({"type": "executed_code_error", "content": execution_result}) yield format_sse_event({"type": "final", "result": "Fin de la réponse (erreur ou pas de résultat numérique)."}) prompt_tokens = 0 # Comptage non implémenté completion_tokens = 0 total_tokens = prompt_tokens + completion_tokens if not code_executed: logger.info("Envoi événement FINAL (sans code exécuté)") yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) else: yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) except Exception as e: error_message = f"Erreur serveur IA: {str(e)}" logger.exception(error_message) yield format_sse_event({"type": "error", "content": error_message}) finally: yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "done", "is_code_execution": is_code_used})
return Response(stream_with_context(stream_response()), mimetype='text/event-stream')
@app.route('/canvas_modify', methods=['POST'])
def canvas_modify_endpoint() -> Response:
data = request.get_json()
if not data:
logger.warning("Requête canvas_modify sans données.")
return jsonify({"error": "Requête canvas_modify invalide : corps de requête manquant."}), 400
user_message = data.get('message')
canvas_content = data.get('canvas_content')
model = data.get('model', 'llama3-70b-8192').strip()
if not user_message or canvas_content is None:
logger.warning("Requête canvas_modify incomplète: message=%s, canvas_content=%s", user_message, canvas_content)
return jsonify({"error": "Requête canvas_modify invalide : message ou contenu canvas manquant."}), 400
canvas_chat_messages = [
{"role": "system", "content": (
"Tu es dans un chat de canvas. L'utilisateur va te demander de modifier le contenu du canvas suivant. "
"Fournis UNIQUEMENT le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR, encadré par [CANVAS_START]
et [CANVAS_END]
. "
"Ne réponds pas avec du texte en dehors de ces marqueurs, sauf une courte phrase de confirmation après le canvas. "
"Maintiens le contexte des modifications précédentes du canvas, en te basant sur l'historique de ce chat canvas."
)},
{"role": "assistant", "content": f"[CANVAS_START]\n{canvas_content}\n[CANVAS_END] Voici le canvas actuel. Dites-moi comment le modifier."},
{"role": "user", "content": user_message}
]
text@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def canvas_modify_groq_call(canvas_chat_messages, model): return groq_client.chat.completions.create( model=model, messages=canvas_chat_messages, temperature=0.0, stream=False, ) try: response_stream = canvas_modify_groq_call(canvas_chat_messages, model) ai_response_text = response_stream.choices[0].message.content or "" if CANVAS_START_MARKER in ai_response_text and CANVAS_END_MARKER in ai_response_text: start_index = ai_response_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = ai_response_text.find(CANVAS_END_MARKER) updated_canvas_content = ai_response_text[start_index:end_index].strip() response_message = "Canvas mis à jour." logger.info("Canvas modifié avec succès.") return jsonify({"updated_canvas_content": updated_canvas_content, "response_message": response_message}) else: logger.error("Réponse IA canvas_modify invalide: marqueurs CANVAS_START/END manquants.") return jsonify({"error": "Réponse IA invalide pour la modification du canvas", "full_response": ai_response_text}), 500 except Exception as e: error_message = f"Erreur serveur (canvas_modify): {str(e)}" logger.exception(error_message) return jsonify({"error": error_message}), 500
@app.route('/run', methods=['POST'])
def run_code_endpoint() -> Response:
"""
Point d'accès pour exécuter du code via l'API Piston (pour canvas code).
"""
data = request.get_json()
if not data:
logger.warning("Requête run sans données.")
return jsonify({"error": "Requête run invalide : corps de requête manquant."}), 400
language = data.get("language")
code = data.get("code")
stdin = data.get("stdin", "")
if not language or not code:
logger.warning("Requête run incomplète: language=%s, code=%s", language, code)
return jsonify({"error": "Requête run invalide : langage et code requis."}), 400
payload = {
"language": language,
"version": "*",
"files": [{"content": code}],
"stdin": stdin
}
try:
piston_response = requests.post(PISTON_API_URL, json=payload, timeout=3000000)
piston_response.raise_for_status()
logger.info("Code exécuté avec Piston avec succès.")
return jsonify(piston_response.json())
except requests.RequestException as e:
error_message = f"Erreur lors de l'appel à l'API Piston: {str(e)}"
logger.exception(error_message)
piston_text = piston_response.text if 'piston_response' in locals() and piston_response is not None else ""
return jsonify({"error": "Erreur d'exécution du code via l'API Piston.", "piston_response": piston_text, "details": error_message}), 500
except Exception as e:
error_message = f"Erreur inattendue lors de l'exécution du code: {str(e)}"
logger.exception(error_message)
return jsonify({"error": "Erreur inattendue lors de l'exécution du code.", "details": error_message}), 500
@app.route('/run', methods=['OPTIONS'])
def run_code_options() -> Response:
"""Gère les requêtes OPTIONS pour l'endpoint /run (CORS preflight)."""
return jsonify({'message': 'OPTIONS requests allowed'}), 200
def format_sse_event(data: Dict[str, Any]) -> str:
"""Formate les données en événement SSE (Server-Sent Events)."""
return f"data: {json.dumps(data)}\n\n"
if name == 'main':
try:
nltk.data.find('tokenizers/punkt_tab') # Check if it's already downloaded
print("NLTK punkt_tab tokenizer already downloaded.")
except LookupError:
print("Downloading NLTK punkt_tab tokenizer...")
nltk.download('punkt_tab', download_dir='/opt/render/nltk_data') # Specify a writable download directory if needed, or remove download_dir if default works
print("NLTK punkt_tab tokenizer downloaded.")
except Exception as e:
print(f"Error during NLTK punkt_tab download: {e}")
textif os.environ.get('FLASK_ENV') == 'development': app.run(debug=True, port=5000)
Voici ci-dessous une version mise à jour de votre fichier app.py qui intègre la même logique de recherche (scrapping, filtrage et génération de rapport) que votre CLI, tout en faisant cohabiter Groq et OpenRouter pour l'appel à l'API. Vous retrouverez notamment :
/chat
afin d’obtenir un résultat équivalent à celui de votre CLI.Vous pouvez copier ce code dans votre fichier app.py :
pythonfrom gevent import monkey monkey.patch_all() import nltk import os import re import json import logging from typing import Any, Dict, List, Optional, Tuple import time import datetime from tqdm import tqdm from flask import Flask, request, Response, jsonify, stream_with_context from flask_cors import CORS from groq import Groq from e2b_code_interpreter import Sandbox from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type # --- Intégration de la fonctionnalité de recherche --- import requests from duckduckgo_search import DDGS from bs4 import BeautifulSoup from urllib.parse import urlparse # Configuration pour la recherche (mêmes paramètres que la CLI) CONFIG = { 'openrouter_api_key': os.environ.get("OPENROUTER_API_KEY"), # clé API depuis les variables d'environnement 'max_results': 7, # nombre max de résultats pour la recherche 'cache_file': 'search_cache.json', # non utilisé dans le backend, réservé à une éventuelle extension 'model': 'google/gemini-2.0-pro-exp-02-05:free', # modèle pour OpenRouter 'timeout': 20, 'min_quality_score': 0.65 # critère de filtrage, à personnaliser si besoin } class DataCollector: @staticmethod def web_search(query: str) -> List[Dict]: with DDGS() as ddgs: results = ddgs.text(query, region='fr-fr', max_results=CONFIG['max_results']) return [{ 'url': r['href'], 'title': r['title'], 'domain': urlparse(r['href']).netloc.replace('www.', '') } for r in results if urlparse(r['href']).scheme in ['http', 'https']] class ContentProcessor: def __init__(self): self.headers = { 'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/91.0.4472.124 Safari/537.36') } def fetch_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') # Supprime les balises non pertinentes for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: print(f"Erreur sur {url}: {str(e)}") return '' class AICore: @staticmethod def query_ai(system_prompt: str, user_content: str, temp: float = 0.0) -> str: # Tentative avec Groq en priorité try: response = groq_client.chat.completions.create( messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], model="mixtral-8x7b-32768", temperature=temp, max_tokens=1000 ) return response.choices[0].message.content except Exception as e: # Fallback sur OpenRouter try: headers = { "Authorization": f"Bearer {os.environ.get('OPENROUTER_API_KEY')}", "Content-Type": "application/json", "HTTP-Referer": "https://pl1nkochat.com", "X-Title": "Pl1nkochat" } data = { "model": CONFIG['model'], "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "temperature": temp, "max_tokens": 1000 } response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=CONFIG['timeout']) return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Erreur API: {str(e)}" class ContentFilter: @staticmethod def _filter_content(content: str, query: str) -> bool: """Filtre les contenus non pertinents en s'appuyant sur l'IA.""" prompt = f"Ce contenu est-il pertinent pour '{query}' ? Répondre uniquement par oui/non" response = AICore.query_ai(prompt, content[:2000], temp=0.0) return response.lower().startswith('oui') # --- Fin de l'intégration de la recherche --- # Configuration du logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) CORS(app) # Clés API (variables d'environnement ou valeurs par défaut sécurisées) GROQ_API_KEY = os.environ.get("GROQ_API_KEY") E2B_API_KEY = os.environ.get("E2B_API_KEY") PISTON_API_URL = "https://emkc.org/api/v2/piston/execute" groq_client = Groq(api_key=GROQ_API_KEY) sandbox = Sandbox(api_key=E2B_API_KEY, timeout=3000) VALID_MODELS = [ "llama-3.3-70b-versatile", "distil-whisper-large-v3-en", "gemma2-9b-it", "llama-3.1-8b-instant", "llama-guard-3-8b", "llama3-70b-8192", "llama3-8b-8192", "mixtral-8x7b-32768", "whisper-large-v3", "whisper-large-v3-turbo", "deepseek-r1-distill-llama-70b-specdec", "deepseek-r1-distill-llama-70b", "llama-3.3-specdec", "llama-3.2-1b-preview", "llama-3.2-3b-preview", "llama-3.2-11b-vision-preview", "llama-3.2-90b-vision-preview", "llama3-70b-8192" ] CODE_PATTERN = re.compile(r"```python\s*([\s\S]*?)```") CODE_REQUIRED_MARKER = "[CODE_REQUIRED]" CANVAS_START_MARKER = "[CANVAS_START]" CANVAS_END_MARKER = "[CANVAS_END]" def extract_code_blocks(text: str) -> Tuple[Optional[str], Optional[str]]: """Extrait les blocs de code Python et gère le marqueur [CODE_REQUIRED].""" if CODE_REQUIRED_MARKER in text: text_no_marker = text.replace(CODE_REQUIRED_MARKER, "", 1).strip() code_match = CODE_PATTERN.search(text_no_marker) if code_match: return code_match.group(1).strip(), None else: code_match = CODE_PATTERN.search(text) if code_match: return None, code_match.group(1).strip() # code à afficher sans exécution return None, None def run_code_in_sandbox(code: str) -> str: """Exécute le code Python dans le sandbox e2b et retourne le résultat.""" start_time = time.time() try: logger.info("Début de l'exécution du code dans le sandbox e2b.") execution = None method_name = next((method for method in ["run_code_sync", "execute_sync", "execute_code", "execute", "run_code", "run"] if hasattr(sandbox, method)), None) if method_name: logger.info(f"Utilisation de la méthode sandbox.{method_name}") execution = getattr(sandbox, method_name)(code) else: error_msg = "Aucune méthode d'exécution de code disponible dans le sandbox." logger.error(error_msg) return error_msg logs = execution.logs stdout_logs = logs.stdout if hasattr(logs, 'stdout') else [] stderr_logs = logs.stderr if hasattr(logs, 'stderr') else [] stdout = "\n".join(stdout_logs) stderr = "\n".join(stderr_logs) if stderr: return f"Stdout:\n{stdout}\n\nStderr:\n{stderr}" return stdout except Exception as e: duration = time.time() - start_time error_msg = f"Erreur lors de l'exécution du code dans e2b après {duration:.2f} secondes: {str(e)}" logger.exception(error_msg) return f"Erreur exécution code: {str(e)}" def extract_numeric_result(exec_result: str) -> Optional[str]: """Extrait la dernière ligne numérique du résultat d'exécution de code.""" if "Stdout:" in exec_result: stdout_section = exec_result.split("Stdout:\n", 1)[1].split("\n\nStderr:")[0].strip() else: stdout_section = exec_result.strip() lines = [l.strip() for l in stdout_section.splitlines() if l.strip()] if not lines: return None last_line = lines[-1] if re.search(r'\d', last_line): return last_line return None @app.route('/models', methods=['GET']) def get_models() -> Response: """Retourne la liste des modèles valides.""" return jsonify({"models": VALID_MODELS}) @app.route('/chat', methods=['POST']) def chat_endpoint() -> Response: """ Point d'entrée principal pour le chat. Gère le streaming SSE, l'exécution de code, les canvas, et désormais la recherche internet. """ @stream_with_context @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def stream_response(): data = request.get_json() if not data: logger.warning("Requête chat invalide: corps de requête manquant.") yield format_sse_event({"type": "error", "content": "Requête chat invalide : corps de requête manquant."}) return messages = data.get('messages', []) model: str = data.get('model', 'llama3-70b-8192').strip() execute_code: bool = data.get('execute_code', True) enable_web_search: bool = data.get('web_search', False) # activation de la recherche web if not messages or not isinstance(messages, list) or not model or model not in VALID_MODELS: logger.warning("Requête chat invalide: messages=%s, model=%s", messages, model) yield format_sse_event({"type": "error", "content": "Requête chat invalide : messages ou modèle incorrects."}) return user_query = messages[-1]['content'] if messages else "" # Construction du prompt système de base base_prompt = ( "🎯 **Rôle : Expert en calcul numérique précis et assistant polyvalent avec canvas interactifs et recherche internet.**\n" "**Mission :** Répondre aux questions de l'utilisateur, effectuer des calculs précis, fournir des canvas interactifs (texte et code) et, " "si demandé, enrichir la réponse avec une recherche internet pertinente. " "Cite toujours les sources web utilisées sous forme de liens à la fin de ta réponse.\n\n" "**Si une recherche internet est demandée :**\n" "1. Effectue discrètement une recherche sur internet avec des mots-clés pertinents.\n" "2. Utilise ces informations pour enrichir ta réponse.\n" "3. Présente un rapport synthétique avec les sources [entre crochets].\n\n" "**Si la question demande un calcul numérique :**\n" "1. Annonce que tu vas exécuter un code Python pour réaliser le calcul.\n" "2. Ajoute le marqueur `[CODE_REQUIRED]` suivi du bloc de code minimal en Python.\n" "3. N'envoie rien d'autre et attends l'exécution du code.\n\n" "**Après l'exécution du code**, explique le résultat en détails en te basant sur le résultat numérique obtenu.\n\n" "**Pour la création ou modification de canvas** (texte ou code) :\n" "Utilise les marqueurs `[CANVAS_START]` et `[CANVAS_END]` pour encadrer le contenu du canvas.\n\n" "Si aucune de ces conditions n'est remplie, réponds directement et clairement à la question." ) system_prompt = {"role": "system", "content": base_prompt} messages_with_system_prompt = [system_prompt] + messages full_reply_text = "" code_to_execute = None code_to_display = None is_code_used = False canvas_content = None code_executed = False prompt_tokens = 0 completion_tokens = 0 total_tokens = 0 initial_response_completed = False web_search_results = [] # Pour stocker les résultats de la recherche web yield format_sse_event({"type": "analysis_status", "content": "start"}) # Si la recherche web est activée, exécute la recherche et génère le rapport if enable_web_search: yield format_sse_event({"type": "web_search_status", "content": "start"}) yield format_sse_event({"type": "web_search", "content": {"query": user_query}}) search_results_raw = DataCollector.web_search(user_query) valid_search_results = [] processor = ContentProcessor() for result in tqdm(search_results_raw, desc="Fetching Content", disable=True): content = processor.fetch_content(result['url']) if content and ContentFilter._filter_content(content, user_query): result['content'] = content valid_search_results.append(result) if len(valid_search_results) >= CONFIG['max_results']: break web_search_results = valid_search_results # Envoi des URLs des sources pour affichage côté frontend web_sources_urls = [res['url'] for res in web_search_results] yield format_sse_event({"type": "web_sources", "content": web_sources_urls}) yield format_sse_event({"type": "web_search_status", "content": "end"}) # Génération du rapport via IA, similaire à la CLI system_prompt_report = f"""**Rôle:** Expert en recherche d'informations **Consignes:** 1. Structure claire avec titres ## 2. Utilise exclusivement le titre: "{user_query}" 3. Liste à puces avec informations clés 4. Sources entre [crochets] 5. Maximum {len(web_search_results)} sources 6. Style naturel et fluide""" sources_text = "\n\n".join([ f"[Source {i+1} - {r['domain']}]\n{r['content'][:1000]}" for i, r in enumerate(web_search_results) ]) yield format_sse_event({"type": "web_analysis_status", "content": "start"}) report_text = AICore.query_ai(system_prompt_report, sources_text) yield format_sse_event({"type": "web_report", "content": report_text}) yield format_sse_event({"type": "web_analysis_status", "content": "end"}) # Envoi des domaines sources dans la réponse finale sources_domains = list(set(r['domain'] for r in web_search_results)) yield format_sse_event({"type": "web_sources", "content": sources_domains}) try: response_stream = groq_client.chat.completions.create( model=model, messages=messages_with_system_prompt, temperature=0.0, stream=True, ) for chunk in response_stream: if initial_response_completed: break delta_content = chunk.choices[0].delta.content or "" delta_content_no_marker = delta_content.replace(CODE_REQUIRED_MARKER, "") full_reply_text += delta_content_no_marker # Extraction et envoi du canvas si présent if CANVAS_START_MARKER in full_reply_text and CANVAS_END_MARKER in full_reply_text: start_index = full_reply_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = full_reply_text.find(CANVAS_END_MARKER) canvas_content_extracted = full_reply_text[start_index:end_index].strip() canvas_content = canvas_content_extracted delta_content_for_chat = delta_content_no_marker.replace(CANVAS_START_MARKER, "").replace(CANVAS_END_MARKER, "") yield format_sse_event({"type": "text", "content": delta_content_for_chat}) yield format_sse_event({"type": "canvas_start", "content": canvas_content}) continue else: yield format_sse_event({"type": "text", "content": delta_content_no_marker}) code_to_execute, code_to_display = extract_code_blocks(full_reply_text) if code_to_execute: initial_response_completed = True if execute_code and code_to_execute: is_code_used = True logger.info("Envoi événement CODE_BLOCK") yield format_sse_event({"type": "code_block", "content": code_to_execute}) yield format_sse_event({"type": "code_execution_start"}) execution_result = run_code_in_sandbox(code_to_execute) numeric_result = extract_numeric_result(execution_result) if numeric_result: code_executed = True logger.info("Envoi événement EXECUTED_CODE_RESULT") yield format_sse_event({"type": "executed_code_result", "content": numeric_result}) conclusion_prompt_content = ( f"**[INSTRUCTION SUITE - APRES EXECUTION DU CODE]** Tu as exécuté le code Python suivant:\n" f"```python\n{code_to_execute}\n```\n" f"Le résultat du calcul est: **{numeric_result}**. " "Continue ensuite la conversation en t'appuyant sur ce résultat." ) conclusion_prompt = [{"role": "user", "content": conclusion_prompt_content}] conclusion_stream = groq_client.chat.completions.create( model=model, messages=conclusion_prompt, temperature=0.0, stream=True, ) for chunk in conclusion_stream: delta_content = chunk.choices[0].delta.content or "" yield format_sse_event({"type": "text", "content": delta_content}) yield format_sse_event({"type": "final"}) else: code_executed = True logger.info("Envoi événement EXECUTED_CODE_ERROR") yield format_sse_event({"type": "executed_code_error", "content": execution_result}) yield format_sse_event({"type": "final", "result": "Fin de la réponse (erreur ou pas de résultat numérique)."}) prompt_tokens = 0 # Comptage non implémenté completion_tokens = 0 total_tokens = prompt_tokens + completion_tokens if not code_executed: logger.info("Envoi événement FINAL (sans code exécuté)") yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) else: yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) except Exception as e: error_message = f"Erreur serveur IA: {str(e)}" logger.exception(error_message) yield format_sse_event({"type": "error", "content": error_message}) finally: yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "done", "is_code_execution": is_code_used}) return Response(stream_with_context(stream_response()), mimetype='text/event-stream') @app.route('/canvas_modify', methods=['POST']) def canvas_modify_endpoint() -> Response: data = request.get_json() if not data: logger.warning("Requête canvas_modify sans données.") return jsonify({"error": "Requête canvas_modify invalide : corps de requête manquant."}), 400 user_message = data.get('message') canvas_content = data.get('canvas_content') model = data.get('model', 'llama3-70b-8192').strip() if not user_message or canvas_content is None: logger.warning("Requête canvas_modify incomplète: message=%s, canvas_content=%s", user_message, canvas_content) return jsonify({"error": "Requête canvas_modify invalide : message ou contenu canvas manquant."}), 400 canvas_chat_messages = [ {"role": "system", "content": ( "Tu es dans un chat de canvas. L'utilisateur va te demander de modifier le contenu du canvas suivant. " "Fournis UNIQUEMENT le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR, encadré par `[CANVAS_START]` et `[CANVAS_END]`. " "Ne réponds pas avec du texte en dehors de ces marqueurs, sauf une courte phrase de confirmation après le canvas. " "Maintiens le contexte des modifications précédentes du canvas, en te basant sur l'historique de ce chat canvas." )}, {"role": "assistant", "content": f"[CANVAS_START]\n{canvas_content}\n[CANVAS_END] Voici le canvas actuel. Dites-moi comment le modifier."}, {"role": "user", "content": user_message} ] @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def canvas_modify_groq_call(canvas_chat_messages, model): return groq_client.chat.completions.create( model=model, messages=canvas_chat_messages, temperature=0.0, stream=False, ) try: response_stream = canvas_modify_groq_call(canvas_chat_messages, model) ai_response_text = response_stream.choices[0].message.content or "" if CANVAS_START_MARKER in ai_response_text and CANVAS_END_MARKER in ai_response_text: start_index = ai_response_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = ai_response_text.find(CANVAS_END_MARKER) updated_canvas_content = ai_response_text[start_index:end_index].strip() response_message = "Canvas mis à jour." logger.info("Canvas modifié avec succès.") return jsonify({"updated_canvas_content": updated_canvas_content, "response_message": response_message}) else: logger.error("Réponse IA canvas_modify invalide: marqueurs CANVAS_START/END manquants.") return jsonify({"error": "Réponse IA invalide pour la modification du canvas", "full_response": ai_response_text}), 500 except Exception as e: error_message = f"Erreur serveur (canvas_modify): {str(e)}" logger.exception(error_message) return jsonify({"error": error_message}), 500 @app.route('/run', methods=['POST']) def run_code_endpoint() -> Response: """ Point d'accès pour exécuter du code via l'API Piston (pour canvas code). """ data = request.get_json() if not data: logger.warning("Requête run sans données.") return jsonify({"error": "Requête run invalide : corps de requête manquant."}), 400 language = data.get("language") code = data.get("code") stdin = data.get("stdin", "") if not language or not code: logger.warning("Requête run incomplète: language=%s, code=%s", language, code) return jsonify({"error": "Requête run invalide : langage et code requis."}), 400 payload = { "language": language, "version": "*", "files": [{"content": code}], "stdin": stdin } try: piston_response = requests.post(PISTON_API_URL, json=payload, timeout=3000000) piston_response.raise_for_status() logger.info("Code exécuté avec Piston avec succès.") return jsonify(piston_response.json()) except requests.RequestException as e: error_message = f"Erreur lors de l'appel à l'API Piston: {str(e)}" logger.exception(error_message) piston_text = piston_response.text if 'piston_response' in locals() and piston_response is not None else "" return jsonify({"error": "Erreur d'exécution du code via l'API Piston.", "piston_response": piston_text, "details": error_message}), 500 except Exception as e: error_message = f"Erreur inattendue lors de l'exécution du code: {str(e)}" logger.exception(error_message) return jsonify({"error": "Erreur inattendue lors de l'exécution du code.", "details": error_message}), 500 @app.route('/run', methods=['OPTIONS']) def run_code_options() -> Response: """Gère les requêtes OPTIONS pour l'endpoint /run (CORS preflight).""" return jsonify({'message': 'OPTIONS requests allowed'}), 200 def format_sse_event(data: Dict[str, Any]) -> str: """Formate les données en événement SSE (Server-Sent Events).""" return f"data: {json.dumps(data)}\n\n" if __name__ == '__main__': try: nltk.data.find('tokenizers/punkt_tab') print("NLTK punkt_tab tokenizer déjà téléchargé.") except LookupError: print("Téléchargement du tokenizer NLTK punkt_tab...") nltk.download('punkt_tab', download_dir='/opt/render/nltk_data') print("NLTK punkt_tab tokenizer téléchargé.") except Exception as e: print(f"Erreur lors du téléchargement du tokenizer NLTK punkt_tab: {e}") if os.environ.get('FLASK_ENV') == 'development': app.run(debug=True, port=5000)
Ce fichier app.py met ainsi en œuvre :
N’hésitez pas à me signaler si vous souhaitez d’autres ajustements ou des précisions supplémentaires.