API gestion et intégration

创建于:2025年1月17日

使用 OpenAI 01 回答 OpenAI01

问题

Ok j'aimerais rajouté un ce code un moyen que l'utilisateur est un menu pour utilisé des API's de sont choix qu'il peux rajouter en ajoutant la documentation ça clé api et d'autre chose si besoins il faut que le tout soit instinctif je sais que ça peux paraitre complexe mais j'aimerais que par exemple il y est un setting pour ajouté les API
dans ce setting imaginons que je rajoute l'api de spotify dans l'api spotify je créé des commandes comme lire les playlistes que l'utilisateur peut rajouté en ajoutant la documentation l'ia générera du code pour l'executer et le testera pour être sur que ceci est valide voicii le code de base pour une idée :
import asyncio
import json
import os
import re
from pathlib import Path
from typing import Dict, Any, List, Optional
from unittest import result
import sys
from groq import Groq
from rich import print as rprint
from rich.panel import Panel
from rich.console import Console
from rich.prompt import Confirm
from rich.markdown import Markdown
from rich.table import Table
from rich.progress import Progress

from .file_manager import FileManager
from .settings_manager import SettingsManager
from .context_manager import ContextManager

import logging
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("groq").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARNING)

console = Console()


class GroqService:
"""Service d'intégration avec le client Groq pour gérer les interactions IA."""

def __init__(
self,
file_manager: FileManager,
context_manager: ContextManager,
settings_manager: SettingsManager,
api_key: Optional[str] = None
):
"""Initialize the GroqService."""
self.file_manager = file_manager
self.context_manager = context_manager
self.settings_manager = settings_manager
self.api_key = api_key or os.getenv("GROQ_API_KEY")
self.db_service = self.file_manager.workspace.db_service

# Initialize client
self.client = Groq(api_key=self.api_key)

# Get current model from settings
settings = self.settings_manager.get_settings()
self.current_model = settings['models']['default']

# Initialize current_plan
self.current_plan = []

self.system_prompt = """You are an advanced AI code editor assistant that:
1. Understands project context and file relationships
2. Can create, edit, and manage multiple files
3. Implements proper error handling and testing
4. Follows best coding practices and patterns
5. Provides clear documentation and examples

IMPORTANT FORMATTING RULES:
1. Always wrap code blocks in triple backticks with language and filename:
```python:path/to/file.py
# Your code here
```

2. When showing multiple files, use separate code blocks:
```python:first_file.py
# First file code
```

```python:second_file.py
# Second file code
```

3. When explaining code changes, use format:
```diff:file.py
- removed line
+ added line
```

4. Always include:
- Complete imports at top
- Type hints
- Docstrings
- Error handling
- Example usage in comments"""

self.planning_prompt = """You are a senior software architect. For complex tasks:
1. Break down the task into smaller components
2. Identify dependencies between components
3. Create a clear implementation order
4. Consider potential challenges
5. Suggest testing strategies

Format your response as JSON:
{
"project": {
"name": "Project name",
"description": "Brief description",
"estimated_time": "Estimated completion time"
},
"components": [
{
"name": "Component name",
"type": "file/module/function",
"path": "path/to/file.py",
"purpose": "What this component does",
"dependencies": ["other_components"],
"implementation_order": 1
}
],
"steps": [
{
"order": 1,
"action": "create/modify/test",
"target": "component_name",
"description": "What to implement",
"code_example": "Optional code snippet"
}
],
"challenges": [
"Potential challenge 1",
"Potential challenge 2"
],
"testing": {
"strategy": "Testing approach",
"test_cases": ["test case 1", "test case 2"]
}
}"""

async def get_project_context(self) -> Dict[str, Any]:
"""Get current project context from cloud storage."""
try:
# Get current workspace contents
contents = await self.file_manager.list_files()

# Initialize context with empty structures
context = {
"project_structure": {},
"files": {},
"dependencies": [],
"current_path": self.file_manager.workspace.get_current_dir()
}

# Build directory structure first
for item in contents:
try:
path = item["path"].strip("/")
if not path: # Skip empty paths
continue

parts = path.split("/")
current = context["project_structure"]

# Create nested directories
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]

# Add the file/directory
last_part = parts[-1]
if item["type"] == "file":
current[last_part] = {"type": "file"}
else:
current[last_part] = {"type": "directory"}

except Exception as e:
continue # Skip problematic items

# Add file contents separately
for item in contents:
if item["type"] == "file":
try:
content = await self.file_manager.read_file(item["path"])
if content:
context["files"][item["path"]] = content
except Exception as e:
continue # Skip problematic files

return context

except Exception as e:
# Return empty context on error
return {
"project_structure": {},
"files": {},
"dependencies": [],
"current_path": "/"
}

def _analyze_dependencies(self, content: str, dependencies: set):
"""
Analyse le contenu d'un fichier pour extraire les importations et dépendances.

Args:
content (str): Le contenu du fichier.
dependencies (set): Ensemble pour stocker les dépendances trouvées.
"""
import_lines = [line.strip() for line in content.split('\n')
if line.strip().startswith(('import ', 'from '))]
for line in import_lines:
parts = line.split()
if line.startswith('import') and len(parts) >= 2:
dependencies.add(parts[1].split('.')[0])
elif line.startswith('from') and len(parts) >= 2:
dependencies.add(parts[1].split('.')[0])

def _build_project_tree(self, directory: Path) -> Dict[str, Any]:
"""
Construit une représentation arborescente de la structure du projet.

Args:
directory (Path): Le répertoire racine du projet.

Returns:
Dict[str, Any]: Un dictionnaire représentant la structure du projet.
"""
tree = {}
for item in directory.iterdir():
if item.name.startswith('.'):
continue
if item.is_file():
tree[item.name] = "file"
elif item.is_dir():
tree[item.name] = self._build_project_tree(item)
return tree

async def send_message(
self,
prompt: str,
system_prompt: Optional[str] = None,
include_context: bool = False
) -> str:
"""Send a message to Groq and get the response."""
try:
# Get fresh settings each time
settings = await self.db_service.get_user_config()
if not settings["success"]:
raise Exception("Could not get user settings")

# Update current model if needed
self.current_model = settings["config"]["models"]["default"]

messages = []
messages.append({
"role": "system",
"content": system_prompt or self.system_prompt
})

if include_context:
context_prompt = self.context_manager.get_context_for_prompt()
messages.append({
"role": "system",
"content": context_prompt
})

messages.append({
"role": "user",
"content": prompt
})

# Use settings from database with current model
chat_completion = self.client.chat.completions.create(
messages=messages,
model=self.current_model, # Use current_model here
temperature=settings["config"]["temperature"],
max_tokens=settings["config"]["max_tokens"],
top_p=settings["config"]["top_p"],
stream=False
)

response = chat_completion.choices[0].message.content
return response.strip()

except Exception as e:
logger.error(f"Error in send_message: {str(e)}")
raise Exception(f"Error in send_message: {str(e)}")

async def handle_action(
self,
message: str,
action: str,
target: Optional[str] = None,
purpose: Optional[str] = None
) -> Dict[str, Any]:
"""Handle a single action with improved error handling."""
try:
# Get current context first
context = await self.get_project_context()

if action == "create_file":
result = await self._handle_create_file(message, target, purpose)
return result

