import os import json import logging import reques...
Créé le : 20 février 2025
Répondu en utilisant O3 Mini High par Chat01
Créé le : 20 février 2025
Répondu en utilisant O3 Mini High par Chat01
import os
import json
import logging
import requests
from flask import Flask, request, Response, jsonify
from flask_cors import CORS
from dotenv import load_dotenv
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from langchain.agents import AgentExecutor, create_react_agent
from langchain_core.prompts import PromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.tools import Tool
from werkzeug.utils import secure_filename
import PyPDF2
from PIL import Image
import pytesseract
import re
load_dotenv()
app = Flask(name)
CORS(app) # Activer CORS pour les requêtes cross-origin
UPLOAD_FOLDER = "uploads"
OPENROUTER_API_KEY="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"
PISTON_API_URL = "https://emkc.org/api/v2/piston/execute"
VALID_MODELS = ["google/gemini-2.0-pro-exp-02-05:free"]
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
def format_sse_event(data: dict) -> str:
"""Formater un dictionnaire en événement SSE."""
return f"data: {json.dumps(data)}\n\n"
class DataCollector:
@staticmethod
def web_search(query: str) -> list:
"""Effectuer une recherche web avec DuckDuckGo."""
try:
with DDGS() as ddgs:
results = [r for r in ddgs.text(query, region="fr-fr", max_results=5) if isinstance(r, dict)]
return [{"url": r["href"], "title": r["title"], "domain": urlparse(r["href"]).netloc.replace("www.", "")} for r in results]
except Exception as e:
logger.error(f"Erreur de recherche web : {e}")
return []
class ContentProcessor:
def init(self):
self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"}
textdef fetch_content(self, url: str) -> str: """Récupérer et nettoyer le contenu HTML d'une URL.""" try: response = requests.get(url, headers=self.headers, timeout=20) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") for element in soup(["script", "style", "nav", "footer", "header", "aside", "form", "button", "meta", "link"]): element.decompose() main_content = soup.find(["main", "article"]) or soup.body return " ".join(main_content.stripped_strings)[:5000] except Exception as e: logger.error(f"Erreur lors de la récupération de {url} : {e}") return ""
class AICore:
@staticmethod
def query_ai(system_prompt: str, user_content: str, temp: float = 0.1, model: str = VALID_MODELS[0]) -> str:
"""Envoyer une requête à l'API Open Router."""
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
}
data = {
"model": model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_content},
],
"temperature": temp,
"max_tokens": 2000,
}
try:
response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=20)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
except Exception as e:
logger.error(f"Erreur API Open Router : {e}")
return f"Erreur lors de la requête Open Router : {str(e)}"
def open_router_chat_completion_stream(messages: list, model: str, temperature: float = 0.0):
"""Streamer les réponses de l'API Open Router."""
headers = {
"Authorization": f"Bearer {OPENROUTER_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": 2000,
"stream": True,
}
try:
response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, stream=True, timeout=20)
response.raise_for_status()
for line in response.iter_lines():
if line:
decoded_line = line.decode("utf-8")
if decoded_line.startswith("data: "):
data_str = decoded_line[len("data: "):]
try:
data = json.loads(data_str)
yield data
except json.JSONDecodeError:
continue
except Exception as e:
logger.error(f"Erreur de streaming : {e}")
yield {"error": str(e)}
def execute_code_with_piston(code: str, language: str) -> str:
"""Exécuter du code avec l'API Piston et retourner la sortie."""
code = clean_code_input(code)
payload = {
"language": language,
"version": "*",
"files": [{"content": code}],
"stdin": ""
}
try:
response = requests.post(PISTON_API_URL, json=payload, timeout=30)
response.raise_for_status()
result = response.json()
run_output = result.get("run", {}).get("stdout", "") or result.get("run", {}).get("stderr", "")
return run_output if run_output else "Aucune sortie."
except Exception as e:
logger.error(f"Erreur API Piston : {e}")
return f"Erreur lors de l'exécution du code : {str(e)}"
def clean_code_input(input_str: str) -> str:
"""Supprimer les marqueurs de bloc de code Markdown."""
match = re.search(r'python\s*(.*?)\s*
', input_str, re.DOTALL)
if match:
return match.group(1)
return input_str
react_prompt = PromptTemplate.from_template("""
Vous êtes un assistant IA utile. Répondez aux questions suivantes du mieux que vous pouvez. Vous avez accès aux outils suivants :
{tools}
Utilisez le format suivant :
Question : la question à répondre
Pensée : vous devez toujours réfléchir à ce qu'il faut faire
Action : l'action à prendre, doit être une parmi [{tool_names}]
Entrée Action : l'entrée pour l'action
Observation : le résultat de l'action
... (cette séquence Pensée/Action/Entrée Action/Observation peut se répéter N fois)
Pensée : je connais maintenant la réponse finale
Réponse Finale : la réponse finale à la question initiale
Question : {input}
{agent_scratchpad}
""")
@app.route("/models", methods=["GET"])
def get_models():
"""Retourner la liste des modèles IA valides."""
return jsonify({"models": VALID_MODELS})
@app.route("/chat", methods=["POST"])
def chat_endpoint():
"""Gérer les requêtes de chat avec streaming, exécution de code et recherche web."""
data = request.get_json()
messages = data.get("messages", [])
model = data.get("model", VALID_MODELS[0])
web_search_enabled = data.get("web_search", False)
file_context = data.get("file_context", "")
textif not messages: return jsonify({"error": "Messages requis"}), 400 if model not in VALID_MODELS: return jsonify({"error": "Modèle invalide"}), 400 # Préparer les messages pour le streaming full_messages = [{"role": "system", "content": "Vous êtes un assistant IA utile."}] if file_context: full_messages.append({"role": "system", "content": f"Contexte fichier : {file_context}"}) full_messages.extend(messages) def generate(): collector = DataCollector() processor = ContentProcessor() ai_core = AICore() # Gérer la recherche web si activée if web_search_enabled: last_message = messages[-1]["content"] yield format_sse_event({"type": "web_search_status", "content": "start"}) yield format_sse_event({"type": "web_search", "query": last_message}) search_results = collector.web_search(last_message) filtered_results = [] for result in search_results: content = processor.fetch_content(result["url"]) if content: filter_prompt = f"Ce contenu est-il pertinent pour la requête : '{last_message}' ? Répondez 'oui' ou 'non'." is_relevant = ai_core.query_ai(filter_prompt, content[:2000], temp=0.0) if is_relevant.lower().strip() == "oui": filtered_results.append({"url": result["url"], "title": result["title"], "content": content}) if len(filtered_results) >= 3: break if filtered_results: web_content = "\n\n".join(f"URL: {r['url']}\nTitre: {r['title']}\nContenu: {r['content'][:1000]}" for r in filtered_results) full_messages.append({"role": "system", "content": f"Résultats de la recherche web :\n{web_content}"}) yield format_sse_event({"type": "web_sources", "content": [r["url"] for r in filtered_results]}) yield format_sse_event({"type": "web_search_status", "content": "end"}) # Définir les outils pour l'agent tools = [ Tool( name="PythonCodeInterpreter", func=lambda code: execute_code_with_piston(clean_code_input(code), "python"), description="Exécute du code Python et retourne la sortie." ) ] if web_search_enabled: tools.append( Tool( name="WebSearch", func=lambda q: json.dumps(collector.web_search(q)), description="Effectue une recherche web et retourne les résultats en JSON." ) ) # Configurer l'agent LangChain llm = ChatOpenAI( openai_api_base="https://openrouter.ai/api/v1", openai_api_key=OPENROUTER_API_KEY, model_name=model, temperature=0.0 ) agent = create_react_agent(llm, tools, react_prompt) agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True) # Streamer la réponse de l'agent chat_history = "\n".join([f"{m['role']}: {m['content']}" for m in messages[:-1]]) input_text = messages[-1]["content"] for output in agent_executor.stream({"input": input_text, "chat_history": chat_history}): if "steps" in output: for step in output["steps"]: yield format_sse_event({"type": "intermediate", "content": f"Action: {step['action']}, Entrée: {step['action_input']}, Observation: {step['observation']}"}) if "output" in output: yield format_sse_event({"type": "text", "content": output["output"]}) yield format_sse_event({"type": "final", "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}) return Response(generate(), mimetype="text/event-stream")
@app.route("/canvas_modify", methods=["POST"])
def canvas_modify_endpoint():
"""Modifier le contenu du canvas selon les instructions de l'utilisateur."""
data = request.get_json()
instruction = data.get("message")
canvas_content = data.get("canvas_content", "")
model = data.get("model", VALID_MODELS[0])
textif not instruction: return jsonify({"error": "Instruction requise"}), 400 if model not in VALID_MODELS: return jsonify({"error": "Modèle invalide"}), 400 messages = [ {"role": "system", "content": "Modifiez le contenu du canvas selon l'instruction de l'utilisateur. Entourez le contenu mis à jour avec [CANVAS_START] et [CANVAS_END]."}, {"role": "user", "content": f"Instruction : {instruction}\nContenu actuel du canvas : {canvas_content}"} ] response = "" for chunk in open_router_chat_completion_stream(messages, model): if "choices" in chunk and chunk["choices"]: delta = chunk["choices"][0].get("delta", {}).get("content", "") response += delta if "[CANVAS_START]" in response and "[CANVAS_END]" in response: start_idx = response.index("[CANVAS_START]") + len("[CANVAS_START]") end_idx = response.index("[CANVAS_END]") updated_content = response[start_idx:end_idx].strip() return jsonify({"updated_canvas_content": updated_content, "response_message": "Canvas mis à jour."}) else: return jsonify({"error": "Échec de l'extraction du contenu mis à jour du canvas"}), 500
@app.route("/run", methods=["POST"])
def run_code_endpoint():
"""Exécuter du code dans le langage spécifié et retourner la sortie."""
data = request.get_json()
language = data.get("language", "python")
code = data.get("code")
if not language or not code:
return jsonify({"error": "Langage et code requis"}), 400
output = execute_code_with_piston(code, language)
return jsonify({"run": {"stdout": output}})
@app.route("/upload", methods=["POST"])
def upload_file():
"""Gérer les uploads de fichiers et extraire leur contenu."""
if "file" not in request.files:
return jsonify({"error": "Aucun fichier fourni"}), 400
textfiles = request.files.getlist("file") uploaded_files = [] for file in files: if file.filename == "": continue filename = secure_filename(file.filename) file_path = os.path.join(UPLOAD_FOLDER, filename) try: file.save(file_path) file_content = "" if filename.endswith((".txt", ".py", ".js", ".c", ".cpp", ".java", ".cs")): with open(file_path, "r", encoding="utf-8") as f: file_content = f.read() elif filename.endswith(".pdf"): with open(file_path, "rb") as f: pdf = PyPDF2.PdfReader(f) file_content = " ".join(page.extract_text() for page in pdf.pages if page.extract_text()) elif filename.endswith((".png", ".jpg", ".jpeg")): file_content = pytesseract.image_to_string(Image.open(file_path)) else: return jsonify({"error": f"Type de fichier non supporté : {filename}"}), 400 uploaded_files.append({"filename": filename, "content": file_content}) except Exception as e: logger.error(f"Échec de l'upload pour {filename} : {e}") return jsonify({"error": f"Échec de l'upload : {e}"}), 500 finally: if os.path.exists(file_path): os.remove(file_path) # Nettoyer return jsonify({"files": uploaded_files})
if name == "main":
app.run(debug=True, host="0.0.0.0", port=5000)
Requirement already satisfied: multidict<7.0,>=4.5 in c:\users\ixelo\miniconda3\lib\site-packages (from aiohttp<4.0.0,>=3.8.3->langchain) (6.1.0)
Requirement already satisfied: propcache>=0.2.0 in c:\users\ixelo\miniconda3\lib\site-packages (from aiohttp<4.0.0,>=3.8.3->langchain) (0.2.1)
Requirement already satisfied: yarl<2.0,>=1.17.0 in c:\users\ixelo\miniconda3\lib\site-packages (from aiohttp<4.0.0,>=3.8.3->langchain) (1.18.3)
Requirement already satisfied: colorama in c:\users\ixelo\miniconda3\lib\site-packages (from click>=8.1.3->flask) (0.4.6)
Requirement already satisfied: jsonpatch<2.0,>=1.33 in c:\users\ixelo\miniconda3\lib\site-packages (from langchain-core<1.0.0,>=0.3.35->langchain) (1.33)
Requirement already satisfied: typing-extensions>=4.7 in c:\users\ixelo\miniconda3\lib\site-packages (from langchain-core<1.0.0,>=0.3.35->langchain) (4.12.2)
Requirement already satisfied: httpx<1,>=0.23.0 in c:\users\ixelo\miniconda3\lib\site-packages (from langsmith<0.4,>=0.1.17->langchain) (0.28.1)
Requirement already satisfied: orjson<4.0.0,>=3.9.14 in c:\users\ixelo\miniconda3\lib\site-packages (from langsmith<0.4,>=0.1.17->langchain) (3.10.12)
Requirement already satisfied: requests-toolbelt<2.0.0,>=1.0.0 in c:\users\ixelo\miniconda3\lib\site-packages (from langsmith<0.4,>=0.1.17->langchain) (1.0.0)
Requirement already satisfied: anyio<5,>=3.5.0 in c:\users\ixelo\miniconda3\lib\site-packages (from openai<2.0.0,>=1.58.1->langchain-openai) (3.7.1)
Requirement already satisfied: distro<2,>=1.7.0 in c:\users\ixelo\miniconda3\lib\site-packages (from openai<2.0.0,>=1.58.1->langchain-openai) (1.9.0)
Requirement already satisfied: jiter<1,>=0.4.0 in c:\users\ixelo\miniconda3\lib\site-packages (from openai<2.0.0,>=1.58.1->langchain-openai) (0.8.2)
Requirement already satisfied: sniffio in c:\users\ixelo\miniconda3\lib\site-packages (from openai<2.0.0,>=1.58.1->langchain-openai) (1.3.1)
Requirement already satisfied: tqdm>4 in c:\users\ixelo\miniconda3\lib\site-packages (from openai<2.0.0,>=1.58.1->langchain-openai) (4.66.5)
Requirement already satisfied: annotated-types>=0.6.0 in c:\users\ixelo\miniconda3\lib\site-packages (from pydantic<3.0.0,>=2.7.4->langchain) (0.7.0)
Requirement already satisfied: pydantic-core==2.27.2 in c:\users\ixelo\miniconda3\lib\site-packages (from pydantic<3.0.0,>=2.7.4->langchain) (2.27.2)
Requirement already satisfied: greenlet!=0.4.17 in c:\users\ixelo\miniconda3\lib\site-packages (from SQLAlchemy<3,>=1.4->langchain) (3.1.1)
Requirement already satisfied: regex>=2022.1.18 in c:\users\ixelo\miniconda3\lib\site-packages (from tiktoken<1,>=0.7->langchain-openai) (2024.11.6)
Requirement already satisfied: httpcore==1.* in c:\users\ixelo\miniconda3\lib\site-packages (from httpx<1,>=0.23.0->langsmith<0.4,>=0.1.17->langchain) (1.0.7)
Requirement already satisfied: h11<0.15,>=0.13 in c:\users\ixelo\miniconda3\lib\site-packages (from httpcore==1.*->httpx<1,>=0.23.0->langsmith<0.4,>=0.1.17->langchain) (0.14.0)
Requirement already satisfied: jsonpointer>=1.9 in c:\users\ixelo\miniconda3\lib\site-packages (from jsonpatch<2.0,>=1.33->langchain-core<1.0.0,>=0.3.35->langchain) (2.1)
C:\Users\ixelo\Desktop\Pl1nkochat>python app.py
Entering new None chain...
INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
Pensée : L'utilisateur a simplement écrit "sa". Il s'agit probablement d'une abréviation ou d'une faute de frappe. Je ne peux pas répondre à la question telle quelle. Je vais demander des précisions.
Action : Aucune action nécessaire, je vais reformuler la question.
Réponse Finale : Pourriez-vous préciser votre question, s'il vous plaît ? "sa" est très court et je ne suis pas sûr de comprendre ce que vous voulez dire.
que vous voulez dire.Debugging middleware caught exception in streamed response at a point where response headers were already sent.
Traceback (most recent call last):
File "C:\Users\ixelo\miniconda3\Lib\site-packages\werkzeug\wsgi.py", line 256, in next
return self._next()
^^^^^^^^^^^^
File "C:\Users\ixelo\miniconda3\Lib\site-packages\werkzeug\wrappers\response.py", line 32, in _iter_encoded
for item in iterable:
^^^^^^^^^
File "C:\Users\ixelo\Desktop\Pl1nkochat\app.py", line 271, in generate
yield format_sse_event({"type": "intermediate", "content": f"Action: {step['action']}, Entrée: {step['action_input']}, Observation: {step['observation']}"})
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'AgentStep' object is not subscriptable
INFO:werkzeug:127.0.0.1 - - [20/Feb/2025 14:41:59] "POST /chat HTTP/1.1" 200 -
INFO:werkzeug: * Detected change in 'C:\Users\ixelo\Desktop\Pl1nkochat\app.py', reloading
INFO:werkzeug: * Detected change in 'C:\Users\ixelo\Desktop\Pl1nkochat\app.py', reloading
Invalid Format: Missing 'Action Input:' after 'Action:'INFO:werkzeug: * Restarting with watchdog (windowsapi)
WARNING:werkzeug: * Debugger is active!
INFO:werkzeug: * Debugger PIN: 250-744-203
INFO:werkzeug: * Detected change in 'C:\Users\ixelo\Desktop\Pl1nkochat\app.py', reloading
INFO:werkzeug: * Detected change in 'C:\Users\ixelo\Desktop\Pl1nkochat\app.py', reloading
INFO:werkzeug: * Detected change in 'C:\Users\ixelo\Desktop\Pl1nkochat\app.py', reloading
INFO:werkzeug: * Detected change in 'C:\Users\ixelo\Desktop\Pl1nkochat\app.py', reloading
INFO:werkzeug: * Detected change in 'C:\Users\ixelo\Desktop\Pl1nkochat\app.py', reloading
INFO:werkzeug: * Detected change in 'C:\Users\ixelo\Desktop\Pl1nkochat\app.py', reloading
INFO:werkzeug: * Detected change in 'C:\Users\ixelo\Desktop\Pl1nkochat\app.py', reloading
INFO:werkzeug: * Detected change in 'C:\Users\ixelo\Desktop\Pl1nkochat\app.py', reloading
INFO:werkzeug: * Detected change in 'C:\Users\ixelo\Desktop\Pl1nkochat\app.py', reloading
INFO:werkzeug: * Restarting with watchdog (windowsapi)
WARNING:werkzeug: * Debugger is active!
INFO:werkzeug: * Debugger PIN: 250-744-203
INFO:werkzeug:127.0.0.1 - - [20/Feb/2025 14:42:44] "OPTIONS /chat HTTP/1.1" 200 -
Entering new None chain...
INFO:httpx:HTTP Request: POST https://openrouter.ai/api/v1/chat/completions "HTTP/1.1 200 OK"
Pensée : L'utilisateur a simplement écrit "sa". Il s'agit probablement d'une abréviation ou d'une faute de frappe. Il est difficile de répondre à une question aussi courte et ambiguë sans plus de contexte. Je pourrais essayer de deviner ce que l'utilisateur voulait dire, mais il y a trop de possibilités. Je vais lui demander de clarifier sa question.
Réponse Finale : Pourriez-vous préciser votre question, s'il vous plaît ? "sa" est très court et je ne suis pas sûr de comprendre ce que vous voulez dire.
? "sa" est très court et je ne suis pas sûr de comprendre ce que vous voulez dire.Debugging middleware caught exception in streamed response at a point where response headers were already sent.
Traceback (most recent call last):
File "C:\Users\ixelo\miniconda3\Lib\site-packages\werkzeug\wsgi.py", line 256, in next
return self._next()
^^^^^^^^^^^^
File "C:\Users\ixelo\miniconda3\Lib\site-packages\werkzeug\wrappers\response.py", line 32, in _iter_encoded
for item in iterable:
^^^^^^^^^
File "C:\Users\ixelo\Desktop\Pl1nkochat\app.py", line 271, in generate
yield format_sse_event({"type": "intermediate", "content": f"Action: {step['action']}, Entrée: {step['action_input']}, Observation: {step['observation']}"})
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'AgentStep' object is not subscriptable
INFO:werkzeug:127.0.0.1 - - [20/Feb/2025 14:42:49] "POST /chat HTTP/1.1" 200 -
after patched the code show me the full patched code so i can copy past this is the things i want :
"""
ok this is my code i want you to make the feature of chat gpt :
code interpreter
canva(schearch on google for what is it)(i want canva in all language pls)
web schearch
streaming answer
-(not a tools) but language addapation
what i want to use :
for AI model i want to use "google/gemini-2.0-pro-exp-02-05:free" on open router
for code exectuion i want to use piston for running code in canva and for running code itnerpreter i want piston/e2b is depend what is the best
for web schearch i want to use duck duck go
remember is a chat so we stream and we get answer instantly code interpreter is the same as Chat gpt so this is my code create a full plan for make something really cool and coherent :
"""
Voici ci-dessous le code complet mis à jour selon vos demandes. Les modifications principales sont :
Vous pouvez copier-coller ce code intégralement :
pythonimport os import json import logging import requests from flask import Flask, request, Response, jsonify from flask_cors import CORS from dotenv import load_dotenv from duckduckgo_search import DDGS from bs4 import BeautifulSoup from urllib.parse import urlparse from langchain.agents import AgentExecutor, create_react_agent from langchain_core.prompts import PromptTemplate from langchain_openai import ChatOpenAI from langchain_core.tools import Tool from werkzeug.utils import secure_filename import PyPDF2 from PIL import Image import pytesseract import re # Charger les variables d'environnement load_dotenv() # Initialiser l'application Flask app = Flask(__name__) CORS(app) # Activer CORS pour les requêtes cross-origin # Configuration UPLOAD_FOLDER = "uploads" OPENROUTER_API_KEY = "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" PISTON_API_URL = "https://emkc.org/api/v2/piston/execute" VALID_MODELS = ["google/gemini-2.0-pro-exp-02-05:free"] # Créer le dossier d'upload s'il n'existe pas os.makedirs(UPLOAD_FOLDER, exist_ok=True) # Configurer le logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Fonction utilitaire pour le format SSE def format_sse_event(data: dict) -> str: """Formater un dictionnaire en événement SSE.""" return f"data: {json.dumps(data)}\n\n" # Classe pour la recherche web avec DuckDuckGo class DataCollector: @staticmethod def web_search(query: str) -> list: """Effectuer une recherche web avec DuckDuckGo.""" try: with DDGS() as ddgs: results = [r for r in ddgs.text(query, region="fr-fr", max_results=5) if isinstance(r, dict)] return [{"url": r["href"], "title": r["title"], "domain": urlparse(r["href"]).netloc.replace("www.", "")} for r in results] except Exception as e: logger.error(f"Erreur de recherche web : {e}") return [] # Classe pour le traitement du contenu HTML class ContentProcessor: def __init__(self): self.headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"} def fetch_content(self, url: str) -> str: """Récupérer et nettoyer le contenu HTML d'une URL.""" try: response = requests.get(url, headers=self.headers, timeout=20) response.raise_for_status() soup = BeautifulSoup(response.text, "html.parser") for element in soup(["script", "style", "nav", "footer", "header", "aside", "form", "button", "meta", "link"]): element.decompose() main_content = soup.find(["main", "article"]) or soup.body return " ".join(main_content.stripped_strings)[:5000] except Exception as e: logger.error(f"Erreur lors de la récupération de {url} : {e}") return "" # Classe pour interagir avec l'IA via Open Router class AICore: @staticmethod def query_ai(system_prompt: str, user_content: str, temp: float = 0.1, model: str = VALID_MODELS[0]) -> str: """Envoyer une requête à l'API Open Router.""" headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", } data = { "model": model, "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_content}, ], "temperature": temp, "max_tokens": 2000, } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=data, timeout=20) response.raise_for_status() return response.json()["choices"][0]["message"]["content"] except Exception as e: logger.error(f"Erreur API Open Router : {e}") return f"Erreur lors de la requête Open Router : {str(e)}" # Fonction de streaming pour l'API Open Router def open_router_chat_completion_stream(messages: list, model: str, temperature: float = 0.0): """Streamer les réponses de l'API Open Router.""" headers = { "Authorization": f"Bearer {OPENROUTER_API_KEY}", "Content-Type": "application/json", } payload = { "model": model, "messages": messages, "temperature": temperature, "max_tokens": 2000, "stream": True, } try: response = requests.post("https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, stream=True, timeout=20) response.raise_for_status() for line in response.iter_lines(): if line: decoded_line = line.decode("utf-8") if decoded_line.startswith("data: "): data_str = decoded_line[len("data: "):] try: data = json.loads(data_str) yield data except json.JSONDecodeError: continue except Exception as e: logger.error(f"Erreur de streaming : {e}") yield {"error": str(e)} # Exécution de code avec Piston def execute_code_with_piston(code: str, language: str) -> str: """Exécuter du code avec l'API Piston et retourner la sortie.""" code = clean_code_input(code) payload = { "language": language, "version": "*", "files": [{"content": code}], "stdin": "" } try: response = requests.post(PISTON_API_URL, json=payload, timeout=30) response.raise_for_status() result = response.json() run_output = result.get("run", {}).get("stdout", "") or result.get("run", {}).get("stderr", "") return run_output if run_output else "Aucune sortie." except Exception as e: logger.error(f"Erreur API Piston : {e}") return f"Erreur lors de l'exécution du code : {str(e)}" # Nettoyer l'entrée de code pour enlever les blocs Markdown def clean_code_input(input_str: str) -> str: """Supprimer les marqueurs de bloc de code Markdown.""" match = re.search(r'```python\s*(.*?)\s*```', input_str, re.DOTALL) if match: return match.group(1) return input_str # Template de prompt ReAct pour l'agent, incluant l'adaptation de langue react_prompt = PromptTemplate.from_template(""" Vous êtes un assistant IA utile. Répondez aux questions suivantes du mieux que vous pouvez et adaptez votre langue à celle de l'utilisateur. Vous avez accès aux outils suivants : {tools} Utilisez le format suivant : Question : la question à répondre Pensée : vous devez toujours réfléchir à ce qu'il faut faire Action : l'action à prendre, doit être une parmi [{tool_names}] Entrée Action : l'entrée pour l'action Observation : le résultat de l'action ... (cette séquence Pensée/Action/Entrée Action/Observation peut se répéter plusieurs fois) Pensée : je connais maintenant la réponse finale Réponse Finale : la réponse finale à la question initiale Question : {input} {agent_scratchpad} """) # Endpoint pour obtenir la liste des modèles IA valides @app.route("/models", methods=["GET"]) def get_models(): return jsonify({"models": VALID_MODELS}) # Endpoint de chat avec streaming, code interpreter, web search et adaptation de langue @app.route("/chat", methods=["POST"]) def chat_endpoint(): data = request.get_json() messages = data.get("messages", []) model = data.get("model", VALID_MODELS[0]) web_search_enabled = data.get("web_search", False) file_context = data.get("file_context", "") if not messages: return jsonify({"error": "Messages requis"}), 400 if model not in VALID_MODELS: return jsonify({"error": "Modèle invalide"}), 400 # Préparer les messages pour le streaming full_messages = [{"role": "system", "content": "Vous êtes un assistant IA utile et adaptez votre langue à celle de l'utilisateur."}] if file_context: full_messages.append({"role": "system", "content": f"Contexte fichier : {file_context}"}) full_messages.extend(messages) def generate(): collector = DataCollector() processor = ContentProcessor() ai_core = AICore() # Gérer la recherche web si activée if web_search_enabled: last_message = messages[-1]["content"] yield format_sse_event({"type": "web_search_status", "content": "start"}) yield format_sse_event({"type": "web_search", "query": last_message}) search_results = collector.web_search(last_message) filtered_results = [] for result in search_results: content = processor.fetch_content(result["url"]) if content: filter_prompt = f"Ce contenu est-il pertinent pour la requête : '{last_message}' ? Répondez 'oui' ou 'non'." is_relevant = ai_core.query_ai(filter_prompt, content[:2000], temp=0.0) if is_relevant.lower().strip() == "oui": filtered_results.append({"url": result["url"], "title": result["title"], "content": content}) if len(filtered_results) >= 3: break if filtered_results: web_content = "\n\n".join(f"URL: {r['url']}\nTitre: {r['title']}\nContenu: {r['content'][:1000]}" for r in filtered_results) full_messages.append({"role": "system", "content": f"Résultats de la recherche web :\n{web_content}"}) yield format_sse_event({"type": "web_sources", "content": [r["url"] for r in filtered_results]}) yield format_sse_event({"type": "web_search_status", "content": "end"}) # Définir les outils pour l'agent tools = [ Tool( name="PythonCodeInterpreter", func=lambda code: execute_code_with_piston(clean_code_input(code), "python"), description="Exécute du code Python et retourne la sortie." ) ] if web_search_enabled: tools.append( Tool( name="WebSearch", func=lambda q: json.dumps(collector.web_search(q)), description="Effectue une recherche web et retourne les résultats en JSON." ) ) # Configurer l'agent LangChain llm = ChatOpenAI( openai_api_base="https://openrouter.ai/api/v1", openai_api_key=OPENROUTER_API_KEY, model_name=model, temperature=0.0 ) agent = create_react_agent(llm, tools, react_prompt) agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True) # Streamer la réponse de l'agent chat_history = "\n".join([f"{m['role']}: {m['content']}" for m in messages[:-1]]) input_text = messages[-1]["content"] for output in agent_executor.stream({"input": input_text, "chat_history": chat_history}): if "steps" in output: for step in output["steps"]: # Accès par attribut (pas par index) pour AgentStep yield format_sse_event({ "type": "intermediate", "content": f"Action: {step.action}, Entrée: {step.action_input}, Observation: {step.observation}" }) if "output" in output: yield format_sse_event({"type": "text", "content": output["output"]}) yield format_sse_event({"type": "final", "prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0}) return Response(generate(), mimetype="text/event-stream") # Endpoint pour modifier le contenu du canvas (adapté à toutes les langues) @app.route("/canvas_modify", methods=["POST"]) def canvas_modify_endpoint(): data = request.get_json() instruction = data.get("message") canvas_content = data.get("canvas_content", "") model = data.get("model", VALID_MODELS[0]) if not instruction: return jsonify({"error": "Instruction requise"}), 400 if model not in VALID_MODELS: return jsonify({"error": "Modèle invalide"}), 400 messages = [ {"role": "system", "content": "Modifiez le contenu du canvas selon l'instruction de l'utilisateur. Veuillez répondre dans la langue de l'instruction. Entourez le contenu mis à jour avec [CANVAS_START] et [CANVAS_END]."}, {"role": "user", "content": f"Instruction : {instruction}\nContenu actuel du canvas : {canvas_content}"} ] response = "" for chunk in open_router_chat_completion_stream(messages, model): if "choices" in chunk and chunk["choices"]: delta = chunk["choices"][0].get("delta", {}).get("content", "") response += delta if "[CANVAS_START]" in response and "[CANVAS_END]" in response: start_idx = response.index("[CANVAS_START]") + len("[CANVAS_START]") end_idx = response.index("[CANVAS_END]") updated_content = response[start_idx:end_idx].strip() return jsonify({"updated_canvas_content": updated_content, "response_message": "Canvas mis à jour."}) else: return jsonify({"error": "Échec de l'extraction du contenu mis à jour du canvas"}), 500 # Endpoint pour exécuter du code dans le langage spécifié @app.route("/run", methods=["POST"]) def run_code_endpoint(): data = request.get_json() language = data.get("language", "python") code = data.get("code") if not language or not code: return jsonify({"error": "Langage et code requis"}), 400 output = execute_code_with_piston(code, language) return jsonify({"run": {"stdout": output}}) # Endpoint pour gérer les uploads de fichiers et extraire leur contenu @app.route("/upload", methods=["POST"]) def upload_file(): if "file" not in request.files: return jsonify({"error": "Aucun fichier fourni"}), 400 files = request.files.getlist("file") uploaded_files = [] for file in files: if file.filename == "": continue filename = secure_filename(file.filename) file_path = os.path.join(UPLOAD_FOLDER, filename) try: file.save(file_path) file_content = "" if filename.endswith((".txt", ".py", ".js", ".c", ".cpp", ".java", ".cs")): with open(file_path, "r", encoding="utf-8") as f: file_content = f.read() elif filename.endswith(".pdf"): with open(file_path, "rb") as f: pdf = PyPDF2.PdfReader(f) file_content = " ".join(page.extract_text() for page in pdf.pages if page.extract_text()) elif filename.endswith((".png", ".jpg", ".jpeg")): file_content = pytesseract.image_to_string(Image.open(file_path)) else: return jsonify({"error": f"Type de fichier non supporté : {filename}"}), 400 uploaded_files.append({"filename": filename, "content": file_content}) except Exception as e: logger.error(f"Échec de l'upload pour {filename} : {e}") return jsonify({"error": f"Échec de l'upload : {e}"}), 500 finally: if os.path.exists(file_path): os.remove(file_path) # Nettoyer return jsonify({"files": uploaded_files}) if __name__ == "__main__": app.run(debug=True, host="0.0.0.0", port=5000)
Ce plan et ce code répondent à vos besoins :
/canvas_modify
modifie le contenu en fonction de l’instruction et adapte la langue de la réponse.Vous pouvez maintenant copier-coller ce code et l’adapter si nécessaire.