API gestion et intégration
创建于:2025年1月17日
创建于:2025年1月17日
Ok j'aimerais rajouté un ce code un moyen que l'utilisateur est un menu pour utilisé des API's de sont choix qu'il peux rajouter en ajoutant la documentation ça clé api et d'autre chose si besoins il faut que le tout soit instinctif je sais que ça peux paraitre complexe mais j'aimerais que par exemple il y est un setting pour ajouté les API
dans ce setting imaginons que je rajoute l'api de spotify dans l'api spotify je créé des commandes comme lire les playlistes que l'utilisateur peut rajouté en ajoutant la documentation l'ia générera du code pour l'executer et le testera pour être sur que ceci est valide voicii le code de base pour une idée :
import asyncio
import json
import os
import re
from pathlib import Path
from typing import Dict, Any, List, Optional
from unittest import result
import sys
from groq import Groq
from rich import print as rprint
from rich.panel import Panel
from rich.console import Console
from rich.prompt import Confirm
from rich.markdown import Markdown
from rich.table import Table
from rich.progress import Progress
from .file_manager import FileManager
from .settings_manager import SettingsManager
from .context_manager import ContextManager
import logging
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("groq").setLevel(logging.WARNING)
logger = logging.getLogger(name)
logger.setLevel(logging.WARNING)
console = Console()
class GroqService:
"""Service d'intégration avec le client Groq pour gérer les interactions IA."""
textdef __init__( self, file_manager: FileManager, context_manager: ContextManager, settings_manager: SettingsManager, api_key: Optional[str] = None ): """Initialize the GroqService.""" self.file_manager = file_manager self.context_manager = context_manager self.settings_manager = settings_manager self.api_key = api_key or os.getenv("GROQ_API_KEY") self.db_service = self.file_manager.workspace.db_service # Initialize client self.client = Groq(api_key=self.api_key) # Get current model from settings settings = self.settings_manager.get_settings() self.current_model = settings['models']['default'] # Initialize current_plan self.current_plan = [] self.system_prompt = """You are an advanced AI code editor assistant that:
IMPORTANT FORMATTING RULES:
python# Your code here
python# First file code
python# Second file code
diff- removed line + added line
Complete imports at top
Type hints
Docstrings
Error handling
Example usage in comments"""
self.planning_prompt = """You are a senior software architect. For complex tasks:
Format your response as JSON:
{
"project": {
"name": "Project name",
"description": "Brief description",
"estimated_time": "Estimated completion time"
},
"components": [
{
"name": "Component name",
"type": "file/module/function",
"path": "path/to/file.py",
"purpose": "What this component does",
"dependencies": ["other_components"],
"implementation_order": 1
}
],
"steps": [
{
"order": 1,
"action": "create/modify/test",
"target": "component_name",
"description": "What to implement",
"code_example": "Optional code snippet"
}
],
"challenges": [
"Potential challenge 1",
"Potential challenge 2"
],
"testing": {
"strategy": "Testing approach",
"test_cases": ["test case 1", "test case 2"]
}
}"""
textasync def get_project_context(self) -> Dict[str, Any]: """Get current project context from cloud storage.""" try: # Get current workspace contents contents = await self.file_manager.list_files() # Initialize context with empty structures context = { "project_structure": {}, "files": {}, "dependencies": [], "current_path": self.file_manager.workspace.get_current_dir() } # Build directory structure first for item in contents: try: path = item["path"].strip("/") if not path: # Skip empty paths continue parts = path.split("/") current = context["project_structure"] # Create nested directories for part in parts[:-1]: if part not in current: current[part] = {} current = current[part] # Add the file/directory last_part = parts[-1] if item["type"] == "file": current[last_part] = {"type": "file"} else: current[last_part] = {"type": "directory"} except Exception as e: continue # Skip problematic items # Add file contents separately for item in contents: if item["type"] == "file": try: content = await self.file_manager.read_file(item["path"]) if content: context["files"][item["path"]] = content except Exception as e: continue # Skip problematic files return context except Exception as e: # Return empty context on error return { "project_structure": {}, "files": {}, "dependencies": [], "current_path": "/" } def _analyze_dependencies(self, content: str, dependencies: set): """ Analyse le contenu d'un fichier pour extraire les importations et dépendances. Args: content (str): Le contenu du fichier. dependencies (set): Ensemble pour stocker les dépendances trouvées. """ import_lines = [line.strip() for line in content.split('\n') if line.strip().startswith(('import ', 'from '))] for line in import_lines: parts = line.split() if line.startswith('import') and len(parts) >= 2: dependencies.add(parts[1].split('.')[0]) elif line.startswith('from') and len(parts) >= 2: dependencies.add(parts[1].split('.')[0]) def _build_project_tree(self, directory: Path) -> Dict[str, Any]: """ Construit une représentation arborescente de la structure du projet. Args: directory (Path): Le répertoire racine du projet. Returns: Dict[str, Any]: Un dictionnaire représentant la structure du projet. """ tree = {} for item in directory.iterdir(): if item.name.startswith('.'): continue if item.is_file(): tree[item.name] = "file" elif item.is_dir(): tree[item.name] = self._build_project_tree(item) return tree async def send_message( self, prompt: str, system_prompt: Optional[str] = None, include_context: bool = False ) -> str: """Send a message to Groq and get the response.""" try: # Get fresh settings each time settings = await self.db_service.get_user_config() if not settings["success"]: raise Exception("Could not get user settings") # Update current model if needed self.current_model = settings["config"]["models"]["default"] messages = [] messages.append({ "role": "system", "content": system_prompt or self.system_prompt }) if include_context: context_prompt = self.context_manager.get_context_for_prompt() messages.append({ "role": "system", "content": context_prompt }) messages.append({ "role": "user", "content": prompt }) # Use settings from database with current model chat_completion = self.client.chat.completions.create( messages=messages, model=self.current_model, # Use current_model here temperature=settings["config"]["temperature"], max_tokens=settings["config"]["max_tokens"], top_p=settings["config"]["top_p"], stream=False ) response = chat_completion.choices[0].message.content return response.strip() except Exception as e: logger.error(f"Error in send_message: {str(e)}") raise Exception(f"Error in send_message: {str(e)}") async def handle_action( self, message: str, action: str, target: Optional[str] = None, purpose: Optional[str] = None ) -> Dict[str, Any]: """Handle a single action with improved error handling.""" try: # Get current context first context = await self.get_project_context() if action == "create_file": result = await self._handle_create_file(message, target, purpose) return result elif action == "edit_file": result = await self._handle_edit_file(message, target, purpose) return result elif action == "chat_stream": result = await self._handle_chat_stream(message) return result else: return { "success": False, "message": f"Unknown action: {action}" } except Exception as e: logger.error(f"Error in handle_action: {str(e)}") return { "success": False, "message": f"Error: {str(e)}" } async def _handle_create_file( self, message: str, target: Optional[str] = None, purpose: Optional[str] = None ) -> Dict[str, Any]: """Handle file creation with optimized performance.""" try: file_info = self._get_file_extension(message, target) if not file_info['success']: return file_info results = [] for file_type, filepath in file_info['files'].items(): # Generate content first result = await self.generate_file_content(filepath, message) if not result["success"]: continue # Create file directly create_result = await self.file_manager.create_file( filename=filepath, content=result["file_info"]["content"] ) if create_result["success"]: results.append({ "file": filepath, "success": True }) if not results: return { "success": False, "message": "Failed to create files" } return { "success": True, "message": f"Created {len(results)} files", "files": results } except Exception as e: return { "success": False, "message": f"Error: {str(e)}" } async def _handle_edit_file( self, message: str, target: Optional[str] = None, purpose: Optional[str] = None ) -> Dict[str, Any]: """Handle file editing.""" try: # Use target from plan if provided, otherwise extract from message filename = target or next( (word for word in message.split() if word.endswith('.py') or word.endswith('.txt')), None ) if not filename: return { "success": False, "message": "No file specified to edit" } if not self.file_manager.file_exists(filename): return { "success": False, "message": f"File {filename} does not exist" } current_content = self.file_manager.read_file(filename) # Generate edit instructions prompt = f"""Edit file {filename} to: Purpose: {purpose or 'Implement requested changes'} Current content: ```python {current_content} ``` Original request: {message} Return the complete updated code in a code block: ```python:{filename} [Your code here] ```""" response = await self.send_message(prompt, include_context=True) # Extract new content new_content = self._extract_code_from_markdown(response, filename) if not new_content: code_blocks = re.findall(r'```(?:python)?\n(.*?)```', response, re.DOTALL) if code_blocks: new_content = code_blocks[0].strip() else: new_content = response # Save changes result = self.file_manager.save_file(filename, new_content) return { "success": result["success"], "message": f"Updated {filename} for {purpose or 'requested changes'}" } except Exception as e: return { "success": False, "message": f"Error editing {target}: {str(e)}" } async def detect_action(self, message: str) -> List[str]: """ Detect and plan actions needed for the request using AI. Args: message (str): The user's input message Returns: List[str]: List of detected actions to perform """ try: message_lower = message.lower() # AI prompt for action detection action_prompt = { "role": "system", "content": """Analyze the user command and detect the action and a logic filename if needed.(not all information are usefull in the user message) Return ONLY a JSON object in this format: { "action_type": "create|edit|show|run|chat", "target_file": "return only the name of the file we talk about ?", "operation": "detailed operation type", "args": ["any", "additional", "arguments"] WARNING BE SURE YOU RETURN LOGIC FILE NOT STUPIND THINGS IF YOU RETURN STUPID THINGS YOUR CREATOR CAN LOST LOT OF MONEY LIKE YOU NEED TO BE LOGIC SOMETIME THE USER CAN SAID pls show me mega.py THE NAME OF THE FILE IS "MEGA.PY" IN ZERO CASE "me mega.py" IF I GOT THIS KIND OF PROBLEM HE WILL BE SAD FOR YOU }""" } # Use a lightweight model for quick detection detection_model = "llama-3.1-8b-instant" # Lightweight model option # Send message for AI analysis try: response = self.client.chat.completions.create( messages=[ action_prompt, {"role": "user", "content": message} ], model=detection_model, temperature=0.4, # Lower temperature for more focused responses max_tokens=200, # Limit response size top_p=0.9, stream=False ) # Parse AI response try: action_data = json.loads(response.choices[0].message.content) action_type = action_data.get("action_type", "chat") target_file = action_data.get("target_file") operation = action_data.get("operation") args = action_data.get("args", []) except json.JSONDecodeError: # Fallback to basic detection if AI response isn't valid JSON return await self._fallback_detection(message_lower) # Handle different action types if action_type == "show": if not target_file: print("Error: No file specified to show") return [] result = await self.show_file_content(target_file) if not result["success"]: print(f"Error: {result['message']}") return [] elif action_type == "run": if not target_file: print("Error: No file specified to run") return [] result = await self._handle_run_file(target_file) if not result["success"]: print(f"Error: {result['message']}") return [] elif action_type == "create": # Get project type and structure file_info = self._get_file_extension(message, target_file) if not file_info["success"]: return ["chat_stream"] # Set current plan print(file_info) print("==>") self.current_plan = [] for file_type, filepath in file_info["files"].items(): self.current_plan.append({ "type": "create_file", "target": filepath, "purpose": f"Create {file_type} file", "order": len(self.current_plan) + 1, "args": args # Add any additional arguments from AI detection }) # Show plan with rich formatting console.print("\n[bold green]📋 Required Actions:[/bold green]") for action in self.current_plan: if Confirm.ask( f"[yellow]{action['purpose']}\n" f"Should I create {action['target']}?[/yellow]" ): console.print(f"[green]✓[/green] Added: {action['purpose']}") else: self.current_plan.remove(action) if self.current_plan: return ["create_file"] * len(self.current_plan) return ["chat_stream"] elif action_type == "edit": if not target_file: return ["chat_stream"] self.current_plan = [{ "type": "edit_file", "target": target_file, "purpose": operation, "args": args }] return ["edit_file"] # Default to chat stream return ["chat_stream"] except Exception as e: logger.error(f"Error in AI action detection: {str(e)}") return await self._fallback_detection(message_lower) except Exception as e: logger.error(f"Error in detect_action: {str(e)}") return ["chat_stream"] async def _fallback_detection(self, message_lower: str) -> List[str]: """Fallback detection method when AI detection fails.""" try: return ["chat_stream"] except Exception as e: logger.error(f"Error in fallback detection: {str(e)}") return ["chat_stream"] async def handle_actions(self, message: str, actions: List[str]) -> Dict[str, Any]: """Handle multiple actions with minimal output.""" try: if actions == ["chat_stream"]: return await self._handle_chat_stream(message) results = [] for i, action in enumerate(actions, 1): target = self.current_plan[i-1].get('target', 'file') print(f"Processing file: {target}") # Simple start message result = await self.handle_action( message=message, action=action, target=target, purpose=self.current_plan[i-1].get('purpose', None) ) results.append(result) if result["success"]: print(f"End file created: {target}") # Simple completion message else: return result return { "success": True, "message": "\n".join(f"Created {r.get('file', 'file')}" for r in results) } except Exception as e: return { "success": False, "message": f"Error: {str(e)}" } async def generate_file_content(self, filename: str, message: str) -> Dict[str, Any]: """Generate and save file content to cloud storage.""" try: # Get file type from extension file_type = Path(filename).suffix.lower() # Different prompts for different file types if file_type == '.py': prompt = f"""Create a Python file for: {message} Filename: {filename} Requirements: 1. All necessary imports 2. Complete type hints 3. Docstrings 4. Error handling 5. Example usage in comments Return ONLY the code block in this format: ```python:{filename} # Your code here ```""" elif file_type == '.html': prompt = f"""Create an HTML file for: {message} Filename: {filename} Return ONLY the code block: ```html:{filename} <!-- Your HTML here --> ```""" elif file_type == '.css': prompt = f"""Create a CSS file for: {message} Filename: {filename} Return ONLY the code block: ```css:{filename} /* Your CSS here */ ```""" elif file_type == '.js': prompt = f"""Create a JavaScript file for: {message} Filename: {filename} Return ONLY the code block: ```javascript:{filename} // Your JavaScript here ```""" elif file_type == '.C#': prompt = f"""Create a C# file for: {message} Filename: {filename} Return ONLY the code block: ```C#:{filename} // Your C# here ```""" elif file_type == '.C': prompt = f"""Create a C file for: {message} Filename: {filename} Return ONLY the code block: ```C:{filename} // Your C here ```""" else: prompt = f"""Create content for file: {filename} Purpose: {message} Return ONLY the code block: ```text:{filename} // Your content here ```""" response = await self.send_message(prompt, include_context=True) # Extract code from response with proper file type pattern = f"```(?:python|html|css|javascript|text):{re.escape(filename)}\n(.*?)```" code_match = re.search(pattern, response, re.DOTALL) if not code_match: # Try without filename pattern = f"```(?:python|html|css|javascript|text)\n(.*?)```" code_match = re.search(pattern, response, re.DOTALL) if not code_match: raise ValueError(f"No valid code block found for {filename}") code_content = code_match.group(1).strip() return { "success": True, "file_info": { "path": filename, "content": code_content } } except Exception as e: logger.error(f"Error generating content for {filename}: {str(e)}") return { "success": False, "error": str(e) } def _extract_code_from_markdown(self, response: str, filename: str) -> Optional[str]: """ Extrait le code des blocs de code Markdown. Args: response (str): La réponse de l'IA. filename (str): Le nom du fichier attendu. Returns: Optional[str]: Le contenu du code extrait ou None si non trouvé. """ try: pattern = rf"```python:{re.escape(filename)}\n(.*?)```" matches = re.findall(pattern, response, re.DOTALL) if matches: return matches[0].strip() # Fallback: chercher n'importe quel bloc python pattern = r"```python\n(.*?)```" matches = re.findall(pattern, response, re.DOTALL) if matches: return matches[0].strip() return None except Exception as e: logger.error(f"Error extracting code from markdown: {str(e)}") return None def _extract_code_patterns(self, files: Dict[str, str]) -> str: """ Extrait les modèles de code communs à partir des fichiers existants. Args: files (Dict[str, str]): Dictionnaire des fichiers avec leurs contenus. Returns: str: Chaîne contenant les modèles de code extraits. """ patterns = [] for filename, content in files.items(): if filename.endswith('.py'): # Modèles de classes class_matches = re.findall(r'class\s+\w+[^\n]*:', content) if class_matches: patterns.append(f"Class pattern found in {filename}:") patterns.extend(f" {match}" for match in class_matches[:2]) # Modèles de fonctions func_matches = re.findall(r'def\s+\w+[^\n]*:', content) if func_matches: patterns.append(f"Function pattern found in {filename}:") patterns.extend(f" {match}" for match in func_matches[:2]) # Modèles de gestion des erreurs if 'try:' in content: patterns.append("Error handling pattern found") return "\n".join(patterns) async def handle_file_creation(self, filename: str, message: str) -> Dict[str, Any]: """ Gère la création d'un fichier avec une gestion des erreurs améliorée et des retours d'information. Args: filename (str): Le nom du fichier à créer. message (str): Le message de l'utilisateur. Returns: Dict[str, Any]: Résultat de la création du fichier avec succès ou message d'erreur. """ try: if self.file_manager.file_exists(filename): return { "success": False, "message": f"File {filename} already exists" } result = await self.generate_file_content(filename, message) if not result["success"]: return { "success": False, "message": f"Failed to generate content: {result.get('error', 'Unknown error')}" } file_info = result["file_info"] create_result = self.file_manager.create_file( filename=file_info["path"], content=file_info["content"] ) if create_result.get("success", False): return { "success": True, "message": f"Created {filename}: {file_info.get('description', 'File created successfully')}" } else: return { "success": False, "message": f"Failed to create {filename}: {create_result.get('message', 'Unknown error')}" } except Exception as e: logger.error(f"Error creating {filename}: {str(e)}") return { "success": False, "message": f"Error creating {filename}: {str(e)}" } async def edit_file(self, filename: str, message: str) -> Dict[str, Any]: """ Gère l'édition d'un fichier avec une gestion des erreurs améliorée et des retours d'information. Args: filename (str): Le nom du fichier à éditer. message (str): Le message de l'utilisateur. Returns: Dict[str, Any]: Résultat de l'édition du fichier avec succès ou message d'erreur. """ try: file_info = self.file_manager.get_file_info(filename) if not file_info.get("exists", False): return { "success": False, "message": f"File {filename} does not exist" } current_content = file_info.get("content", "") edit_prompt = f"""Given this existing file:
Filename: {filename}
Current content:
python{current_content}
User request: {message}
Provide the complete updated file content that:
Return ONLY a JSON object in this exact format (no additional text, no code blocks):
{{
"edit": {{
"path": "{filename}",
"content": "<complete updated file content>",
"description": "<description of changes made>",
"changes": ["<list>", "<of>", "<specific>", "<changes>"]
}}
}}
IMPORTANT:
The content field should contain the complete Python code as a string
Do not use markdown code blocks in the JSON
Escape any special characters in the content
"""
textrprint(f"[blue]Generating edits for {filename}...[/blue]") response = await self.send_message(edit_prompt, include_context=True) try: clean_response = self._clean_json_response(response) logger.info("Attempting to parse JSON response for file edit.") data = json.loads(clean_response) edit_info = data['edit'] if not edit_info.get('content'): raise ValueError("Generated content is empty") if edit_info.get('changes'): rprint("\n[yellow]Changes to be made:[/yellow]") for change in edit_info['changes']: rprint(f" • {change}") # Validation syntaxique du code généré try: compile(edit_info['content'], filename, 'exec') except SyntaxError as e: rprint(f"[red]Error: Generated code has syntax errors: {str(e)}[/red]") rprint("\n[yellow]Generated content:[/yellow]") rprint(edit_info['content']) raise # Sauvegarder les modifications save_result = self.file_manager.save_file( filename=edit_info["path"], content=edit_info["content"] ) if save_result.get("success", False): return { "success": True, "message": f"Updated {filename}: {edit_info.get('description', 'File updated successfully')}" } else: return { "success": False, "message": f"Failed to save {filename}: {save_result.get('message', 'Unknown error')}" } except (json.JSONDecodeError, KeyError) as e: logger.error(f"Error parsing response for file edit: {str(e)}") rprint(f"[red]Error parsing response: {str(e)}[/red]") rprint("\n[yellow]Raw response:[/yellow]") rprint(response) raise except Exception as e: logger.error(f"Error editing file: {str(e)}") return { "success": False, "message": f"Error editing file: {str(e)}" }
def _clean_json_response(self, response: str) -> str:
"""
Nettoie la réponse de l'IA pour extraire un JSON valide.
textArgs: response (str): La réponse brute de l'IA. Returns: str: La chaîne JSON nettoyée. Raises: ValueError: Si aucun JSON valide n'est trouvé. """ try: response = response.strip() # Chercher le début et la fin de l'objet JSON start = response.find('{') end = response.rfind('}') + 1 if start >= 0 and end > start: json_str = response[start:end] # Valider que c'est un JSON valide json.loads(json_str) return json_str raise ValueError("No valid JSON object found in response") except Exception as e: logger.error(f"Error cleaning JSON response: {str(e)}") rprint(f"[red]Error cleaning JSON response: {str(e)}[/red]") rprint("\n[yellow]Original response:[/yellow]") rprint(response) raise
def _extract_code_from_markdown(self, response: str, filename: str) -> Optional[str]:
"""
Extrait le code des blocs de code Markdown.
textArgs: response (str): La réponse de l'IA. filename (str): Le nom du fichier attendu. Returns: Optional[str]: Le contenu du code extrait ou None si non trouvé. """ try: pattern = rf"```python:{re.escape(filename)}\n(.*?)```" matches = re.findall(pattern, response, re.DOTALL) if matches: return matches[0].strip() # Fallback: chercher n'importe quel bloc python pattern = r"```python\n(.*?)```" matches = re.findall(pattern, response, re.DOTALL) if matches: return matches[0].strip() return None except Exception as e: logger.error(f"Error extracting code from markdown: {str(e)}") return None
def _extract_code_patterns(self, files: Dict[str, str]) -> str:
"""
Extrait les modèles de code communs à partir des fichiers existants.
textArgs: files (Dict[str, str]): Dictionnaire des fichiers avec leurs contenus. Returns: str: Chaîne contenant les modèles de code extraits. """ patterns = [] for filename, content in files.items(): if filename.endswith('.py'): # Modèles de classes class_matches = re.findall(r'class\s+\w+[^\n]*:', content) if class_matches: patterns.append(f"Class pattern found in {filename}:") patterns.extend(f" {match}" for match in class_matches[:2]) # Modèles de fonctions func_matches = re.findall(r'def\s+\w+[^\n]*:', content) if func_matches: patterns.append(f"Function pattern found in {filename}:") patterns.extend(f" {match}" for match in func_matches[:2]) # Modèles de gestion des erreurs if 'try:' in content: patterns.append("Error handling pattern found") return "\n".join(patterns)
async def generate_template_content(
self,
filename: str,
imports: Optional[List[str]] = None,
classes: Optional[List[str]] = None,
functions: Optional[List[str]] = None
) -> str:
"""
Génère un contenu de fichier basé sur un template.
textArgs: filename (str): Le nom du fichier. imports (List[str], optional): Liste des imports. classes (List[str], optional): Liste des classes. functions (List[str], optional): Liste des fonctions. Returns: str: Le contenu généré du fichier. """ content = [f"# {filename}", "# Auto-generated template", ""] # Ajouter les imports if imports: for imp in imports: content.append(f"import {imp}") content.append("") else: content.append("import typing") content.append("from typing import List, Dict, Any, Optional") content.append("") # Ajouter les classes if classes: for class_name in classes: content.extend([ f"class {class_name}:", f' """', f" {class_name} class.", f' """', " def __init__(self):", " pass", "" ]) # Ajouter les fonctions if functions: for func_name in functions: content.extend([ f"def {func_name}() -> None:", f' """', f" {func_name} function.", f' """', " pass", "" ]) # Ajouter la fonction main content.extend([ "def main():", ' """Main function."""', " try:", " # Your code here", " pass", " except Exception as e:", ' print(f"Error: {str(e)}")', "", 'if __name__ == "__main__":', " main()" ]) return "\n".join(content)
async def _validate_generated_code(
self,
response: str,
filename: str,
context: Dict[str, Any]
) -> Dict[str, Any]:
"""
Valide le code généré pour sa complétude et sa compatibilité.
textArgs: response (str): La réponse de l'IA contenant le code. filename (str): Le nom du fichier. context (Dict[str, Any]): Contexte du projet. Returns: Dict[str, Any]: Résultat de la validation avec succès ou message d'erreur. """ try: data = json.loads(self._clean_json_response(response)) file_info = data.get('file', {}) if not file_info.get('content'): return { "success": False, "error": "Generated content is empty" } # Valider la syntaxe try: compile(file_info['content'], filename, 'exec') except SyntaxError as e: return { "success": False, "error": f"Syntax error in generated code: {str(e)}" } # Vérifier les composants requis content = file_info['content'] checks = { "imports": "import" in content, "error_handling": "try:" in content, "documentation": '"""' in content, "type_hints": ": " in content and " -> " in content, } missing = [k for k, v in checks.items() if not v] if missing: return { "success": False, "error": f"Missing required components: {', '.join(missing)}" } return { "success": True, "file_info": file_info } except Exception as e: return { "success": False, "error": f"Validation error: {str(e)}" }
def _extract_filename_from_response(self, response: str) -> Optional[str]:
"""
Extrait le nom de fichier de la réponse si possible.
textArgs: response (str): La réponse de l'IA. Returns: Optional[str]: Le nom du fichier extrait ou None. """ py_files = re.findall(r'\b\w+\.py\b', response) if py_files: return py_files[0] return None
async def _handle_complex_task(self, message: str) -> Dict[str, Any]:
"""Handle complex tasks that require planning."""
try:
# Get project context
context = await self.get_project_context()
text# Generate plan using AI with more structured prompt console.print("[bold blue]🤔 Analyzing task and creating plan...[/bold blue]") response = await self.send_message( f"""Create a detailed implementation plan for: {message} Return your response in this exact JSON format: {{ "project": {{ "name": "Name of the project", "description": "Brief description", "estimated_time": "Estimated completion time" }}, "components": [ {{ "name": "Name of component", "type": "file", "path": "src/component/file.py", "purpose": "Purpose of this component", "dependencies": [], "implementation_order": 1, "content": "# Full Python code here\\n..." }} ], "steps": [ {{ "order": 1, "action": "create_file", "target": "src/component/file.py", "description": "Step description" }} ], "challenges": [ "Challenge 1", "Challenge 2" ], "testing": {{ "strategy": "Testing approach", "test_cases": [ "Test case 1", "Test case 2" ] }} }} Current project context: {json.dumps(context, indent=2)}""", system_prompt="""You are a senior software architect. Your task is to create implementation plans. IMPORTANT: Return ONLY valid JSON matching the exact format shown in the prompt. Do not include any other text or explanations outside the JSON structure.""" ) try: # Try to parse JSON, handling common issues response = response.strip() # Remove any markdown code block markers if present response = re.sub(r'^```json\s*|\s*```$', '', response, flags=re.MULTILINE) # Try to find JSON object if there's other text json_match = re.search(r'\{[\s\S]*\}', response) if json_match: response = json_match.group(0) plan = json.loads(response) # Validate required fields required_fields = ['project', 'components', 'steps'] if not all(field in plan for field in required_fields): raise ValueError(f"Missing required fields: {[f for f in required_fields if f not in plan]}") # Display plan console.print("\n[bold]📋 Implementation Plan[/bold]") console.print(f"\n[cyan]Project:[/cyan] {plan['project']['name']}") console.print(f"[cyan]Description:[/cyan] {plan['project']['description']}") console.print(f"[cyan]Estimated time:[/cyan] {plan['project']['estimated_time']}") # Show components table table = Table(title="Components") table.add_column("Order", justify="right", style="cyan") table.add_column("Component", style="green") table.add_column("Purpose", style="yellow") for comp in sorted(plan['components'], key=lambda x: x['implementation_order']): table.add_row( str(comp['implementation_order']), comp['path'], comp['purpose'] ) console.print(table) # Show implementation steps console.print("\n[bold]Implementation Steps:[/bold]") for step in plan['steps']: console.print(f"{step['order']}. {step['description']}") # Show challenges if plan.get('challenges'): console.print("\n[bold red]Potential Challenges:[/bold red]") for challenge in plan['challenges']: console.print(f"• {challenge}") # Show testing strategy if plan.get('testing'): console.print("\n[bold green]Testing Strategy:[/bold green]") console.print(f"Strategy: {plan['testing']['strategy']}") console.print("Test Cases:") for test in plan['testing']['test_cases']: console.print(f"• {test}") # Ask for confirmation to proceed if Confirm.ask("\n[bold]Would you like me to implement this plan?[/bold]"): return await self._execute_plan(plan) return { "success": True, "message": "Plan created but implementation was cancelled" } except json.JSONDecodeError as e: # Show the problematic response for debugging console.print("[red]Error parsing AI response as JSON[/red]") console.print("\n[yellow]Response received:[/yellow]") console.print(response) console.print(f"\n[red]JSON Error: {str(e)}[/red]") return { "success": False, "message": "Failed to parse implementation plan" } except Exception as e: logger.error(f"Error handling complex task: {str(e)}") return { "success": False, "message": f"Error: {str(e)}" }
async def _handle_chat_stream(self, message: str) -> Dict[str, Any]:
"""Handle chat with streaming response."""
try:
# Get fresh settings each time
settings = await self.db_service.get_user_config()
if not settings["success"]:
raise Exception("Could not get user settings")
textmessages = [ { "role": "system", "content": """You are a helpful AI assistant. Keep responses: 1. Natural and conversational 2. Brief and to the point (2-3 sentences max) 3. Focused on answering the specific question""" }, { "role": "user", "content": message } ] # Use current model from settings response = self.client.chat.completions.create( messages=messages, model=settings["config"]["models"]["default"], # Use model from settings temperature=settings["config"]["temperature"], max_tokens=settings["config"]["max_tokens"], top_p=settings["config"]["top_p"], stream=False ) content = response.choices[0].message.content console.print("\n[bold green]🤖[/bold green] ", end="") console.print(content) console.print() return { "success": True, "message": content } except Exception as e: logger.error(f"Error in chat: {str(e)}") return { "success": False, "message": f"Error: {str(e)}" }
def _get_file_extension(self, message: str, target: Optional[str] = None) -> Dict[str, Any]:
"""Generate appropriate filename and extension based on message context."""
try:
message_lower = message.lower()
words = message_lower.split()
text# Project type patterns PROJECT_PATTERNS = { 'web': { 'keywords': ['web', 'website', 'webpage', 'frontend', 'html'], 'structure': { 'main': '/web_app/app.py', 'template': '/web_app/templates/index.html', 'static': '/web_app/static/styles.css', 'js': '/web_app/static/script.js' } }, 'scraper': { 'keywords': ['scraper', 'crawler', 'spider', 'scraping'], 'structure': { 'main': lambda site: f'/scrapers/{site}_scraper.py', 'utils': '/scrapers/utils.py', 'config': '/scrapers/config.yml' } }, 'api': { 'keywords': ['api', 'rest', 'endpoint', 'backend'], 'structure': { 'main': '/api/main.py', 'routes': '/api/routes.py', 'models': '/api/models.py' } }, 'downloader': { 'keywords': ['downloader', 'download', 'youtube', 'video'], 'structure': { 'main': '/downloader/main.py', 'ui': '/downloader/templates/index.html', 'static': '/downloader/static/styles.css', 'js': '/downloader/static/script.js' } } } # Check for project type first for project_type, config in PROJECT_PATTERNS.items(): if any(keyword in message_lower for keyword in config['keywords']): structure = config['structure'] # For scrapers, handle dynamic site names if project_type == 'scraper': site_name = next( (word for word in words if word not in config['keywords'] + ['create', 'make', 'for', 'a', 'an']), 'generic' ) return { 'success': True, 'files': { 'main': structure['main'](site_name), 'utils': structure['utils'], 'config': structure['config'] } } return { 'success': True, 'files': structure } # File type patterns FILE_PATTERNS = { 'python': { 'extensions': ['.py'], 'keywords': ['python', 'script', 'module'], 'default': 'main.py' }, 'web': { 'extensions': ['.html', '.css', '.js'], 'keywords': ['html', 'css', 'javascript', 'webpage'], 'default': 'index.html' }, 'data': { 'extensions': ['.json', '.csv', '.yaml', '.yml'], 'keywords': ['data', 'config', 'settings'], 'default': 'data.json' }, 'requirements': { 'extensions': ['.txt', '.toml'], 'keywords': ['requirements', 'dependencies'], 'default': 'requirements.txt' } } # If target is provided, use it if target: return { 'success': True, 'files': {'main': target} } # Check for specific file types for file_type, config in FILE_PATTERNS.items(): if any(keyword in message_lower for keyword in config['keywords']): return { 'success': True, 'files': {'main': f"/{config['default']}"} } # Default to Python file if no match return { 'success': True, 'files': {'main': '/script.py'} } except Exception as e: return { 'success': False, 'error': str(e) }
async def verify_model(self) -> bool:
"""Verify current model is available and sync with database."""
try:
# Get current settings from user_configs table
settings_result = await self.db_service.get_user_config()
if not settings_result["success"]:
return False
textsettings = settings_result["config"] current_model = settings['models']['default'] # Fetch available models quietly available_models = self.fetch_available_models() model_ids = [model['id'] for model in available_models] # If current model not available, switch to first available if current_model not in model_ids and model_ids: settings['models']['default'] = model_ids[0] settings['models']['available'] = model_ids await self.db_service.update_user_config(settings) return True # Update available models list if different if set(settings['models'].get('available', [])) != set(model_ids): settings['models']['available'] = model_ids await self.db_service.update_user_config(settings) # Update client model self.client = Groq( api_key=self.api_key, default_model=settings['models']['default'] ) return True except Exception as e: logger.error(f"Error verifying model: {str(e)}") return False
async def show_file_content(self, filename: str) -> Dict[str, Any]:
"""Show the contents of a file with AI explanation."""
try:
# Get file content from workspace_contents table
if not filename.startswith('/'):
filename = f"/{filename}"
text# Get current workspace workspace = await self.file_manager.workspace.db_service.get_user_workspace() if not workspace["success"]: return { "success": False, "message": "No active workspace" } # Get file content from database result = await self.file_manager.workspace.db_service.get_file_content(filename) if not result: return { "success": False, "message": f"File {filename} not found or empty" } # Ask AI to explain the code prompt = f"""Here is the content of {filename}: ```python {result} ``` Please explain what this code does briefly.""" explanation = await self.send_message(prompt) # Format output print(f"\nFile: {filename}") print("-" * 40) print(result) print("-" * 40) print("Explanation:") print(explanation) return { "success": True, "content": result, "explanation": explanation } except Exception as e: return { "success": False, "message": f"Error showing file: {str(e)}" }
async def handle_command(self, command: str):
"""Handle CLI commands."""
try:
if command.startswith('/run'):
# Extract filename from command
parts = command.split()
if len(parts) < 2:
return {
"success": False,
"message": "Please specify a file to run"
}
textfilename = parts[1] return await self._handle_run_file(filename) # If not a run command, return None or handle other commands return None except Exception as e: return { "success": False, "message": f"Error handling command: {str(e)}" }
async def _handle_run_file(self, filename: str) -> Dict[str, Any]:
"""Handle running different types of files."""
try:
# Normalize filename
if not filename.startswith('/'):
filename = f"/{filename}"
text# Get file content from database result = await self.file_manager.workspace.db_service.get_file_content(filename) if not result: # Try common paths in database common_paths = [ filename, f"/web_app/templates{filename}", f"/web_app{filename}", f"/web_app/static{filename}", ] for path in common_paths: result = await self.file_manager.workspace.db_service.get_file_content(path) if result: filename = path # Update filename to matched path break if not result: return { "success": False, "message": f"File not found in any location: {filename}" } file_ext = filename.split('.')[-1].lower() if file_ext == 'py': import subprocess import tempfile import re # Extract required packages from imports import_pattern = r'^(?:from|import)\s+([a-zA-Z0-9_]+)' required_packages = set() for line in result.split('\n'): match = re.match(import_pattern, line.strip()) if match: package = match.group(1) # Skip standard library modules if package not in ['os', 'sys', 'time', 'datetime', 'json', 'math', 're', 'random', 'pathlib', 'typing']: required_packages.add(package) # Install required packages if required_packages: print(f"Installing required packages: {', '.join(required_packages)}") for package in required_packages: try: subprocess.run( ['pip', 'install', package], capture_output=True, text=True, check=True ) print(f"✓ Installed {package}") except subprocess.CalledProcessError as e: return { "success": False, "message": f"Failed to install {package}: {e.stderr}" } # Run the Python file with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: f.write(result) temp_path = f.name try: process = subprocess.run( ['python', temp_path], capture_output=True, text=True ) if process.returncode == 0: return { "success": True, "message": f"Output:\n{process.stdout}" } else: return { "success": False, "message": f"Error:\n{process.stderr}" } finally: import os os.unlink(temp_path) elif file_ext in ['html', 'htm']: import http.server import socketserver import threading import webbrowser import tempfile from pathlib import Path import time # Define Handler class outside class CustomHandler(http.server.SimpleHTTPRequestHandler): def __init__(self, *args, **kwargs): self.directory = kwargs.pop('directory', None) super().__init__(*args, **kwargs) def log_message(self, format, *args): if args[1] != '200': print(f"Error serving {args[0]}: {args[1]}") def translate_path(self, path): # Remove /web_app/templates/ from URL if file exists directly if path.startswith('/web_app/templates/'): direct_path = super().translate_path(path.replace('/web_app/templates/', '/')) if Path(direct_path).exists(): return direct_path return super().translate_path(path) class WebServer: def __init__(self, directory, port=8080): self.directory = directory self.port = port self.httpd = None def start(self): """Start the web server.""" handler = lambda *args: CustomHandler(*args, directory=str(self.directory)) self.httpd = socketserver.TCPServer(("", self.port), handler) print(f"\nServing files from: {self.directory}") print(f"Server running at: http://localhost:{self.port}") threading.Thread(target=self.httpd.serve_forever, daemon=True).start() # Wait a bit for server to start time.sleep(0.5) def stop(self): """Stop the web server.""" if self.httpd: self.httpd.shutdown() self.httpd.server_close() # Create temporary directory and file with tempfile.TemporaryDirectory() as temp_dir: temp_dir = Path(temp_dir) # Create web_app structure web_dir = temp_dir / 'web_app' templates_dir = web_dir / 'templates' static_dir = web_dir / 'static' templates_dir.mkdir(parents=True, exist_ok=True) static_dir.mkdir(parents=True, exist_ok=True) # Write main HTML file if '/web_app/templates/' in filename: html_file = templates_dir / Path(filename).name url_path = f'web_app/templates/{Path(filename).name}' else: html_file = temp_dir / Path(filename).name url_path = Path(filename).name html_file.write_text(result) print(f"\nCreated HTML file: {html_file}") # Always try to get and write related files related_files = { '/web_app/static/styles.css': static_dir / 'styles.css', '/web_app/static/script.js': static_dir / 'script.js', } for db_path, file_path in related_files.items(): content = await self.file_manager.workspace.db_service.get_file_content(db_path) if content: file_path.write_text(content) print(f"Added related file: {file_path}") # Start the web server server = WebServer(temp_dir) server.start() # Open browser with correct path url = f"http://localhost:8080/{url_path}" print(f"\nOpening: {url}") webbrowser.open(url) # Keep the server running try: while True: time.sleep(1) except KeyboardInterrupt: server.stop() return { "success": True, "message": "Server stopped" } return { "success": True, "message": f"Preview available at: {url}\nPress Ctrl+C to stop server" } else: return { "success": False, "message": f"Unsupported file type: {file_ext}" } except Exception as e: return { "success": False, "message": f"Error running file: {str(e)}" }
Voici un exemple d’implémentation montrant comment vous pourriez ajouter un menu pour configurer et gérer des APIs de façon plus intuitive.
L’idée globale est :
api_manager.py
) qui centralise la logique de gestion des APIs (ajout, stockage des clés, parsing de documentation, etc.)SettingsManager
pour stocker dynamiquement les infos relatives aux APIs (clés, URL, endpoints importants, etc.)api_manager.py
Ce fichier s’occupe de :
Créez un nouveau fichier api_manager.py
:
pythonimport json import re import subprocess import tempfile import logging from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) logger.setLevel(logging.WARNING) class APIManager: """ Gère la configuration et l'utilisation des APIs externes. Permet d'ajouter des APIs, de stocker leurs clés, de générer du code pour tester un appel, etc. """ def __init__(self, settings_manager: Any, groq_service: Any): """ Args: settings_manager (SettingsManager): Le gestionnaire de paramètres qui stockera nos infos d'API. groq_service (GroqService): Pour générer et tester le code en s'appuyant sur l'IA. """ self.settings_manager = settings_manager self.groq_service = groq_service def list_apis(self) -> List[str]: """ Retourne la liste des noms d'API configurées dans les settings. """ settings = self.settings_manager.get_settings() apis = settings.get("external_apis", {}) return list(apis.keys()) def get_api_info(self, api_name: str) -> Dict[str, Any]: """ Retourne les infos (clé, endpoints, doc) d'une API donnée. """ settings = self.settings_manager.get_settings() apis = settings.get("external_apis", {}) return apis.get(api_name, {}) def add_api( self, api_name: str, api_key: str, doc_reference: str = "", default_endpoints: Optional[List[str]] = None ) -> bool: """ Ajoute une API dans les settings. Ex: add_api("spotify", "ABC123", doc_reference="https://developer.spotify.com", default_endpoints=["GET /playlists", ...]) """ settings = self.settings_manager.get_settings() if "external_apis" not in settings: settings["external_apis"] = {} settings["external_apis"][api_name] = { "api_key": api_key, "doc_reference": doc_reference, "endpoints": default_endpoints or [] } self.settings_manager.save_settings(settings) return True def remove_api(self, api_name: str) -> bool: """ Supprime une API du settings. """ settings = self.settings_manager.get_settings() if "external_apis" not in settings or api_name not in settings["external_apis"]: return False del settings["external_apis"][api_name] self.settings_manager.save_settings(settings) return True async def generate_api_call_code( self, api_name: str, endpoint: str, user_instructions: str ) -> str: """ Génère du code Python (avec requêtes requests ou httpx) pour appeler un endpoint particulier en se basant sur les instructions de l'utilisateur (ex: 'Lire la playlist Spotify'). On s'appuie sur GroqService pour générer le code. Le code sera ensuite testé via test_api_code. """ api_info = self.get_api_info(api_name) if not api_info: return f"Erreur: L'API {api_name} n'est pas configurée." prompt = f""" L'utilisateur souhaite exécuter l'action: {user_instructions} sur l'API: {api_name}, endpoint: {endpoint} Voici les informations d'auth (clé ou token): {api_info['api_key']} Documentation de référence (si dispo): {api_info['doc_reference']} Génére-moi UNIQUEMENT un bloc de code Python complet (avec `import requests` ou `import httpx`), capable d'effectuer l'appel à l'endpoint indiqué, insérer la clé d'API, et afficher (print) le résultat pour test. Respecte la structure: ```python:call_api.py # Code complet ici ``` """ return await self.groq_service.send_message(prompt=prompt, include_context=False) async def test_api_code(self, code_content: str) -> Dict[str, Any]: """ Exécute le code généré dans un environnement isolé et retourne stdout/stderr pour valider que l'appel fonctionne. """ try: with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: f.write(code_content) temp_path = f.name process = subprocess.run( ['python', temp_path], capture_output=True, text=True ) if process.returncode == 0: return { "success": True, "stdout": process.stdout, "stderr": process.stderr } else: return { "success": False, "stdout": process.stdout, "stderr": process.stderr } except Exception as e: logger.error(f"Erreur test_api_code: {str(e)}") return { "success": False, "stderr": str(e) }
SettingsManager
pour stocker les infos d’APISi votre SettingsManager
ne gère pas encore de champs dynamiques, vous pouvez ajouter un champ "external_apis": {}
dans le settings.json
(ou autre format) afin d’y ranger les clés/données :
diffclass SettingsManager: def __init__(self, path: str): self.settings_path = path self._settings = {} self.load_settings() def load_settings(self) -> None: try: with open(self.settings_path, "r", encoding="utf-8") as f: self._settings = json.load(f) - except FileNotFoundError: + except FileNotFoundError as e: self._settings = { "models": { "default": "gpt-4", "available": [] }, + # On s’assure d’initialiser ce champ si absent + "external_apis": {} } + self.save_settings(self._settings) except json.JSONDecodeError: self._settings = {} def get_settings(self) -> dict: return self._settings def save_settings(self, new_settings: dict) -> bool: self._settings = new_settings try: with open(self.settings_path, "w", encoding="utf-8") as f: json.dump(self._settings, f, indent=2) return True except Exception as e: return False
Vous pourriez avoir un menu (en CLI ou autre) permettant à l’utilisateur de :
api_manager.add_api(...)
api_manager.list_apis()
GET /playlists
)generate_api_call_code(...)
test_api_code(...)
Exemple de menu minimal (dans groq_service.py
ou un nouveau menu_manager.py
) :
pythonfrom rich.console import Console from .api_manager import APIManager class MenuManager: def __init__(self, api_manager: APIManager): self.api_manager = api_manager self.console = Console() async def show_api_menu(self): while True: self.console.print("\n[bold cyan]=== Menu API ===[/bold cyan]") self.console.print("1. Ajouter une API") self.console.print("2. Lister les APIs") self.console.print("3. Générer et tester un appel d'API") self.console.print("4. Supprimer une API") self.console.print("5. Quitter\n") choice = self.console.input("[bold green]Votre choix[/bold green]: ").strip() if choice == "1": await self._handle_add_api() elif choice == "2": self._handle_list_apis() elif choice == "3": await self._handle_generate_and_test_code() elif choice == "4": self._handle_remove_api() elif choice == "5": break else: self.console.print("[red]Choix invalide[/red]") async def _handle_add_api(self): name = self.console.input("Nom de l'API: ").strip() key = self.console.input("Clé (token) de l'API: ").strip() doc = self.console.input("Lien de doc (optionnel): ").strip() endpoints_raw = self.console.input("Endpoints courants (séparés par des virgules, ex GET /playlists): ") endpoints = [e.strip() for e in endpoints_raw.split(",") if e.strip()] ok = self.api_manager.add_api(api_name=name, api_key=key, doc_reference=doc, default_endpoints=endpoints) if ok: self.console.print(f"[green]API {name} ajoutée[/green]") else: self.console.print(f"[red]Impossible d'ajouter {name}[/red]") def _handle_list_apis(self): apis = self.api_manager.list_apis() if not apis: self.console.print("[yellow]Aucune API configurée[/yellow]") else: self.console.print("[bold]APIs configurées:[/bold]") for a in apis: self.console.print(f" - {a}") async def _handle_generate_and_test_code(self): apis = self.api_manager.list_apis() if not apis: self.console.print("[yellow]Pas d'API dispo. Veuillez en ajouter une d'abord.[/yellow]") return # Choisir l'API self.console.print("\n[bold]API disponibles[/bold]") for idx, api_name in enumerate(apis, start=1): self.console.print(f"{idx}. {api_name}") choice_idx = int(self.console.input("Votre choix: ").strip()) api_name = apis[choice_idx - 1] # Choisir l’endpoint api_info = self.api_manager.get_api_info(api_name) endpoints = api_info.get("endpoints", []) if not endpoints: self.console.print("[yellow]Aucun endpoint connu pour cette API. Vous pouvez en rajouter via 'Ajouter une API'.[/yellow]") return self.console.print("\n[bold]Endpoints disponibles[/bold]") for idx, ep in enumerate(endpoints, start=1): self.console.print(f"{idx}. {ep}") ep_idx = int(self.console.input("Votre choix d'endpoint: ").strip()) endpoint = endpoints[ep_idx - 1] # Description / instructions instructions = self.console.input("Décrivez ce que vous voulez faire sur cet endpoint: ") # Génère le code self.console.print("[blue]Génération du code...[/blue]") code_response = await self.api_manager.generate_api_call_code(api_name, endpoint, instructions) # Extraire le code Python depuis la réponse IA code_content = self._extract_code_block(code_response, "call_api.py") if not code_content: self.console.print("[red]Impossible de trouver le code dans la réponse IA.[/red]") self.console.print(f"Réponse brute:\n{code_response}") return # Test du code généré self.console.print("[blue]Test du code généré...[/blue]") test_result = await self.api_manager.test_api_code(code_content) if test_result["success"]: self.console.print("[green]Test réussi![/green]") if test_result["stdout"]: self.console.print(f"Sortie:\n{test_result['stdout']}") else: self.console.print("[red]Test échoué ou erreur durant l'exécution.[/red]") self.console.print(f"stdout:\n{test_result.get('stdout','')}") self.console.print(f"stderr:\n{test_result.get('stderr','')}") def _handle_remove_api(self): apis = self.api_manager.list_apis() if not apis: self.console.print("[yellow]Pas d'API à supprimer[/yellow]") return self.console.print("\n[bold]API disponibles[/bold]") for idx, api_name in enumerate(apis, start=1): self.console.print(f"{idx}. {api_name}") choice_idx = int(self.console.input("Votre choix: ").strip()) api_name = apis[choice_idx - 1] ok = self.api_manager.remove_api(api_name) if ok: self.console.print(f"[green]API {api_name} supprimée[/green]") else: self.console.print("[red]Echec de la suppression[/red]") def _extract_code_block(self, response: str, filename: str) -> str: """ Extrait le contenu d'un bloc de code au format: ```python:call_api.py # code ici ``` """ pattern = rf"```python:{re.escape(filename)}\n(.*?)```" match = re.search(pattern, response, re.DOTALL) if match: return match.group(1).strip() # fallback: on tente sans :filename pattern = r"```python\n(.*?)```" match = re.search(pattern, response, re.DOTALL) if match: return match.group(1).strip() return ""
Dans votre code principal (là où vous exécutez déjà certaines commandes), vous pourriez simplement :
diff+ from .api_manager import APIManager + from .menu_manager import MenuManager class GroqService: def __init__( self, file_manager: FileManager, context_manager: ContextManager, settings_manager: SettingsManager, api_key: Optional[str] = None ): ... self.api_key = api_key or os.getenv("GROQ_API_KEY") self.client = Groq(api_key=self.api_key) ... + # Instanciation du APIManager + self.api_manager = APIManager(self.settings_manager, self) + async def open_api_menu(self): + menu_manager = MenuManager(self.api_manager) + await menu_manager.show_api_menu() ...
À partir de là, vous pouvez appeler await groq_service.open_api_menu()
depuis votre CLI principale, un bouton d’interface, etc.
external_apis
dans le settings.json
pour mémoriser toutes les APIs.Avec cette structure, l’utilisateur peut :
Vous pourrez enrichir ce squelette selon vos besoins (par ex. stocker des scopes OAuth, gérer des refresh tokens, etc.).