elif action == "edit_file":
result = await self._handle_edit_file(message, target, purpose)
return result

elif action == "chat_stream":
result = await self._handle_chat_stream(message)
return result

else:
return {
"success": False,
"message": f"Unknown action: {action}"
}

except Exception as e:
logger.error(f"Error in handle_action: {str(e)}")
return {
"success": False,
"message": f"Error: {str(e)}"
}

async def _handle_create_file(
self,
message: str,
target: Optional[str] = None,
purpose: Optional[str] = None
) -> Dict[str, Any]:
"""Handle file creation with optimized performance."""
try:
file_info = self._get_file_extension(message, target)
if not file_info['success']:
return file_info

results = []
for file_type, filepath in file_info['files'].items():
# Generate content first
result = await self.generate_file_content(filepath, message)
if not result["success"]:
continue

# Create file directly
create_result = await self.file_manager.create_file(
filename=filepath,
content=result["file_info"]["content"]
)

if create_result["success"]:
results.append({
"file": filepath,
"success": True
})

if not results:
return {
"success": False,
"message": "Failed to create files"
}

return {
"success": True,
"message": f"Created {len(results)} files",
"files": results
}

except Exception as e:
return {
"success": False,
"message": f"Error: {str(e)}"
}

async def _handle_edit_file(
self,
message: str,
target: Optional[str] = None,
purpose: Optional[str] = None
) -> Dict[str, Any]:
"""Handle file editing."""
try:
# Use target from plan if provided, otherwise extract from message
filename = target or next(
(word for word in message.split()
if word.endswith('.py') or word.endswith('.txt')),
None
)

if not filename:
return {
"success": False,
"message": "No file specified to edit"
}

if not self.file_manager.file_exists(filename):
return {
"success": False,
"message": f"File {filename} does not exist"
}

current_content = self.file_manager.read_file(filename)

# Generate edit instructions
prompt = f"""Edit file {filename} to:
Purpose: {purpose or 'Implement requested changes'}

Current content:
```python
{current_content}
```

Original request: {message}

Return the complete updated code in a code block:
```python:{filename}
[Your code here]
```"""

response = await self.send_message(prompt, include_context=True)

# Extract new content
new_content = self._extract_code_from_markdown(response, filename)
if not new_content:
code_blocks = re.findall(r'```(?:python)?\n(.*?)```', response, re.DOTALL)
if code_blocks:
new_content = code_blocks[0].strip()
else:
new_content = response

# Save changes
result = self.file_manager.save_file(filename, new_content)
return {
"success": result["success"],
"message": f"Updated {filename} for {purpose or 'requested changes'}"
}

except Exception as e:
return {
"success": False,
"message": f"Error editing {target}: {str(e)}"
}

async def detect_action(self, message: str) -> List[str]:
"""
Detect and plan actions needed for the request using AI.

Args:
message (str): The user's input message

Returns:
List[str]: List of detected actions to perform
"""
try:
message_lower = message.lower()

# AI prompt for action detection
action_prompt = {
"role": "system",
"content": """Analyze the user command and detect the action and a logic filename if needed.(not all information are usefull in the user message)
Return ONLY a JSON object in this format:
{
"action_type": "create|edit|show|run|chat",
"target_file": "return only the name of the file we talk about ?",
"operation": "detailed operation type",
"args": ["any", "additional", "arguments"]



WARNING BE SURE YOU RETURN LOGIC FILE NOT STUPIND THINGS IF YOU RETURN STUPID THINGS YOUR CREATOR CAN LOST LOT OF MONEY
LIKE YOU NEED TO BE LOGIC SOMETIME THE USER CAN SAID pls show me mega.py THE NAME OF THE FILE IS "MEGA.PY" IN ZERO CASE "me mega.py"
IF I GOT THIS KIND OF PROBLEM HE WILL BE SAD FOR YOU
}"""
}

# Use a lightweight model for quick detection
detection_model = "llama-3.1-8b-instant" # Lightweight model option

# Send message for AI analysis
try:
response = self.client.chat.completions.create(
messages=[
action_prompt,
{"role": "user", "content": message}
],
model=detection_model,
temperature=0.4, # Lower temperature for more focused responses
max_tokens=200, # Limit response size
top_p=0.9,
stream=False
)

# Parse AI response
try:
action_data = json.loads(response.choices[0].message.content)
action_type = action_data.get("action_type", "chat")
target_file = action_data.get("target_file")
operation = action_data.get("operation")
args = action_data.get("args", [])

except json.JSONDecodeError:
# Fallback to basic detection if AI response isn't valid JSON
return await self._fallback_detection(message_lower)

# Handle different action types
if action_type == "show":
if not target_file:
print("Error: No file specified to show")
return []

result = await self.show_file_content(target_file)
if not result["success"]:
print(f"Error: {result['message']}")
return []

elif action_type == "run":
if not target_file:
print("Error: No file specified to run")
return []

result = await self._handle_run_file(target_file)
if not result["success"]:
print(f"Error: {result['message']}")
return []

elif action_type == "create":
# Get project type and structure
file_info = self._get_file_extension(message, target_file)
if not file_info["success"]:
return ["chat_stream"]

# Set current plan
print(file_info)
print("==>")
self.current_plan = []
for file_type, filepath in file_info["files"].items():
self.current_plan.append({
"type": "create_file",
"target": filepath,
"purpose": f"Create {file_type} file",
"order": len(self.current_plan) + 1,
"args": args # Add any additional arguments from AI detection
})

# Show plan with rich formatting
console.print("\n[bold green]📋 Required Actions:[/bold green]")
for action in self.current_plan:
if Confirm.ask(
f"[yellow]{action['purpose']}\n"
f"Should I create {action['target']}?[/yellow]"
):
console.print(f"[green]✓[/green] Added: {action['purpose']}")
else:
self.current_plan.remove(action)

if self.current_plan:
return ["create_file"] * len(self.current_plan)

return ["chat_stream"]

elif action_type == "edit":
if not target_file:
return ["chat_stream"]

self.current_plan = [{
"type": "edit_file",
"target": target_file,
"purpose": operation,
"args": args
}]
return ["edit_file"]

# Default to chat stream
return ["chat_stream"]

except Exception as e:
logger.error(f"Error in AI action detection: {str(e)}")
return await self._fallback_detection(message_lower)

except Exception as e:
logger.error(f"Error in detect_action: {str(e)}")
return ["chat_stream"]

async def _fallback_detection(self, message_lower: str) -> List[str]:
"""Fallback detection method when AI detection fails."""
try:

return ["chat_stream"]

except Exception as e:
logger.error(f"Error in fallback detection: {str(e)}")
return ["chat_stream"]

async def handle_actions(self, message: str, actions: List[str]) -> Dict[str, Any]:
"""Handle multiple actions with minimal output."""
try:
if actions == ["chat_stream"]:
return await self._handle_chat_stream(message)

results = []
for i, action in enumerate(actions, 1):
target = self.current_plan[i-1].get('target', 'file')
print(f"Processing file: {target}") # Simple start message

result = await self.handle_action(
message=message,
action=action,
target=target,
purpose=self.current_plan[i-1].get('purpose', None)
)
results.append(result)

if result["success"]:
print(f"End file created: {target}") # Simple completion message
else:
return result

return {
"success": True,
"message": "\n".join(f"Created {r.get('file', 'file')}" for r in results)
}

