voici mon code complet : """ from gevent import mo...
Créé le : 14 février 2025
Répondu en utilisant O3 Mini High par Chat01
Créé le : 14 février 2025
Répondu en utilisant O3 Mini High par Chat01
voici mon code complet :
"""
from gevent import monkey
monkey.patch_all()
import requests
import os
import re
import json
import logging
from typing import Any, Dict, List, Optional, Tuple, Generator
import time
from flask import Flask, request, Response, jsonify, stream_with_context
from flask_cors import CORS
from groq import Groq
from e2b_code_interpreter import Sandbox
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from openai import OpenAI # MODIFICATION: Import OpenAI client instead of OpenRouter client
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from tqdm import tqdm # Import tqdm
import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
app = Flask(name)
CORS(app)
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
E2B_API_KEY = os.environ.get("E2B_API_KEY")
OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") # NOUVEAU
PISTON_API_URL = "https://emkc.org/api/v2/piston/execute"
groq_client = Groq(api_key=GROQ_API_KEY)
openrouter_client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=OPENROUTER_API_KEY,
)
sandbox = Sandbox(api_key=E2B_API_KEY, timeout=3000)
VALID_MODELS = [
# Modèles Groq
"llama3-70b-8192", # "llama-3.3-70b-versatile", "llama3-8b-8192", "mixtral-8x7b-32768", <- Removed Groq models except llama3-70b-8192
# NOUVEAU - Modèles OpenRouter
"google/gemini-pro", # "google/gemini-2.0-pro-exp-02-05:free", <- Changed Gemini model to google/gemini-pro; Removed anthropic/claude-3-opus
"meta-llama/llama-3-70b-instruct", "openai/gpt-4-turbo-preview"
]
CODE_PATTERN = re.compile(r"python\s*([\s\S]*?)
")
CODE_REQUIRED_MARKER = "[CODE_REQUIRED]"
CANVAS_START_MARKER = "[CANVAS_START]"
CANVAS_END_MARKER = "[CANVAS_END]"
WEB_SEARCH_MARKER = "[WEB_SEARCH]" # NOUVEAU
CONFIG = {
'openrouter_api_key': os.environ.get("OPENROUTER_API_KEY"), # Use existing API KEY
'max_results': 7, # As per your config
'cache_file': 'search_cache.json', # Caching is not implemented in this patch, but kept for potential future use
'model': 'google/gemini-pro', # Using gemini-pro as requested and in your valid models
'timeout': 20, # As per your config
'min_quality_score': 0.65 # Quality score is not used in this patch
}
class WebSearchHandler:
@staticmethod
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2))
def perform_search(query: str, max_results: int = 5) -> List[Dict]:
with DDGS() as ddgs:
results = ddgs.text(query, region='fr-fr', max_results=CONFIG['max_results']) # Use CONFIG max_results
return [{
'url': r['href'],
'title': r['title'],
'domain': urlparse(r['href']).netloc.replace('www.', ''),
} for r in results]
textdef fetch_page_content(self, url: str) -> str: # Make fetch_page_content an instance method try: headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} # Use more specific User-Agent response = requests.get(url, headers=self.headers, timeout=CONFIG['timeout']) # Use CONFIG timeout, and self.headers soup = BeautifulSoup(response.text, 'html.parser') # Nettoyage du contenu for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): # More comprehensive removal element.decompose() main_content = soup.find(['main', 'article']) or soup.body # Attempt to find main content return ' '.join(main_content.stripped_strings)[:5000] # Extract and limit content except Exception as e: logger.error(f"Erreur fetch_content {url}: {str(e)}") return ""
def generate_search_context(search_results: List[Dict]) -> str:
return "\n".join(
f"[Source {i+1} - {r['domain']}]\n{r['snippet']}\n{r.get('content', '')[:1000]}"
for i, r in enumerate(search_results)
)
def extract_code_blocks(text: str) -> Tuple[Optional[str], Optional[str]]:
"""Extrait les blocs de code Python et gère le marqueur [CODE_REQUIRED]."""
if CODE_REQUIRED_MARKER in text:
text_no_marker = text.replace(CODE_REQUIRED_MARKER, "", 1).strip()
code_match = CODE_PATTERN.search(text_no_marker)
if code_match:
return code_match.group(1).strip(), None
else:
code_match = CODE_PATTERN.search(text)
if code_match:
return None, code_match.group(1).strip() # Code à afficher sans exécution
return None, None
def run_code_in_sandbox(code: str) -> str:
"""Exécute le code Python dans le sandbox e2b et retourne le résultat."""
start_time = time.time()
try:
logger.info("Début de l'exécution du code dans le sandbox e2b.")
execution = None
method_name = next((method for method in ["run_code_sync", "execute_sync", "execute_code", "execute", "run_code", "run"] if hasattr(sandbox, method)), None)
if method_name:
logger.info(f"Utilisation de la méthode sandbox.{method_name}")
execution = getattr(sandbox, method_name)(code)
else:
error_msg = "Aucune méthode d'exécution de code disponible dans le sandbox."
logger.error(error_msg)
return error_msg
logs = execution.logs
stdout_logs = logs.stdout if hasattr(logs, 'stdout') else []
stderr_logs = logs.stderr if hasattr(logs, 'stderr') else []
stdout = "\n".join(stdout_logs)
stderr = "\n".join(stderr_logs)
if stderr:
return f"Stdout:\n{stdout}\n\nStderr:\n{stderr}"
return stdout
except Exception as e:
duration = time.time() - start_time
error_msg = f"Erreur lors de l'exécution du code dans e2b après {duration:.2f} secondes: {str(e)}"
logger.exception(error_msg)
return f"Erreur exécution code: {str(e)}"
def extract_numeric_result(exec_result: str) -> Optional[str]:
"""Extrait la dernière ligne numérique du résultat d'exécution de code."""
if "Stdout:" in exec_result:
stdout_section = exec_result.split("Stdout:\n", 1)[1].split("\n\nStderr:")[0].strip()
else:
stdout_section = exec_result.strip()
lines = [l.strip() for l in stdout_section.splitlines() if l.strip()]
if not lines:
return None
last_line = lines[-1]
if re.search(r'\d', last_line):
return last_line
return None
@app.route('/models', methods=['GET'])
def get_models() -> Response:
"""Retourne la liste des modèles valides."""
return jsonify({"models": VALID_MODELS})
@app.route('/web_search', methods=['POST']) # New endpoint for web search
def web_search_endpoint():
data = request.get_json()
if not data or not data.get('query'):
return jsonify({"error": "Missing query"}), 400
query = data['query']
try:
search_results = WebSearchHandler.perform_search(query)
content_processor = WebSearchHandler() # Instantiate content processor
for result in tqdm(search_results, desc="Fetching content from sources"): # Progress bar for content fetching
result['content'] = content_processor.fetch_page_content(result['url']) # Use instance to fetch content
return jsonify({"results": search_results}) # Return results as JSON
except Exception as e:
logger.error(f"Web search error: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/chat', methods=['POST'])
def chat_endpoint() -> Response:
@stream_with_context
@retry(stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(Exception))
def stream_response():
data = request.get_json()
if not data:
logger.warning("Requête chat invalide: corps de requête manquant.")
yield format_sse_event({"type": "error", "content": "Requête chat invalide : corps de requête manquant."})
return
textmessages = data.get('messages', []) model = data.get('model', 'llama3-70b-8192').strip() execute_code = data.get('execute_code', True) use_openrouter = data.get('use_openrouter', False) # NOUVEAU enable_web_search = data.get('web_search', False) # NOUVEAU web_search_results = data.get('search_results', []) # Get pre-fetched search results if not messages or not isinstance(messages, list) or not model or model not in VALID_MODELS: logger.warning("Requête chat invalide: messages=%s, model=%s", messages, model) yield format_sse_event({"type": "error", "content": "Requête chat invalide : messages ou modèle incorrects."}) return # NOUVEAU - Sélection du client IA ai_client = openrouter_client if use_openrouter else groq_client # Modification du prompt système - utilise le contexte de recherche SI DISPONIBLE system_prompt_additions = [] if enable_web_search and web_search_results: # Only add search context if web search was performed AND results are available search_context = generate_search_context(web_search_results) system_prompt_additions.append( "**CONTEXTE DE RECHERCHE WEB :**\n" f"{search_context}\n" "Utilise les informations ci-dessus pour répondre à la question de l'utilisateur. Cite les sources [source:domaine.com]." ) elif enable_web_search: # Inform AI to perform web search internally if pre-fetched results are NOT available (fallback) system_prompt_additions.append( "Si la question nécessite des informations actualisées ou externes :\n" "1. Utilise le marqueur [WEB_SEARCH] suivi de la requête de recherche\n" "2. Analyse les résultats et cite les sources avec [source:domaine.com]\n" "3. Garde les réponses concises et factuelles" ) base_prompt = ( "🎯 **Rôle : Expert en calcul numérique précis et assistant polyvalent avec canvas interactifs (texte et code).**\n" "**Mission :** Répondre aux questions, effectuer des calculs numériques exacts, et assister l'utilisateur avec des canvas éditables et interactifs pour le **texte** (avec formatage Markdown) et le **code**. " "Maintiens le contexte de conversation, en particulier autour du code, des canvas et des résultats précédents.\n\n" "**Si la question DEMANDE un CALCUL NUMÉRIQUE :**\n" "1. Réponds d'abord avec une courte phrase d'introduction expliquant que tu vas utiliser un code Python pour résoudre le problème. **Sois clair sur le contexte du calcul, en te référant si nécessaire aux étapes précédentes de la conversation et aux données ou résultats déjà obtenus.**\n" "2. Sur la ligne suivante, ajoute TOUJOURS le marqueur `[CODE_REQUIRED]` pour indiquer qu'un code va suivre.\n" "3. Ensuite, fournis le **BLOC DE CODE PYTHON** minimal, lisible et correct pour effectuer UNIQUEMENT LE CALCUL demandé et afficher UNIQUEMENT LE RÉSULTAT NUMÉRIQUE via `print(resultat_numerique)`.\n" "**IMPORTANT :** Ne génère RIEN D'AUTRE que ces 3 éléments, et dans cet ordre précis. Arrête-toi **ICI**. Attends l'exécution du code.\n\n" "**[INSTRUCTION SUITE - APRES EXECUTION DU CODE]**\n" "Après avoir fourni le résultat du code, CONTINUE la conversation pour expliquer le résultat EN DETAILS, en explicitant clairement et précisément **LE Résultat du code Python** que tu viens d'utiliser pour le calcul. Il faut que l'explication du résultat réponde à la question de l'utilisateur.\n\n" "**Si la question DEMANDE la CRÉATION ou MODIFICATION de CODE dans un canvas :**\n" "1. Indique clairement que tu vas créer un **canvas de code** interactif pour cela. Précise que l'utilisateur pourra te demander des modifications dans un 'chat de canvas' dédié qui va s'ouvrir.\n" "2. Encadre le contenu initial du canvas avec les marqueurs `[CANVAS_START]` et `[CANVAS_END]`.\n" " Exemple : 'Très bien, un canvas de code interactif est lancé pour un quick sort en C. Vous pouvez maintenant me demander des modifications directement dans le chat de canvas qui vient de s'ouvrir :\n" "[CANVAS_START]\n```c\n// Code Quick Sort en C\n...\n```\n[CANVAS_END]'\n" "3. **IMPORTANT :** Ne fournis PAS de code markdown ```python ... ``` après le canvas, sauf si cela est explicitement demandé et pertinent.\n\n" "**Si la question DEMANDE la CRÉATION ou MODIFICATION de TEXTE RICHE (avec markdown) dans un canvas :**\n" "1. Indique clairement que tu vas créer un **canvas de texte** interactif pour cela. Précise que l'utilisateur pourra te demander des modifications dans un 'chat de canvas' dédié.\n" "2. Encadre le contenu initial du canvas de texte avec les marqueurs `[CANVAS_START]` et `[CANVAS_END]`.\n" " Exemple : 'Très bien, un canvas de texte interactif est lancé. Le texte supporte le **gras**, l'*italique*, les listes, etc. Vous pouvez maintenant me demander des modifications directement dans le chat de canvas qui vient de s'ouvrir :\n" "[CANVAS_START]\n# Titre principal en gras\n\n* Item 1\n* Item 2\n\n[CANVAS_END]'\n" "3. **IMPORTANT :** Ne fournis PAS de code markdown ```python ... ``` après le canvas de texte, sauf si cela est explicitement demandé.\n\n" "**Si l'utilisateur pose une question dans le 'chat de canvas' :**\n" " - Interprète sa demande comme une instruction pour MODIFIER le contenu du canvas actuel.\n" " - Fournis le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR encadré par `[CANVAS_START]` et `[CANVAS_END]`.\n" " - Ajoute une courte phrase de confirmation indiquant que le canvas a été mis à jour.\n\n" "**Si la question NE DEMANDE PAS de CALCUL NUMÉRIQUE NI DE CANVAS :**\n" "Réponds directement et clairement à la question en tenant compte du contexte de la conversation.\n" "Si tu dois donner un exemple de code, utilise un bloc markdown ```python ... ``` MAIS SANS JAMAIS ajouter `[CODE_REQUIRED]` ni de canvas." ) + "\n".join(system_prompt_additions) system_prompt = {"role": "system", "content": base_prompt} messages_with_system = [system_prompt] + messages full_reply_text = "" code_to_execute = None code_to_display = None is_code_used = False canvas_content = None code_executed = False prompt_tokens = 0 completion_tokens = 0 total_tokens = 0 initial_response_completed = False web_search_triggered = False # NOUVEAU is_initial_web_search = False # NEW FLAG yield format_sse_event({"type": "analysis_status", "content": "start"}) # Check for initial web search request if messages and messages[0]['content'] == '[RECHERCHE WEB INITIALE]': # Detect initial web search request is_initial_web_search = True search_query = "actualités du jour" # Default query for initial web search yield format_sse_event({"type": "web_search", "query": search_query}) search_results = WebSearchHandler.perform_search(search_query) content_processor = WebSearchHandler() # Instantiate content processor for result in search_results: result['content'] = content_processor.fetch_page_content(result['url']) # Use instance to fetch content sources = [result['domain'] for result in search_results] yield format_sse_event({"type": "web_sources", "content": sources}) search_context = generate_search_context(search_results) # Send a final event to stop the initial loading indicator in frontend yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "final"}) # End the stream for initial search return # IMPORTANT: Exit stream after initial search try: # NOUVEAU - Gestion multi-fournisseur # MODIFICATION: Use openai_client for OpenRouter API calls response_stream = ai_client.chat.completions.create( extra_headers={ "HTTP-Referer": "https://pl1nkochat.onrender.com", # Replace with your actual site URL if needed "X-Title": "Plinko Chat", # Replace with your site title if needed }, model=model, messages=messages_with_system, temperature=0.0, stream=True ) for chunk in response_stream: if initial_response_completed: break delta_content = chunk.choices[0].delta.content or "" delta_content_no_marker = delta_content.replace(CODE_REQUIRED_MARKER, "") full_reply_text += delta_content_no_marker # NOUVEAU - Détection recherche web if enable_web_search and WEB_SEARCH_MARKER in full_reply_text: web_search_triggered = True search_query = full_reply_text.split(WEB_SEARCH_MARKER)[-1].strip() yield format_sse_event({"type": "web_search", "query": search_query}) logger.info(f"Extracted search query: '{search_query}'") # ADDED LOGGING if not search_query: # CHECK IF SEARCH QUERY IS EMPTY logger.warning("Web search triggered but no search query was extracted from AI response.") yield format_sse_event({"type": "error", "content": "Erreur de recherche web : requête de recherche manquante."}) # Inform frontend continue # Skip the search and continue processing events # Exécution recherche search_results = WebSearchHandler.perform_search(search_query) content_processor = WebSearchHandler() # Instantiate content processor for result in search_results: result['content'] = content_processor.fetch_page_content(result['url']) # Use instance to fetch content # Envoyer les sources au frontend sources = [result['domain'] for result in search_results] yield format_sse_event({"type": "web_sources", "content": sources}) # Génération contexte search_context = generate_search_context(search_results) # correction : generate_search_context avant la modif de messages_with_system # Modification structure messages messages_with_system = [ *messages_with_system, # ajout * ici pour unpack l'ancienne liste { "role": "user", "content": f"Résultats de recherche pour '{search_query}':\n{search_context}\nRéponds en citant les sources avec [source:domaine]" }] # Régénération réponse # MODIFICATION: Use openai_client for OpenRouter API calls here as well response_stream = ai_client.chat.completions.create( extra_headers={ "HTTP-Referer": "https://pl1nkochat.onrender.com", # Replace with your actual site URL if needed "X-Title": "Plinko Chat", # Replace with your site title if needed }, model=model, messages=messages_with_system, temperature=0.0, stream=True ) break # Important: sortir de la première boucle pour laisser la nouvelle response_stream prendre le relai # Vérifier et extraire le canvas s'il est présent if CANVAS_START_MARKER in full_reply_text and CANVAS_END_MARKER in full_reply_text: start_index = full_reply_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = full_reply_text.find(CANVAS_END_MARKER) canvas_content_extracted = full_reply_text[start_index:end_index].strip() canvas_content = canvas_content_extracted delta_content_for_chat = delta_content_no_marker.replace(CANVAS_START_MARKER, "").replace(CANVAS_END_MARKER, "") yield format_sse_event({"type": "text", "content": delta_content_for_chat}) yield format_sse_event({"type": "canvas_start", "content": canvas_content}) continue else: yield format_sse_event({"type": "text", "content": delta_content_no_marker}) code_to_execute, code_to_display = extract_code_blocks(full_reply_text) if code_to_execute: initial_response_completed = True # NOUVEAU - Suite après recherche web if web_search_triggered: for chunk in response_stream: delta_content = chunk.choices[0].delta.content or "" yield format_sse_event({"type": "text", "content": delta_content}) if execute_code and code_to_execute: is_code_used = True logger.info("Envoi événement CODE_BLOCK") yield format_sse_event({"type": "code_block", "content": code_to_execute}) yield format_sse_event({"type": "code_execution_start"}) execution_result = run_code_in_sandbox(code_to_execute) numeric_result = extract_numeric_result(execution_result) if numeric_result: code_executed = True logger.info("Envoi événement EXECUTED_CODE_RESULT") yield format_sse_event({"type": "executed_code_result", "content": numeric_result}) conclusion_prompt_content = ( f"**[INSTRUCTION SUITE - APRES EXECUTION DU CODE]** Tu as exécuté le code Python suivant:\n" f"```python\n{code_to_execute}\n```\n" f"Le résultat du calcul est: **{numeric_result}**. " "Maintenant, continue pour une conversation logique en te basant sur la question initiale." ) conclusion_prompt = [{"role": "user", "content": conclusion_prompt_content}] # MODIFICATION: Use openai_client for OpenRouter API calls in conclusion conclusion_stream = ai_client.chat.completions.create( # Utilisation de ai_client ici aussi extra_headers={ "HTTP-Referer": "https://pl1nkochat.onrender.com", # Replace with your actual site URL if needed "X-Title": "Plinko Chat", # Replace with your site title if needed }, model=model, messages=conclusion_prompt, temperature=0.0, stream=True, ) for chunk in conclusion_stream: delta_content = chunk.choices[0].delta.content or "" yield format_sse_event({"type": "text", "content": delta_content}) yield format_sse_event({"type": "final"}) else: code_executed = True logger.info("Envoi événement EXECUTED_CODE_ERROR") yield format_sse_event({"type": "executed_code_error", "content": execution_result}) yield format_sse_event({"type": "final", "result": "Fin de la réponse (erreur ou pas de résultat numérique)."}) prompt_tokens = 0 # Comptage non implémenté completion_tokens = 0 total_tokens = prompt_tokens + completion_tokens if not code_executed and not is_initial_web_search: # Prevent "Terminé" status after initial web search logger.info("Envoi événement FINAL (sans code exécuté)") yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) elif not is_initial_web_search: # Still send final for code executions but not initial web search yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) except Exception as e: error_message = f"Erreur serveur IA: {str(e)}" logger.exception(error_message) yield format_sse_event({"type": "error", "content": error_message}) finally: if not is_initial_web_search: # Only send analysis status end for normal chat flow, not initial web search yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "done", "is_code_execution": is_code_used}) return Response(stream_with_context(stream_response()), mimetype='text/event-stream')
@app.route('/canvas_modify', methods=['POST'])
def canvas_modify_endpoint() -> Response:
data = request.get_json()
if not data:
logger.warning("Requête canvas_modify sans données.")
return jsonify({"error": "Requête canvas_modify invalide : corps de requête manquant."}), 400
user_message = data.get('message')
canvas_content = data.get('canvas_content')
model = data.get('model', 'llama3-70b-8192').strip()
use_openrouter = data.get('use_openrouter', False) # NOUVEAU
if not user_message or canvas_content is None:
logger.warning("Requête canvas_modify incomplète: message=%s, canvas_content=%s", user_message, canvas_content)
return jsonify({"error": "Requête canvas_modify invalide : message ou contenu canvas manquant."}), 400
canvas_chat_messages = [
{"role": "system", "content": (
"Tu es dans un chat de canvas. L'utilisateur va te demander de modifier le contenu du canvas suivant. "
"Fournis UNIQUEMENT le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR, encadré par [CANVAS_START]
et [CANVAS_END]
/. "
"Ne réponds pas avec du texte en dehors de ces marqueurs, sauf une courte phrase de confirmation après le canvas. "
"Maintiens le contexte des modifications précédentes du canvas, en te basant sur l'historique de ce chat canvas."
)},
{"role": "assistant", "content": f"[CANVAS_START]\n{canvas_content}\n[CANVAS_END] Voici le canvas actuel. Dites-moi comment le modifier."},
{"role": "user", "content": user_message}
]
text# NOUVEAU - Sélection du client IA ai_client = openrouter_client if use_openrouter else groq_client @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def canvas_modify_groq_call(canvas_chat_messages, model, ai_client): # Modifié - Ajout de ai_client return ai_client.chat.completions.create( # Modifié - Utilisation de ai_client extra_headers={ "HTTP-Referer": "https://pl1nkochat.onrender.com", # Replace with your actual site URL if needed "X-Title": "Plinko Chat Canvas", # Replace with your site title if needed }, model=model, messages=canvas_chat_messages, temperature=0.0, stream=False, # canvas_modify is not streaming ) try: response_stream = canvas_modify_groq_call(canvas_chat_messages, model, ai_client) # Modifié - Passage de ai_client ai_response_text = response_stream.choices[0].message.content or "" if CANVAS_START_MARKER in ai_response_text and CANVAS_END_MARKER in ai_response_text: start_index = ai_response_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = ai_response_text.find(CANVAS_END_MARKER) updated_canvas_content = ai_response_text[start_index:end_index].strip() response_message = "Canvas mis à jour." logger.info("Canvas modifié avec succès.") return jsonify({"updated_canvas_content": updated_canvas_content, "response_message": response_message}) else: logger.error("Réponse IA canvas_modify invalide: marqueurs CANVAS_START/END manquants.") return jsonify({"error": "Réponse IA invalide pour la modification du canvas", "full_response": ai_response_text}), 500 except Exception as e: error_message = f"Erreur serveur (canvas_modify): {str(e)}" logger.exception(error_message) return jsonify({"error": error_message}), 500
@app.route('/run', methods=['POST'])
def run_code_endpoint() -> Response:
"""
Point d'accès pour exécuter du code via l'API Piston (pour canvas code).
"""
data = request.get_json()
if not data:
logger.warning("Requête run sans données.")
return jsonify({"error": "Requête run invalide : corps de requête manquant."}), 400
language = data.get("language")
code = data.get("code")
stdin = data.get("stdin", "")
if not language or not code:
logger.warning("Requête run incomplète: language=%s, code=%s", language, code)
return jsonify({"error": "Requête run invalide : langage et code requis."}), 400
payload = {
"language": language,
"version": "*",
"files": [{"content": code}],
"stdin": stdin
}
try:
piston_response = requests.post(PISTON_API_URL, json=payload, timeout=3000000)
piston_response.raise_for_status()
logger.info("Code exécuté avec Piston avec succès.")
return jsonify(piston_response.json())
except requests.RequestException as e:
error_message = f"Erreur lors de l'appel à l'API Piston: {str(e)}"
logger.exception(error_message)
piston_text = piston_response.text if 'piston_response' in locals() and piston_response is not None else ""
return jsonify({"error": "Erreur d'exécution du code via l'API Piston.", "piston_response": piston_text, "details": error_message}), 500
except Exception as e:
error_message = f"Erreur inattendue lors de l'exécution du code: {str(e)}"
logger.exception(error_message)
return jsonify({"error": "Erreur inattendue lors de l'exécution du code.", "details": error_message}), 500
@app.route('/run', methods=['OPTIONS'])
def run_code_options() -> Response:
"""Gère les requêtes OPTIONS pour l'endpoint /run (CORS preflight)."""
return jsonify({'message': 'OPTIONS requests allowed'}), 200
def format_sse_event(data: Dict[str, Any]) -> str:
"""Formate les données en événement SSE (Server-Sent Events)."""
return f"data: {json.dumps(data)}\n\n"
if name == 'main':
app.run(port=5000, debug=os.environ.get('FLASK_DEBUG') == '1')
"""
app.tsx :
import React, { useState, useRef, useEffect } from 'react';
import ReactMarkdown from 'react-markdown';
import { Prism } from 'react-syntax-highlighter/dist/esm';
import { Light as SyntaxHighlighterLight } from 'react-syntax-highlighter/dist/esm';
import { atomDark } from 'react-syntax-highlighter/dist/esm/styles/prism';
import {
python as prismPython,
javascript as prismJavascript,
c,
cpp as prismCpp,
java as prismJava
} from 'react-syntax-highlighter/dist/esm/languages/prism';
// Add this line to import C# from its specific file
import csharp from 'react-syntax-highlighter/dist/esm/languages/prism/csharp';
// Import CodeMirror et ses extensions
import CodeMirror from '@uiw/react-codemirror';
import { python } from '@codemirror/lang-python';
import { javascript } from '@codemirror/lang-javascript';
import { java } from '@codemirror/lang-java';
import { cpp } from '@codemirror/lang-cpp';
import { markdown } from '@codemirror/lang-markdown';
// Helper function to register languages for react-syntax-highlighter
const registerLanguage = (name: string, language: any) => {
if (Prism && Prism.languages && !Prism.languages[name]) {
Prism.registerLanguage(name, language);
} else if (Prism && Prism.languages && Prism.languages[name]) {
console.warn(Language "${name}" is already registered.
);
} else {
console.warn("Warning: Prism.registerLanguage is not available. Syntax highlighting might not work correctly.");
}
};
// Enregistrement des langages pour la coloration syntaxique
registerLanguage('python', prismPython);
registerLanguage('javascript', prismJavascript);
registerLanguage('c', c);
registerLanguage('cpp', prismCpp);
registerLanguage('java', prismJava);
registerLanguage('csharp', csharp);
registerLanguage('cs', csharp); // Also register common alias 'cs'
registerLanguage('dotnet', csharp); // Optional: for parity with Prism aliases
interface ChatMessage {
sender: 'User' | 'AI';
text: string;
codeBlock?: string;
isCodeCollapsed?: boolean;
canvasContent?: string;
hasCanvas?: boolean;
isTextCanvas?: boolean;
analysisStatus?: string;
executedCodeError?: string;
isCodeExecuting?: boolean;
codeExecutionOutput?: string | null;
promptTokens?: number; // Token usage info
completionTokens?: number; // Token usage info
totalTokens?: number; // Token usage info
webSources?: string[]; // Liste des sources web
}
interface CanvasChatMessage {
sender: 'User' | 'AI';
text: string;
}
const App: React.FC = () => {
const [message, setMessage] = useState('');
const [chat, setChat] = useState<ChatMessage[]>([]);
const chatDisplayRef = useRef<HTMLDivElement>(null);
const [isSending, setIsSending] = useState(false);
const [availableModels, setAvailableModels] = useState<string[]>([]);
const [selectedModel, setSelectedModel] = useState<string>('');
const [isCanvasActive, setIsCanvasActive] = useState(false);
const [currentCanvasContent, setCurrentCanvasContent] = useState('');
const [canvasChatMessages, setCanvasChatMessages] = useState<CanvasChatMessage[]>([]);
const [canvasMessage, setCanvasMessage] = useState('');
const canvasChatDisplayRef = useRef<HTMLDivElement>(null);
const [selectedLanguage, setSelectedLanguage] = useState<'python' | 'c' | 'cpp' | 'java' | 'javascript' | 'csharp'>('python'); // Mise à jour pour inclure C#
const [codeOutput, setCodeOutput] = useState('');
// Suppression de activeCanvasMessageIndex car non utilisé
// const [activeCanvasMessageIndex, setActiveCanvasMessageIndex] = useState<number | null>(null);
const [isTextCanvas, setIsTextCanvas] = useState(false);
const [isWebSearchEnabled, setIsWebSearchEnabled] = useState(false);
const [analysisStatus, setAnalysisStatus] = useState<string | null>(null);
const [webSources, setWebSources] = useState<string[]>([]); // State pour les sources web - RE-INTRODUCED
const [showSources, setShowSources] = useState(false); // State pour afficher/cacher les sources
text// Référence pour la console (output) afin d'automatiquement défiler vers le bas const consoleOutputRef = useRef<HTMLDivElement>(null); useEffect(() => { const fetchModels = async () => { try { const response = await fetch('https://pl1nkochat.onrender.com/models'); if (response.ok) { const data = await response.json(); setAvailableModels(data.models); if (data.models.length > 0) { setSelectedModel(data.models[0]); } } else { console.error("Failed to fetch models:", response.status); setChat(prevChat => [...prevChat, { sender: 'AI', text: `Erreur de chargement des modèles: ${response.status}` }]); } } catch (error) { console.error("Error fetching models:", error); setChat(prevChat => [...prevChat, { sender: 'AI', text: "Erreur de communication avec le serveur pour les modèles." }]); } }; fetchModels(); }, []); useEffect(() => { if (!isCanvasActive && chatDisplayRef.current) { chatDisplayRef.current.scrollTop = chatDisplayRef.current.scrollHeight; } if (isCanvasActive && canvasChatDisplayRef.current) { canvasChatDisplayRef.current.scrollTop = canvasChatDisplayRef.current.scrollHeight; } }, [chat, canvasChatMessages, isCanvasActive]); // Auto-scroll de la console de sortie lorsque codeOutput change useEffect(() => { if (consoleOutputRef.current) { consoleOutputRef.current.scrollTop = consoleOutputRef.current.scrollHeight; } }, [codeOutput]); const handleSubmit = async (e: React.FormEvent) => { e.preventDefault(); if (message.trim() === '' || isSending || isCanvasActive) return; setIsSending(true); setAnalysisStatus('Analyse en cours...'); setWebSources([]); // Réinitialiser les sources web à chaque nouvelle requête const userMessage = message; setChat(prevChat => [...prevChat, { sender: 'User', text: userMessage }]); setMessage(''); setAnalysisStatus('Réponse en cours...'); const messagesForApi = [ ...chat.map(msg => ({ role: msg.sender === 'User' ? 'user' : 'assistant', content: msg.text })), { role: 'user', content: userMessage } ]; setChat(prevChat => [ ...prevChat, { sender: 'AI', text: '', isCodeCollapsed: true, analysisStatus: 'Réponse en cours...', isCodeExecuting: false, codeExecutionOutput: null, webSources: [] } ]); let webSearchToUse = isWebSearchEnabled; // Store the current webSearch state if (isWebSearchEnabled) { setIsWebSearchEnabled(false); // Disable web search immediately after it's used for this message } try { const response = await fetch('https://pl1nkochat.onrender.com/chat', { method: 'POST', headers: { 'Content-Type': 'application/json', 'Accept': 'text/event-stream' }, body: JSON.stringify({ messages: messagesForApi, model: selectedModel, web_search: webSearchToUse }) // Use stored webSearch value }); if (!response.ok) { const errorText = await response.text(); setChat(prevChat => { const lastIndex = prevChat.length - 1; return [ ...prevChat.slice(0, lastIndex), { sender: 'AI', text: `Erreur: ${response.status} - ${errorText}`, analysisStatus: 'Erreur' } ]; }); setIsSending(false); setAnalysisStatus('Erreur lors de la communication avec le serveur.'); return; } const reader = response.body!.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) break; const chunk = decoder.decode(value); const events = chunk.split('\n\n').filter(line => line.startsWith("data: ")); for (const eventLine of events) { const eventData = JSON.parse(eventLine.substring(6)); console.log("Événement reçu:", eventData); if (eventData.type === 'text') { setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const newText = lastMessage.text + eventData.content; const updatedMessage = { ...lastMessage, text: newText, isCodeExecuting: false }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'code_block') { setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, codeBlock: eventData.content, isCodeExecuting: false }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'code_execution_start') { setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, isCodeExecuting: true, codeExecutionOutput: 'Execution en cours...' }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'executed_code_result') { setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, codeExecutionOutput: eventData.content, isCodeExecuting: false }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'canvas_start') { const canvasContent = eventData.content; const isText = canvasContent.trim().startsWith('#') || canvasContent.trim().startsWith('* ') || !canvasContent.includes('```'); setIsTextCanvas(isText); const cleanedCanvasContent = canvasContent.replace(/```(python|javascript|c|cpp|java|csharp)\n/g, '```\n'); // Mise à jour regex pour C# setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, canvasContent: cleanedCanvasContent, hasCanvas: true, isTextCanvas: isText, isCodeExecuting: false }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'final') { setAnalysisStatus(null); setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, promptTokens: eventData.prompt_tokens, completionTokens: eventData.completion_tokens, totalTokens: eventData.total_tokens }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'error') { setChat(prevChat => { const lastIndex = prevChat.length - 1; return [ ...prevChat.slice(0, lastIndex), { sender: 'AI', text: `Erreur IA: ${eventData.content}`, analysisStatus: 'Erreur IA' } ]; }); setIsSending(false); setAnalysisStatus('Erreur de l\'IA.'); return; } else if (eventData.type === 'analysis_status') { setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, analysisStatus: eventData.content, isCodeExecuting: false }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); setAnalysisStatus(eventData.content); } else if (eventData.type === 'executed_code_error') { setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, executedCodeError: eventData.content, isCodeExecuting: false, codeExecutionOutput: eventData.content }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'web_search_status') { if (eventData.content === 'start') { setAnalysisStatus('Recherche internet en cours...'); } else if (eventData.type === 'end') { setAnalysisStatus('Réponse en cours...'); // Retour à l'analyse après la recherche } } else if (eventData.type === 'web_search') { // Ajout de la gestion de l'événement web_search setAnalysisStatus(`Recherche web : ${eventData.query}`); setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, text: lastMessage.text + `\n[Recherche web : ${eventData.query}]`, // webSources: [...(lastMessage.webSources || []), ...sources] // Suppression de l'ajout des sources ici // car sources arrive dans event 'web_sources' }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'web_sources') { setWebSources(eventData.content); setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, webSources: eventData.content }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } } } } catch (error) { console.error("Erreur lors de la communication avec le backend:", error); setChat(prevChat => { const lastIndex = prevChat.length - 1; return [ ...prevChat.slice(0, lastIndex), { sender: 'AI', text: "Erreur de communication avec l'IA.", analysisStatus: 'Erreur réseau' } ]; }); setAnalysisStatus('Erreur réseau.'); } finally { setIsSending(false); if (!analysisStatus) { setAnalysisStatus('Terminé'); setTimeout(() => setAnalysisStatus(null), 30000000); } } }; const toggleCodeBlockCollapse = (index: number) => { setChat(prevChat => prevChat.map((msg, i) => i === index ? { ...msg, isCodeCollapsed: !msg.isCodeCollapsed } : msg ) ); }; const handleCanvasChange = (newContent: string) => { const cleanedContent = newContent.replace(/```(python|javascript|c|cpp|java|csharp)\n/g, '```\n'); // Mise à jour regex pour C# setCurrentCanvasContent(cleanedContent); }; const handleCanvasMessageSubmit = async (e: React.FormEvent) => { e.preventDefault(); if (canvasMessage.trim() === '') return; const userCanvasMessage = canvasMessage; setCanvasChatMessages(prevMessages => [...prevMessages, { sender: 'User', text: userCanvasMessage }]); setCanvasMessage(''); try { const response = await fetch('https://pl1nkochat.onrender.com/canvas_modify', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ message: userCanvasMessage, canvas_content: currentCanvasContent, model: selectedModel }) }); if (!response.ok) { console.error("Erreur Canvas Chat:", response.status, await response.text()); setCanvasChatMessages(prevMessages => [...prevMessages, { sender: 'AI', text: `Erreur: ${response.status} - Chat Canvas` }]); return; } const result = await response.json(); if (result && result.updated_canvas_content) { const cleanedContent = result.updated_canvas_content.replace(/```(python|javascript|c|cpp|java|csharp)\n/g, '```\n'); // Mise à jour regex pour C# setCurrentCanvasContent(cleanedContent); setCanvasChatMessages(prevMessages => [...prevMessages, { sender: 'AI', text: result.response_message || "Canvas mis à jour." }]); } else if (result && result.error) { setCanvasChatMessages(prevMessages => [...prevMessages, { sender: 'AI', text: `Erreur IA Canvas: ${result.error}` }]); } } catch (error) { console.error("Erreur Canvas Chat Backend:", error); setCanvasChatMessages(prevMessages => [...prevMessages, { sender: 'AI', text: "Erreur de communication avec l'IA (Canvas Chat)." }]); } }; const handleLanguageChange = (e: React.ChangeEvent<HTMLSelectElement>) => { setSelectedLanguage(e.target.value as typeof selectedLanguage); }; const handleCodeExecution = async () => { setCodeOutput('Execution en cours...'); try { const response = await fetch("https://pl1nkochat.onrender.com/run", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ language: selectedLanguage, code: currentCanvasContent, stdin: "" }) }); const result = await response.json(); setCodeOutput(result.run?.stdout || result.error || "Erreur d'exécution du code!"); } catch (error) { console.error("Erreur Exécution Code:", error); setCodeOutput("Erreur lors de l'exécution du code."); } }; const handleStartNewChat = () => { setIsCanvasActive(false); setCanvasChatMessages([]); setCurrentCanvasContent(''); setCodeOutput(''); // Suppression de setActiveCanvasMessageIndex car non utilisé // setActiveCanvasMessageIndex(null); setIsTextCanvas(false); setChat([]); setWebSources([]); // Reset web sources on new chat - **RE-INTRODUCED** }; const handleOpenCanvas = (index: number, canvasContent: string, isTextCanvasProp: boolean) => { setIsCanvasActive(true); const cleanedContent = canvasContent.replace(/```(python|javascript|c|cpp|java|csharp)\n/g, '```\n'); // Mise à jour regex pour C# setCurrentCanvasContent(cleanedContent); setCanvasChatMessages([]); // Suppression de setActiveCanvasMessageIndex car non utilisé // setActiveCanvasMessageIndex(index); setIsTextCanvas(isTextCanvasProp); }; const handleModelChange = (e: React.ChangeEvent<HTMLSelectElement>) => { setSelectedModel(e.target.value); }; const handleWebSearchToggle = (e: React.ChangeEvent<HTMLInputElement>) => { setIsWebSearchEnabled(e.target.checked); setChat(prevChat => [...prevChat, { sender: 'AI', text: `[Recherche web ${e.target.checked ? 'activée' : 'désactivée'}]`, analysisStatus: undefined }]); // Correction ici : analysisStatus: undefined }; const toggleSourcesVisibility = () => { setShowSources(!showSources); }; return ( <div style={{ margin: '20px', display: 'flex', flexDirection: 'column', height: '95vh' }}> {!isCanvasActive && ( <center> <h1>PLINKO CHATTE</h1> </center> )} <div style={{ display: 'flex', flexDirection: 'row', width: '100%', height: 'calc(100% - 100px)' }}> {!isCanvasActive && ( <div style={{ flex: 1, height: '100%' }}> <div ref={chatDisplayRef} style={{ flexGrow: 1, overflowY: 'auto', marginBottom: '20px', border: '1px solid #ccc', padding: '10px', height: 'calc(100% - 100px)' }} > {chat.map((msg, index) => ( <div key={index} style={{ marginBottom: '15px', textAlign: msg.sender === 'User' ? 'right' : 'left' }}> <strong>{msg.sender}:</strong> <div> {msg.analysisStatus && <div style={{ fontSize: '0.8em', color: 'grey' }}>{msg.analysisStatus}</div>} <div style={{ whiteSpace: 'pre-wrap', wordWrap: 'break-word', position: 'relative' }}> {msg.sender === 'AI' ? ( <> <ReactMarkdown components={{ code: ({ className, children, ...props }) => { const language = className?.match(/language-(\w+)/)?.[1] || ''; return ( <SyntaxHighlighterLight style={atomDark as any} language={language} PreTag="div" children={String(children).replace(/\n$/, '')} {...props} /> ); } }} > {msg.codeBlock ? msg.text.replace(/```python\s*([\s\S]*?)```/g, '') : msg.text} </ReactMarkdown> {msg.webSources && msg.webSources.length > 0 && ( <div style={{display: 'inline-block', marginLeft: '5px'}}> <button onClick={toggleSourcesVisibility} style={{ background: 'none', border: 'none', color: '#007bff', padding: 0, cursor: 'pointer', fontSize: '0.8em', textDecoration: 'underline' }} > {showSources ? 'Cacher Sources' : 'Afficher Sources'} </button> {showSources && ( <div style={{ position: 'absolute', zIndex: 10, backgroundColor: '#f9f9f9', border: '1px solid #ccc', padding: '10px', borderRadius: '5px', left: '0', top: '100%', marginTop: '5px', width: '250px', textAlign: 'left', boxShadow: '2px 2px 5px rgba(0,0,0,0.3)' }}> <strong>Sources:</strong> <ul> {msg.webSources.map((source, index) => ( <li key={index} style={{fontSize: '0.8em', wordWrap: 'break-word'}}> <a href={source} target="_blank" rel="noopener noreferrer">{source}</a> </li> ))} </ul> </div> )} </div> )} </> ) : ( msg.text )} </div> {msg.sender === 'AI' && msg.codeBlock && ( <div style={{ marginTop: '10px', border: '1px solid #ccc', borderRadius: '5px', overflow: 'hidden' }}> <div style={{ backgroundColor: '#f0f0f0', padding: '8px', cursor: 'pointer', display: 'flex', justifyContent: 'space-between', alignItems: 'center' }} onClick={() => toggleCodeBlockCollapse(index)} > <strong>Code Python Généré</strong> <span>{msg.isCodeCollapsed ? '▼' : '▲'}</span> </div> <div style={{ padding: '10px', display: msg.isCodeCollapsed ? 'none' : 'block', backgroundColor: '#f9f9f9' }} > <SyntaxHighlighterLight style={atomDark as any} language="python" PreTag="div" > {msg.codeBlock} </SyntaxHighlighterLight> </div> </div> )} {msg.sender === 'AI' && msg.isCodeExecuting && msg.codeExecutionOutput && ( <div style={{ marginTop: '10px', padding: '10px', backgroundColor: '#e0f7fa', borderRadius: '5px', border: '1px solid #80deea' }}> <strong>Code Execution:</strong> <pre style={{ whiteSpace: 'pre-wrap', wordWrap: 'break-word', marginTop: '5px', fontFamily: 'monospace', fontSize: '0.9em' }}> {msg.codeExecutionOutput} </pre> </div> )} {msg.sender === 'AI' && !msg.isCodeExecuting && msg.codeExecutionOutput && !msg.executedCodeError && ( <div style={{ marginTop: '10px', padding: '10px', backgroundColor: '#c8e6c9', borderRadius: '5px', border: '1px solid #81c784' }}> <strong>Code Output:</strong> <pre style={{ whiteSpace: 'pre-wrap', wordWrap: 'break-word', marginTop: '5px', fontFamily: 'monospace', fontSize: '0.9em' }}> {msg.codeExecutionOutput} </pre> </div> )} {msg.sender === 'AI' && msg.executedCodeError && ( <div style={{ marginTop: '10px', padding: '10px', backgroundColor: '#ffe0b2', borderRadius: '5px', border: '1px solid #ffb74d' }}> <strong>Erreur d'exécution du code:</strong> <pre style={{ whiteSpace: 'pre-wrap', wordWrap: 'break-word', marginTop: '5px', fontFamily: 'monospace', fontSize: '0.9em' }}> {msg.executedCodeError} </pre> </div> )} {msg.sender === 'AI' && msg.hasCanvas && !isCanvasActive && ( <button onClick={() => handleOpenCanvas(index, msg.canvasContent || '', msg.isTextCanvas || false)} style={{ marginTop: '10px', padding: '8px 15px', borderRadius: '5px', backgroundColor: '#007bff', color: 'white', border: 'none', cursor: 'pointer' }} > Ouvrir Canvas </button> )} {msg.sender === 'AI' && msg.totalTokens !== undefined && ( <div style={{ marginTop: '5px', fontSize: '0.8em', color: 'grey' }}> Tokens utilisés: {msg.totalTokens} (Prompt: {msg.promptTokens}, Completion: {msg.completionTokens}) </div> )} </div> </div> ))} </div> {analysisStatus && <div style={{ marginBottom: '10px', color: 'grey', textAlign: 'center' }}>{analysisStatus}</div>} <form onSubmit={handleSubmit} style={{ display: 'flex', flexDirection: 'column', marginTop: '10px' }}> <textarea value={message} onChange={e => setMessage(e.target.value)} placeholder="Tapez votre message... (Chat Principal)" rows={3} style={{ padding: '8px', borderRadius: '5px', border: '1px solid #ccc' }} disabled={isSending || isCanvasActive} /> <div style={{ marginBottom: '10px', display: 'flex', alignItems: 'center', justifyContent: 'space-between' }}> <div> <label htmlFor="model-select" style={{ display: 'block', marginBottom: '5px', color: '#ccc' }}> Select Model: </label> <select id="model-select" value={selectedModel} onChange={handleModelChange} style={{ width: '100%', padding: '8px', borderRadius: '5px', backgroundColor: '#333', color: 'white', border: '1px solid #555', }} > {availableModels.map((model) => ( <option key={model} value={model}> {model} </option> ))} </select> </div> <div style={{ display: 'flex', alignItems: 'center' }}> <label htmlFor="web-search-toggle" style={{ color: '#ccc', marginRight: '10px' }}> Recherche Web: </label> <input id="web-search-toggle" type="checkbox" checked={isWebSearchEnabled} onChange={handleWebSearchToggle} style={{ cursor: 'pointer' }} /> </div> </div> <button type="submit" style={{ padding: '10px', borderRadius: '5px', backgroundColor: '#4CAF50', color: 'white', border: 'none', cursor: 'pointer' }} disabled={isSending || isCanvasActive}> {isSending ? "Envoi en cours..." : "Envoyer"} </button> </form> <button onClick={handleStartNewChat} style={{ marginTop: '10px', padding: '10px 15px', borderRadius: '8px', backgroundColor: '#607d8b', color: 'white', border: 'none', cursor: 'pointer', fontWeight: 'bold', transition: 'background-color 0.2s ease', }} > Nouveau Chat </button> </div> )} {isCanvasActive && ( <div style={{ display: 'flex', width: '100%', height: '100%', gap: '20px', position: 'fixed', top: 0, left: 0, right: 0, bottom: 0, }}> <div style={{ width: '25%', background: 'linear-gradient(145deg, #3a3d49, #2d2f36)', borderRadius: '8px', padding: '15px', display: 'flex', flexDirection: 'column', boxShadow: '0 4px 12px rgba(0,0,0,0.3)' }}> <h3 style={{ marginBottom: '10px', color: '#ccc' }}>Canvas Chat</h3> <button onClick={handleStartNewChat} style={{ padding: '10px 15px', borderRadius: '8px', backgroundColor: '#0a84ff', color: 'white', border: 'none', cursor: 'pointer', marginBottom: '20px', fontWeight: 'bold', transition: 'background-color 0.2s ease', }} > Retour au Chat Principal </button> <div ref={canvasChatDisplayRef} style={{ flexGrow: 1, overflowY: 'auto', marginBottom: '10px', color: '#ececf1', height: 'calc(100% - 180px)' }} > {canvasChatMessages.map((msg, index) => ( <div key={index} style={{ textAlign: msg.sender === 'User' ? 'right' : 'left', marginBottom: '5px', color: msg.sender === 'User' ? '#90caf9' : '#c5e1a5', }} > <strong>{msg.sender}:</strong> {msg.text} </div> ))} </div> <form onSubmit={handleCanvasMessageSubmit} style={{ display: 'flex', marginTop: '10px' }}> <textarea value={canvasMessage} onChange={(e) => setCanvasMessage(e.target.value)} placeholder="Modifier le canvas ici..." rows={2} style={{ flexGrow: 1, padding: '8px', borderRadius: '5px', border: 'none', backgroundColor: '#4d4e5c', color: '#ececf1', marginRight: '8px', resize: 'none', }} /> <button type="submit" style={{ padding: '8px 15px', borderRadius: '5px', backgroundColor: '#0a84ff', color: 'white', border: 'none', cursor: 'pointer', }} > Send </button> </form> </div> <div style={{ flex: 1, display: 'flex', flexDirection: 'column', backgroundColor: '#1e1e1e', borderRadius: '8px', overflow: 'hidden', boxShadow: '0 4px 12px rgba(0,0,0,0.3)' }}> <div style={{ backgroundColor: '#2d2d2d', padding: '8px 10px', display: 'flex', alignItems: 'center', borderBottom: '1px solid #444' }}> <span style={{ color: '#ccc', fontFamily: 'monospace', marginRight: '10px' }}> {isTextCanvas ? 'text_canvas.md' : `main.${selectedLanguage === 'python' ? 'py' : (selectedLanguage === 'javascript' ? 'js' : (selectedLanguage === 'c' ? 'c' : (selectedLanguage === 'cpp' ? 'cpp' : (selectedLanguage === 'java' ? 'java' : (selectedLanguage === 'csharp' ? 'cs' : 'txt')))))}`} </span> {!isTextCanvas && ( <select id="language-select" value={selectedLanguage} onChange={handleLanguageChange} style={{ padding: '5px 10px', borderRadius: '4px', backgroundColor: '#333', color: 'white', border: '1px solid #555', marginRight: '10px' }} > <option value="python">Python</option> <option value="javascript">JavaScript</option> <option value="c">C</option> <option value="cpp">C++</option> <option value="java">Java</option> <option value="csharp">C#</option> {/* Ajout de l'option C# */} </select> )} {!isTextCanvas && ( <button onClick={handleCodeExecution} style={{ marginLeft: 'auto', padding: '5px 10px', backgroundColor: '#0a84ff', color: '#fff', border: 'none', borderRadius: '4px', cursor: 'pointer' }} > Run </button> )} </div> {/* Zone d'édition du code (CodeMirror) dans un conteneur scrollable */} <div style={{ flexGrow: 1, overflowY: 'auto' }}> <CodeMirror value={currentCanvasContent} height="100%" theme="dark" extensions={ isTextCanvas ? [markdown()] : [ selectedLanguage === 'python' ? python() : selectedLanguage === 'javascript' ? javascript() : selectedLanguage === 'java' ? java() : (selectedLanguage === 'cpp' || selectedLanguage === 'c' || selectedLanguage === 'csharp') ? cpp() : [] ] } onChange={(value) => handleCanvasChange(value)} /> </div> {codeOutput && !isTextCanvas && ( <div ref={consoleOutputRef} style={{ backgroundColor: '#1a1a1a', padding: '10px', borderTop: '1px solid #444', maxHeight: '200px', overflowY: 'auto' }} > <strong style={{ color: '#eee' }}>Output:</strong> <SyntaxHighlighterLight style={atomDark as any} language={selectedLanguage} PreTag="div" > {codeOutput} </SyntaxHighlighterLight> </div> )} </div> </div> )} </div> </div> );
};
export default App;
"""
mon code de recherche web :
import json
import requests
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import datetime
from tqdm import tqdm
from typing import List, Dict
CONFIG = {
'openrouter_api_key': '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',
'max_results': 7,
'cache_file': 'search_cache.json',
'model': 'google/gemini-2.0-pro-exp-02-05:free',
'timeout': 20,
'min_quality_score': 0.65
}
class DataCollector:
@staticmethod
def web_search(query: str) -> List[Dict]:
with DDGS() as ddgs:
results = ddgs.text(query, region='fr-fr', max_results=CONFIG['max_results'])
return [{
'url': r['href'],
'title': r['title'],
'domain': urlparse(r['href']).netloc.replace('www.', '')
} for r in results if urlparse(r['href']).scheme in ['http', 'https']]
class ContentProcessor:
def init(self):
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
textdef fetch_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: print(f"Erreur sur {url}: {str(e)}") return ''
class AICore:
@staticmethod
def query_ai(system_prompt: str, user_content: str, temp: float = 0.3) -> str:
headers = {
"Authorization": f"Bearer {CONFIG['openrouter_api_key']}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/your-repo",
"X-Title": "CLI Search AI"
}
textdata = { "model": CONFIG['model'], "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "temperature": temp, "max_tokens": 2000 } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=CONFIG['timeout']) return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Erreur API: {str(e)}"
class ConsoleSearch:
def init(self):
self.cache = self.load_cache()
self.processor = ContentProcessor()
textdef load_cache(self) -> Dict: try: with open(CONFIG['cache_file'], 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} def save_cache(self): with open(CONFIG['cache_file'], 'w') as f: json.dump(self.cache, f) def get_user_input(self): print("\n" + "="*40) print("🔍 Assistant de Recherche Historique") print("="*40) return ( input("\nEntrez votre recherche : ").strip(), input("\nMode verbeux ? (o/n) : ").lower() == 'o', input("\nFormat de sortie (text/json) [text] : ").lower() or 'text', int(input("\nNombre de sources à utiliser [3] : ") or 3) ) def _filter_content(self, content: str, query: str) -> bool: """Filtre les contenus non pertinents""" prompt = f"Ce contenu est-il pertinent pour '{query}' ? Répondre uniquement par oui/non" response = AICore.query_ai(prompt, content[:2000], temp=0.0) return response.lower().startswith('oui') def run(self): query, verbose, fmt, sources = self.get_user_input() if verbose: print(f"\nDémarrage de la recherche pour : {query}") print(f"Paramètres : {sources} sources | Format {fmt.upper()}\n") results = self._process_search(query, verbose) report = self.generate_report(query, results, sources, verbose) if fmt == 'json': print("\n" + json.dumps(report, indent=2, ensure_ascii=False)) else: self._display_text_report(report) self.save_cache() def _process_search(self, query: str, verbose: bool) -> List[Dict]: if query in self.cache: if verbose: print("\nUtilisation du cache...") return self.cache[query] results = DataCollector.web_search(query) valid_results = [] for result in tqdm(results, desc="Analyse des sources", disable=not verbose): content = self.processor.fetch_content(result['url']) if content and self._filter_content(content, query): result['content'] = content valid_results.append(result) if len(valid_results) >= CONFIG['max_results']: break self.cache[query] = valid_results return valid_results def generate_report(self, query: str, results: List[Dict], max_sources: int, verbose: bool) -> Dict: """Génère un rapport de style ChatGPT""" system_prompt = f"""**Rôle:** Expert en histoire
Consignes:
Exemple de format:
Point important 1 [Source1]
Détail complémentaire [Source2]
Conclusion synthétique [Source3]"""
textsources_text = "\n\n".join([f"[Source {i+1} - {r['domain']}]\n{r['content'][:1000]}" for i, r in enumerate(results[:max_sources])]) if verbose: print("\nGénération du rapport via IA...") report_text = AICore.query_ai(system_prompt, sources_text) return { 'query': query, 'date': datetime.datetime.now().isoformat(), 'sources_used': list(set(r['domain'] for r in results[:max_sources])), 'report': report_text }
def _display_text_report(self, report: Dict):
print("\n" + "="*40)
print(f"{report['report']}")
print("\n" + "-"*40)
print(f"📅 Date: {datetime.datetime.now().strftime('%d/%m/%Y %H:%M')}")
print(f"🔗 Sources: {', '.join(report['sources_used'])}")
print("="*40)
if name == "main":
console = ConsoleSearch()
console.run()
je veux que tu impement mon code de recherche web dans mon backend pour qu'il marche bien et que quand j'appuyes et que je lance une recherche ça fasse une recherche sur le web en plus pour apporter plus d'information
Voici une version modifiée de votre backend qui intègre la logique de votre code de recherche web (notamment la mise en cache et le filtrage des URL) dans la classe de recherche. Vous remarquerez que nous avons :
• Ajouté des fonctions de chargement et sauvegarde du cache (dans le fichier JSON défini dans CONFIG).
• Modifié la méthode statique perform_search pour vérifier d’abord le cache avant d’effectuer une recherche via DuckDuckGo.
• Filtré les résultats pour ne conserver que les URL en HTTP/HTTPS.
• Conserve ensuite la récupération du contenu via la méthode fetch_page_content.
Vous pouvez ainsi déclencher une recherche web depuis l’endpoint /web_search et obtenir des résultats enrichis.
Voici le code modifié (seules les parties modifiées sont indiquées en commentaire) :
pythonfrom gevent import monkey monkey.patch_all() import requests import os import re import json import logging from typing import Any, Dict, List, Optional, Tuple, Generator import time from flask import Flask, request, Response, jsonify, stream_with_context from flask_cors import CORS from groq import Groq from e2b_code_interpreter import Sandbox from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type # NOUVEAU - Imports pour OpenRouter et recherche web from openai import OpenAI from duckduckgo_search import DDGS from bs4 import BeautifulSoup from urllib.parse import urlparse from tqdm import tqdm import datetime import threading # <-- Pour la gestion du cache # Configuration du logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = Flask(__name__) CORS(app) # Clés API (variables d'environnement ou valeurs par défaut sécurisées) GROQ_API_KEY = os.environ.get("GROQ_API_KEY") E2B_API_KEY = os.environ.get("E2B_API_KEY") OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") # NOUVEAU PISTON_API_URL = "https://emkc.org/api/v2/piston/execute" # Clients init groq_client = Groq(api_key=GROQ_API_KEY) openrouter_client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=OPENROUTER_API_KEY, ) sandbox = Sandbox(api_key=E2B_API_KEY, timeout=3000) # Modèles valides VALID_MODELS = [ "llama3-70b-8192", "google/gemini-pro", "meta-llama/llama-3-70b-instruct", "openai/gpt-4-turbo-preview" ] CODE_PATTERN = re.compile(r"```python\s*([\s\S]*?)```") CODE_REQUIRED_MARKER = "[CODE_REQUIRED]" CANVAS_START_MARKER = "[CANVAS_START]" CANVAS_END_MARKER = "[CANVAS_END]" WEB_SEARCH_MARKER = "[WEB_SEARCH]" CONFIG = { 'openrouter_api_key': os.environ.get("OPENROUTER_API_KEY"), 'max_results': 7, 'cache_file': 'search_cache.json', 'model': 'google/gemini-pro', 'timeout': 20, 'min_quality_score': 0.65 } # --- Gestion du cache pour la recherche web --- CACHE_LOCK = threading.Lock() def load_cache() -> Dict: try: with open(CONFIG['cache_file'], 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} def save_cache(cache: Dict): with open(CONFIG['cache_file'], 'w') as f: json.dump(cache, f) # --- Classe modifiée pour la recherche web --- class WebSearchHandler: @staticmethod @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2)) def perform_search(query: str, max_results: int = None) -> List[Dict]: if max_results is None: max_results = CONFIG['max_results'] # Vérification du cache with CACHE_LOCK: cache = load_cache() if query in cache: logger.info(f"Utilisation du cache pour la requête: {query}") return cache[query] # Exécution de la recherche via DuckDuckGo with DDGS() as ddgs: results = ddgs.text(query, region='fr-fr', max_results=max_results) search_results = [{ 'url': r['href'], 'title': r['title'], 'domain': urlparse(r['href']).netloc.replace('www.', '') } for r in results if urlparse(r['href']).scheme in ['http', 'https']] # Mise à jour du cache with CACHE_LOCK: cache = load_cache() cache[query] = search_results save_cache(cache) return search_results def __init__(self): # Utilisation d'un User-Agent plus spécifique self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} def fetch_page_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: logger.error(f"Erreur fetch_content {url}: {str(e)}") return "" def generate_search_context(search_results: List[Dict]) -> str: return "\n".join( f"[Source {i+1} - {r['domain']}]\n{r.get('content','')[:1000]}" for i, r in enumerate(search_results) ) def extract_code_blocks(text: str) -> Tuple[Optional[str], Optional[str]]: if CODE_REQUIRED_MARKER in text: text_no_marker = text.replace(CODE_REQUIRED_MARKER, "", 1).strip() code_match = CODE_PATTERN.search(text_no_marker) if code_match: return code_match.group(1).strip(), None else: code_match = CODE_PATTERN.search(text) if code_match: return None, code_match.group(1).strip() return None, None def run_code_in_sandbox(code: str) -> str: start_time = time.time() try: logger.info("Début de l'exécution du code dans le sandbox e2b.") execution = None method_name = next((method for method in ["run_code_sync", "execute_sync", "execute_code", "execute", "run_code", "run"] if hasattr(sandbox, method)), None) if method_name: logger.info(f"Utilisation de la méthode sandbox.{method_name}") execution = getattr(sandbox, method_name)(code) else: error_msg = "Aucune méthode d'exécution de code disponible dans le sandbox." logger.error(error_msg) return error_msg logs = execution.logs stdout_logs = logs.stdout if hasattr(logs, 'stdout') else [] stderr_logs = logs.stderr if hasattr(logs, 'stderr') else [] stdout = "\n".join(stdout_logs) stderr = "\n".join(stderr_logs) if stderr: return f"Stdout:\n{stdout}\n\nStderr:\n{stderr}" return stdout except Exception as e: duration = time.time() - start_time error_msg = f"Erreur lors de l'exécution du code dans e2b après {duration:.2f} secondes: {str(e)}" logger.exception(error_msg) return f"Erreur exécution code: {str(e)}" def extract_numeric_result(exec_result: str) -> Optional[str]: if "Stdout:" in exec_result: stdout_section = exec_result.split("Stdout:\n", 1)[1].split("\n\nStderr:")[0].strip() else: stdout_section = exec_result.strip() lines = [l.strip() for l in stdout_section.splitlines() if l.strip()] if not lines: return None last_line = lines[-1] if re.search(r'\d', last_line): return last_line return None @app.route('/models', methods=['GET']) def get_models() -> Response: return jsonify({"models": VALID_MODELS}) @app.route('/web_search', methods=['POST']) def web_search_endpoint(): data = request.get_json() if not data or not data.get('query'): return jsonify({"error": "Missing query"}), 400 query = data['query'] try: # Utilisation de la nouvelle version avec cache et filtrage search_results = WebSearchHandler.perform_search(query) content_processor = WebSearchHandler() for result in tqdm(search_results, desc="Fetching content from sources"): result['content'] = content_processor.fetch_page_content(result['url']) return jsonify({"results": search_results}) except Exception as e: logger.error(f"Web search error: {e}") return jsonify({"error": str(e)}), 500 # --- Reste du code inchangé (chat, canvas_modify, run, etc.) --- @app.route('/chat', methods=['POST']) def chat_endpoint() -> Response: @stream_with_context @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def stream_response(): data = request.get_json() if not data: logger.warning("Requête chat invalide: corps de requête manquant.") yield format_sse_event({"type": "error", "content": "Requête chat invalide : corps de requête manquant."}) return messages = data.get('messages', []) model = data.get('model', 'llama3-70b-8192').strip() execute_code = data.get('execute_code', True) use_openrouter = data.get('use_openrouter', False) enable_web_search = data.get('web_search', False) web_search_results = data.get('search_results', []) if not messages or not isinstance(messages, list) or not model or model not in VALID_MODELS: logger.warning("Requête chat invalide: messages=%s, model=%s", messages, model) yield format_sse_event({"type": "error", "content": "Requête chat invalide : messages ou modèle incorrects."}) return ai_client = openrouter_client if use_openrouter else groq_client system_prompt_additions = [] if enable_web_search and web_search_results: search_context = generate_search_context(web_search_results) system_prompt_additions.append( "**CONTEXTE DE RECHERCHE WEB :**\n" f"{search_context}\n" "Utilise les informations ci-dessus pour répondre à la question de l'utilisateur. Cite les sources [source:domaine.com]." ) elif enable_web_search: system_prompt_additions.append( "Si la question nécessite des informations actualisées ou externes :\n" "1. Utilise le marqueur [WEB_SEARCH] suivi de la requête de recherche\n" "2. Analyse les résultats et cite les sources avec [source:domaine.com]\n" "3. Garde les réponses concises et factuelles" ) base_prompt = ( "🎯 **Rôle : Expert en calcul numérique précis et assistant polyvalent avec canvas interactifs (texte et code).**\n" "**Mission :** Répondre aux questions, effectuer des calculs numériques exacts, et assister l'utilisateur avec des canvas éditables et interactifs pour le **texte** (avec formatage Markdown) et le **code**. " "Maintiens le contexte de conversation, en particulier autour du code, des canvas et des résultats précédents.\n\n" "**Si la question DEMANDE un CALCUL NUMÉRIQUE :**\n" "1. Réponds d'abord avec une courte phrase d'introduction expliquant que tu vas utiliser un code Python pour résoudre le problème. **Sois clair sur le contexte du calcul, en te référant si nécessaire aux étapes précédentes de la conversation et aux données ou résultats déjà obtenus.**\n" "2. Sur la ligne suivante, ajoute TOUJOURS le marqueur `[CODE_REQUIRED]` pour indiquer qu'un code va suivre.\n" "3. Ensuite, fournis le **BLOC DE CODE PYTHON** minimal, lisible et correct pour effectuer UNIQUEMENT LE CALCUL demandé et afficher UNIQUEMENT LE RÉSULTAT NUMÉRIQUE via `print(resultat_numerique)`.\n" "**IMPORTANT :** Ne génère RIEN D'AUTRE que ces 3 éléments, et dans cet ordre précis. Arrête-toi **ICI**. Attends l'exécution du code.\n\n" "**[INSTRUCTION SUITE - APRES EXECUTION DU CODE]**\n" "Après avoir fourni le résultat du code, CONTINUE la conversation pour expliquer le résultat EN DETAILS, en explicitant clairement et précisément **LE Résultat du code Python** que tu viens d'utiliser pour le calcul. Il faut que l'explication du résultat réponde à la question de l'utilisateur.\n\n" "**Si la question DEMANDE la CRÉATION ou MODIFICATION de CODE dans un canvas :**\n" "1. Indique clairement que tu vas créer un **canvas de code** interactif pour cela. Précise que l'utilisateur pourra te demander des modifications dans un 'chat de canvas' dédié qui va s'ouvrir.\n" "2. Encadre le contenu initial du canvas avec les marqueurs `[CANVAS_START]` et `[CANVAS_END]`.\n" " Exemple : 'Très bien, un canvas de code interactif est lancé pour un quick sort en C. Vous pouvez maintenant me demander des modifications directement dans le chat de canvas qui vient de s'ouvrir :\n" "[CANVAS_START]\n```c\n// Code Quick Sort en C\n...\n```\n[CANVAS_END]'\n" "3. **IMPORTANT :** Ne fournis PAS de code markdown ```python ... ``` après le canvas, sauf si cela est explicitement demandé et pertinent.\n\n" "**Si la question DEMANDE la CRÉATION ou MODIFICATION de TEXTE RICHE (avec markdown) dans un canvas :**\n" "1. Indique clairement que tu vas créer un **canvas de texte** interactif pour cela. Précise que l'utilisateur pourra te demander des modifications dans un 'chat de canvas' dédié.\n" "2. Encadre le contenu initial du canvas de texte avec les marqueurs `[CANVAS_START]` et `[CANVAS_END]`.\n" " Exemple : 'Très bien, un canvas de texte interactif est lancé. Le texte supporte le **gras**, l'*italique*, les listes, etc. Vous pouvez maintenant me demander des modifications directement dans le chat de canvas qui vient de s'ouvrir :\n" "[CANVAS_START]\n# Titre principal en gras\n\n* Item 1\n* Item 2\n\n[CANVAS_END]'\n" "3. **IMPORTANT :** Ne fournis PAS de code markdown ```python ... ``` après le canvas de texte, sauf si cela est explicitement demandé.\n\n" "**Si l'utilisateur pose une question dans le 'chat de canvas' :**\n" " - Interprète sa demande comme une instruction pour MODIFIER le contenu du canvas actuel.\n" " - Fournis le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR encadré par `[CANVAS_START]` et `[CANVAS_END]`.\n" " - Ajoute une courte phrase de confirmation indiquant que le canvas a été mis à jour.\n\n" "**Si la question NE DEMANDE PAS de CALCUL NUMÉRIQUE NI DE CANVAS :**\n" "Réponds directement et clairement à la question en tenant compte du contexte de la conversation.\n" "Si tu dois donner un exemple de code, utilise un bloc markdown ```python ... ``` MAIS SANS JAMAIS ajouter `[CODE_REQUIRED]` ni de canvas." ) + "\n".join(system_prompt_additions) system_prompt = {"role": "system", "content": base_prompt} messages_with_system = [system_prompt] + messages full_reply_text = "" code_to_execute = None code_to_display = None is_code_used = False canvas_content = None code_executed = False prompt_tokens = 0 completion_tokens = 0 total_tokens = 0 initial_response_completed = False web_search_triggered = False is_initial_web_search = False yield format_sse_event({"type": "analysis_status", "content": "start"}) if messages and messages[0]['content'] == '[RECHERCHE WEB INITIALE]': is_initial_web_search = True search_query = "actualités du jour" yield format_sse_event({"type": "web_search", "query": search_query}) search_results = WebSearchHandler.perform_search(search_query) content_processor = WebSearchHandler() for result in search_results: result['content'] = content_processor.fetch_page_content(result['url']) sources = [result['domain'] for result in search_results] yield format_sse_event({"type": "web_sources", "content": sources}) yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "final"}) return try: response_stream = ai_client.chat.completions.create( extra_headers={ "HTTP-Referer": "https://pl1nkochat.onrender.com", "X-Title": "Plinko Chat", }, model=model, messages=messages_with_system, temperature=0.0, stream=True ) for chunk in response_stream: if initial_response_completed: break delta_content = chunk.choices[0].delta.content or "" delta_content_no_marker = delta_content.replace(CODE_REQUIRED_MARKER, "") full_reply_text += delta_content_no_marker if enable_web_search and WEB_SEARCH_MARKER in full_reply_text: web_search_triggered = True search_query = full_reply_text.split(WEB_SEARCH_MARKER)[-1].strip() yield format_sse_event({"type": "web_search", "query": search_query}) logger.info(f"Extracted search query: '{search_query}'") if not search_query: logger.warning("Web search triggered but no search query was extracted from AI response.") yield format_sse_event({"type": "error", "content": "Erreur de recherche web : requête de recherche manquante."}) continue search_results = WebSearchHandler.perform_search(search_query) content_processor = WebSearchHandler() for result in search_results: result['content'] = content_processor.fetch_page_content(result['url']) sources = [result['domain'] for result in search_results] yield format_sse_event({"type": "web_sources", "content": sources}) search_context = generate_search_context(search_results) messages_with_system = [ *messages_with_system, { "role": "user", "content": f"Résultats de recherche pour '{search_query}':\n{search_context}\nRéponds en citant les sources avec [source:domaine]" }] response_stream = ai_client.chat.completions.create( extra_headers={ "HTTP-Referer": "https://pl1nkochat.onrender.com", "X-Title": "Plinko Chat", }, model=model, messages=messages_with_system, temperature=0.0, stream=True ) break if CANVAS_START_MARKER in full_reply_text and CANVAS_END_MARKER in full_reply_text: start_index = full_reply_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = full_reply_text.find(CANVAS_END_MARKER) canvas_content_extracted = full_reply_text[start_index:end_index].strip() canvas_content = canvas_content_extracted delta_content_for_chat = delta_content_no_marker.replace(CANVAS_START_MARKER, "").replace(CANVAS_END_MARKER, "") yield format_sse_event({"type": "text", "content": delta_content_for_chat}) yield format_sse_event({"type": "canvas_start", "content": canvas_content}) continue else: yield format_sse_event({"type": "text", "content": delta_content_no_marker}) code_to_execute, code_to_display = extract_code_blocks(full_reply_text) if code_to_execute: initial_response_completed = True if web_search_triggered: for chunk in response_stream: delta_content = chunk.choices[0].delta.content or "" yield format_sse_event({"type": "text", "content": delta_content}) if execute_code and code_to_execute: is_code_used = True logger.info("Envoi événement CODE_BLOCK") yield format_sse_event({"type": "code_block", "content": code_to_execute}) yield format_sse_event({"type": "code_execution_start"}) execution_result = run_code_in_sandbox(code_to_execute) numeric_result = extract_numeric_result(execution_result) if numeric_result: code_executed = True logger.info("Envoi événement EXECUTED_CODE_RESULT") yield format_sse_event({"type": "executed_code_result", "content": numeric_result}) conclusion_prompt_content = ( f"**[INSTRUCTION SUITE - APRES EXECUTION DU CODE]** Tu as exécuté le code Python suivant:\n" f"```python\n{code_to_execute}\n```\n" f"Le résultat du calcul est: **{numeric_result}**. " "Maintenant, continue pour une conversation logique en te basant sur la question initiale." ) conclusion_prompt = [{"role": "user", "content": conclusion_prompt_content}] conclusion_stream = ai_client.chat.completions.create( extra_headers={ "HTTP-Referer": "https://pl1nkochat.onrender.com", "X-Title": "Plinko Chat", }, model=model, messages=conclusion_prompt, temperature=0.0, stream=True, ) for chunk in conclusion_stream: delta_content = chunk.choices[0].delta.content or "" yield format_sse_event({"type": "text", "content": delta_content}) yield format_sse_event({"type": "final"}) else: code_executed = True logger.info("Envoi événement EXECUTED_CODE_ERROR") yield format_sse_event({"type": "executed_code_error", "content": execution_result}) yield format_sse_event({"type": "final", "result": "Fin de la réponse (erreur ou pas de résultat numérique)."}) prompt_tokens = 0 completion_tokens = 0 total_tokens = prompt_tokens + completion_tokens if not code_executed and not is_initial_web_search: logger.info("Envoi événement FINAL (sans code exécuté)") yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) elif not is_initial_web_search: yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) except Exception as e: error_message = f"Erreur serveur IA: {str(e)}" logger.exception(error_message) yield format_sse_event({"type": "error", "content": error_message}) finally: if not is_initial_web_search: yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "done", "is_code_execution": is_code_used}) return Response(stream_with_context(stream_response()), mimetype='text/event-stream') @app.route('/canvas_modify', methods=['POST']) def canvas_modify_endpoint() -> Response: data = request.get_json() if not data: logger.warning("Requête canvas_modify sans données.") return jsonify({"error": "Requête canvas_modify invalide : corps de requête manquant."}), 400 user_message = data.get('message') canvas_content = data.get('canvas_content') model = data.get('model', 'llama3-70b-8192').strip() use_openrouter = data.get('use_openrouter', False) if not user_message or canvas_content is None: logger.warning("Requête canvas_modify incomplète: message=%s, canvas_content=%s", user_message, canvas_content) return jsonify({"error": "Requête canvas_modify invalide : message ou contenu canvas manquant."}), 400 canvas_chat_messages = [ {"role": "system", "content": ( "Tu es dans un chat de canvas. L'utilisateur va te demander de modifier le contenu du canvas suivant. " "Fournis UNIQUEMENT le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR, encadré par `[CANVAS_START]` et `[CANVAS_END]`/. " "Ne réponds pas avec du texte en dehors de ces marqueurs, sauf une courte phrase de confirmation après le canvas. " "Maintiens le contexte des modifications précédentes du canvas, en te basant sur l'historique de ce chat canvas." )}, {"role": "assistant", "content": f"[CANVAS_START]\n{canvas_content}\n[CANVAS_END] Voici le canvas actuel. Dites-moi comment le modifier."}, {"role": "user", "content": user_message} ] ai_client = openrouter_client if use_openrouter else groq_client @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def canvas_modify_groq_call(canvas_chat_messages, model, ai_client): return ai_client.chat.completions.create( extra_headers={ "HTTP-Referer": "https://pl1nkochat.onrender.com", "X-Title": "Plinko Chat Canvas", }, model=model, messages=canvas_chat_messages, temperature=0.0, stream=False, ) try: response_stream = canvas_modify_groq_call(canvas_chat_messages, model, ai_client) ai_response_text = response_stream.choices[0].message.content or "" if CANVAS_START_MARKER in ai_response_text and CANVAS_END_MARKER in ai_response_text: start_index = ai_response_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = ai_response_text.find(CANVAS_END_MARKER) updated_canvas_content = ai_response_text[start_index:end_index].strip() response_message = "Canvas mis à jour." logger.info("Canvas modifié avec succès.") return jsonify({"updated_canvas_content": updated_canvas_content, "response_message": response_message}) else: logger.error("Réponse IA canvas_modify invalide: marqueurs CANVAS_START/END manquants.") return jsonify({"error": "Réponse IA invalide pour la modification du canvas", "full_response": ai_response_text}), 500 except Exception as e: error_message = f"Erreur serveur (canvas_modify): {str(e)}" logger.exception(error_message) return jsonify({"error": error_message}), 500 @app.route('/run', methods=['POST']) def run_code_endpoint() -> Response: data = request.get_json() if not data: logger.warning("Requête run sans données.") return jsonify({"error": "Requête run invalide : corps de requête manquant."}), 400 language = data.get("language") code = data.get("code") stdin = data.get("stdin", "") if not language or not code: logger.warning("Requête run incomplète: language=%s, code=%s", language, code) return jsonify({"error": "Requête run invalide : langage et code requis."}), 400 payload = { "language": language, "version": "*", "files": [{"content": code}], "stdin": stdin } try: piston_response = requests.post(PISTON_API_URL, json=payload, timeout=3000000) piston_response.raise_for_status() logger.info("Code exécuté avec Piston avec succès.") return jsonify(piston_response.json()) except requests.RequestException as e: error_message = f"Erreur lors de l'appel à l'API Piston: {str(e)}" logger.exception(error_message) piston_text = piston_response.text if 'piston_response' in locals() and piston_response is not None else "" return jsonify({"error": "Erreur d'exécution du code via l'API Piston.", "piston_response": piston_text, "details": error_message}), 500 except Exception as e: error_message = f"Erreur inattendue lors de l'exécution du code: {str(e)}" logger.exception(error_message) return jsonify({"error": "Erreur inattendue lors de l'exécution du code.", "details": error_message}), 500 @app.route('/run', methods=['OPTIONS']) def run_code_options() -> Response: return jsonify({'message': 'OPTIONS requests allowed'}), 200 def format_sse_event(data: Dict[str, Any]) -> str: return f"data: {json.dumps(data)}\n\n" if __name__ == '__main__': app.run(port=5000, debug=os.environ.get('FLASK_DEBUG') == '1')
Cache intégré dans la recherche web :
– Les fonctions load_cache()
et save_cache()
gèrent la lecture/écriture du fichier search_cache.json
.
– Dans WebSearchHandler.perform_search()
, nous vérifions d’abord si la requête existe déjà dans le cache. Si oui, nous retournons le résultat mis en cache ; sinon, nous effectuons la recherche puis mettons à jour le cache.
Filtrage des URL :
– Seules les URL dont le schéma est 'http'
ou 'https'
sont conservées.
Récupération du contenu :
– La méthode fetch_page_content()
reste similaire, afin d’extraire et nettoyer le contenu des pages visitées.
En lançant une recherche via l’endpoint /web_search
(ou via la fonctionnalité dans votre front-end), le backend effectuera désormais une recherche sur le web en utilisant cette logique enrichie pour vous fournir des résultats avec davantage d’informations.
Vous pouvez bien entendu ajuster ou étendre cette logique (par exemple en ajoutant un filtrage par IA si nécessaire) en fonction de vos besoins.