ok deepseek i want to have possibility to do webre...
Créé le : 15 février 2025
Répondu en utilisant O3 Mini High par Chat01
Créé le : 15 février 2025
Répondu en utilisant O3 Mini High par Chat01
ok deepseek i want to have possibility to do webreschearch in my backend using duck duck go api can you make the change exactly like in this for web schearch ?:
"""
import json
import requests
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import datetime
from tqdm import tqdm
from typing import List, Dict
CONFIG = {
'openrouter_api_key': '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',
'max_results': 7,
'cache_file': 'search_cache.json',
'model': 'google/gemini-2.0-pro-exp-02-05:free',
'timeout': 20,
'min_quality_score': 0.65
}
class DataCollector:
@staticmethod
def web_search(query: str) -> List[Dict]:
with DDGS() as ddgs:
results = ddgs.text(query, region='fr-fr', max_results=CONFIG['max_results'])
return [{
'url': r['href'],
'title': r['title'],
'domain': urlparse(r['href']).netloc.replace('www.', '')
} for r in results if urlparse(r['href']).scheme in ['http', 'https']]
class ContentProcessor:
def init(self):
self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
textdef fetch_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: print(f"Erreur sur {url}: {str(e)}") return ''
class AICore:
@staticmethod
def query_ai(system_prompt: str, user_content: str, temp: float = 0.3) -> str:
headers = {
"Authorization": f"Bearer {CONFIG['openrouter_api_key']}",
"Content-Type": "application/json",
"HTTP-Referer": "https://github.com/your-repo",
"X-Title": "CLI Search AI"
}
textdata = { "model": CONFIG['model'], "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content} ], "temperature": temp, "max_tokens": 2000 } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=CONFIG['timeout']) return response.json()['choices'][0]['message']['content'] except Exception as e: return f"Erreur API: {str(e)}"
class ConsoleSearch:
def init(self):
self.cache = self.load_cache()
self.processor = ContentProcessor()
textdef load_cache(self) -> Dict: try: with open(CONFIG['cache_file'], 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} def save_cache(self): with open(CONFIG['cache_file'], 'w') as f: json.dump(self.cache, f) def get_user_input(self): print("\n" + "="*40) print("🔍 Assistant de Recherche Historique") print("="*40) return ( input("\nEntrez votre recherche : ").strip(), input("\nMode verbeux ? (o/n) : ").lower() == 'o', input("\nFormat de sortie (text/json) [text] : ").lower() or 'text', int(input("\nNombre de sources à utiliser [3] : ") or 3) ) def _filter_content(self, content: str, query: str) -> bool: """Filtre les contenus non pertinents""" prompt = f"Ce contenu est-il pertinent pour '{query}' ? Répondre uniquement par oui/non" response = AICore.query_ai(prompt, content[:2000], temp=0.0) return response.lower().startswith('oui') def run(self): query, verbose, fmt, sources = self.get_user_input() if verbose: print(f"\nDémarrage de la recherche pour : {query}") print(f"Paramètres : {sources} sources | Format {fmt.upper()}\n") results = self._process_search(query, verbose) report = self.generate_report(query, results, sources, verbose) if fmt == 'json': print("\n" + json.dumps(report, indent=2, ensure_ascii=False)) else: self._display_text_report(report) self.save_cache() def _process_search(self, query: str, verbose: bool) -> List[Dict]: if query in self.cache: if verbose: print("\nUtilisation du cache...") return self.cache[query] results = DataCollector.web_search(query) valid_results = [] for result in tqdm(results, desc="Analyse des sources", disable=not verbose): content = self.processor.fetch_content(result['url']) if content and self._filter_content(content, query): result['content'] = content valid_results.append(result) if len(valid_results) >= CONFIG['max_results']: break self.cache[query] = valid_results return valid_results def generate_report(self, query: str, results: List[Dict], max_sources: int, verbose: bool) -> Dict: """Génère un rapport de style ChatGPT""" system_prompt = f"""**Rôle:** Expert en histoire
Consignes:
Exemple de format:
Point important 1 [Source1]
Détail complémentaire [Source2]
Conclusion synthétique [Source3]"""
textsources_text = "\n\n".join([f"[Source {i+1} - {r['domain']}]\n{r['content'][:1000]}" for i, r in enumerate(results[:max_sources])]) if verbose: print("\nGénération du rapport via IA...") report_text = AICore.query_ai(system_prompt, sources_text) return { 'query': query, 'date': datetime.datetime.now().isoformat(), 'sources_used': list(set(r['domain'] for r in results[:max_sources])), 'report': report_text }
def _display_text_report(self, report: Dict):
print("\n" + "="*40)
print(f"{report['report']}")
print("\n" + "-"*40)
print(f"📅 Date: {datetime.datetime.now().strftime('%d/%m/%Y %H:%M')}")
print(f"🔗 Sources: {', '.join(report['sources_used'])}")
print("="*40)
if name == "main":
console = ConsoleSearch()
console.run()
"""
and you add this in this code for new internet web schearch in the chat if the scheafch is active use gemini pro exp model like i use for web schearch and duck duck go api :
"""
from gevent import monkey
monkey.patch_all()
import requests
import os
import re
import json
import logging
from typing import Any, Dict, List, Optional, Tuple, Generator
import time
from flask import Flask, request, Response, jsonify, stream_with_context
from flask_cors import CORS
from groq import Groq
from e2b_code_interpreter import Sandbox
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from openai import OpenAI
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from tqdm import tqdm
import datetime
import threading # <-- Pour la gestion du cache
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
app = Flask(name)
CORS(app)
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")
E2B_API_KEY = os.environ.get("E2B_API_KEY")
OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") # NOUVEAU
PISTON_API_URL = "https://emkc.org/api/v2/piston/execute"
groq_client = Groq(api_key=GROQ_API_KEY)
openrouter_client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=OPENROUTER_API_KEY,
)
sandbox = Sandbox(api_key=E2B_API_KEY, timeout=3000)
VALID_MODELS = [
"llama3-70b-8192",
"llama-3.3-70b-versatile",
"google/gemini-pro",
"meta-llama/llama-3-70b-instruct", "openai/gpt-4-turbo-preview"
]
CODE_PATTERN = re.compile(r"python\s*([\s\S]*?)
")
CODE_REQUIRED_MARKER = "[CODE_REQUIRED]"
CANVAS_START_MARKER = "[CANVAS_START]"
CANVAS_END_MARKER = "[CANVAS_END]"
WEB_SEARCH_MARKER = "[WEB_SEARCH]"
CONFIG = {
'openrouter_api_key': os.environ.get("OPENROUTER_API_KEY"),
'max_results': 7,
'cache_file': 'search_cache.json',
'model': 'google/gemini-pro',
'timeout': 20,
'min_quality_score': 0.65
}
CACHE_LOCK = threading.Lock()
def load_cache() -> Dict:
try:
with open(CONFIG['cache_file'], 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {}
def save_cache(cache: Dict):
with open(CONFIG['cache_file'], 'w') as f:
json.dump(cache, f)
class WebSearchHandler:
@staticmethod
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2))
def perform_search(query: str, max_results: int = None) -> List[Dict]:
if max_results is None:
max_results = CONFIG['max_results']
# Vérification du cache
with CACHE_LOCK:
cache = load_cache()
if query in cache:
logger.info(f"Utilisation du cache pour la requête: {query}")
return cache[query]
# Exécution de la recherche via DuckDuckGo
with DDGS() as ddgs:
results = ddgs.text(query, region='fr-fr', max_results=max_results)
search_results = [{
'url': r['href'],
'title': r['title'],
'domain': urlparse(r['href']).netloc.replace('www.', '')
} for r in results if urlparse(r['href']).scheme in ['http', 'https']]
# Mise à jour du cache
with CACHE_LOCK:
cache = load_cache()
cache[query] = search_results
save_cache(cache)
return search_results
textdef __init__(self): # Utilisation d'un User-Agent plus spécifique self.headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} def fetch_page_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: logger.error(f"Erreur fetch_content {url}: {str(e)}") return ""
def generate_search_context(search_results: List[Dict]) -> str:
return "\n".join(
f"[Source {i+1} - {r['domain']}]\n{r.get('content','')[:1000]}"
for i, r in enumerate(search_results)
)
def extract_code_blocks(text: str) -> Tuple[Optional[str], Optional[str]]:
if CODE_REQUIRED_MARKER in text:
text_no_marker = text.replace(CODE_REQUIRED_MARKER, "", 1).strip()
code_match = CODE_PATTERN.search(text_no_marker)
if code_match:
return code_match.group(1).strip(), None
else:
code_match = CODE_PATTERN.search(text)
if code_match:
return None, code_match.group(1).strip()
return None, None
def run_code_in_sandbox(code: str) -> str:
start_time = time.time()
try:
logger.info("Début de l'exécution du code dans le sandbox e2b.")
execution = None
method_name = next((method for method in ["run_code_sync", "execute_sync", "execute_code", "execute", "run_code", "run"] if hasattr(sandbox, method)), None)
if method_name:
logger.info(f"Utilisation de la méthode sandbox.{method_name}")
execution = getattr(sandbox, method_name)(code)
else:
error_msg = "Aucune méthode d'exécution de code disponible dans le sandbox."
logger.error(error_msg)
return error_msg
logs = execution.logs
stdout_logs = logs.stdout if hasattr(logs, 'stdout') else []
stderr_logs = logs.stderr if hasattr(logs, 'stderr') else []
stdout = "\n".join(stdout_logs)
stderr = "\n".join(stderr_logs)
if stderr:
return f"Stdout:\n{stdout}\n\nStderr:\n{stderr}"
return stdout
except Exception as e:
duration = time.time() - start_time
error_msg = f"Erreur lors de l'exécution du code dans e2b après {duration:.2f} secondes: {str(e)}"
logger.exception(error_msg)
return f"Erreur exécution code: {str(e)}"
def extract_numeric_result(exec_result: str) -> Optional[str]:
if "Stdout:" in exec_result:
stdout_section = exec_result.split("Stdout:\n", 1)[1].split("\n\nStderr:")[0].strip()
else:
stdout_section = exec_result.strip()
lines = [l.strip() for l in stdout_section.splitlines() if l.strip()]
if not lines:
return None
last_line = lines[-1]
if re.search(r'\d', last_line):
return last_line
return None
@app.route('/models', methods=['GET'])
def get_models() -> Response:
return jsonify({"models": VALID_MODELS})
@app.route('/web_search', methods=['POST'])
def web_search_endpoint():
data = request.get_json()
if not data or not data.get('query'):
return jsonify({"error": "Missing query"}), 400
query = data['query']
try:
# Utilisation de la nouvelle version avec cache et filtrage
search_results = WebSearchHandler.perform_search(query)
content_processor = WebSearchHandler()
for result in tqdm(search_results, desc="Fetching content from sources"):
result['content'] = content_processor.fetch_page_content(result['url'])
return jsonify({"results": search_results})
except Exception as e:
logger.error(f"Web search error: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/chat', methods=['POST'])
def chat_endpoint() -> Response:
@stream_with_context
@retry(stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type(Exception))
def stream_response():
data = request.get_json()
if not data:
logger.warning("Requête chat invalide: corps de requête manquant.")
yield format_sse_event({"type": "error", "content": "Requête chat invalide : corps de requête manquant."})
return
textmessages = data.get('messages', []) model = data.get('model', 'llama3-70b-8192').strip() execute_code = data.get('execute_code', True) use_openrouter = data.get('use_openrouter', False) enable_web_search = data.get('web_search', False) web_search_results = data.get('search_results', []) if not messages or not isinstance(messages, list) or not model or model not in VALID_MODELS: logger.warning("Requête chat invalide: messages=%s, model=%s", messages, model) yield format_sse_event({"type": "error", "content": "Requête chat invalide : messages ou modèle incorrects."}) return ai_client = openrouter_client if use_openrouter else groq_client system_prompt_additions = [] if enable_web_search and web_search_results: search_context = generate_search_context(web_search_results) system_prompt_additions.append( "**CONTEXTE DE RECHERCHE WEB :**\n" f"{search_context}\n" "Utilise les informations ci-dessus pour répondre à la question de l'utilisateur. Cite les sources [source:domaine.com]." ) elif enable_web_search: system_prompt_additions.append( "Si la question nécessite des informations actualisées ou externes :\n" "1. Utilise le marqueur [WEB_SEARCH] suivi de la requête de recherche\n" "2. Analyse les résultats et cite les sources avec [source:domaine.com]\n" "3. Garde les réponses concises et factuelles" ) base_prompt = ( "🎯 **Rôle : Expert en calcul numérique précis et assistant polyvalent avec canvas interactifs (texte et code).**\n" "**Mission :** Répondre aux questions, effectuer des calculs numériques exacts, et assister l'utilisateur avec des canvas éditables et interactifs pour le **texte** (avec formatage Markdown) et le **code**. " "Maintiens le contexte de conversation, en particulier autour du code, des canvas et des résultats précédents.\n\n" "**Si la question DEMANDE un CALCUL NUMÉRIQUE :**\n" "1. Réponds d'abord avec une courte phrase d'introduction expliquant que tu vas utiliser un code Python pour résoudre le problème. **Sois clair sur le contexte du calcul, en te référant si nécessaire aux étapes précédentes de la conversation et aux données ou résultats déjà obtenus.**\n" "2. Sur la ligne suivante, ajoute TOUJOURS le marqueur `[CODE_REQUIRED]` pour indiquer qu'un code va suivre.\n" "3. Ensuite, fournis le **BLOC DE CODE PYTHON** minimal, lisible et correct pour effectuer UNIQUEMENT LE CALCUL demandé et afficher UNIQUEMENT LE RÉSULTAT NUMÉRIQUE via `print(resultat_numerique)`.\n" "**IMPORTANT :** Ne génère RIEN D'AUTRE que ces 3 éléments, et dans cet ordre précis. Arrête-toi **ICI**. Attends l'exécution du code.\n\n" "**[INSTRUCTION SUITE - APRES EXECUTION DU CODE]**\n" "Après avoir fourni le résultat du code, CONTINUE la conversation pour expliquer le résultat EN DETAILS, en explicitant clairement et précisément **LE Résultat du code Python** que tu viens d'utiliser pour le calcul. Il faut que l'explication du résultat réponde à la question de l'utilisateur.\n\n" "**Si la question DEMANDE la CRÉATION ou MODIFICATION de CODE dans un canvas :**\n" "1. Indique clairement que tu vas créer un **canvas de code** interactif pour cela. Précise que l'utilisateur pourra te demander des modifications dans un 'chat de canvas' dédié qui va s'ouvrir.\n" "2. Encadre le contenu initial du canvas avec les marqueurs `[CANVAS_START]` et `[CANVAS_END]`.\n" " Exemple : 'Très bien, un canvas de code interactif est lancé pour un quick sort en C. Vous pouvez maintenant me demander des modifications directement dans le chat de canvas qui vient de s'ouvrir :\n" "[CANVAS_START]\n```c\n// Code Quick Sort en C\n...\n```\n[CANVAS_END]'\n" "3. **IMPORTANT :** Ne fournis PAS de code markdown ```python ... ``` après le canvas, sauf si cela est explicitement demandé et pertinent.\n\n" "**Si la question DEMANDE la CRÉATION ou MODIFICATION de TEXTE RICHE (avec markdown) dans un canvas :**\n" "1. Indique clairement que tu vas créer un **canvas de texte** interactif pour cela. Précise que l'utilisateur pourra te demander des modifications dans un 'chat de canvas' dédié.\n" "2. Encadre le contenu initial du canvas de texte avec les marqueurs `[CANVAS_START]` et `[CANVAS_END]`.\n" " Exemple : 'Très bien, un canvas de texte interactif est lancé. Le texte supporte le **gras**, l'*italique*, les listes, etc. Vous pouvez maintenant me demander des modifications directement dans le chat de canvas qui vient de s'ouvrir :\n" "[CANVAS_START]\n# Titre principal en gras\n\n* Item 1\n* Item 2\n\n[CANVAS_END]'\n" "3. **IMPORTANT :** Ne fournis PAS de code markdown ```python ... ``` après le canvas de texte, sauf si cela est explicitement demandé.\n\n" "**Si l'utilisateur pose une question dans le 'chat de canvas' :**\n" " - Interprète sa demande comme une instruction pour MODIFIER le contenu du canvas actuel.\n" " - Fournis le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR encadré par `[CANVAS_START]` et `[CANVAS_END]`.\n" " - Ajoute une courte phrase de confirmation indiquant que le canvas a été mis à jour.\n\n" "**Si la question NE DEMANDE PAS de CALCUL NUMÉRIQUE NI DE CANVAS :**\n" "Réponds directement et clairement à la question en tenant compte du contexte de la conversation.\n" "Si tu dois donner un exemple de code, utilise un bloc markdown ```python ... ``` MAIS SANS JAMAIS ajouter `[CODE_REQUIRED]` ni de canvas." ) + "\n".join(system_prompt_additions) system_prompt = {"role": "system", "content": base_prompt} messages_with_system = [system_prompt] + messages full_reply_text = "" code_to_execute = None code_to_display = None is_code_used = False canvas_content = None code_executed = False prompt_tokens = 0 completion_tokens = 0 total_tokens = 0 initial_response_completed = False web_search_triggered = False is_initial_web_search = False yield format_sse_event({"type": "analysis_status", "content": "start"}) if messages and messages[0]['content'] == '[RECHERCHE WEB INITIALE]': is_initial_web_search = True search_query = "actualités du jour" yield format_sse_event({"type": "web_search", "query": search_query}) search_results = WebSearchHandler.perform_search(search_query) content_processor = WebSearchHandler() for result in search_results: result['content'] = content_processor.fetch_page_content(result['url']) sources = [result['domain'] for result in search_results] yield format_sse_event({"type": "web_sources", "content": sources}) yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "final"}) return try: response_stream = ai_client.chat.completions.create( extra_headers={ "HTTP-Referer": "https://pl1nkochat.onrender.com", "X-Title": "Plinko Chat", }, model=model, messages=messages_with_system, temperature=0.0, stream=True ) for chunk in response_stream: if initial_response_completed: break delta_content = chunk.choices[0].delta.content or "" delta_content_no_marker = delta_content.replace(CODE_REQUIRED_MARKER, "") full_reply_text += delta_content_no_marker if enable_web_search and WEB_SEARCH_MARKER in full_reply_text: web_search_triggered = True search_query = full_reply_text.split(WEB_SEARCH_MARKER)[-1].strip() yield format_sse_event({"type": "web_search", "query": search_query}) logger.info(f"Extracted search query: '{search_query}'") if not search_query: logger.warning("Web search triggered but no search query was extracted from AI response.") yield format_sse_event({"type": "error", "content": "Erreur de recherche web : requête de recherche manquante."}) continue search_results = WebSearchHandler.perform_search(search_query) content_processor = WebSearchHandler() for result in search_results: result['content'] = content_processor.fetch_page_content(result['url']) sources = [result['domain'] for result in search_results] yield format_sse_event({"type": "web_sources", "content": sources}) search_context = generate_search_context(search_results) messages_with_system = [ *messages_with_system, { "role": "user", "content": f"Résultats de recherche pour '{search_query}':\n{search_context}\nRéponds en citant les sources avec [source:domaine]" }] response_stream = ai_client.chat.completions.create( extra_headers={ "HTTP-Referer": "https://pl1nkochat.onrender.com", "X-Title": "Plinko Chat", }, model=model, messages=messages_with_system, temperature=0.0, stream=True ) break if CANVAS_START_MARKER in full_reply_text and CANVAS_END_MARKER in full_reply_text: start_index = full_reply_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = full_reply_text.find(CANVAS_END_MARKER) canvas_content_extracted = full_reply_text[start_index:end_index].strip() canvas_content = canvas_content_extracted delta_content_for_chat = delta_content_no_marker.replace(CANVAS_START_MARKER, "").replace(CANVAS_END_MARKER, "") yield format_sse_event({"type": "text", "content": delta_content_for_chat}) yield format_sse_event({"type": "canvas_start", "content": canvas_content}) continue else: yield format_sse_event({"type": "text", "content": delta_content_no_marker}) code_to_execute, code_to_display = extract_code_blocks(full_reply_text) if code_to_execute: initial_response_completed = True if web_search_triggered: for chunk in response_stream: delta_content = chunk.choices[0].delta.content or "" yield format_sse_event({"type": "text", "content": delta_content}) if execute_code and code_to_execute: is_code_used = True logger.info("Envoi événement CODE_BLOCK") yield format_sse_event({"type": "code_block", "content": code_to_execute}) yield format_sse_event({"type": "code_execution_start"}) execution_result = run_code_in_sandbox(code_to_execute) numeric_result = extract_numeric_result(execution_result) if numeric_result: code_executed = True logger.info("Envoi événement EXECUTED_CODE_RESULT") yield format_sse_event({"type": "executed_code_result", "content": numeric_result}) conclusion_prompt_content = ( f"**[INSTRUCTION SUITE - APRES EXECUTION DU CODE]** Tu as exécuté le code Python suivant:\n" f"```python\n{code_to_execute}\n```\n" f"Le résultat du calcul est: **{numeric_result}**. " "Maintenant, continue pour une conversation logique en te basant sur la question initiale." ) conclusion_prompt = [{"role": "user", "content": conclusion_prompt_content}] conclusion_stream = ai_client.chat.completions.create( extra_headers={ "HTTP-Referer": "https://pl1nkochat.onrender.com", "X-Title": "Plinko Chat", }, model=model, messages=conclusion_prompt, temperature=0.0, stream=True, ) for chunk in conclusion_stream: delta_content = chunk.choices[0].delta.content or "" yield format_sse_event({"type": "text", "content": delta_content}) yield format_sse_event({"type": "final"}) else: code_executed = True logger.info("Envoi événement EXECUTED_CODE_ERROR") yield format_sse_event({"type": "executed_code_error", "content": execution_result}) yield format_sse_event({"type": "final", "result": "Fin de la réponse (erreur ou pas de résultat numérique)."}) prompt_tokens = 0 completion_tokens = 0 total_tokens = prompt_tokens + completion_tokens if not code_executed and not is_initial_web_search: logger.info("Envoi événement FINAL (sans code exécuté)") yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) elif not is_initial_web_search: yield format_sse_event({"type": "final", "prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens, "total_tokens": total_tokens}) except Exception as e: error_message = f"Erreur serveur IA: {str(e)}" logger.exception(error_message) yield format_sse_event({"type": "error", "content": error_message}) finally: if not is_initial_web_search: yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "done", "is_code_execution": is_code_used}) return Response(stream_with_context(stream_response()), mimetype='text/event-stream')
@app.route('/canvas_modify', methods=['POST'])
def canvas_modify_endpoint() -> Response:
data = request.get_json()
if not data:
logger.warning("Requête canvas_modify sans données.")
return jsonify({"error": "Requête canvas_modify invalide : corps de requête manquant."}), 400
user_message = data.get('message')
canvas_content = data.get('canvas_content')
model = data.get('model', 'llama3-70b-8192').strip()
use_openrouter = data.get('use_openrouter', False)
if not user_message or canvas_content is None:
logger.warning("Requête canvas_modify incomplète: message=%s, canvas_content=%s", user_message, canvas_content)
return jsonify({"error": "Requête canvas_modify invalide : message ou contenu canvas manquant."}), 400
canvas_chat_messages = [
{"role": "system", "content": (
"Tu es dans un chat de canvas. L'utilisateur va te demander de modifier le contenu du canvas suivant. "
"Fournis UNIQUEMENT le NOUVEAU contenu du canvas ENTIÈREMENT MIS À JOUR, encadré par [CANVAS_START]
et [CANVAS_END]
/. "
"Ne réponds pas avec du texte en dehors de ces marqueurs, sauf une courte phrase de confirmation après le canvas. "
"Maintiens le contexte des modifications précédentes du canvas, en te basant sur l'historique de ce chat canvas."
)},
{"role": "assistant", "content": f"[CANVAS_START]\n{canvas_content}\n[CANVAS_END] Voici le canvas actuel. Dites-moi comment le modifier."},
{"role": "user", "content": user_message}
]
textai_client = openrouter_client if use_openrouter else groq_client @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def canvas_modify_groq_call(canvas_chat_messages, model, ai_client): return ai_client.chat.completions.create( extra_headers={ "HTTP-Referer": "https://pl1nkochat.onrender.com", "X-Title": "Plinko Chat Canvas", }, model=model, messages=canvas_chat_messages, temperature=0.0, stream=False, ) try: response_stream = canvas_modify_groq_call(canvas_chat_messages, model, ai_client) ai_response_text = response_stream.choices[0].message.content or "" if CANVAS_START_MARKER in ai_response_text and CANVAS_END_MARKER in ai_response_text: start_index = ai_response_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = ai_response_text.find(CANVAS_END_MARKER) updated_canvas_content = ai_response_text[start_index:end_index].strip() response_message = "Canvas mis à jour." logger.info("Canvas modifié avec succès.") return jsonify({"updated_canvas_content": updated_canvas_content, "response_message": response_message}) else: logger.error("Réponse IA canvas_modify invalide: marqueurs CANVAS_START/END manquants.") return jsonify({"error": "Réponse IA invalide pour la modification du canvas", "full_response": ai_response_text}), 500 except Exception as e: error_message = f"Erreur serveur (canvas_modify): {str(e)}" logger.exception(error_message) return jsonify({"error": error_message}), 500
@app.route('/run', methods=['POST'])
def run_code_endpoint() -> Response:
data = request.get_json()
if not data:
logger.warning("Requête run sans données.")
return jsonify({"error": "Requête run invalide : corps de requête manquant."}), 400
language = data.get("language")
code = data.get("code")
stdin = data.get("stdin", "")
if not language or not code:
logger.warning("Requête run incomplète: language=%s, code=%s", language, code)
return jsonify({"error": "Requête run invalide : langage et code requis."}), 400
payload = {
"language": language,
"version": "*",
"files": [{"content": code}],
"stdin": stdin
}
try:
piston_response = requests.post(PISTON_API_URL, json=payload, timeout=3000000)
piston_response.raise_for_status()
logger.info("Code exécuté avec Piston avec succès.")
return jsonify(piston_response.json())
except requests.RequestException as e:
error_message = f"Erreur lors de l'appel à l'API Piston: {str(e)}"
logger.exception(error_message)
piston_text = piston_response.text if 'piston_response' in locals() and piston_response is not None else ""
return jsonify({"error": "Erreur d'exécution du code via l'API Piston.", "piston_response": piston_text, "details": error_message}), 500
except Exception as e:
error_message = f"Erreur inattendue lors de l'exécution du code: {str(e)}"
logger.exception(error_message)
return jsonify({"error": "Erreur inattendue lors de l'exécution du code.", "details": error_message}), 500
@app.route('/run', methods=['OPTIONS'])
def run_code_options() -> Response:
return jsonify({'message': 'OPTIONS requests allowed'}), 200
def format_sse_event(data: Dict[str, Any]) -> str:
return f"data: {json.dumps(data)}\n\n"
if name == 'main':
app.run(port=5000, debug=os.environ.get('FLASK_DEBUG') == '1')
"""
this is my frontend :
"""
import React, { useState, useRef, useEffect } from 'react';
import ReactMarkdown from 'react-markdown';
import { Prism } from 'react-syntax-highlighter/dist/esm';
import { Light as SyntaxHighlighterLight } from 'react-syntax-highlighter/dist/esm';
import { atomDark } from 'react-syntax-highlighter/dist/esm/styles/prism';
import {
python as prismPython,
javascript as prismJavascript,
c,
cpp as prismCpp,
java as prismJava
} from 'react-syntax-highlighter/dist/esm/languages/prism';
// Add this line to import C# from its specific file
import csharp from 'react-syntax-highlighter/dist/esm/languages/prism/csharp';
// Import CodeMirror et ses extensions
import CodeMirror from '@uiw/react-codemirror';
import { python } from '@codemirror/lang-python';
import { javascript } from '@codemirror/lang-javascript';
import { java } from '@codemirror/lang-java';
import { cpp } from '@codemirror/lang-cpp';
import { markdown } from '@codemirror/lang-markdown';
// Helper function to register languages for react-syntax-highlighter
const registerLanguage = (name: string, language: any) => {
if (Prism && Prism.languages && !Prism.languages[name]) {
Prism.registerLanguage(name, language);
} else if (Prism && Prism.languages && Prism.languages[name]) {
console.warn(Language "${name}" is already registered.
);
} else {
console.warn("Warning: Prism.registerLanguage is not available. Syntax highlighting might not work correctly.");
}
};
// Enregistrement des langages pour la coloration syntaxique
registerLanguage('python', prismPython);
registerLanguage('javascript', prismJavascript);
registerLanguage('c', c);
registerLanguage('cpp', prismCpp);
registerLanguage('java', prismJava);
registerLanguage('csharp', csharp);
registerLanguage('cs', csharp); // Also register common alias 'cs'
registerLanguage('dotnet', csharp); // Optional: for parity with Prism aliases
interface ChatMessage {
sender: 'User' | 'AI';
text: string;
codeBlock?: string;
isCodeCollapsed?: boolean;
canvasContent?: string;
hasCanvas?: boolean;
isTextCanvas?: boolean;
analysisStatus?: string;
executedCodeError?: string;
isCodeExecuting?: boolean;
codeExecutionOutput?: string | null;
promptTokens?: number; // Token usage info
completionTokens?: number; // Token usage info
totalTokens?: number; // Token usage info
webSources?: string[]; // Liste des sources web
}
interface CanvasChatMessage {
sender: 'User' | 'AI';
text: string;
}
const App: React.FC = () => {
const [message, setMessage] = useState('');
const [chat, setChat] = useState<ChatMessage[]>([]);
const chatDisplayRef = useRef<HTMLDivElement>(null);
const [isSending, setIsSending] = useState(false);
const [availableModels, setAvailableModels] = useState<string[]>([]);
const [selectedModel, setSelectedModel] = useState<string>('');
const [isCanvasActive, setIsCanvasActive] = useState(false);
const [currentCanvasContent, setCurrentCanvasContent] = useState('');
const [canvasChatMessages, setCanvasChatMessages] = useState<CanvasChatMessage[]>([]);
const [canvasMessage, setCanvasMessage] = useState('');
const canvasChatDisplayRef = useRef<HTMLDivElement>(null);
const [selectedLanguage, setSelectedLanguage] = useState<'python' | 'c' | 'cpp' | 'java' | 'javascript' | 'csharp'>('python'); // Mise à jour pour inclure C#
const [codeOutput, setCodeOutput] = useState('');
// Suppression de activeCanvasMessageIndex car non utilisé
// const [activeCanvasMessageIndex, setActiveCanvasMessageIndex] = useState<number | null>(null);
const [isTextCanvas, setIsTextCanvas] = useState(false);
const [isWebSearchEnabled, setIsWebSearchEnabled] = useState(false);
const [analysisStatus, setAnalysisStatus] = useState<string | null>(null);
const [webSources, setWebSources] = useState<string[]>([]); // State pour les sources web - RE-INTRODUCED
const [showSources, setShowSources] = useState(false); // State pour afficher/cacher les sources
text// Référence pour la console (output) afin d'automatiquement défiler vers le bas const consoleOutputRef = useRef<HTMLDivElement>(null); useEffect(() => { const fetchModels = async () => { try { const response = await fetch('https://pl1nkochat.onrender.com/models'); if (response.ok) { const data = await response.json(); setAvailableModels(data.models); if (data.models.length > 0) { setSelectedModel(data.models[0]); } } else { console.error("Failed to fetch models:", response.status); setChat(prevChat => [...prevChat, { sender: 'AI', text: `Erreur de chargement des modèles: ${response.status}` }]); } } catch (error) { console.error("Error fetching models:", error); setChat(prevChat => [...prevChat, { sender: 'AI', text: "Erreur de communication avec le serveur pour les modèles." }]); } }; fetchModels(); }, []); useEffect(() => { if (!isCanvasActive && chatDisplayRef.current) { chatDisplayRef.current.scrollTop = chatDisplayRef.current.scrollHeight; } if (isCanvasActive && canvasChatDisplayRef.current) { canvasChatDisplayRef.current.scrollTop = canvasChatDisplayRef.current.scrollHeight; } }, [chat, canvasChatMessages, isCanvasActive]); // Auto-scroll de la console de sortie lorsque codeOutput change useEffect(() => { if (consoleOutputRef.current) { consoleOutputRef.current.scrollTop = consoleOutputRef.current.scrollHeight; } }, [codeOutput]); const handleSubmit = async (e: React.FormEvent) => { e.preventDefault(); if (message.trim() === '' || isSending || isCanvasActive) return; setIsSending(true); setAnalysisStatus('Analyse en cours...'); setWebSources([]); // Réinitialiser les sources web à chaque nouvelle requête const userMessage = message; setChat(prevChat => [...prevChat, { sender: 'User', text: userMessage }]); setMessage(''); setAnalysisStatus('Réponse en cours...'); const messagesForApi = [ ...chat.map(msg => ({ role: msg.sender === 'User' ? 'user' : 'assistant', content: msg.text })), { role: 'user', content: userMessage } ]; setChat(prevChat => [ ...prevChat, { sender: 'AI', text: '', isCodeCollapsed: true, analysisStatus: 'Réponse en cours...', isCodeExecuting: false, codeExecutionOutput: null, webSources: [] } ]); let webSearchToUse = isWebSearchEnabled; // Store the current webSearch state if (isWebSearchEnabled) { setIsWebSearchEnabled(false); // Disable web search immediately after it's used for this message } try { const response = await fetch('https://pl1nkochat.onrender.com/chat', { method: 'POST', headers: { 'Content-Type': 'application/json', 'Accept': 'text/event-stream' }, body: JSON.stringify({ messages: messagesForApi, model: selectedModel, web_search: webSearchToUse }) // Use stored webSearch value }); if (!response.ok) { const errorText = await response.text(); setChat(prevChat => { const lastIndex = prevChat.length - 1; return [ ...prevChat.slice(0, lastIndex), { sender: 'AI', text: `Erreur: ${response.status} - ${errorText}`, analysisStatus: 'Erreur' } ]; }); setIsSending(false); setAnalysisStatus('Erreur lors de la communication avec le serveur.'); return; } const reader = response.body!.getReader(); const decoder = new TextDecoder(); while (true) { const { done, value } = await reader.read(); if (done) break; const chunk = decoder.decode(value); const events = chunk.split('\n\n').filter(line => line.startsWith("data: ")); for (const eventLine of events) { const eventData = JSON.parse(eventLine.substring(6)); console.log("Événement reçu:", eventData); if (eventData.type === 'text') { setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const newText = lastMessage.text + eventData.content; const updatedMessage = { ...lastMessage, text: newText, isCodeExecuting: false }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'code_block') { setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, codeBlock: eventData.content, isCodeExecuting: false }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'code_execution_start') { setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, isCodeExecuting: true, codeExecutionOutput: 'Execution en cours...' }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'executed_code_result') { setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, codeExecutionOutput: eventData.content, isCodeExecuting: false }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'canvas_start') { const canvasContent = eventData.content; const isText = canvasContent.trim().startsWith('#') || canvasContent.trim().startsWith('* ') || !canvasContent.includes('```'); setIsTextCanvas(isText); const cleanedCanvasContent = canvasContent.replace(/```(python|javascript|c|cpp|java|csharp)\n/g, '```\n'); // Mise à jour regex pour C# setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, canvasContent: cleanedCanvasContent, hasCanvas: true, isTextCanvas: isText, isCodeExecuting: false }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'final') { setAnalysisStatus(null); setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, promptTokens: eventData.prompt_tokens, completionTokens: eventData.completion_tokens, totalTokens: eventData.total_tokens }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'error') { setChat(prevChat => { const lastIndex = prevChat.length - 1; return [ ...prevChat.slice(0, lastIndex), { sender: 'AI', text: `Erreur IA: ${eventData.content}`, analysisStatus: 'Erreur IA' } ]; }); setIsSending(false); setAnalysisStatus('Erreur de l\'IA.'); return; } else if (eventData.type === 'analysis_status') { setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, analysisStatus: eventData.content, isCodeExecuting: false }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); setAnalysisStatus(eventData.content); } else if (eventData.type === 'executed_code_error') { setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, executedCodeError: eventData.content, isCodeExecuting: false, codeExecutionOutput: eventData.content }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'web_search_status') { if (eventData.content === 'start') { setAnalysisStatus('Recherche internet en cours...'); } else if (eventData.type === 'end') { setAnalysisStatus('Réponse en cours...'); // Retour à l'analyse après la recherche } } else if (eventData.type === 'web_search') { // Ajout de la gestion de l'événement web_search setAnalysisStatus(`Recherche web : ${eventData.query}`); setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, text: lastMessage.text + `\n[Recherche web : ${eventData.query}]`, // webSources: [...(lastMessage.webSources || []), ...sources] // Suppression de l'ajout des sources ici // car sources arrive dans event 'web_sources' }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } else if (eventData.type === 'web_sources') { setWebSources(eventData.content); setChat(prevChat => { const lastIndex = prevChat.length - 1; const lastMessage = prevChat[lastIndex]; if (lastMessage) { const updatedMessage = { ...lastMessage, webSources: eventData.content }; return [...prevChat.slice(0, lastIndex), updatedMessage]; } return prevChat; }); } } } } catch (error) { console.error("Erreur lors de la communication avec le backend:", error); setChat(prevChat => { const lastIndex = prevChat.length - 1; return [ ...prevChat.slice(0, lastIndex), { sender: 'AI', text: "Erreur de communication avec l'IA.", analysisStatus: 'Erreur réseau' } ]; }); setAnalysisStatus('Erreur réseau.'); } finally { setIsSending(false); if (!analysisStatus) { setAnalysisStatus('Terminé'); setTimeout(() => setAnalysisStatus(null), 30000000); } } }; const toggleCodeBlockCollapse = (index: number) => { setChat(prevChat => prevChat.map((msg, i) => i === index ? { ...msg, isCodeCollapsed: !msg.isCodeCollapsed } : msg ) ); }; const handleCanvasChange = (newContent: string) => { const cleanedContent = newContent.replace(/```(python|javascript|c|cpp|java|csharp)\n/g, '```\n'); // Mise à jour regex pour C# setCurrentCanvasContent(cleanedContent); }; const handleCanvasMessageSubmit = async (e: React.FormEvent) => { e.preventDefault(); if (canvasMessage.trim() === '') return; const userCanvasMessage = canvasMessage; setCanvasChatMessages(prevMessages => [...prevMessages, { sender: 'User', text: userCanvasMessage }]); setCanvasMessage(''); try { const response = await fetch('https://pl1nkochat.onrender.com/canvas_modify', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ message: userCanvasMessage, canvas_content: currentCanvasContent, model: selectedModel }) }); if (!response.ok) { console.error("Erreur Canvas Chat:", response.status, await response.text()); setCanvasChatMessages(prevMessages => [...prevMessages, { sender: 'AI', text: `Erreur: ${response.status} - Chat Canvas` }]); return; } const result = await response.json(); if (result && result.updated_canvas_content) { const cleanedContent = result.updated_canvas_content.replace(/```(python|javascript|c|cpp|java|csharp)\n/g, '```\n'); // Mise à jour regex pour C# setCurrentCanvasContent(cleanedContent); setCanvasChatMessages(prevMessages => [...prevMessages, { sender: 'AI', text: result.response_message || "Canvas mis à jour." }]); } else if (result && result.error) { setCanvasChatMessages(prevMessages => [...prevMessages, { sender: 'AI', text: `Erreur IA Canvas: ${result.error}` }]); } } catch (error) { console.error("Erreur Canvas Chat Backend:", error); setCanvasChatMessages(prevMessages => [...prevMessages, { sender: 'AI', text: "Erreur de communication avec l'IA (Canvas Chat)." }]); } }; const handleLanguageChange = (e: React.ChangeEvent<HTMLSelectElement>) => { setSelectedLanguage(e.target.value as typeof selectedLanguage); }; const handleCodeExecution = async () => { setCodeOutput('Execution en cours...'); try { const response = await fetch("https://pl1nkochat.onrender.com/run", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ language: selectedLanguage, code: currentCanvasContent, stdin: "" }) }); const result = await response.json(); setCodeOutput(result.run?.stdout || result.error || "Erreur d'exécution du code!"); } catch (error) { console.error("Erreur Exécution Code:", error); setCodeOutput("Erreur lors de l'exécution du code."); } }; const handleStartNewChat = () => { setIsCanvasActive(false); setCanvasChatMessages([]); setCurrentCanvasContent(''); setCodeOutput(''); // Suppression de setActiveCanvasMessageIndex car non utilisé // setActiveCanvasMessageIndex(null); setIsTextCanvas(false); setChat([]); setWebSources([]); // Reset web sources on new chat - **RE-INTRODUCED** }; const handleOpenCanvas = (index: number, canvasContent: string, isTextCanvasProp: boolean) => { setIsCanvasActive(true); const cleanedContent = canvasContent.replace(/```(python|javascript|c|cpp|java|csharp)\n/g, '```\n'); // Mise à jour regex pour C# setCurrentCanvasContent(cleanedContent); setCanvasChatMessages([]); // Suppression de setActiveCanvasMessageIndex car non utilisé // setActiveCanvasMessageIndex(index); setIsTextCanvas(isTextCanvasProp); }; const handleModelChange = (e: React.ChangeEvent<HTMLSelectElement>) => { setSelectedModel(e.target.value); }; const handleWebSearchToggle = (e: React.ChangeEvent<HTMLInputElement>) => { setIsWebSearchEnabled(e.target.checked); setChat(prevChat => [...prevChat, { sender: 'AI', text: `[Recherche web ${e.target.checked ? 'activée' : 'désactivée'}]`, analysisStatus: undefined }]); // Correction ici : analysisStatus: undefined }; const toggleSourcesVisibility = () => { setShowSources(!showSources); }; return ( <div style={{ margin: '20px', display: 'flex', flexDirection: 'column', height: '95vh' }}> {!isCanvasActive && ( <center> <h1>PLINKO CHATTE</h1> </center> )} <div style={{ display: 'flex', flexDirection: 'row', width: '100%', height: 'calc(100% - 100px)' }}> {!isCanvasActive && ( <div style={{ flex: 1, height: '100%' }}> <div ref={chatDisplayRef} style={{ flexGrow: 1, overflowY: 'auto', marginBottom: '20px', border: '1px solid #ccc', padding: '10px', height: 'calc(100% - 100px)' }} > {chat.map((msg, index) => ( <div key={index} style={{ marginBottom: '15px', textAlign: msg.sender === 'User' ? 'right' : 'left' }}> <strong>{msg.sender}:</strong> <div> {msg.analysisStatus && <div style={{ fontSize: '0.8em', color: 'grey' }}>{msg.analysisStatus}</div>} <div style={{ whiteSpace: 'pre-wrap', wordWrap: 'break-word', position: 'relative' }}> {msg.sender === 'AI' ? ( <> <ReactMarkdown components={{ code: ({ className, children, ...props }) => { const language = className?.match(/language-(\w+)/)?.[1] || ''; return ( <SyntaxHighlighterLight style={atomDark as any} language={language} PreTag="div" children={String(children).replace(/\n$/, '')} {...props} /> ); } }} > {msg.codeBlock ? msg.text.replace(/```python\s*([\s\S]*?)```/g, '') : msg.text} </ReactMarkdown> {msg.webSources && msg.webSources.length > 0 && ( <div style={{display: 'inline-block', marginLeft: '5px'}}> <button onClick={toggleSourcesVisibility} style={{ background: 'none', border: 'none', color: '#007bff', padding: 0, cursor: 'pointer', fontSize: '0.8em', textDecoration: 'underline' }} > {showSources ? 'Cacher Sources' : 'Afficher Sources'} </button> {showSources && ( <div style={{ position: 'absolute', zIndex: 10, backgroundColor: '#f9f9f9', border: '1px solid #ccc', padding: '10px', borderRadius: '5px', left: '0', top: '100%', marginTop: '5px', width: '250px', textAlign: 'left', boxShadow: '2px 2px 5px rgba(0,0,0,0.3)' }}> <strong>Sources:</strong> <ul> {msg.webSources.map((source, index) => ( <li key={index} style={{fontSize: '0.8em', wordWrap: 'break-word'}}> <a href={source} target="_blank" rel="noopener noreferrer">{source}</a> </li> ))} </ul> </div> )} </div> )} </> ) : ( msg.text )} </div> {msg.sender === 'AI' && msg.codeBlock && ( <div style={{ marginTop: '10px', border: '1px solid #ccc', borderRadius: '5px', overflow: 'hidden' }}> <div style={{ backgroundColor: '#f0f0f0', padding: '8px', cursor: 'pointer', display: 'flex', justifyContent: 'space-between', alignItems: 'center' }} onClick={() => toggleCodeBlockCollapse(index)} > <strong>Code Python Généré</strong> <span>{msg.isCodeCollapsed ? '▼' : '▲'}</span> </div> <div style={{ padding: '10px', display: msg.isCodeCollapsed ? 'none' : 'block', backgroundColor: '#f9f9f9' }} > <SyntaxHighlighterLight style={atomDark as any} language="python" PreTag="div" > {msg.codeBlock} </SyntaxHighlighterLight> </div> </div> )} {msg.sender === 'AI' && msg.isCodeExecuting && msg.codeExecutionOutput && ( <div style={{ marginTop: '10px', padding: '10px', backgroundColor: '#e0f7fa', borderRadius: '5px', border: '1px solid #80deea' }}> <strong>Code Execution:</strong> <pre style={{ whiteSpace: 'pre-wrap', wordWrap: 'break-word', marginTop: '5px', fontFamily: 'monospace', fontSize: '0.9em' }}> {msg.codeExecutionOutput} </pre> </div> )} {msg.sender === 'AI' && !msg.isCodeExecuting && msg.codeExecutionOutput && !msg.executedCodeError && ( <div style={{ marginTop: '10px', padding: '10px', backgroundColor: '#c8e6c9', borderRadius: '5px', border: '1px solid #81c784' }}> <strong>Code Output:</strong> <pre style={{ whiteSpace: 'pre-wrap', wordWrap: 'break-word', marginTop: '5px', fontFamily: 'monospace', fontSize: '0.9em' }}> {msg.codeExecutionOutput} </pre> </div> )} {msg.sender === 'AI' && msg.executedCodeError && ( <div style={{ marginTop: '10px', padding: '10px', backgroundColor: '#ffe0b2', borderRadius: '5px', border: '1px solid #ffb74d' }}> <strong>Erreur d'exécution du code:</strong> <pre style={{ whiteSpace: 'pre-wrap', wordWrap: 'break-word', marginTop: '5px', fontFamily: 'monospace', fontSize: '0.9em' }}> {msg.executedCodeError} </pre> </div> )} {msg.sender === 'AI' && msg.hasCanvas && !isCanvasActive && ( <button onClick={() => handleOpenCanvas(index, msg.canvasContent || '', msg.isTextCanvas || false)} style={{ marginTop: '10px', padding: '8px 15px', borderRadius: '5px', backgroundColor: '#007bff', color: 'white', border: 'none', cursor: 'pointer' }} > Ouvrir Canvas </button> )} {msg.sender === 'AI' && msg.totalTokens !== undefined && ( <div style={{ marginTop: '5px', fontSize: '0.8em', color: 'grey' }}> Tokens utilisés: {msg.totalTokens} (Prompt: {msg.promptTokens}, Completion: {msg.completionTokens}) </div> )} </div> </div> ))} </div> {analysisStatus && <div style={{ marginBottom: '10px', color: 'grey', textAlign: 'center' }}>{analysisStatus}</div>} <form onSubmit={handleSubmit} style={{ display: 'flex', flexDirection: 'column', marginTop: '10px' }}> <textarea value={message} onChange={e => setMessage(e.target.value)} placeholder="Tapez votre message... (Chat Principal)" rows={3} style={{ padding: '8px', borderRadius: '5px', border: '1px solid #ccc' }} disabled={isSending || isCanvasActive} /> <div style={{ marginBottom: '10px', display: 'flex', alignItems: 'center', justifyContent: 'space-between' }}> <div> <label htmlFor="model-select" style={{ display: 'block', marginBottom: '5px', color: '#ccc' }}> Select Model: </label> <select id="model-select" value={selectedModel} onChange={handleModelChange} style={{ width: '100%', padding: '8px', borderRadius: '5px', backgroundColor: '#333', color: 'white', border: '1px solid #555', }} > {availableModels.map((model) => ( <option key={model} value={model}> {model} </option> ))} </select> </div> <div style={{ display: 'flex', alignItems: 'center' }}> <label htmlFor="web-search-toggle" style={{ color: '#ccc', marginRight: '10px' }}> Recherche Web: </label> <input id="web-search-toggle" type="checkbox" checked={isWebSearchEnabled} onChange={handleWebSearchToggle} style={{ cursor: 'pointer' }} /> </div> </div> <button type="submit" style={{ padding: '10px', borderRadius: '5px', backgroundColor: '#4CAF50', color: 'white', border: 'none', cursor: 'pointer' }} disabled={isSending || isCanvasActive}> {isSending ? "Envoi en cours..." : "Envoyer"} </button> </form> <button onClick={handleStartNewChat} style={{ marginTop: '10px', padding: '10px 15px', borderRadius: '8px', backgroundColor: '#607d8b', color: 'white', border: 'none', cursor: 'pointer', fontWeight: 'bold', transition: 'background-color 0.2s ease', }} > Nouveau Chat </button> </div> )} {isCanvasActive && ( <div style={{ display: 'flex', width: '100%', height: '100%', gap: '20px', position: 'fixed', top: 0, left: 0, right: 0, bottom: 0, }}> <div style={{ width: '25%', background: 'linear-gradient(145deg, #3a3d49, #2d2f36)', borderRadius: '8px', padding: '15px', display: 'flex', flexDirection: 'column', boxShadow: '0 4px 12px rgba(0,0,0,0.3)' }}> <h3 style={{ marginBottom: '10px', color: '#ccc' }}>Canvas Chat</h3> <button onClick={handleStartNewChat} style={{ padding: '10px 15px', borderRadius: '8px', backgroundColor: '#0a84ff', color: 'white', border: 'none', cursor: 'pointer', marginBottom: '20px', fontWeight: 'bold', transition: 'background-color 0.2s ease', }} > Retour au Chat Principal </button> <div ref={canvasChatDisplayRef} style={{ flexGrow: 1, overflowY: 'auto', marginBottom: '10px', color: '#ececf1', height: 'calc(100% - 180px)' }} > {canvasChatMessages.map((msg, index) => ( <div key={index} style={{ textAlign: msg.sender === 'User' ? 'right' : 'left', marginBottom: '5px', color: msg.sender === 'User' ? '#90caf9' : '#c5e1a5', }} > <strong>{msg.sender}:</strong> {msg.text} </div> ))} </div> <form onSubmit={handleCanvasMessageSubmit} style={{ display: 'flex', marginTop: '10px' }}> <textarea value={canvasMessage} onChange={(e) => setCanvasMessage(e.target.value)} placeholder="Modifier le canvas ici..." rows={2} style={{ flexGrow: 1, padding: '8px', borderRadius: '5px', border: 'none', backgroundColor: '#4d4e5c', color: '#ececf1', marginRight: '8px', resize: 'none', }} /> <button type="submit" style={{ padding: '8px 15px', borderRadius: '5px', backgroundColor: '#0a84ff', color: 'white', border: 'none', cursor: 'pointer', }} > Send </button> </form> </div> <div style={{ flex: 1, display: 'flex', flexDirection: 'column', backgroundColor: '#1e1e1e', borderRadius: '8px', overflow: 'hidden', boxShadow: '0 4px 12px rgba(0,0,0,0.3)' }}> <div style={{ backgroundColor: '#2d2d2d', padding: '8px 10px', display: 'flex', alignItems: 'center', borderBottom: '1px solid #444' }}> <span style={{ color: '#ccc', fontFamily: 'monospace', marginRight: '10px' }}> {isTextCanvas ? 'text_canvas.md' : `main.${selectedLanguage === 'python' ? 'py' : (selectedLanguage === 'javascript' ? 'js' : (selectedLanguage === 'c' ? 'c' : (selectedLanguage === 'cpp' ? 'cpp' : (selectedLanguage === 'java' ? 'java' : (selectedLanguage === 'csharp' ? 'cs' : 'txt')))))}`} </span> {!isTextCanvas && ( <select id="language-select" value={selectedLanguage} onChange={handleLanguageChange} style={{ padding: '5px 10px', borderRadius: '4px', backgroundColor: '#333', color: 'white', border: '1px solid #555', marginRight: '10px' }} > <option value="python">Python</option> <option value="javascript">JavaScript</option> <option value="c">C</option> <option value="cpp">C++</option> <option value="java">Java</option> <option value="csharp">C#</option> {/* Ajout de l'option C# */} </select> )} {!isTextCanvas && ( <button onClick={handleCodeExecution} style={{ marginLeft: 'auto', padding: '5px 10px', backgroundColor: '#0a84ff', color: '#fff', border: 'none', borderRadius: '4px', cursor: 'pointer' }} > Run </button> )} </div> {/* Zone d'édition du code (CodeMirror) dans un conteneur scrollable */} <div style={{ flexGrow: 1, overflowY: 'auto' }}> <CodeMirror value={currentCanvasContent} height="100%" theme="dark" extensions={ isTextCanvas ? [markdown()] : [ selectedLanguage === 'python' ? python() : selectedLanguage === 'javascript' ? javascript() : selectedLanguage === 'java' ? java() : (selectedLanguage === 'cpp' || selectedLanguage === 'c' || selectedLanguage === 'csharp') ? cpp() : [] ] } onChange={(value) => handleCanvasChange(value)} /> </div> {codeOutput && !isTextCanvas && ( <div ref={consoleOutputRef} style={{ backgroundColor: '#1a1a1a', padding: '10px', borderTop: '1px solid #444', maxHeight: '200px', overflowY: 'auto' }} > <strong style={{ color: '#eee' }}>Output:</strong> <SyntaxHighlighterLight style={atomDark as any} language={selectedLanguage} PreTag="div" > {codeOutput} </SyntaxHighlighterLight> </div> )} </div> </div> )} </div> </div> );
};
export default App;
"""
Below is an example of how you can “unify” your backend code so that you have a dedicated web‐search endpoint (using DuckDuckGo with caching and content extraction) as well as a chat endpoint that—when web search is active—uses your Gemini Pro–based OpenRouter API. (Your frontend can remain as provided.) You may adjust environment variables and other parameters as needed.
pythonfrom gevent import monkey monkey.patch_all() import os import re import json import logging import time import datetime import threading from typing import Any, Dict, List, Optional, Tuple import requests from flask import Flask, request, Response, jsonify, stream_with_context from flask_cors import CORS from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type # --- NEW: Imports for OpenRouter, web search, and content extraction --- from openai import OpenAI from duckduckgo_search import DDGS from bs4 import BeautifulSoup from urllib.parse import urlparse from tqdm import tqdm # --- Configuration --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # API keys (set via environment or use safe defaults) GROQ_API_KEY = os.environ.get("GROQ_API_KEY") E2B_API_KEY = os.environ.get("E2B_API_KEY") OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") PISTON_API_URL = "https://emkc.org/api/v2/piston/execute" # Initialize clients (assume groq_client is defined elsewhere if needed) # For this example, we assume groq_client is a fallback chat client. # openrouter_client will use the Gemini Pro model when required. groq_client = None # Replace with your groq client initialization if available openrouter_client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=OPENROUTER_API_KEY, ) # Valid models (Gemini Pro used for web search–assisted chats) VALID_MODELS = [ "llama3-70b-8192", "llama-3.3-70b-versatile", "google/gemini-pro", # Gemini Pro "meta-llama/llama-3-70b-instruct", "openai/gpt-4-turbo-preview" ] CONFIG = { 'openrouter_api_key': OPENROUTER_API_KEY, 'max_results': 7, 'cache_file': 'search_cache.json', 'model': 'google/gemini-pro', # Use Gemini Pro model for web search responses 'timeout': 20, 'min_quality_score': 0.65 } # --- Cache management for web search --- CACHE_LOCK = threading.Lock() def load_cache() -> Dict: try: with open(CONFIG['cache_file'], 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return {} def save_cache(cache: Dict): with open(CONFIG['cache_file'], 'w') as f: json.dump(cache, f) # --- WebSearchHandler: Uses DuckDuckGo and extracts page content --- class WebSearchHandler: @staticmethod @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2)) def perform_search(query: str, max_results: int = None) -> List[Dict]: if max_results is None: max_results = CONFIG['max_results'] # Check cache with CACHE_LOCK: cache = load_cache() if query in cache: logger.info(f"Using cache for query: {query}") return cache[query] # Use DuckDuckGo API with DDGS() as ddgs: results = ddgs.text(query, region='fr-fr', max_results=max_results) search_results = [{ 'url': r['href'], 'title': r['title'], 'domain': urlparse(r['href']).netloc.replace('www.', '') } for r in results if urlparse(r['href']).scheme in ['http', 'https']] # Update cache with CACHE_LOCK: cache = load_cache() cache[query] = search_results save_cache(cache) return search_results def __init__(self): self.headers = { 'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/91.0.4472.124 Safari/537.36') } def fetch_page_content(self, url: str) -> str: try: response = requests.get(url, headers=self.headers, timeout=CONFIG['timeout']) soup = BeautifulSoup(response.text, 'html.parser') # Remove unnecessary elements for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'button', 'meta', 'link']): element.decompose() main_content = soup.find(['main', 'article']) or soup.body return ' '.join(main_content.stripped_strings)[:5000] except Exception as e: logger.error(f"Error fetching content from {url}: {str(e)}") return "" def generate_search_context(search_results: List[Dict]) -> str: return "\n".join( f"[Source {i+1} - {r['domain']}]\n{r.get('content', '')[:1000]}" for i, r in enumerate(search_results) ) # --- Helpers for code extraction and sandbox execution (if needed) --- CODE_PATTERN = re.compile(r"```python\s*([\s\S]*?)```") CODE_REQUIRED_MARKER = "[CODE_REQUIRED]" CANVAS_START_MARKER = "[CANVAS_START]" CANVAS_END_MARKER = "[CANVAS_END]" WEB_SEARCH_MARKER = "[WEB_SEARCH]" def extract_code_blocks(text: str) -> Tuple[Optional[str], Optional[str]]: if CODE_REQUIRED_MARKER in text: text_no_marker = text.replace(CODE_REQUIRED_MARKER, "", 1).strip() code_match = CODE_PATTERN.search(text_no_marker) if code_match: return code_match.group(1).strip(), None else: code_match = CODE_PATTERN.search(text) if code_match: return None, code_match.group(1).strip() return None, None # (Optional) Code execution in sandbox – adjust as needed def run_code_in_sandbox(code: str) -> str: # Example: using a sandbox (e.g. via e2b or other service) start_time = time.time() try: logger.info("Executing code in sandbox...") # This is a placeholder – replace with your sandbox code execution logic. execution_result = f"Simulated execution result of:\n{code}" return execution_result except Exception as e: duration = time.time() - start_time error_msg = f"Error executing code after {duration:.2f} seconds: {str(e)}" logger.exception(error_msg) return error_msg def extract_numeric_result(exec_result: str) -> Optional[str]: if "Stdout:" in exec_result: stdout_section = exec_result.split("Stdout:\n", 1)[1].split("\n\nStderr:")[0].strip() else: stdout_section = exec_result.strip() lines = [l.strip() for l in stdout_section.splitlines() if l.strip()] if not lines: return None last_line = lines[-1] if re.search(r'\d', last_line): return last_line return None # --- Flask App and Endpoints --- app = Flask(__name__) CORS(app) @app.route('/models', methods=['GET']) def get_models() -> Response: return jsonify({"models": VALID_MODELS}) @app.route('/web_search', methods=['POST']) def web_search_endpoint(): data = request.get_json() if not data or not data.get('query'): return jsonify({"error": "Missing query"}), 400 query = data['query'] try: search_results = WebSearchHandler.perform_search(query) content_processor = WebSearchHandler() for result in tqdm(search_results, desc="Fetching content"): result['content'] = content_processor.fetch_page_content(result['url']) return jsonify({"results": search_results}) except Exception as e: logger.error(f"Web search error: {e}") return jsonify({"error": str(e)}), 500 @app.route('/chat', methods=['POST']) def chat_endpoint() -> Response: @stream_with_context @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def stream_response(): data = request.get_json() if not data: logger.warning("Invalid chat request: missing body.") yield format_sse_event({"type": "error", "content": "Missing request body."}) return messages = data.get('messages', []) model = data.get('model', 'llama3-70b-8192').strip() execute_code = data.get('execute_code', True) use_openrouter = data.get('use_openrouter', False) enable_web_search = data.get('web_search', False) web_search_results = data.get('search_results', []) if not messages or not isinstance(messages, list) or not model or model not in VALID_MODELS: logger.warning("Invalid chat request: messages or model incorrect.") yield format_sse_event({"type": "error", "content": "Invalid messages or model."}) return # Choose the API client – if web search is enabled, use OpenRouter (Gemini Pro) ai_client = openrouter_client if use_openrouter or enable_web_search else groq_client if ai_client is None: # Fallback in case groq_client isn’t set ai_client = openrouter_client system_prompt_additions = [] if enable_web_search and web_search_results: search_context = generate_search_context(web_search_results) system_prompt_additions.append( "**WEB SEARCH CONTEXT:**\n" + f"{search_context}\n" + "Use the above information to answer the user’s question and cite sources as [source:domain]." ) elif enable_web_search: system_prompt_additions.append( "If the question requires up-to-date external information, " "use the marker [WEB_SEARCH] followed by the search query, analyze the results, and cite the sources as [source:domain]." ) base_prompt = ( "🎯 **Role: Expert assistant capable of both precise numerical calculations and interactive canvas work.**\n" "Answer questions clearly and concisely while maintaining context.\n\n" ) + "\n".join(system_prompt_additions) system_prompt = {"role": "system", "content": base_prompt} messages_with_system = [system_prompt] + messages full_reply_text = "" code_to_execute = None code_to_display = None is_code_used = False is_initial_web_search = False yield format_sse_event({"type": "analysis_status", "content": "start"}) # (Optional) If an initial web search is triggered by a special marker: if messages and messages[0]['content'] == '[RECHERCHE WEB INITIALE]': is_initial_web_search = True search_query = "actualités du jour" yield format_sse_event({"type": "web_search", "query": search_query}) search_results = WebSearchHandler.perform_search(search_query) content_processor = WebSearchHandler() for result in search_results: result['content'] = content_processor.fetch_page_content(result['url']) sources = [result['domain'] for result in search_results] yield format_sse_event({"type": "web_sources", "content": sources}) yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "final"}) return try: response_stream = ai_client.chat.completions.create( extra_headers={ "HTTP-Referer": "https://yourdomain.example.com", "X-Title": "Your Chat Title", }, model=model, messages=messages_with_system, temperature=0.0, stream=True ) for chunk in response_stream: delta_content = chunk.choices[0].delta.content or "" # Remove any special code markers for display delta_content_no_marker = delta_content.replace(CODE_REQUIRED_MARKER, "") full_reply_text += delta_content_no_marker if enable_web_search and WEB_SEARCH_MARKER in full_reply_text: # Extract the search query after the marker and trigger a search search_query = full_reply_text.split(WEB_SEARCH_MARKER)[-1].strip() yield format_sse_event({"type": "web_search", "query": search_query}) logger.info(f"Extracted search query: '{search_query}'") if not search_query: yield format_sse_event({"type": "error", "content": "Missing search query."}) continue search_results = WebSearchHandler.perform_search(search_query) content_processor = WebSearchHandler() for result in search_results: result['content'] = content_processor.fetch_page_content(result['url']) sources = [result['domain'] for result in search_results] yield format_sse_event({"type": "web_sources", "content": sources}) search_context = generate_search_context(search_results) messages_with_system.append({ "role": "user", "content": f"Search results for '{search_query}':\n{search_context}\nPlease answer citing sources as [source:domain]." }) response_stream = ai_client.chat.completions.create( extra_headers={ "HTTP-Referer": "https://yourdomain.example.com", "X-Title": "Your Chat Title", }, model=model, messages=messages_with_system, temperature=0.0, stream=True ) break if CANVAS_START_MARKER in full_reply_text and CANVAS_END_MARKER in full_reply_text: start_index = full_reply_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = full_reply_text.find(CANVAS_END_MARKER) canvas_content = full_reply_text[start_index:end_index].strip() yield format_sse_event({"type": "canvas_start", "content": canvas_content}) continue else: yield format_sse_event({"type": "text", "content": delta_content_no_marker}) # Check for code block markers code_to_execute, code_to_display = extract_code_blocks(full_reply_text) if code_to_execute: # Once code is found, stop further streaming (if that is your intended logic) break # If execute_code is enabled and a code block was extracted, execute it if execute_code and code_to_execute: is_code_used = True yield format_sse_event({"type": "code_block", "content": code_to_execute}) yield format_sse_event({"type": "code_execution_start"}) execution_result = run_code_in_sandbox(code_to_execute) numeric_result = extract_numeric_result(execution_result) if numeric_result: yield format_sse_event({"type": "executed_code_result", "content": numeric_result}) conclusion_prompt = [{ "role": "user", "content": ( f"**[POST-CODE EXECUTION]** The Python code:\n```python\n{code_to_execute}\n``` " f"produced the result: **{numeric_result}**. Now please continue your answer." ) }] conclusion_stream = ai_client.chat.completions.create( extra_headers={ "HTTP-Referer": "https://yourdomain.example.com", "X-Title": "Your Chat Title", }, model=model, messages=conclusion_prompt, temperature=0.0, stream=True, ) for chunk in conclusion_stream: delta_content = chunk.choices[0].delta.content or "" yield format_sse_event({"type": "text", "content": delta_content}) yield format_sse_event({"type": "final"}) else: yield format_sse_event({"type": "executed_code_error", "content": execution_result}) yield format_sse_event({"type": "final", "result": "Finished (code execution error or no numeric result)."}) else: yield format_sse_event({"type": "final", "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}) except Exception as e: error_message = f"Server error: {str(e)}" logger.exception(error_message) yield format_sse_event({"type": "error", "content": error_message}) finally: yield format_sse_event({"type": "analysis_status", "content": "end"}) yield format_sse_event({"type": "done", "is_code_execution": is_code_used}) return Response(stream_with_context(stream_response()), mimetype='text/event-stream') @app.route('/canvas_modify', methods=['POST']) def canvas_modify_endpoint() -> Response: data = request.get_json() if not data: logger.warning("Missing canvas_modify data.") return jsonify({"error": "Missing request body."}), 400 user_message = data.get('message') canvas_content = data.get('canvas_content') model = data.get('model', 'llama3-70b-8192').strip() use_openrouter = data.get('use_openrouter', False) if not user_message or canvas_content is None: logger.warning("Incomplete canvas_modify request.") return jsonify({"error": "Missing message or canvas content."}), 400 canvas_chat_messages = [ {"role": "system", "content": ( "You are in a canvas chat. The user will instruct you to modify the canvas content below. " "Provide ONLY the UPDATED canvas content enclosed by [CANVAS_START] and [CANVAS_END]." )}, {"role": "assistant", "content": f"[CANVAS_START]\n{canvas_content}\n[CANVAS_END] Current canvas. How would you like to modify it?"}, {"role": "user", "content": user_message} ] ai_client = openrouter_client if use_openrouter else groq_client if ai_client is None: ai_client = openrouter_client @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=60), retry=retry_if_exception_type(Exception)) def canvas_modify_groq_call(messages, model, ai_client): return ai_client.chat.completions.create( extra_headers={ "HTTP-Referer": "https://yourdomain.example.com", "X-Title": "Chat Canvas", }, model=model, messages=messages, temperature=0.0, stream=False, ) try: response_stream = canvas_modify_groq_call(canvas_chat_messages, model, ai_client) ai_response_text = response_stream.choices[0].message.content or "" if CANVAS_START_MARKER in ai_response_text and CANVAS_END_MARKER in ai_response_text: start_index = ai_response_text.find(CANVAS_START_MARKER) + len(CANVAS_START_MARKER) end_index = ai_response_text.find(CANVAS_END_MARKER) updated_canvas_content = ai_response_text[start_index:end_index].strip() logger.info("Canvas updated successfully.") return jsonify({"updated_canvas_content": updated_canvas_content, "response_message": "Canvas updated."}) else: logger.error("Invalid canvas_modify response: missing markers.") return jsonify({"error": "Invalid response from AI for canvas modification", "full_response": ai_response_text}), 500 except Exception as e: error_message = f"Canvas modify error: {str(e)}" logger.exception(error_message) return jsonify({"error": error_message}), 500 @app.route('/run', methods=['POST']) def run_code_endpoint() -> Response: data = request.get_json() if not data: logger.warning("Missing run request data.") return jsonify({"error": "Missing request body."}), 400 language = data.get("language") code = data.get("code") stdin = data.get("stdin", "") if not language or not code: logger.warning("Incomplete run request.") return jsonify({"error": "Language and code are required."}), 400 payload = { "language": language, "version": "*", "files": [{"content": code}], "stdin": stdin } try: piston_response = requests.post(PISTON_API_URL, json=payload, timeout=300) piston_response.raise_for_status() logger.info("Code executed successfully with Piston.") return jsonify(piston_response.json()) except requests.RequestException as e: error_message = f"Piston API error: {str(e)}" logger.exception(error_message) piston_text = piston_response.text if 'piston_response' in locals() else "" return jsonify({"error": "Error executing code via Piston.", "piston_response": piston_text, "details": error_message}), 500 except Exception as e: error_message = f"Unexpected error: {str(e)}" logger.exception(error_message) return jsonify({"error": "Unexpected error executing code.", "details": error_message}), 500 @app.route('/run', methods=['OPTIONS']) def run_code_options() -> Response: return jsonify({'message': 'OPTIONS requests allowed'}), 200 def format_sse_event(data: Dict[str, Any]) -> str: return f"data: {json.dumps(data)}\n\n" if __name__ == '__main__': port = int(os.environ.get("PORT", 5000)) debug_mode = os.environ.get('FLASK_DEBUG') == '1' app.run(port=port, debug=debug_mode)
Web Search Endpoint (/web_search
):
Uses the DuckDuckGo API via the DDGS
library and then extracts content with BeautifulSoup. Results are cached (using a file and thread lock) to avoid redundant searches.
Chat Endpoint (/chat
):
If web search is enabled (or a special marker is used), it gathers web search results, appends a “search context” to the conversation, and uses the Gemini Pro (OpenRouter) model for generating the answer. It also supports code extraction and (optionally) sandboxed execution.
Canvas & Code Execution Endpoints:
These endpoints remain similar to your previous implementation, allowing interactive canvas modifications and code execution (with Piston).
Frontend:
Your provided React frontend will call these endpoints (e.g. /chat
and /web_search
) and display the responses accordingly.
Feel free to further customize the prompts, error handling, or client initialization (e.g. setting up your fallback groq_client) as needed for your application.