except Exception as e:
return {
"success": False,
"message": f"Error: {str(e)}"
}

async def generate_file_content(self, filename: str, message: str) -> Dict[str, Any]:
"""Generate and save file content to cloud storage."""
try:
# Get file type from extension
file_type = Path(filename).suffix.lower()

# Different prompts for different file types
if file_type == '.py':
prompt = f"""Create a Python file for: {message}
Filename: {filename}

Requirements:
1. All necessary imports
2. Complete type hints
3. Docstrings
4. Error handling
5. Example usage in comments

Return ONLY the code block in this format:
```python:{filename}
# Your code here
```"""

elif file_type == '.html':
prompt = f"""Create an HTML file for: {message}
Filename: {filename}

Return ONLY the code block:
```html:{filename}
<!-- Your HTML here -->
```"""

elif file_type == '.css':
prompt = f"""Create a CSS file for: {message}
Filename: {filename}

Return ONLY the code block:
```css:{filename}
/* Your CSS here */
```"""

elif file_type == '.js':
prompt = f"""Create a JavaScript file for: {message}
Filename: {filename}

Return ONLY the code block:
```javascript:{filename}
// Your JavaScript here
```"""
elif file_type == '.C#':
prompt = f"""Create a C# file for: {message}
Filename: {filename}

Return ONLY the code block:
```C#:{filename}
// Your C# here
```"""
elif file_type == '.C':
prompt = f"""Create a C file for: {message}
Filename: {filename}

Return ONLY the code block:
```C:{filename}
// Your C here
```"""
else:
prompt = f"""Create content for file: {filename}
Purpose: {message}

Return ONLY the code block:
```text:{filename}
// Your content here
```"""

response = await self.send_message(prompt, include_context=True)

# Extract code from response with proper file type
pattern = f"```(?:python|html|css|javascript|text):{re.escape(filename)}\n(.*?)```"
code_match = re.search(pattern, response, re.DOTALL)

if not code_match:
# Try without filename
pattern = f"```(?:python|html|css|javascript|text)\n(.*?)```"
code_match = re.search(pattern, response, re.DOTALL)

if not code_match:
raise ValueError(f"No valid code block found for {filename}")

code_content = code_match.group(1).strip()

return {
"success": True,
"file_info": {
"path": filename,
"content": code_content
}
}

except Exception as e:
logger.error(f"Error generating content for {filename}: {str(e)}")
return {
"success": False,
"error": str(e)
}

def _extract_code_from_markdown(self, response: str, filename: str) -> Optional[str]:
"""
Extrait le code des blocs de code Markdown.

Args:
response (str): La réponse de l'IA.
filename (str): Le nom du fichier attendu.

Returns:
Optional[str]: Le contenu du code extrait ou None si non trouvé.
"""
try:
pattern = rf"```python:{re.escape(filename)}\n(.*?)```"
matches = re.findall(pattern, response, re.DOTALL)
if matches:
return matches[0].strip()

# Fallback: chercher n'importe quel bloc python
pattern = r"```python\n(.*?)```"
matches = re.findall(pattern, response, re.DOTALL)
if matches:
return matches[0].strip()

return None
except Exception as e:
logger.error(f"Error extracting code from markdown: {str(e)}")
return None

def _extract_code_patterns(self, files: Dict[str, str]) -> str:
"""
Extrait les modèles de code communs à partir des fichiers existants.

Args:
files (Dict[str, str]): Dictionnaire des fichiers avec leurs contenus.

Returns:
str: Chaîne contenant les modèles de code extraits.
"""
patterns = []

for filename, content in files.items():
if filename.endswith('.py'):
# Modèles de classes
class_matches = re.findall(r'class\s+\w+[^\n]*:', content)
if class_matches:
patterns.append(f"Class pattern found in {filename}:")
patterns.extend(f" {match}" for match in class_matches[:2])

# Modèles de fonctions
func_matches = re.findall(r'def\s+\w+[^\n]*:', content)
if func_matches:
patterns.append(f"Function pattern found in {filename}:")
patterns.extend(f" {match}" for match in func_matches[:2])

# Modèles de gestion des erreurs
if 'try:' in content:
patterns.append("Error handling pattern found")

return "\n".join(patterns)

async def handle_file_creation(self, filename: str, message: str) -> Dict[str, Any]:
"""
Gère la création d'un fichier avec une gestion des erreurs améliorée et des retours d'information.

Args:
filename (str): Le nom du fichier à créer.
message (str): Le message de l'utilisateur.

Returns:
Dict[str, Any]: Résultat de la création du fichier avec succès ou message d'erreur.
"""
try:
if self.file_manager.file_exists(filename):
return {
"success": False,
"message": f"File {filename} already exists"
}

result = await self.generate_file_content(filename, message)

if not result["success"]:
return {
"success": False,
"message": f"Failed to generate content: {result.get('error', 'Unknown error')}"
}

file_info = result["file_info"]

create_result = self.file_manager.create_file(
filename=file_info["path"],
content=file_info["content"]
)

if create_result.get("success", False):
return {
"success": True,
"message": f"Created {filename}: {file_info.get('description', 'File created successfully')}"
}
else:
return {
"success": False,
"message": f"Failed to create {filename}: {create_result.get('message', 'Unknown error')}"
}

except Exception as e:
logger.error(f"Error creating {filename}: {str(e)}")
return {
"success": False,
"message": f"Error creating {filename}: {str(e)}"
}

async def edit_file(self, filename: str, message: str) -> Dict[str, Any]:
"""
Gère l'édition d'un fichier avec une gestion des erreurs améliorée et des retours d'information.

Args:
filename (str): Le nom du fichier à éditer.
message (str): Le message de l'utilisateur.

Returns:
Dict[str, Any]: Résultat de l'édition du fichier avec succès ou message d'erreur.
"""
try:
file_info = self.file_manager.get_file_info(filename)
if not file_info.get("exists", False):
return {
"success": False,
"message": f"File {filename} does not exist"
}

current_content = file_info.get("content", "")

edit_prompt = f"""Given this existing file:

Filename: {filename}
Current content:
```python
{current_content}
```

User request: {message}

Provide the complete updated file content that:
1. Implements the requested changes
2. Maintains existing structure where appropriate
3. Includes proper error handling
4. Has complete documentation
5. Follows the project's style

Return ONLY a JSON object in this exact format (no additional text, no code blocks):
{{
"edit": {{
"path": "{filename}",
"content": "<complete updated file content>",
"description": "<description of changes made>",
"changes": ["<list>", "<of>", "<specific>", "<changes>"]
}}
}}

IMPORTANT:
- The content field should contain the complete Python code as a string
- Do not use markdown code blocks in the JSON
- Escape any special characters in the content
"""

rprint(f"[blue]Generating edits for {filename}...[/blue]")
response = await self.send_message(edit_prompt, include_context=True)

try:
clean_response = self._clean_json_response(response)

logger.info("Attempting to parse JSON response for file edit.")
data = json.loads(clean_response)
edit_info = data['edit']

if not edit_info.get('content'):
raise ValueError("Generated content is empty")

if edit_info.get('changes'):
rprint("\n[yellow]Changes to be made:[/yellow]")
for change in edit_info['changes']:
rprint(f" • {change}")

# Validation syntaxique du code généré
try:
compile(edit_info['content'], filename, 'exec')
except SyntaxError as e:
rprint(f"[red]Error: Generated code has syntax errors: {str(e)}[/red]")
rprint("\n[yellow]Generated content:[/yellow]")
rprint(edit_info['content'])
raise

# Sauvegarder les modifications
save_result = self.file_manager.save_file(
filename=edit_info["path"],
content=edit_info["content"]
)

if save_result.get("success", False):
return {
"success": True,
"message": f"Updated {filename}: {edit_info.get('description', 'File updated successfully')}"
}
else:
return {
"success": False,
"message": f"Failed to save {filename}: {save_result.get('message', 'Unknown error')}"
}

except (json.JSONDecodeError, KeyError) as e:
logger.error(f"Error parsing response for file edit: {str(e)}")
rprint(f"[red]Error parsing response: {str(e)}[/red]")
rprint("\n[yellow]Raw response:[/yellow]")
rprint(response)
raise

except Exception as e:
logger.error(f"Error editing file: {str(e)}")
return {
"success": False,
"message": f"Error editing file: {str(e)}"
}

def _clean_json_response(self, response: str) -> str:
"""
Nettoie la réponse de l'IA pour extraire un JSON valide.

Args:
response (str): La réponse brute de l'IA.

Returns:
str: La chaîne JSON nettoyée.

Raises:
ValueError: Si aucun JSON valide n'est trouvé.
"""
try:
response = response.strip()

# Chercher le début et la fin de l'objet JSON
start = response.find('{')
end = response.rfind('}') + 1

if start >= 0 and end > start:
json_str = response[start:end]
# Valider que c'est un JSON valide
json.loads(json_str)
return json_str

raise ValueError("No valid JSON object found in response")

except Exception as e:
logger.error(f"Error cleaning JSON response: {str(e)}")
rprint(f"[red]Error cleaning JSON response: {str(e)}[/red]")
rprint("\n[yellow]Original response:[/yellow]")
rprint(response)
raise

def _extract_code_from_markdown(self, response: str, filename: str) -> Optional[str]:
"""
Extrait le code des blocs de code Markdown.

Args:
response (str): La réponse de l'IA.
filename (str): Le nom du fichier attendu.

Returns:
Optional[str]: Le contenu du code extrait ou None si non trouvé.
"""
try:
pattern = rf"```python:{re.escape(filename)}\n(.*?)```"
matches = re.findall(pattern, response, re.DOTALL)
if matches:
return matches[0].strip()

# Fallback: chercher n'importe quel bloc python
pattern = r"```python\n(.*?)```"
matches = re.findall(pattern, response, re.DOTALL)
if matches:
return matches[0].strip()

return None
except Exception as e:
logger.error(f"Error extracting code from markdown: {str(e)}")
return None

def _extract_code_patterns(self, files: Dict[str, str]) -> str:
"""
Extrait les modèles de code communs à partir des fichiers existants.

Args:
files (Dict[str, str]): Dictionnaire des fichiers avec leurs contenus.

Returns:
str: Chaîne contenant les modèles de code extraits.
"""
patterns = []

for filename, content in files.items():
if filename.endswith('.py'):
# Modèles de classes
class_matches = re.findall(r'class\s+\w+[^\n]*:', content)
if class_matches:
patterns.append(f"Class pattern found in {filename}:")
patterns.extend(f" {match}" for match in class_matches[:2])

# Modèles de fonctions
func_matches = re.findall(r'def\s+\w+[^\n]*:', content)
if func_matches:
patterns.append(f"Function pattern found in {filename}:")
patterns.extend(f" {match}" for match in func_matches[:2])

# Modèles de gestion des erreurs
if 'try:' in content:
patterns.append("Error handling pattern found")

return "\n".join(patterns)

async def generate_template_content(
self,
filename: str,
imports: Optional[List[str]] = None,
classes: Optional[List[str]] = None,
functions: Optional[List[str]] = None
) -> str:
"""
Génère un contenu de fichier basé sur un template.

Args:
filename (str): Le nom du fichier.
imports (List[str], optional): Liste des imports.
classes (List[str], optional): Liste des classes.
functions (List[str], optional): Liste des fonctions.

Returns:
str: Le contenu généré du fichier.
"""
content = [f"# {filename}", "# Auto-generated template", ""]

# Ajouter les imports
if imports:
for imp in imports:
content.append(f"import {imp}")
content.append("")
else:
content.append("import typing")
content.append("from typing import List, Dict, Any, Optional")
content.append("")

# Ajouter les classes
if classes:
for class_name in classes:
content.extend([
f"class {class_name}:",
f' """',
f" {class_name} class.",
f' """',
" def __init__(self):",
" pass",
""
])

# Ajouter les fonctions
if functions:
for func_name in functions:
content.extend([
f"def {func_name}() -> None:",
f' """',
f" {func_name} function.",
f' """',
" pass",
""
])

# Ajouter la fonction main
content.extend([
"def main():",
' """Main function."""',
" try:",
" # Your code here",
" pass",
" except Exception as e:",
' print(f"Error: {str(e)}")',
"",
'if __name__ == "__main__":',
" main()"
])

return "\n".join(content)

async def _validate_generated_code(
self,
response: str,
filename: str,
context: Dict[str, Any]
) -> Dict[str, Any]:
"""
Valide le code généré pour sa complétude et sa compatibilité.

Args:
response (str): La réponse de l'IA contenant le code.
filename (str): Le nom du fichier.
context (Dict[str, Any]): Contexte du projet.

Returns:
Dict[str, Any]: Résultat de la validation avec succès ou message d'erreur.
"""
try:
data = json.loads(self._clean_json_response(response))
file_info = data.get('file', {})

if not file_info.get('content'):
return {
"success": False,
"error": "Generated content is empty"
}

# Valider la syntaxe
try:
compile(file_info['content'], filename, 'exec')
except SyntaxError as e:
return {
"success": False,
"error": f"Syntax error in generated code: {str(e)}"
}

# Vérifier les composants requis
content = file_info['content']
checks = {
"imports": "import" in content,
"error_handling": "try:" in content,
"documentation": '"""' in content,
"type_hints": ": " in content and " -> " in content,
}

missing = [k for k, v in checks.items() if not v]
if missing:
return {
"success": False,
"error": f"Missing required components: {', '.join(missing)}"
}

return {
"success": True,
"file_info": file_info
}

except Exception as e:
return {
"success": False,
"error": f"Validation error: {str(e)}"
}

def _extract_filename_from_response(self, response: str) -> Optional[str]:
"""
Extrait le nom de fichier de la réponse si possible.

Args:
response (str): La réponse de l'IA.

Returns:
Optional[str]: Le nom du fichier extrait ou None.
"""
py_files = re.findall(r'\b\w+\.py\b', response)
if py_files:
return py_files[0]
return None

async def _handle_complex_task(self, message: str) -> Dict[str, Any]:
"""Handle complex tasks that require planning."""
try:
# Get project context
context = await self.get_project_context()

# Generate plan using AI with more structured prompt
console.print("[bold blue]🤔 Analyzing task and creating plan...[/bold blue]")
response = await self.send_message(
f"""Create a detailed implementation plan for: {message}

Return your response in this exact JSON format:
{{
"project": {{
"name": "Name of the project",
"description": "Brief description",
"estimated_time": "Estimated completion time"
}},
"components": [
{{
"name": "Name of component",
"type": "file",
"path": "src/component/file.py",
"purpose": "Purpose of this component",
"dependencies": [],
"implementation_order": 1,
"content": "# Full Python code here\\n..."
}}
],
"steps": [
{{
"order": 1,
"action": "create_file",
"target": "src/component/file.py",
"description": "Step description"
}}
],
"challenges": [
"Challenge 1",
"Challenge 2"
],
"testing": {{
"strategy": "Testing approach",
"test_cases": [
"Test case 1",
"Test case 2"
]
}}
}}

Current project context:
{json.dumps(context, indent=2)}""",
system_prompt="""You are a senior software architect. Your task is to create implementation plans.
IMPORTANT: Return ONLY valid JSON matching the exact format shown in the prompt.
Do not include any other text or explanations outside the JSON structure."""
)

try:
# Try to parse JSON, handling common issues
response = response.strip()
# Remove any markdown code block markers if present
response = re.sub(r'^```json\s*|\s*```$', '', response, flags=re.MULTILINE)

# Try to find JSON object if there's other text
json_match = re.search(r'\{[\s\S]*\}', response)
if json_match:
response = json_match.group(0)

plan = json.loads(response)

# Validate required fields
required_fields = ['project', 'components', 'steps']
if not all(field in plan for field in required_fields):
raise ValueError(f"Missing required fields: {[f for f in required_fields if f not in plan]}")

# Display plan
console.print("\n[bold]📋 Implementation Plan[/bold]")
console.print(f"\n[cyan]Project:[/cyan] {plan['project']['name']}")
console.print(f"[cyan]Description:[/cyan] {plan['project']['description']}")
console.print(f"[cyan]Estimated time:[/cyan] {plan['project']['estimated_time']}")

# Show components table
table = Table(title="Components")
table.add_column("Order", justify="right", style="cyan")
table.add_column("Component", style="green")
table.add_column("Purpose", style="yellow")

for comp in sorted(plan['components'], key=lambda x: x['implementation_order']):
table.add_row(
str(comp['implementation_order']),
comp['path'],
comp['purpose']
)

console.print(table)

# Show implementation steps
console.print("\n[bold]Implementation Steps:[/bold]")
for step in plan['steps']:
console.print(f"{step['order']}. {step['description']}")

# Show challenges
if plan.get('challenges'):
console.print("\n[bold red]Potential Challenges:[/bold red]")
for challenge in plan['challenges']:
console.print(f"• {challenge}")

# Show testing strategy
if plan.get('testing'):
console.print("\n[bold green]Testing Strategy:[/bold green]")
console.print(f"Strategy: {plan['testing']['strategy']}")
console.print("Test Cases:")
for test in plan['testing']['test_cases']:
console.print(f"• {test}")

# Ask for confirmation to proceed
if Confirm.ask("\n[bold]Would you like me to implement this plan?[/bold]"):
return await self._execute_plan(plan)

return {
"success": True,
"message": "Plan created but implementation was cancelled"
}

except json.JSONDecodeError as e:
# Show the problematic response for debugging
console.print("[red]Error parsing AI response as JSON[/red]")
console.print("\n[yellow]Response received:[/yellow]")
console.print(response)
console.print(f"\n[red]JSON Error: {str(e)}[/red]")
return {
"success": False,
"message": "Failed to parse implementation plan"
}

except Exception as e:
logger.error(f"Error handling complex task: {str(e)}")
return {
"success": False,
"message": f"Error: {str(e)}"
}

async def _handle_chat_stream(self, message: str) -> Dict[str, Any]:
"""Handle chat with streaming response."""
try:
# Get fresh settings each time
settings = await self.db_service.get_user_config()
if not settings["success"]:
raise Exception("Could not get user settings")

messages = [
{
"role": "system",
"content": """You are a helpful AI assistant. Keep responses:
1. Natural and conversational
2. Brief and to the point (2-3 sentences max)
3. Focused on answering the specific question"""
},
{
"role": "user",
"content": message
}
]

# Use current model from settings
response = self.client.chat.completions.create(
messages=messages,
model=settings["config"]["models"]["default"], # Use model from settings
temperature=settings["config"]["temperature"],
max_tokens=settings["config"]["max_tokens"],
top_p=settings["config"]["top_p"],
stream=False
)

content = response.choices[0].message.content
console.print("\n[bold green]🤖[/bold green] ", end="")
console.print(content)
console.print()

return {
"success": True,
"message": content
}

except Exception as e:
logger.error(f"Error in chat: {str(e)}")
return {
"success": False,
"message": f"Error: {str(e)}"
}

def _get_file_extension(self, message: str, target: Optional[str] = None) -> Dict[str, Any]:
"""Generate appropriate filename and extension based on message context."""
try:
message_lower = message.lower()
words = message_lower.split()

# Project type patterns
PROJECT_PATTERNS = {
'web': {
'keywords': ['web', 'website', 'webpage', 'frontend', 'html'],
'structure': {
'main': '/web_app/app.py',
'template': '/web_app/templates/index.html',
'static': '/web_app/static/styles.css',
'js': '/web_app/static/script.js'
}
},
'scraper': {
'keywords': ['scraper', 'crawler', 'spider', 'scraping'],
'structure': {
'main': lambda site: f'/scrapers/{site}_scraper.py',
'utils': '/scrapers/utils.py',
'config': '/scrapers/config.yml'
}
},
'api': {
'keywords': ['api', 'rest', 'endpoint', 'backend'],
'structure': {
'main': '/api/main.py',
'routes': '/api/routes.py',
'models': '/api/models.py'
}
},
'downloader': {
'keywords': ['downloader', 'download', 'youtube', 'video'],
'structure': {
'main': '/downloader/main.py',
'ui': '/downloader/templates/index.html',
'static': '/downloader/static/styles.css',
'js': '/downloader/static/script.js'
}
}
}

# Check for project type first
for project_type, config in PROJECT_PATTERNS.items():
if any(keyword in message_lower for keyword in config['keywords']):
structure = config['structure']

# For scrapers, handle dynamic site names
if project_type == 'scraper':
site_name = next(
(word for word in words
if word not in config['keywords'] + ['create', 'make', 'for', 'a', 'an']),
'generic'
)
return {
'success': True,
'files': {
'main': structure['main'](site_name),
'utils': structure['utils'],
'config': structure['config']
}
}

return {
'success': True,
'files': structure
}

# File type patterns
FILE_PATTERNS = {
'python': {
'extensions': ['.py'],
'keywords': ['python', 'script', 'module'],
'default': 'main.py'
},
'web': {
'extensions': ['.html', '.css', '.js'],
'keywords': ['html', 'css', 'javascript', 'webpage'],
'default': 'index.html'
},
'data': {
'extensions': ['.json', '.csv', '.yaml', '.yml'],
'keywords': ['data', 'config', 'settings'],
'default': 'data.json'
},
'requirements': {
'extensions': ['.txt', '.toml'],
'keywords': ['requirements', 'dependencies'],
'default': 'requirements.txt'
}
}

# If target is provided, use it
if target:
return {
'success': True,
'files': {'main': target}
}

# Check for specific file types
for file_type, config in FILE_PATTERNS.items():
if any(keyword in message_lower for keyword in config['keywords']):
return {
'success': True,
'files': {'main': f"/{config['default']}"}
}

# Default to Python file if no match
return {
'success': True,
'files': {'main': '/script.py'}
}

except Exception as e:
return {
'success': False,
'error': str(e)
}

async def verify_model(self) -> bool:
"""Verify current model is available and sync with database."""
try:
# Get current settings from user_configs table
settings_result = await self.db_service.get_user_config()
if not settings_result["success"]:
return False

settings = settings_result["config"]
current_model = settings['models']['default']

# Fetch available models quietly
available_models = self.fetch_available_models()
model_ids = [model['id'] for model in available_models]

# If current model not available, switch to first available
if current_model not in model_ids and model_ids:
settings['models']['default'] = model_ids[0]
settings['models']['available'] = model_ids
await self.db_service.update_user_config(settings)
return True

# Update available models list if different
if set(settings['models'].get('available', [])) != set(model_ids):
settings['models']['available'] = model_ids
await self.db_service.update_user_config(settings)

# Update client model
self.client = Groq(
api_key=self.api_key,
default_model=settings['models']['default']
)

return True

except Exception as e:
logger.error(f"Error verifying model: {str(e)}")
return False

async def show_file_content(self, filename: str) -> Dict[str, Any]:
"""Show the contents of a file with AI explanation."""
try:
# Get file content from workspace_contents table
if not filename.startswith('/'):
filename = f"/{filename}"

# Get current workspace
workspace = await self.file_manager.workspace.db_service.get_user_workspace()
if not workspace["success"]:
return {
"success": False,
"message": "No active workspace"
}

# Get file content from database
result = await self.file_manager.workspace.db_service.get_file_content(filename)
if not result:
return {
"success": False,
"message": f"File {filename} not found or empty"
}

# Ask AI to explain the code
prompt = f"""Here is the content of {filename}:
```python
{result}
```
Please explain what this code does briefly."""

explanation = await self.send_message(prompt)

# Format output
print(f"\nFile: {filename}")
print("-" * 40)
print(result)
print("-" * 40)
print("Explanation:")
print(explanation)

return {
"success": True,
"content": result,
"explanation": explanation
}

except Exception as e:
return {
"success": False,
"message": f"Error showing file: {str(e)}"
}

async def handle_command(self, command: str):
"""Handle CLI commands."""
try:
if command.startswith('/run'):
# Extract filename from command
parts = command.split()
if len(parts) < 2:
return {
"success": False,
"message": "Please specify a file to run"
}

filename = parts[1]
return await self._handle_run_file(filename)

# If not a run command, return None or handle other commands
return None

except Exception as e:
return {
"success": False,
"message": f"Error handling command: {str(e)}"
}

async def _handle_run_file(self, filename: str) -> Dict[str, Any]:
"""Handle running different types of files."""
try:
# Normalize filename
if not filename.startswith('/'):
filename = f"/{filename}"

# Get file content from database
result = await self.file_manager.workspace.db_service.get_file_content(filename)
if not result:
# Try common paths in database
common_paths = [
filename,
f"/web_app/templates{filename}",
f"/web_app{filename}",
f"/web_app/static{filename}",
]

for path in common_paths:
result = await self.file_manager.workspace.db_service.get_file_content(path)
if result:
filename = path # Update filename to matched path
break

if not result:
return {
"success": False,
"message": f"File not found in any location: {filename}"
}

file_ext = filename.split('.')[-1].lower()

if file_ext == 'py':
import subprocess
import tempfile
import re

# Extract required packages from imports
import_pattern = r'^(?:from|import)\s+([a-zA-Z0-9_]+)'
required_packages = set()

for line in result.split('\n'):
match = re.match(import_pattern, line.strip())
if match:
package = match.group(1)
# Skip standard library modules
if package not in ['os', 'sys', 'time', 'datetime', 'json',
'math', 're', 'random', 'pathlib', 'typing']:
required_packages.add(package)

# Install required packages
if required_packages:
print(f"Installing required packages: {', '.join(required_packages)}")
for package in required_packages:
try:
subprocess.run(
['pip', 'install', package],
capture_output=True,
text=True,
check=True
)
print(f"✓ Installed {package}")
except subprocess.CalledProcessError as e:
return {
"success": False,
"message": f"Failed to install {package}: {e.stderr}"
}

# Run the Python file
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(result)
temp_path = f.name

try:
process = subprocess.run(
['python', temp_path],
capture_output=True,
text=True
)

if process.returncode == 0:
return {
"success": True,
"message": f"Output:\n{process.stdout}"
}
else:
return {
"success": False,
"message": f"Error:\n{process.stderr}"
}
finally:
import os
os.unlink(temp_path)

elif file_ext in ['html', 'htm']:
import http.server
import socketserver
import threading
import webbrowser
import tempfile
from pathlib import Path
import time

# Define Handler class outside
class CustomHandler(http.server.SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.directory = kwargs.pop('directory', None)
super().__init__(*args, **kwargs)

def log_message(self, format, *args):
if args[1] != '200':
print(f"Error serving {args[0]}: {args[1]}")

def translate_path(self, path):
# Remove /web_app/templates/ from URL if file exists directly
if path.startswith('/web_app/templates/'):
direct_path = super().translate_path(path.replace('/web_app/templates/', '/'))
if Path(direct_path).exists():
return direct_path
return super().translate_path(path)

class WebServer:
def __init__(self, directory, port=8080):
self.directory = directory
self.port = port
self.httpd = None

def start(self):
"""Start the web server."""
handler = lambda *args: CustomHandler(*args, directory=str(self.directory))
self.httpd = socketserver.TCPServer(("", self.port), handler)
print(f"\nServing files from: {self.directory}")
print(f"Server running at: http://localhost:{self.port}")
threading.Thread(target=self.httpd.serve_forever, daemon=True).start()
# Wait a bit for server to start
time.sleep(0.5)

def stop(self):
"""Stop the web server."""
if self.httpd:
self.httpd.shutdown()
self.httpd.server_close()

# Create temporary directory and file
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir = Path(temp_dir)

# Create web_app structure
web_dir = temp_dir / 'web_app'
templates_dir = web_dir / 'templates'
static_dir = web_dir / 'static'

templates_dir.mkdir(parents=True, exist_ok=True)
static_dir.mkdir(parents=True, exist_ok=True)

# Write main HTML file
if '/web_app/templates/' in filename:
html_file = templates_dir / Path(filename).name
url_path = f'web_app/templates/{Path(filename).name}'
else:
html_file = temp_dir / Path(filename).name
url_path = Path(filename).name

html_file.write_text(result)
print(f"\nCreated HTML file: {html_file}")

# Always try to get and write related files
related_files = {
'/web_app/static/styles.css': static_dir / 'styles.css',
'/web_app/static/script.js': static_dir / 'script.js',
}

for db_path, file_path in related_files.items():
content = await self.file_manager.workspace.db_service.get_file_content(db_path)
if content:
file_path.write_text(content)
print(f"Added related file: {file_path}")

# Start the web server
server = WebServer(temp_dir)
server.start()

# Open browser with correct path
url = f"http://localhost:8080/{url_path}"
print(f"\nOpening: {url}")
webbrowser.open(url)

# Keep the server running
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
server.stop()
return {
"success": True,
"message": "Server stopped"
}

return {
"success": True,
"message": f"Preview available at: {url}\nPress Ctrl+C to stop server"
}

else:
return {
"success": False,
"message": f"Unsupported file type: {file_ext}"
}

except Exception as e:
return {
"success": False,
"message": f"Error running file: {str(e)}"
}

思考

Proposer des fonctionnalités intuitives

Je suis en train de créer un menu pour ajouter des APIs, avec des commandes facilitant l'intégration et la génération de code pour les tester.

Ajout des APIs externes

Je suis en train de concevoir un module pour intégrer des APIs externes, générer et tester leur code pour s'assurer de leur validité, en stockant des clés API et en parsemant les commandes nécessaires.

Intégration des APIs externes

Je travaille sur une fonctionnalité pour ajouter des APIs externes, analyser la documentation, générer et tester le code, tout en intégrant des clés API et en ajoutant des fichiers "api_manager.py" et "MenuManager".

Ajouter des fonctionnalités

Je suis en train de camoufler des méthodes "add_api", "generate_api_code" et "test_api_code" dans "groq_service.py", tout en concoctant de nouveaux fichiers comme "api_manager.py".

Thought about API integration features pendant 8 secondes

回答

Voici un exemple d’implémentation montrant comment vous pourriez ajouter un menu pour configurer et gérer des APIs de façon plus intuitive.
L’idée globale est :

  1. Créer un nouveau module (ex. api_manager.py) qui centralise la logique de gestion des APIs (ajout, stockage des clés, parsing de documentation, etc.)
  2. Étendre votre SettingsManager pour stocker dynamiquement les infos relatives aux APIs (clés, URL, endpoints importants, etc.)
  3. Ajouter un menu/commandes pour que l’utilisateur puisse :
    • Ajouter une nouvelle API (ex. Spotify)
    • Lister les APIs déjà configurées
    • Générer du code pour effectuer une requête vers l’API (en se basant sur la doc transmise par l’utilisateur)
    • Tester automatiquement le code pour valider la requête

1) Nouveau module api_manager.py

Ce fichier s’occupe de :

  • Gérer la logique d’ajout/liste/destruction d’une API.
  • Générer le squelette de code pour appeler une API (par exemple, lecture d’une playlist Spotify).
  • Tester l’appel en exécutant ce code dans un environnement isolé (similaire à ce qui est fait pour _handle_run_file).

Créez un nouveau fichier api_manager.py :

import json import re import subprocess import tempfile import logging from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) logger.setLevel(logging.WARNING) class APIManager: """ Gère la configuration et l'utilisation des APIs externes. Permet d'ajouter des APIs, de stocker leurs clés, de générer du code pour tester un appel, etc. """ def __init__(self, settings_manager: Any, groq_service: Any): """ Args: settings_manager (SettingsManager): Le gestionnaire de paramètres qui stockera nos infos d'API. groq_service (GroqService): Pour générer et tester le code en s'appuyant sur l'IA. """ self.settings_manager = settings_manager self.groq_service = groq_service def list_apis(self) -> List[str]: """ Retourne la liste des noms d'API configurées dans les settings. """ settings = self.settings_manager.get_settings() apis = settings.get("external_apis", {}) return list(apis.keys()) def get_api_info(self, api_name: str) -> Dict[str, Any]: """ Retourne les infos (clé, endpoints, doc) d'une API donnée. """ settings = self.settings_manager.get_settings() apis = settings.get("external_apis", {}) return apis.get(api_name, {}) def add_api( self, api_name: str, api_key: str, doc_reference: str = "", default_endpoints: Optional[List[str]] = None ) -> bool: """ Ajoute une API dans les settings. Ex: add_api("spotify", "ABC123", doc_reference="https://developer.spotify.com", default_endpoints=["GET /playlists", ...]) """ settings = self.settings_manager.get_settings() if "external_apis" not in settings: settings["external_apis"] = {} settings["external_apis"][api_name] = { "api_key": api_key, "doc_reference": doc_reference, "endpoints": default_endpoints or [] } self.settings_manager.save_settings(settings) return True def remove_api(self, api_name: str) -> bool: """ Supprime une API du settings. """ settings = self.settings_manager.get_settings() if "external_apis" not in settings or api_name not in settings["external_apis"]: return False del settings["external_apis"][api_name] self.settings_manager.save_settings(settings) return True async def generate_api_call_code( self, api_name: str, endpoint: str, user_instructions: str ) -> str: """ Génère du code Python (avec requêtes requests ou httpx) pour appeler un endpoint particulier en se basant sur les instructions de l'utilisateur (ex: 'Lire la playlist Spotify'). On s'appuie sur GroqService pour générer le code. Le code sera ensuite testé via test_api_code. """ api_info = self.get_api_info(api_name) if not api_info: return f"Erreur: L'API {api_name} n'est pas configurée." prompt = f""" L'utilisateur souhaite exécuter l'action: {user_instructions} sur l'API: {api_name}, endpoint: {endpoint} Voici les informations d'auth (clé ou token): {api_info['api_key']} Documentation de référence (si dispo): {api_info['doc_reference']} Génére-moi UNIQUEMENT un bloc de code Python complet (avec `import requests` ou `import httpx`), capable d'effectuer l'appel à l'endpoint indiqué, insérer la clé d'API, et afficher (print) le résultat pour test. Respecte la structure: ```python:call_api.py # Code complet ici ``` """ return await self.groq_service.send_message(prompt=prompt, include_context=False) async def test_api_code(self, code_content: str) -> Dict[str, Any]: """ Exécute le code généré dans un environnement isolé et retourne stdout/stderr pour valider que l'appel fonctionne. """ try: with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: f.write(code_content) temp_path = f.name process = subprocess.run( ['python', temp_path], capture_output=True, text=True ) if process.returncode == 0: return { "success": True, "stdout": process.stdout, "stderr": process.stderr } else: return { "success": False, "stdout": process.stdout, "stderr": process.stderr } except Exception as e: logger.error(f"Erreur test_api_code: {str(e)}") return { "success": False, "stderr": str(e) }

2) Mettre à jour le SettingsManager pour stocker les infos d’API

Si votre SettingsManager ne gère pas encore de champs dynamiques, vous pouvez ajouter un champ "external_apis": {} dans le settings.json (ou autre format) afin d’y ranger les clés/données :

class SettingsManager: def __init__(self, path: str): self.settings_path = path self._settings = {} self.load_settings() def load_settings(self) -> None: try: with open(self.settings_path, "r", encoding="utf-8") as f: self._settings = json.load(f) - except FileNotFoundError: + except FileNotFoundError as e: self._settings = { "models": { "default": "gpt-4", "available": [] }, + # On s’assure d’initialiser ce champ si absent + "external_apis": {} } + self.save_settings(self._settings) except json.JSONDecodeError: self._settings = {} def get_settings(self) -> dict: return self._settings def save_settings(self, new_settings: dict) -> bool: self._settings = new_settings try: with open(self.settings_path, "w", encoding="utf-8") as f: json.dump(self._settings, f, indent=2) return True except Exception as e: return False

3) Exemple d’intégration dans votre menu / flux utilisateur

Vous pourriez avoir un menu (en CLI ou autre) permettant à l’utilisateur de :

  1. Ajouter une API : on appelle api_manager.add_api(...)
  2. Lister les APIs : on appelle api_manager.list_apis()
  3. Ajouter une commande (ex: “Lire playlist Spotify”) :
    • L’utilisateur choisit l’API (ex: Spotify)
    • L’utilisateur choisit l’endpoint (ex: GET /playlists)
    • L’utilisateur décrit son besoin (ex: “Récupère tous mes playlists et affiche le titre”)
    • L’IA génère le code avec generate_api_call_code(...)
    • On teste le code avec test_api_code(...)
    • On affiche le résultat.

Exemple de menu minimal (dans groq_service.py ou un nouveau menu_manager.py) :

from rich.console import Console from .api_manager import APIManager class MenuManager: def __init__(self, api_manager: APIManager): self.api_manager = api_manager self.console = Console() async def show_api_menu(self): while True: self.console.print("\n[bold cyan]=== Menu API ===[/bold cyan]") self.console.print("1. Ajouter une API") self.console.print("2. Lister les APIs") self.console.print("3. Générer et tester un appel d'API") self.console.print("4. Supprimer une API") self.console.print("5. Quitter\n") choice = self.console.input("[bold green]Votre choix[/bold green]: ").strip() if choice == "1": await self._handle_add_api() elif choice == "2": self._handle_list_apis() elif choice == "3": await self._handle_generate_and_test_code() elif choice == "4": self._handle_remove_api() elif choice == "5": break else: self.console.print("[red]Choix invalide[/red]") async def _handle_add_api(self): name = self.console.input("Nom de l'API: ").strip() key = self.console.input("Clé (token) de l'API: ").strip() doc = self.console.input("Lien de doc (optionnel): ").strip() endpoints_raw = self.console.input("Endpoints courants (séparés par des virgules, ex GET /playlists): ") endpoints = [e.strip() for e in endpoints_raw.split(",") if e.strip()] ok = self.api_manager.add_api(api_name=name, api_key=key, doc_reference=doc, default_endpoints=endpoints) if ok: self.console.print(f"[green]API {name} ajoutée[/green]") else: self.console.print(f"[red]Impossible d'ajouter {name}[/red]") def _handle_list_apis(self): apis = self.api_manager.list_apis() if not apis: self.console.print("[yellow]Aucune API configurée[/yellow]") else: self.console.print("[bold]APIs configurées:[/bold]") for a in apis: self.console.print(f" - {a}") async def _handle_generate_and_test_code(self): apis = self.api_manager.list_apis() if not apis: self.console.print("[yellow]Pas d'API dispo. Veuillez en ajouter une d'abord.[/yellow]") return # Choisir l'API self.console.print("\n[bold]API disponibles[/bold]") for idx, api_name in enumerate(apis, start=1): self.console.print(f"{idx}. {api_name}") choice_idx = int(self.console.input("Votre choix: ").strip()) api_name = apis[choice_idx - 1] # Choisir l’endpoint api_info = self.api_manager.get_api_info(api_name) endpoints = api_info.get("endpoints", []) if not endpoints: self.console.print("[yellow]Aucun endpoint connu pour cette API. Vous pouvez en rajouter via 'Ajouter une API'.[/yellow]") return self.console.print("\n[bold]Endpoints disponibles[/bold]") for idx, ep in enumerate(endpoints, start=1): self.console.print(f"{idx}. {ep}") ep_idx = int(self.console.input("Votre choix d'endpoint: ").strip()) endpoint = endpoints[ep_idx - 1] # Description / instructions instructions = self.console.input("Décrivez ce que vous voulez faire sur cet endpoint: ") # Génère le code self.console.print("[blue]Génération du code...[/blue]") code_response = await self.api_manager.generate_api_call_code(api_name, endpoint, instructions) # Extraire le code Python depuis la réponse IA code_content = self._extract_code_block(code_response, "call_api.py") if not code_content: self.console.print("[red]Impossible de trouver le code dans la réponse IA.[/red]") self.console.print(f"Réponse brute:\n{code_response}") return # Test du code généré self.console.print("[blue]Test du code généré...[/blue]") test_result = await self.api_manager.test_api_code(code_content) if test_result["success"]: self.console.print("[green]Test réussi![/green]") if test_result["stdout"]: self.console.print(f"Sortie:\n{test_result['stdout']}") else: self.console.print("[red]Test échoué ou erreur durant l'exécution.[/red]") self.console.print(f"stdout:\n{test_result.get('stdout','')}") self.console.print(f"stderr:\n{test_result.get('stderr','')}") def _handle_remove_api(self): apis = self.api_manager.list_apis() if not apis: self.console.print("[yellow]Pas d'API à supprimer[/yellow]") return self.console.print("\n[bold]API disponibles[/bold]") for idx, api_name in enumerate(apis, start=1): self.console.print(f"{idx}. {api_name}") choice_idx = int(self.console.input("Votre choix: ").strip()) api_name = apis[choice_idx - 1] ok = self.api_manager.remove_api(api_name) if ok: self.console.print(f"[green]API {api_name} supprimée[/green]") else: self.console.print("[red]Echec de la suppression[/red]") def _extract_code_block(self, response: str, filename: str) -> str: """ Extrait le contenu d'un bloc de code au format: ```python:call_api.py # code ici ``` """ pattern = rf"```python:{re.escape(filename)}\n(.*?)```" match = re.search(pattern, response, re.DOTALL) if match: return match.group(1).strip() # fallback: on tente sans :filename pattern = r"```python\n(.*?)```" match = re.search(pattern, response, re.DOTALL) if match: return match.group(1).strip() return ""

4) Comment utiliser ce nouveau menu

Dans votre code principal (là où vous exécutez déjà certaines commandes), vous pourriez simplement :

+ from .api_manager import APIManager + from .menu_manager import MenuManager class GroqService: def __init__( self, file_manager: FileManager, context_manager: ContextManager, settings_manager: SettingsManager, api_key: Optional[str] = None ): ... self.api_key = api_key or os.getenv("GROQ_API_KEY") self.client = Groq(api_key=self.api_key) ... + # Instanciation du APIManager + self.api_manager = APIManager(self.settings_manager, self) + async def open_api_menu(self): + menu_manager = MenuManager(self.api_manager) + await menu_manager.show_api_menu() ...

À partir de là, vous pouvez appeler await groq_service.open_api_menu() depuis votre CLI principale, un bouton d’interface, etc.


Points clés à retenir

  • Stockage dynamique : on ajoute un champ external_apis dans le settings.json pour mémoriser toutes les APIs.
  • APIManager : gère l’ajout, la suppression, et la génération de code basé sur la doc fournie.
  • MenuManager : un simple menu CLI pour guider l’utilisateur. Vous pouvez l’adapter en interface graphique, ou en commande terminal, etc.
  • Test de code : on réutilise le même principe que pour handle_run_file, c’est-à-dire un script temporaire + exécution dans un sous-processus.

Avec cette structure, l’utilisateur peut :

  1. Ajouter une API (ex. Spotify, ChatGPT, etc.) + doc + endpoints.
  2. Demander à l’IA de générer du code d’appel.
  3. Tester immédiatement si ça fonctionne.

Vous pourrez enrichir ce squelette selon vos besoins (par ex. stocker des scopes OAuth, gérer des refresh tokens, etc.).

分享这个问答