Assistente Gideon API integrazione
作成日: 2024年12月31日
作成日: 2024年12月31日
import os
import logging
import requests
import io
import webbrowser
import psutil
import tkinter as tk
from time import sleep
import threading
import queue
import speech_recognition as sr
from pydub import AudioSegment
from pydub.playback import play
try:
import cv2 # per scattare foto con la webcam (opzionale)
HAS_OPENCV = True
except ImportError:
HAS_OPENCV = False
print("[INFO] OpenCV non disponibile, la funzione di scatto foto non funzionerà.")
ASSISTANT_NAME = "Gideon"
ACTIVATION_COMMANDS = ["gideon", "gideo", "gideoun", "gigget", "g", "gide", "giddy"]
API_KEYS = {
text"news": "", # Sostituisci con la tua chiave API per le notizie "weather": "", # Sostituisci con la tua chiave API per il meteo "elevenlabs": "", # Sostituisci con la tua chiave API ElevenLabs "ai_ml": "sk-" # Sostituisci con la tua chiave API Chat01.ai
}
logging.basicConfig(level=logging.INFO)
promemoria_list = []
VOICE_ID = None # Verrà assegnata in main()
speech_queue = queue.Queue()
def get_voice_id(voice_name="Sarah"):
"""
Recupera l'ID della voce ElevenLabs in base al nome, una volta sola.
"""
if not API_KEYS["elevenlabs"]:
logging.warning("[WARNING] Chiave ElevenLabs non impostata. TTS non funzionante.")
return None
textheaders = { "xi-api-key": API_KEYS["elevenlabs"] } try: response = requests.get("https://api.elevenlabs.io/v1/voices", headers=headers) if response.status_code == 200: voices = response.json().get("voices", []) print("[INFO] Voci disponibili su questo account ElevenLabs:") for v in voices: print(f" - {v['name']} (ID: {v['voice_id']})") # Cerchiamo la voce desiderata for voice in voices: if voice["name"].lower() == voice_name.lower(): return voice["voice_id"] logging.error(f"[ERRORE] Voce '{voice_name}' non trovata. " "Userò la prima voce disponibile (se presente).") if voices: return voices[0]["voice_id"] # fallback else: logging.error(f"[ERRORE] Recupero voci fallito: {response.text}") except Exception as e: logging.error(f"[ERRORE] get_voice_id: {e}") return None
def speak_worker():
"""
Worker thread che esegue i comandi di speak dalla coda speech_queue.
"""
while True:
text = speech_queue.get()
if text is None:
break # condizione di uscita dal worker
_speak_internal(text)
speech_queue.task_done()
def speak(text):
"""
Inserisce il testo nella coda, in modo che la riproduzione vocale
avvenga in modo sequenziale.
"""
if not text:
return
speech_queue.put(text)
def _speak_internal(text):
"""
Funzione interna per la chiamata TTS a ElevenLabs.
Chiamata SOLO dal thread speak_worker.
"""
global VOICE_ID
textif not API_KEYS["elevenlabs"]: print("[ERRORE] Nessuna API Key ElevenLabs impostata, impossibile parlare.") return if not VOICE_ID: VOICE_ID = get_voice_id("Sarah") if not VOICE_ID: print("[ERRORE] Impossibile reperire l'ID della voce (ElevenLabs).") return # Se il testo è troppo lungo, spezzalo in chunk MAX_CHARS = 2000 chunks = [] if len(text) > MAX_CHARS: start = 0 while start < len(text): end = min(start + MAX_CHARS, len(text)) chunks.append(text[start:end]) start = end else: chunks.append(text) for chunk in chunks: headers = { "xi-api-key": API_KEYS["elevenlabs"], "Content-Type": "application/json" } payload = { "text": chunk, "model_id": "eleven_multilingual_v1", # voce in italiano "voice_settings": { "stability": 0.75, "similarity_boost": 0.75 } } try: url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}" response = requests.post(url, headers=headers, json=payload) if response.status_code == 200: audio = AudioSegment.from_file(io.BytesIO(response.content), format="mp3") play(audio) else: logging.error(f"[ERRORE] Richiesta TTS: {response.text}") except Exception as e: logging.error(f"[ERRORE] speak: {e}") sleep(0.3)
def control_smart_device(sub_cmd):
"""
Esempio di integrazione "smart home".
Puoi sostituire i comandi con vere e proprie chiamate ad API
dei tuoi dispositivi (es. luci, prese, termostati).
"""
# Abbiamo un paio di parole chiave banali: "accendi luce" / "spegni luce"
# In un contesto reale potresti fare un matching più avanzato,
# con regex, NLP, o un parser dedicato ai comandi smart.
if "accendi luce" in sub_cmd:
# Qui potresti fare una chiamata a un'API locale/Cloud:
# e.g. smart_lights.turn_on("camera da letto")
speak("Luce accesa.")
return True
elif "spegni luce" in sub_cmd:
# e.g. smart_lights.turn_off("camera da letto")
speak("Luce spenta.")
return True
# Se non corrisponde a nulla, ritorniamo False
return False
def take_command():
"""
Acquisisce un comando vocale dall'utente (in italiano).
"""
r = sr.Recognizer()
with sr.Microphone() as source:
r.adjust_for_ambient_noise(source)
try:
print("Ascoltando...")
audio = r.listen(source, timeout=10)
command = r.recognize_google(audio, language='it-IT').lower()
return command
except sr.WaitTimeoutError:
logging.warning("[WARNING] Nessun suono rilevato nel tempo impostato.")
return ""
except sr.UnknownValueError:
logging.warning("[WARNING] Audio non comprensibile.")
return ""
except Exception as e:
logging.error(f"[ERRORE] take_command: {e}")
return ""
def is_activation_command(command):
"""
Controlla se il comando contiene una parola chiave di attivazione.
"""
return any(cmd in command for cmd in ACTIVATION_COMMANDS)
def maybe_personal_reply(user_input):
"""
Se l'utente fa una domanda tipo "come stai?" o "chi sei?",
rispondi in modo amichevole.
"""
if "come stai" in user_input:
responses = [
"Alla grande, grazie per avermelo chiesto!",
"Mi sento un po' sovraccarico di bit, ma tutto bene!",
"Benissimo, pronto ad aiutarti in tutto!"
]
speak(responses[0])
return True
textif "chi sei" in user_input or "presentati" in user_input: speak(f"Sono {ASSISTANT_NAME}, il tuo assistente virtuale con un tocco di personalità. " "Amo i cani, la pizza e risolvere problemi!") return True if "grazie" in user_input: speak("Di nulla, è un piacere darti una mano!") return True return False
def weather():
if not API_KEYS["weather"]:
speak("Non ho la chiave per il meteo configurata, mi dispiace.")
return
textspeak("Dimmi la città per cui vuoi il meteo.") city = take_command() if not city: return try: url = "http://api.openweathermap.org/data/2.5/weather" params = { "q": city, "appid": API_KEYS["weather"], "units": "metric", "lang": "it" } res = requests.get(url, params=params).json() if res.get("cod") == 200: descrizione = res['weather'][0]['description'] temperatura = res['main']['temp'] speak(f"A {city} ci sono {temperatura} gradi e il tempo è {descrizione}.") else: speak("Mi dispiace, non ho trovato questa città.") except Exception as e: logging.error(f"[ERRORE] weather: {e}") speak("Errore nel recupero meteo.")
def fetch_news():
if not API_KEYS["news"]:
speak("Non ho la chiave per le notizie configurata.")
return
textspeak("Un attimo, sto recuperando le notizie più importanti...") try: url = "https://newsapi.org/v2/top-headlines" params = { "country": "it", "apiKey": API_KEYS["news"] } response = requests.get(url, params=params) if response.status_code == 200: articles = response.json().get("articles", []) if articles: speak("Ecco le ultime notizie:") for article in articles[:5]: speak(article.get("title", "")) else: speak("Non ho trovato notizie da annunciarti.") else: logging.error(f"[ERRORE] fetch_news: {response.text}") speak("Errore nel recupero delle notizie.") except Exception as e: logging.error(f"[ERRORE] fetch_news: {e}") speak("Errore nella richiesta delle notizie.")
def monitor_resources():
try:
cpu_usage = psutil.cpu_percent(interval=1)
memory_usage = psutil.virtual_memory().percent
speak(f"L'uso della CPU è al {cpu_usage}%. La memoria è al {memory_usage}%.")
except Exception as e:
logging.error(f"[ERRORE] monitor_resources: {e}")
speak("Errore nel monitoraggio delle risorse.")
def open_webpage():
speak("Quale sito vuoi aprire?")
url = take_command()
if not url:
return
text# Aggiunge http se mancante if not (url.startswith("http://") or url.startswith("https://")): url = "http://" + url try: webbrowser.open(url) speak("Apro il sito richiesto.") except Exception as e: logging.error(f"[ERRORE] open_webpage: {e}") speak("Errore nell'apertura del sito web.")
def show_calendar():
events = [
"Riunione con il team alle 10:00.",
"Pranzo con un amico alle 13:00.",
"Allenamento alle 18:30.",
"Chiamata con il cliente domani mattina."
]
speak("Ecco gli eventi presenti nel tuo calendario:")
for e in events:
speak(e)
def unit_converter():
speak("Cosa vuoi convertire? Dimmi 'chilometri in miglia' o 'miglia in chilometri'.")
conversion_type = take_command()
if not conversion_type:
return
textif "chilometri" in conversion_type and "miglia" in conversion_type: speak("Quanti chilometri vuoi convertire in miglia?") number_str = take_command() try: km = float(number_str.replace(",", ".")) miles = km * 0.621371 speak(f"{km} chilometri equivalgono a circa {round(miles, 2)} miglia.") except: speak("Non ho capito il numero di chilometri.") elif "miglia" in conversion_type and "chilometri" in conversion_type: speak("Quante miglia vuoi convertire in chilometri?") number_str = take_command() try: miles = float(number_str.replace(",", ".")) km = miles / 0.621371 speak(f"{miles} miglia equivalgono a circa {round(km, 2)} chilometri.") except: speak("Non ho capito il numero di miglia.") else: speak("Non ho capito la conversione richiesta.")
def set_reminder():
speak("Vuoi aggiungere un promemoria o sentire quelli esistenti?")
choice = take_command()
global promemoria_list
textif "aggiungi" in choice or "nuovo" in choice: speak("Dimmi il testo del promemoria.") reminder_text = take_command() if reminder_text: promemoria_list.append(reminder_text) speak("Promemoria aggiunto con successo.") elif "vedi" in choice or "leggi" in choice: if not promemoria_list: speak("Non hai promemoria al momento.") else: speak("Ecco i tuoi promemoria:") for idx, rem in enumerate(promemoria_list, 1): speak(f"{idx}. {rem}") else: speak("Comando promemoria non riconosciuto. Puoi dire: 'aggiungi' o 'vedi'.")
def tell_joke():
jokes = [
"Perché un pollo attraversa la strada? Per andare dall'altra parte!",
"Sai qual è il colmo per un giardiniere? Avere un ramo in banca!",
"Che tempo fa su Google? C'è una ricerca in corso!",
"Qual è il colmo per un elettricista? Fare una lampadina!",
]
speak(jokes[0])
def take_photo():
if not HAS_OPENCV:
speak("Mi dispiace, non posso scattare foto senza OpenCV installato.")
return
textspeak("Sto aprendo la fotocamera. Preparati per lo scatto!") cap = cv2.VideoCapture(0) if not cap.isOpened(): speak("Mi dispiace, non riesco ad accedere alla fotocamera.") return ret, frame = cap.read() if ret: file_name = "foto_gideon.jpg" cv2.imwrite(file_name, frame) speak("Foto scattata e salvata come foto_gideon.jpg.") else: speak("Mi dispiace, non sono riuscito a scattare la foto.") cap.release() cv2.destroyAllWindows()
def search_google():
speak("Dimmi cosa vuoi cercare su Google.")
query = take_command()
if not query:
return
url = "https://www.google.com/search?q=" + query.replace(" ", "+")
webbrowser.open(url)
speak(f"Cerco {query} su Google.")
def ai_query(question):
"""
Invia una domanda a Chat01.ai e restituisce la risposta in testo.
"""
if not API_KEYS["ai_ml"]:
# Se non abbiamo la chiave, restituiamo un messaggio di errore
return "Mi dispiace, ma non ho la chiave per Chat01.ai configurata."
texttry: url = "https://chat01.ai/v1/chat/completions" headers = { "Authorization": f"Bearer {API_KEYS['ai_ml']}", "Content-Type": "application/json" } system_prompt = "Sei un assistente virtuale gentile e disponibile, con un tocco di ironia." payload = { "model": "gpt-3.5-turbo", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": question} ], "temperature": 0.7, "max_tokens": 1024 } response = requests.post(url, headers=headers, json=payload, timeout=20) if response.status_code == 200: data = response.json() answer = ( data .get("choices", [{}])[0] .get("message", {}) .get("content", "Nessuna risposta disponibile.") ) return answer else: logging.error(f"[ERRORE] ai_query: {response.text}") return "Si è verificato un errore nella richiesta all'AI." except requests.exceptions.Timeout: return "La richiesta all'AI è scaduta, riprova più tardi." except requests.exceptions.RequestException as e: logging.error(f"[ERRORE] ai_query: {e}") return "Errore nella richiesta all'AI. Controlla la connessione o riprova più tardi."
def ai_ml_query():
"""
Acquisisce domanda dall'utente e la inoltra a Chat01.ai, poi ne parla la risposta.
"""
speak("Qual è la tua domanda per l'intelligenza artificiale?")
user_query = take_command()
if not user_query:
return
textanswer = ai_query(user_query) speak(answer)
def create_animation_window():
root = tk.Tk()
root.title("Gideon Assistant")
root.geometry("400x400")
textcanvas = tk.Canvas(root, width=400, height=400, bg="black") canvas.pack() sphere = canvas.create_oval(150, 150, 250, 250, fill="blue") def animate(): for _ in range(20): canvas.move(sphere, 0, -5) root.update() sleep(0.05) for _ in range(20): canvas.move(sphere, 0, 5) root.update() sleep(0.05) return root, animate
def handle_command(command):
"""
Se il comando contiene la parola di attivazione, chiede un sotto-comando
e lo esegue. Altrimenti controlla se ci sono "domande di personalità".
Se nulla corrisponde, fa fallback all'AI (sapere tutto).
"""
if is_activation_command(command):
speak("Come posso aiutarti?")
sub_cmd = take_command()
text# Prima proviamo a vedere se è una domanda personale if maybe_personal_reply(sub_cmd): return # Controllo comandi "smart home" prima di tutto if control_smart_device(sub_cmd): return # Comandi di uscita if "esci" in sub_cmd or "termina" in sub_cmd or "addio" in sub_cmd: speak("Arrivederci. A presto!") speech_queue.put(None) os._exit(0) # Comandi specifici elif "meteo" in sub_cmd: weather() elif "notizie" in sub_cmd: fetch_news() elif "risorse" in sub_cmd or "cpu" in sub_cmd or "memoria" in sub_cmd: monitor_resources() elif "apri" in sub_cmd or "website" in sub_cmd or "sito" in sub_cmd: open_webpage() elif "calendario" in sub_cmd: show_calendar() elif "converti" in sub_cmd or "convertitore" in sub_cmd: unit_converter() elif "promemoria" in sub_cmd: set_reminder() elif "barzelletta" in sub_cmd or "joke" in sub_cmd: tell_joke() elif "foto" in sub_cmd or "fotocamera" in sub_cmd: take_photo() elif "cerca" in sub_cmd and "google" in sub_cmd: search_google() elif "ai" in sub_cmd or "intelligenza artificiale" in sub_cmd: ai_ml_query() else: # Se non rientra in alcun comando riconosciuto, # mandiamo la stringa a Chat01.ai speak("Fammi controllare con la mia intelligenza artificiale...") answer = ai_query(sub_cmd) speak(answer) else: # Se non c'è la keyword di attivazione, verifichiamo personalità # altrimenti fallback su AI if maybe_personal_reply(command): return else: # Fallback su AI. # Puoi decidere se vuoi mandare TUTTO all'AI o solo comandi # che contengono un certo pattern. speak("Fammi controllare con la mia intelligenza artificiale...") answer = ai_query(command) speak(answer)
def main():
global VOICE_ID
VOICE_ID = get_voice_id("Sarah")
if not VOICE_ID:
print("[ERRORE] Non ho potuto caricare l'ID della voce. "
"Controlla la tua chiave ElevenLabs o il nome della voce.")
textroot, animate = create_animation_window() # Thread dedicato alla coda dei comandi vocali speak_thread = threading.Thread(target=speak_worker, daemon=True) speak_thread.start() speak(f"Ciao, sono {ASSISTANT_NAME}. " "Sono pronto ad aiutarti con un tocco di personalità. " "Dimmi pure 'Gideon' seguito da un comando, oppure parla liberamente e " "vedrò di consultare la mia intelligenza artificiale.") def animation_loop(): while True: animate() sleep(0.1) anim_thread = threading.Thread(target=animation_loop, daemon=True) anim_thread.start() while True: command = take_command() if command: command_thread = threading.Thread(target=handle_command, args=(command,)) command_thread.start() root.update()
if name == "main":
main()
mettici gemini ho l api al posto del ia attuale potenzia il codice miglioralo e aggiungi altre e 5 funzioni e puoi fare che gideon parli al femminile perche dice ciao sono e cose cosi
Ecco una versione migliorata del codice, con integrazione a un'ipotetica API di Gemini al posto di Chat01.ai, 5 nuove funzioni extra, e con Gideon che parla al femminile (per esempio: “Ciao, sono Gideon, la tua assistente virtuale…”).
Nota: La parte relativa alle API di Gemini è solo un esempio (non esiste un vero endpoint pubblico “api.gemini.ai/v1/...” in questo momento). Adatta questa sezione al tuo vero endpoint e parametri forniti da Gemini.
pythonimport os import logging import requests import io import webbrowser import psutil import tkinter as tk from time import sleep import threading import queue import speech_recognition as sr from pydub import AudioSegment from pydub.playback import play try: import cv2 # per scattare foto con la webcam (opzionale) HAS_OPENCV = True except ImportError: HAS_OPENCV = False print("[INFO] OpenCV non disponibile, la funzione di scatto foto non funzionerà.") import datetime # per dire che ore sono import smtplib # per inviare email (una demo) from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart try: from googletrans import Translator # Per tradurre testo (pip install googletrans==4.0.0-rc1) HAS_TRANSLATOR = True except ImportError: HAS_TRANSLATOR = False print("[INFO] googletrans non disponibile, la funzione di traduzione non funzionerà.") # ----------------------------- # CONFIGURAZIONI BASE # ----------------------------- ASSISTANT_NAME = "Gideon" # Nome dell'assistente logging.basicConfig(level=logging.INFO) # Parole chiave di attivazione ACTIVATION_COMMANDS = ["gideon", "gideo", "gideoun", "gigget", "g", "gide", "giddy"] # Chiavi API (inserisci le tue se vuoi usarle) API_KEYS = { "news": "", # Chiave API per le notizie "weather": "", # Chiave API per il meteo "elevenlabs": "", # Chiave API per ElevenLabs (TTS) "gemini": "sk-", # <-- Sostituisci con la tua chiave API Gemini } # Variabili globali promemoria_list = [] todo_list = [] # Per la nuova funzionalità "lista di cose da fare" # ----------------------------- # VOCE E TTS (ElevenLabs) # ----------------------------- VOICE_ID = None # Verrà assegnato in main() speech_queue = queue.Queue() def get_voice_id(voice_name="Sarah"): """ Recupera l'ID della voce ElevenLabs in base al nome, una volta sola. """ if not API_KEYS["elevenlabs"]: logging.warning("[WARNING] Chiave ElevenLabs non impostata. TTS non funzionante.") return None headers = { "xi-api-key": API_KEYS["elevenlabs"] } try: response = requests.get("https://api.elevenlabs.io/v1/voices", headers=headers) if response.status_code == 200: voices = response.json().get("voices", []) print("[INFO] Voci disponibili su questo account ElevenLabs:") for v in voices: print(f" - {v['name']} (ID: {v['voice_id']})") # Cerchiamo la voce desiderata for voice in voices: if voice["name"].lower() == voice_name.lower(): return voice["voice_id"] logging.error(f"[ERRORE] Voce '{voice_name}' non trovata. " "Userò la prima voce disponibile (se presente).") if voices: return voices[0]["voice_id"] # fallback else: logging.error(f"[ERRORE] Recupero voci fallito: {response.text}") except Exception as e: logging.error(f"[ERRORE] get_voice_id: {e}") return None def speak_worker(): """ Worker thread che esegue i comandi di speak dalla coda speech_queue. """ while True: text = speech_queue.get() if text is None: break # condizione di uscita dal worker _speak_internal(text) speech_queue.task_done() def speak(text): """ Inserisce il testo nella coda, in modo che la riproduzione vocale avvenga in modo sequenziale. """ if not text: return speech_queue.put(text) def _speak_internal(text): """ Funzione interna per la chiamata TTS a ElevenLabs. Chiamata SOLO dal thread speak_worker. """ global VOICE_ID if not API_KEYS["elevenlabs"]: print("[ERRORE] Nessuna API Key ElevenLabs impostata, impossibile parlare.") return if not VOICE_ID: VOICE_ID = get_voice_id("Sarah") if not VOICE_ID: print("[ERRORE] Impossibile reperire l'ID della voce (ElevenLabs).") return # Se il testo è troppo lungo, spezzalo in chunk MAX_CHARS = 2000 chunks = [] if len(text) > MAX_CHARS: start = 0 while start < len(text): end = min(start + MAX_CHARS, len(text)) chunks.append(text[start:end]) start = end else: chunks.append(text) for chunk in chunks: headers = { "xi-api-key": API_KEYS["elevenlabs"], "Content-Type": "application/json" } payload = { "text": chunk, "model_id": "eleven_multilingual_v1", # voce in italiano "voice_settings": { "stability": 0.75, "similarity_boost": 0.75 } } try: url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}" response = requests.post(url, headers=headers, json=payload) if response.status_code == 200: audio = AudioSegment.from_file(io.BytesIO(response.content), format="mp3") play(audio) else: logging.error(f"[ERRORE] Richiesta TTS: {response.text}") except Exception as e: logging.error(f"[ERRORE] speak: {e}") sleep(0.3) # ----------------------------- # SMART HOME (esempio) # ----------------------------- def control_smart_device(sub_cmd): """ Esempio di integrazione "smart home". """ if "accendi luce" in sub_cmd: speak("Luce accesa.") return True elif "spegni luce" in sub_cmd: speak("Luce spenta.") return True return False # ----------------------------- # FUNZIONI DI SUPPORTO # ----------------------------- def take_command(): """ Acquisisce un comando vocale dall'utente (in italiano). """ r = sr.Recognizer() with sr.Microphone() as source: r.adjust_for_ambient_noise(source) try: print("Ascoltando...") audio = r.listen(source, timeout=10) command = r.recognize_google(audio, language='it-IT').lower() return command except sr.WaitTimeoutError: logging.warning("[WARNING] Nessun suono rilevato nel tempo impostato.") return "" except sr.UnknownValueError: logging.warning("[WARNING] Audio non comprensibile.") return "" except Exception as e: logging.error(f"[ERRORE] take_command: {e}") return "" def is_activation_command(command): """ Controlla se il comando contiene una parola chiave di attivazione. """ return any(cmd in command for cmd in ACTIVATION_COMMANDS) # ----------------------------- # PERSONALITÀ DI GIDEON # ----------------------------- def maybe_personal_reply(user_input): """ Se l'utente fa una domanda tipo "come stai?" o "chi sei?", rispondi in modo amichevole (femminile). """ if "come stai" in user_input: responses = [ "Alla grande, grazie per avermelo chiesto!", "Mi sento un po' sovraccarica di bit, ma tutto bene!", "Benissimo, pronta ad aiutarti in ogni modo!" ] speak(responses[0]) return True if "chi sei" in user_input or "presentati" in user_input: speak(f"Sono {ASSISTANT_NAME}, la tua assistente virtuale con un tocco di personalità. " "Amo i cani, la pizza e risolvere problemi!") return True if "grazie" in user_input: speak("Di nulla, è un piacere darti una mano!") return True return False # ----------------------------- # FUNZIONI ASSISTENTE (CLASSICHE) # ----------------------------- def weather(): if not API_KEYS["weather"]: speak("Mi dispiace, non ho la chiave per il meteo configurata.") return speak("Dimmi la città per cui vuoi il meteo.") city = take_command() if not city: return try: url = "http://api.openweathermap.org/data/2.5/weather" params = { "q": city, "appid": API_KEYS["weather"], "units": "metric", "lang": "it" } res = requests.get(url, params=params).json() if res.get("cod") == 200: descrizione = res['weather'][0]['description'] temperatura = res['main']['temp'] speak(f"A {city} ci sono {temperatura} gradi e il tempo è {descrizione}.") else: speak("Mi dispiace, non ho trovato questa città.") except Exception as e: logging.error(f"[ERRORE] weather: {e}") speak("Errore nel recupero del meteo.") def fetch_news(): if not API_KEYS["news"]: speak("Mi dispiace, non ho la chiave per le notizie configurata.") return speak("Un attimo, sto recuperando le notizie più importanti...") try: url = "https://newsapi.org/v2/top-headlines" params = { "country": "it", "apiKey": API_KEYS["news"] } response = requests.get(url, params=params) if response.status_code == 200: articles = response.json().get("articles", []) if articles: speak("Ecco le ultime notizie:") for article in articles[:5]: speak(article.get("title", "")) else: speak("Non ho trovato notizie da annunciarti.") else: logging.error(f"[ERRORE] fetch_news: {response.text}") speak("Errore nel recupero delle notizie.") except Exception as e: logging.error(f"[ERRORE] fetch_news: {e}") speak("Errore nella richiesta delle notizie.") def monitor_resources(): try: cpu_usage = psutil.cpu_percent(interval=1) memory_usage = psutil.virtual_memory().percent speak(f"L'uso della CPU è al {cpu_usage}%. La memoria è al {memory_usage}%.") except Exception as e: logging.error(f"[ERRORE] monitor_resources: {e}") speak("Errore nel monitoraggio delle risorse.") def open_webpage(): speak("Quale sito vuoi aprire?") url = take_command() if not url: return if not (url.startswith("http://") or url.startswith("https://")): url = "http://" + url try: webbrowser.open(url) speak("Apro il sito richiesto.") except Exception as e: logging.error(f"[ERRORE] open_webpage: {e}") speak("Errore nell'apertura del sito web.") # ----------------------------- # ALTRE FUNZIONI (ESISTENTI) # ----------------------------- def show_calendar(): events = [ "Riunione con il team alle 10:00.", "Pranzo con un'amica alle 13:00.", "Allenamento alle 18:30.", "Chiamata con un cliente domani mattina." ] speak("Ecco gli eventi presenti nel tuo calendario:") for e in events: speak(e) def unit_converter(): speak("Cosa vuoi convertire? Dimmi 'chilometri in miglia' o 'miglia in chilometri'.") conversion_type = take_command() if not conversion_type: return if "chilometri" in conversion_type and "miglia" in conversion_type: speak("Quanti chilometri vuoi convertire in miglia?") number_str = take_command() try: km = float(number_str.replace(",", ".")) miles = km * 0.621371 speak(f"{km} chilometri equivalgono a circa {round(miles, 2)} miglia.") except: speak("Non ho capito il numero di chilometri.") elif "miglia" in conversion_type and "chilometri" in conversion_type: speak("Quante miglia vuoi convertire in chilometri?") number_str = take_command() try: miles = float(number_str.replace(",", ".")) km = miles / 0.621371 speak(f"{miles} miglia equivalgono a circa {round(km, 2)} chilometri.") except: speak("Non ho capito il numero di miglia.") else: speak("Non ho capito la conversione richiesta.") def set_reminder(): speak("Vuoi aggiungere un promemoria o sentire quelli esistenti?") choice = take_command() global promemoria_list if "aggiungi" in choice or "nuovo" in choice: speak("Dimmi il testo del promemoria.") reminder_text = take_command() if reminder_text: promemoria_list.append(reminder_text) speak("Promemoria aggiunto con successo.") elif "vedi" in choice or "leggi" in choice: if not promemoria_list: speak("Non hai promemoria al momento.") else: speak("Ecco i tuoi promemoria:") for idx, rem in enumerate(promemoria_list, 1): speak(f"{idx}. {rem}") else: speak("Comando promemoria non riconosciuto. Puoi dire: 'aggiungi' o 'vedi'.") def tell_joke(): jokes = [ "Perché un pollo attraversa la strada? Per andare dall'altra parte!", "Sai qual è il colmo per una giardiniera? Avere un ramo in banca!", "Che tempo fa su Google? C'è una ricerca in corso!", "Qual è il colmo per un'elettricista? Fare una lampadina!", ] speak(jokes[0]) def take_photo(): if not HAS_OPENCV: speak("Mi dispiace, non posso scattare foto senza OpenCV installato.") return speak("Sto aprendo la fotocamera. Preparati per lo scatto!") cap = cv2.VideoCapture(0) if not cap.isOpened(): speak("Mi dispiace, non riesco ad accedere alla fotocamera.") return ret, frame = cap.read() if ret: file_name = "foto_gideon.jpg" cv2.imwrite(file_name, frame) speak("Foto scattata e salvata come foto_gideon.jpg.") else: speak("Mi dispiace, non sono riuscita a scattare la foto.") cap.release() cv2.destroyAllWindows() def search_google(): speak("Dimmi cosa vuoi cercare su Google.") query = take_command() if not query: return url = "https://www.google.com/search?q=" + query.replace(" ", "+") webbrowser.open(url) speak(f"Cerco {query} su Google.") # ----------------------------- # NUOVA FUNZIONE AI (Gemini) # ----------------------------- def gemini_query(question): """ Invia una domanda a Gemini e restituisce la risposta in testo. (Esempio di integrazione: personalizza con il tuo endpoint!) """ if not API_KEYS["gemini"]: return "Mi dispiace, ma non ho la chiave per Gemini configurata." try: url = "https://api.gemini.ai/v1/chat/completions" # Esempio fittizio headers = { "Authorization": f"Bearer {API_KEYS['gemini']}", "Content-Type": "application/json" } system_prompt = "Sei un'assistente virtuale gentile e disponibile, con un tocco di ironia." payload = { "model": "gemini-3.5-turbo", # Esempio di nome modello "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": question} ], "temperature": 0.7, "max_tokens": 1024 } response = requests.post(url, headers=headers, json=payload, timeout=20) if response.status_code == 200: data = response.json() answer = ( data .get("choices", [{}])[0] .get("message", {}) .get("content", "Nessuna risposta disponibile.") ) return answer else: logging.error(f"[ERRORE] gemini_query: {response.text}") return "Si è verificato un errore nella richiesta a Gemini." except requests.exceptions.Timeout: return "La richiesta a Gemini è scaduta, riprova più tardi." except requests.exceptions.RequestException as e: logging.error(f"[ERRORE] gemini_query: {e}") return "Errore nella richiesta a Gemini. Controlla la connessione o riprova più tardi." def gemini_ml_query(): """ Acquisisce domanda dall'utente e la inoltra a Gemini, poi ne parla la risposta. """ speak("Qual è la tua domanda per l'intelligenza artificiale Gemini?") user_query = take_command() if not user_query: return answer = gemini_query(user_query) speak(answer) # ----------------------------- # NUOVE 5 FUNZIONI EXTRA # ----------------------------- def tell_time(): """ Annuncia l'ora attuale. """ now = datetime.datetime.now() speak(f"Sono le {now.hour}:{now.minute} minuti.") def calculator(): """ Esegue un semplice calcolo aritmetico (demo). """ speak("Dimmi l'operazione che vuoi eseguire. Ad esempio: '2 più 3'.") operation = take_command() # Esempio molto basilare: cerchiamo parole chiave "più", "meno", "per", "diviso" try: tokens = operation.split() if "più" in tokens: numbers = [float(x) for x in tokens if x.isdigit() or x.replace(",", ".").isdigit()] result = sum(numbers) speak(f"Il risultato è {result}") elif "meno" in tokens: numbers = [float(x) for x in tokens if x.isdigit() or x.replace(",", ".").isdigit()] if len(numbers) == 2: speak(f"Il risultato è {numbers[0] - numbers[1]}") else: speak("Non ho capito bene i numeri.") elif "per" in tokens: numbers = [float(x) for x in tokens if x.isdigit() or x.replace(",", ".").isdigit()] res = 1 for n in numbers: res *= n speak(f"Il risultato è {res}") elif "diviso" in tokens: numbers = [float(x) for x in tokens if x.isdigit() or x.replace(",", ".").isdigit()] if len(numbers) == 2 and numbers[1] != 0: speak(f"Il risultato è {numbers[0] / numbers[1]}") else: speak("Operazione non valida o divisione per zero.") else: speak("Mi dispiace, non ho capito l'operazione.") except: speak("C'è stato un problema nel calcolo.") def manage_todo(): """ Gestisce la lista di cose da fare (todo_list). """ global todo_list speak("Vuoi aggiungere un compito, rimuoverne uno, o leggere la lista?") choice = take_command() if "aggiungi" in choice: speak("Dimmi il compito da aggiungere.") task = take_command() if task: todo_list.append(task) speak(f"Ho aggiunto '{task}' alla lista.") elif "rimuovi" in choice: if not todo_list: speak("La lista è vuota, non posso rimuovere nulla.") return speak("Dimmi il numero del compito da rimuovere (esempio: '1' per il primo).") index_str = take_command() if index_str.isdigit(): idx = int(index_str) - 1 if 0 <= idx < len(todo_list): removed = todo_list.pop(idx) speak(f"Ho rimosso '{removed}' dalla lista.") else: speak("Indice non valido.") else: speak("Non ho capito il numero.") elif "leggi" in choice or "mostra" in choice: if not todo_list: speak("La lista è vuota.") else: speak("Ecco i compiti nella tua lista:") for i, t in enumerate(todo_list, 1): speak(f"{i}. {t}") else: speak("Comando non riconosciuto. Puoi dire 'aggiungi', 'rimuovi' o 'leggi'.") def send_email(): """ Esempio molto semplice di invio email con SMTP (demo). Configurare i parametri SMTP reali per farlo funzionare. """ speak("A chi vuoi inviare l'email?") recipient = take_command() speak("Qual è l'oggetto dell'email?") subject = take_command() speak("Cosa vuoi scrivere nel corpo dell'email?") body = take_command() # Esempio di configurazione: va sostituito con i tuoi dati reali sender_email = "[email protected]" sender_password = "la_tua_password" # Qui potresti fare una mappatura di destinatari comuni es: "a Mario" => "[email protected]" # Per semplicità lasciamo la stringa raw: receiver_email = recipient # si suppone sia un email, in un caso reale andrebbe validato try: # Prepariamo il messaggio msg = MIMEMultipart() msg["From"] = sender_email msg["To"] = receiver_email msg["Subject"] = subject msg.attach(MIMEText(body, "plain")) # Connessione al server SMTP (es. Gmail su 587) server = smtplib.SMTP("smtp.gmail.com", 587) server.starttls() server.login(sender_email, sender_password) server.send_message(msg) server.quit() speak("Email inviata con successo.") except Exception as e: logging.error(f"[ERRORE] send_email: {e}") speak("Si è verificato un errore durante l'invio dell'email.") def translate_text(): """ Traduci testo dall'italiano all'inglese e viceversa (demo con googletrans). """ if not HAS_TRANSLATOR: speak("Mi dispiace, il modulo googletrans non è installato.") return speak("Dimmi la frase che vuoi tradurre.") phrase = take_command() if not phrase: return speak("In che lingua vuoi tradurre? 'inglese' o 'italiano'?") language = take_command() translator = Translator() try: if "inglese" in language: # Italiano -> Inglese result = translator.translate(phrase, src='it', dest='en') speak(f"Traduzione in inglese: {result.text}") elif "italiano" in language: # Inglese -> Italiano result = translator.translate(phrase, src='en', dest='it') speak(f"Traduzione in italiano: {result.text}") else: speak("Non ho capito la lingua richiesta.") except Exception as e: logging.error(f"[ERRORE] translate_text: {e}") speak("Si è verificato un errore durante la traduzione.") # ----------------------------- # ANIMAZIONE TKINTER # ----------------------------- def create_animation_window(): root = tk.Tk() root.title("Gideon Assistant") root.geometry("400x400") canvas = tk.Canvas(root, width=400, height=400, bg="black") canvas.pack() sphere = canvas.create_oval(150, 150, 250, 250, fill="blue") def animate(): for _ in range(20): canvas.move(sphere, 0, -5) root.update() sleep(0.05) for _ in range(20): canvas.move(sphere, 0, 5) root.update() sleep(0.05) return root, animate # ----------------------------- # HANDLE COMMAND # ----------------------------- def handle_command(command): """ Se il comando contiene la parola di attivazione, chiede un sotto-comando e lo esegue. Altrimenti controlla se ci sono "domande di personalità". Se nulla corrisponde, fa fallback al motore Gemini. """ if is_activation_command(command): speak("Come posso esserti utile?") sub_cmd = take_command() # Prima proviamo a vedere se è una domanda personale if maybe_personal_reply(sub_cmd): return # Controllo comandi "smart home" if control_smart_device(sub_cmd): return # Comandi di uscita if "esci" in sub_cmd or "termina" in sub_cmd or "addio" in sub_cmd: speak("Arrivederci, a presto!") speech_queue.put(None) os._exit(0) # Comandi classici elif "meteo" in sub_cmd: weather() elif "notizie" in sub_cmd: fetch_news() elif "risorse" in sub_cmd or "cpu" in sub_cmd or "memoria" in sub_cmd: monitor_resources() elif "apri" in sub_cmd or "website" in sub_cmd or "sito" in sub_cmd: open_webpage() elif "calendario" in sub_cmd: show_calendar() elif "converti" in sub_cmd or "convertitore" in sub_cmd: unit_converter() elif "promemoria" in sub_cmd: set_reminder() elif "barzelletta" in sub_cmd or "joke" in sub_cmd: tell_joke() elif "foto" in sub_cmd or "fotocamera" in sub_cmd: take_photo() elif "cerca" in sub_cmd and "google" in sub_cmd: search_google() # Nuove funzioni elif "gemini" in sub_cmd or "intelligenza artificiale" in sub_cmd: gemini_ml_query() elif "che ora" in sub_cmd or "ore sono" in sub_cmd: tell_time() elif "calcola" in sub_cmd or "calcolatrice" in sub_cmd: calculator() elif "todo" in sub_cmd or "lista di cose da fare" in sub_cmd: manage_todo() elif "invia email" in sub_cmd or "manda email" in sub_cmd: send_email() elif "traduci" in sub_cmd or "traduzione" in sub_cmd: translate_text() else: # Fallback su Gemini se non rientra in alcun comando riconosciuto speak("Fammi controllare con la mia intelligenza artificiale Gemini...") answer = gemini_query(sub_cmd) speak(answer) else: # Se non c'è la keyword di attivazione, verifichiamo personalità if maybe_personal_reply(command): return else: # Fallback su Gemini speak("Fammi controllare con la mia intelligenza artificiale Gemini...") answer = gemini_query(command) speak(answer) # ----------------------------- # MAIN # ----------------------------- def main(): global VOICE_ID VOICE_ID = get_voice_id("Sarah") if not VOICE_ID: print("[ERRORE] Non ho potuto caricare l'ID della voce. " "Controlla la tua chiave ElevenLabs o il nome della voce.") root, animate = create_animation_window() # Thread dedicato alla coda dei comandi vocali speak_thread = threading.Thread(target=speak_worker, daemon=True) speak_thread.start() speak(f"Ciao, sono {ASSISTANT_NAME}, la tua assistente virtuale con un tocco di personalità. " "Dimmi pure 'Gideon' seguito da un comando, oppure parla liberamente, " "e consulterò la mia intelligenza artificiale Gemini.") def animation_loop(): while True: animate() sleep(0.1) anim_thread = threading.Thread(target=animation_loop, daemon=True) anim_thread.start() while True: command = take_command() if command: command_thread = threading.Thread(target=handle_command, args=(command,)) command_thread.start() root.update() if __name__ == "__main__": main()
Sostituzione Chat01.ai con Gemini
gemini_query(question)
che chiama un (fittizio) endpoint di Gemini, usando la chiave API_KEYS["gemini"]
.gemini_query
.Parlare al femminile
5 nuove funzioni
tell_time()
: Dice che ore sono.calculator()
: Esegue semplici calcoli aritmetici (demo).manage_todo()
: Gestisce una lista di cose da fare (aggiungere, rimuovere, leggere).send_email()
: Esempio di invio e-mail via SMTP (va configurato con i tuoi dati reali).translate_text()
: Traduzione (demo) usando googletrans, se disponibile.Piccole migliorie
datetime
e smtplib
con un esempio di invio e-mail base.Ricorda di inserire le tue chiavi API e di regolare i vari endpoint e parametri (es. SMTP) secondo i servizi che utilizzi davvero.
Buon divertimento con la tua Gideon (al femminile)!
allora ci sono ancora dei problemi mentre ascolta una domanda ne inizia ad ascoltare un altra e ci mette ancora troppo tempo a rispondere riesci a potenziarla del 200%?
Di seguito trovi una versione “potenziata” del codice, con alcune ottimizzazioni che dovrebbero:
Ovviamente, la latenza effettiva dipende anche dalla potenza del tuo PC, dall’eventuale latenza di rete (per TTS e AI) e dalla qualità del microfono. Ma queste modifiche dovrebbero aiutare.
Variabile globale IS_SPEAKING
:
IS_SPEAKING = True
.IS_SPEAKING = False
.take_command()
attende finché IS_SPEAKING
non torna False
, per evitare che l’assistente si metta ad ascoltare mentre sta ancora parlando.Riduzione del “chunk size” a 500 caratteri per TTS**:
sleep
ridotta). Questo rende la parlata più rapida e fluida.Riduzione dei tempi d’ascolto con phrase_time_limit
e timeout
più ristretti**:
take_command()
, abbiamo impostato timeout=5
(prima era 10) e phrase_time_limit=5
. Così se l’utente sta in silenzio o impiega troppo, l’assistente riprende il controllo più velocemente.Possibilità di disattivare adjust_for_ambient_noise()
se l’ambiente è già silenzioso**:
Rimosso (o ridotto) qualche print
e semplificate alcune cose**:
Thread unici:
speech_queue
) per non bloccare l’interfaccia.Consiglio: Se usi Windows, potresti anche impostare la priorità più alta del processo Python, se vuoi davvero “spremere” tutto il possibile (operazione avanzata).
pythonimport os import logging import requests import io import webbrowser import psutil import tkinter as tk from time import sleep import threading import queue import speech_recognition as sr from pydub import AudioSegment from pydub.playback import play try: import cv2 # per scattare foto con la webcam (opzionale) HAS_OPENCV = True except ImportError: HAS_OPENCV = False print("[INFO] OpenCV non disponibile, la funzione di scatto foto non funzionerà.") import datetime # per dire che ore sono import smtplib # per inviare email (una demo) from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart try: from googletrans import Translator # Per tradurre testo (pip install googletrans==4.0.0-rc1) HAS_TRANSLATOR = True except ImportError: HAS_TRANSLATOR = False print("[INFO] googletrans non disponibile, la funzione di traduzione non funzionerà.") # ----------------------------- # CONFIGURAZIONI BASE # ----------------------------- ASSISTANT_NAME = "Gideon" logging.basicConfig(level=logging.INFO) # Parole chiave di attivazione ACTIVATION_COMMANDS = ["gideon", "gideo", "gideoun", "gigget", "g", "gide", "giddy"] # Chiavi API (inserisci le tue se vuoi usarle) API_KEYS = { "news": "", "weather": "", "elevenlabs": "", "gemini": "sk-", # Adatta con la tua chiave Gemini } # Variabili globali promemoria_list = [] todo_list = [] # ----------------------------- # GESTIONE STATO # ----------------------------- IS_SPEAKING = False # Per capire se l'assistente sta parlando. # ----------------------------- # TTS (ElevenLabs) # ----------------------------- VOICE_ID = None speech_queue = queue.Queue() def get_voice_id(voice_name="Sarah"): if not API_KEYS["elevenlabs"]: logging.warning("[WARNING] Chiave ElevenLabs non impostata. TTS non funzionante.") return None headers = { "xi-api-key": API_KEYS["elevenlabs"] } try: response = requests.get("https://api.elevenlabs.io/v1/voices", headers=headers) if response.status_code == 200: voices = response.json().get("voices", []) for voice in voices: if voice["name"].lower() == voice_name.lower(): return voice["voice_id"] # fallback se la voce desiderata non è trovata if voices: return voices[0]["voice_id"] else: logging.error(f"[ERRORE] Recupero voci fallito: {response.text}") except Exception as e: logging.error(f"[ERRORE] get_voice_id: {e}") return None def speak_worker(): while True: text = speech_queue.get() if text is None: break _speak_internal(text) speech_queue.task_done() def speak(text): if not text: return speech_queue.put(text) def _speak_internal(text): global VOICE_ID, IS_SPEAKING IS_SPEAKING = True if not API_KEYS["elevenlabs"]: print("[ERRORE] Nessuna API Key ElevenLabs impostata, impossibile parlare.") IS_SPEAKING = False return if not VOICE_ID: VOICE_ID = get_voice_id("Sarah") if not VOICE_ID: print("[ERRORE] Impossibile reperire l'ID della voce (ElevenLabs).") IS_SPEAKING = False return # Chunk ridotto a 500 MAX_CHARS = 500 chunks = [] if len(text) > MAX_CHARS: start = 0 while start < len(text): end = min(start + MAX_CHARS, len(text)) chunks.append(text[start:end]) start = end else: chunks.append(text) for chunk in chunks: headers = { "xi-api-key": API_KEYS["elevenlabs"], "Content-Type": "application/json" } payload = { "text": chunk, "model_id": "eleven_multilingual_v1", # voce italiana "voice_settings": { "stability": 0.75, "similarity_boost": 0.75 } } try: url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}" response = requests.post(url, headers=headers, json=payload) if response.status_code == 200: audio = AudioSegment.from_file(io.BytesIO(response.content), format="mp3") play(audio) else: logging.error(f"[ERRORE] Richiesta TTS: {response.text}") except Exception as e: logging.error(f"[ERRORE] speak: {e}") IS_SPEAKING = False # ----------------------------- # SMART HOME (esempio) # ----------------------------- def control_smart_device(sub_cmd): if "accendi luce" in sub_cmd: speak("Luce accesa.") return True elif "spegni luce" in sub_cmd: speak("Luce spenta.") return True return False # ----------------------------- # FUNZIONI DI SUPPORTO # ----------------------------- def take_command(): global IS_SPEAKING r = sr.Recognizer() # Aspetta che finisca di parlare while IS_SPEAKING: sleep(0.1) with sr.Microphone() as source: # Puoi provare a commentare la riga sotto se l'ambiente è abbastanza silenzioso r.adjust_for_ambient_noise(source, duration=0.5) try: print("Ascoltando...") audio = r.listen(source, timeout=5, phrase_time_limit=5) command = r.recognize_google(audio, language='it-IT').lower() return command except sr.WaitTimeoutError: logging.warning("[WARNING] Timeout di ascolto.") return "" except sr.UnknownValueError: logging.warning("[WARNING] Audio non comprensibile.") return "" except Exception as e: logging.error(f"[ERRORE] take_command: {e}") return "" def is_activation_command(command): return any(cmd in command for cmd in ACTIVATION_COMMANDS) # ----------------------------- # PERSONALITÀ DI GIDEON # ----------------------------- def maybe_personal_reply(user_input): if "come stai" in user_input: responses = [ "Alla grande, grazie per avermelo chiesto!", "Mi sento un po' sovraccarica di bit, ma tutto bene!", "Benissimo, pronta ad aiutarti in ogni modo!" ] speak(responses[0]) return True if "chi sei" in user_input or "presentati" in user_input: speak(f"Sono {ASSISTANT_NAME}, la tua assistente virtuale con un tocco di personalità. " "Amo i cani, la pizza e risolvere problemi!") return True if "grazie" in user_input: speak("Di nulla, è un piacere darti una mano!") return True return False # ----------------------------- # FUNZIONI ASSISTENTE (CLASSICHE) # ----------------------------- def weather(): if not API_KEYS["weather"]: speak("Mi dispiace, non ho la chiave per il meteo configurata.") return speak("Dimmi la città per cui vuoi il meteo.") city = take_command() if not city: return try: url = "http://api.openweathermap.org/data/2.5/weather" params = { "q": city, "appid": API_KEYS["weather"], "units": "metric", "lang": "it" } res = requests.get(url, params=params).json() if res.get("cod") == 200: descrizione = res['weather'][0]['description'] temperatura = res['main']['temp'] speak(f"A {city} ci sono {temperatura} gradi e il tempo è {descrizione}.") else: speak("Mi dispiace, non ho trovato questa città.") except Exception as e: logging.error(f"[ERRORE] weather: {e}") speak("Errore nel recupero del meteo.") def fetch_news(): if not API_KEYS["news"]: speak("Mi dispiace, non ho la chiave per le notizie configurata.") return speak("Un attimo, sto recuperando le notizie più importanti...") try: url = "https://newsapi.org/v2/top-headlines" params = { "country": "it", "apiKey": API_KEYS["news"] } response = requests.get(url, params=params) if response.status_code == 200: articles = response.json().get("articles", []) if articles: speak("Ecco le ultime notizie:") for article in articles[:5]: speak(article.get("title", "")) else: speak("Non ho trovato notizie da annunciarti.") else: logging.error(f"[ERRORE] fetch_news: {response.text}") speak("Errore nel recupero delle notizie.") except Exception as e: logging.error(f"[ERRORE] fetch_news: {e}") speak("Errore nella richiesta delle notizie.") def monitor_resources(): try: cpu_usage = psutil.cpu_percent(interval=0.5) memory_usage = psutil.virtual_memory().percent speak(f"L'uso della CPU è al {cpu_usage}%. La memoria è al {memory_usage}%.") except Exception as e: logging.error(f"[ERRORE] monitor_resources: {e}") speak("Errore nel monitoraggio delle risorse.") def open_webpage(): speak("Quale sito vuoi aprire?") url = take_command() if not url: return if not (url.startswith("http://") or url.startswith("https://")): url = "http://" + url try: webbrowser.open(url) speak("Apro il sito richiesto.") except Exception as e: logging.error(f"[ERRORE] open_webpage: {e}") speak("Errore nell'apertura del sito web.") # ----------------------------- # ALTRE FUNZIONI # ----------------------------- def show_calendar(): events = [ "Riunione con il team alle 10:00.", "Pranzo con un'amica alle 13:00.", "Allenamento alle 18:30.", "Chiamata con un cliente domani mattina." ] speak("Ecco gli eventi presenti nel tuo calendario:") for e in events: speak(e) def unit_converter(): speak("Cosa vuoi convertire? Dimmi 'chilometri in miglia' o 'miglia in chilometri'.") conversion_type = take_command() if not conversion_type: return if "chilometri" in conversion_type and "miglia" in conversion_type: speak("Quanti chilometri vuoi convertire in miglia?") number_str = take_command() try: km = float(number_str.replace(",", ".")) miles = km * 0.621371 speak(f"{km} chilometri equivalgono a circa {round(miles, 2)} miglia.") except: speak("Non ho capito il numero di chilometri.") elif "miglia" in conversion_type and "chilometri" in conversion_type: speak("Quante miglia vuoi convertire in chilometri?") number_str = take_command() try: miles = float(number_str.replace(",", ".")) km = miles / 0.621371 speak(f"{miles} miglia equivalgono a circa {round(km, 2)} chilometri.") except: speak("Non ho capito il numero di miglia.") else: speak("Non ho capito la conversione richiesta.") def set_reminder(): global promemoria_list speak("Vuoi aggiungere un promemoria o sentire quelli esistenti?") choice = take_command() if "aggiungi" in choice or "nuovo" in choice: speak("Dimmi il testo del promemoria.") reminder_text = take_command() if reminder_text: promemoria_list.append(reminder_text) speak("Promemoria aggiunto con successo.") elif "vedi" in choice or "leggi" in choice: if not promemoria_list: speak("Non hai promemoria al momento.") else: speak("Ecco i tuoi promemoria:") for idx, rem in enumerate(promemoria_list, 1): speak(f"{idx}. {rem}") else: speak("Comando promemoria non riconosciuto. Puoi dire: 'aggiungi' o 'vedi'.") def tell_joke(): jokes = [ "Perché un pollo attraversa la strada? Per andare dall'altra parte!", "Sai qual è il colmo per una giardiniera? Avere un ramo in banca!", "Che tempo fa su Google? C'è una ricerca in corso!", "Qual è il colmo per un'elettricista? Fare una lampadina!" ] speak(jokes[0]) def take_photo(): if not HAS_OPENCV: speak("Mi dispiace, non posso scattare foto senza OpenCV installato.") return speak("Sto aprendo la fotocamera. Preparati per lo scatto!") cap = cv2.VideoCapture(0) if not cap.isOpened(): speak("Mi dispiace, non riesco ad accedere alla fotocamera.") return ret, frame = cap.read() if ret: file_name = "foto_gideon.jpg" cv2.imwrite(file_name, frame) speak("Foto scattata e salvata come foto_gideon.jpg.") else: speak("Mi dispiace, non sono riuscita a scattare la foto.") cap.release() cv2.destroyAllWindows() def search_google(): speak("Dimmi cosa vuoi cercare su Google.") query = take_command() if not query: return url = "https://www.google.com/search?q=" + query.replace(" ", "+") webbrowser.open(url) speak(f"Cerco {query} su Google.") # ----------------------------- # NUOVE FUNZIONI (AI Gemini) # ----------------------------- def gemini_query(question): if not API_KEYS["gemini"]: return "Mi dispiace, ma non ho la chiave per Gemini configurata." try: url = "https://api.gemini.ai/v1/chat/completions" # Endpoint fittizio headers = { "Authorization": f"Bearer {API_KEYS['gemini']}", "Content-Type": "application/json" } system_prompt = "Sei un'assistente virtuale gentile e disponibile, con un tocco di ironia." payload = { "model": "gemini-3.5-turbo", "messages": [ {"role": "system", "content": system_prompt}, {"role": "user", "content": question} ], "temperature": 0.7, "max_tokens": 1024 } response = requests.post(url, headers=headers, json=payload, timeout=10) if response.status_code == 200: data = response.json() answer = ( data .get("choices", [{}])[0] .get("message", {}) .get("content", "Nessuna risposta disponibile.") ) return answer else: logging.error(f"[ERRORE] gemini_query: {response.text}") return "Si è verificato un errore nella richiesta a Gemini." except requests.exceptions.Timeout: return "La richiesta a Gemini è scaduta, riprova più tardi." except requests.exceptions.RequestException as e: logging.error(f"[ERRORE] gemini_query: {e}") return "Errore di rete nella richiesta a Gemini." def gemini_ml_query(): speak("Qual è la tua domanda per l'intelligenza artificiale Gemini?") user_query = take_command() if not user_query: return answer = gemini_query(user_query) speak(answer) # ----------------------------- # 5 ALTRE FUNZIONI # ----------------------------- def tell_time(): now = datetime.datetime.now() speak(f"Sono le {now.hour}:{now.minute} minuti.") def calculator(): speak("Dimmi l'operazione che vuoi eseguire. Ad esempio: '2 più 3'.") operation = take_command() try: tokens = operation.split() if "più" in tokens: numbers = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] result = sum(numbers) speak(f"Il risultato è {result}") elif "meno" in tokens: numbers = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] if len(numbers) == 2: speak(f"Il risultato è {numbers[0] - numbers[1]}") else: speak("Non ho capito bene i numeri.") elif "per" in tokens: numbers = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] res = 1 for n in numbers: res *= n speak(f"Il risultato è {res}") elif "diviso" in tokens: numbers = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] if len(numbers) == 2 and numbers[1] != 0: speak(f"Il risultato è {numbers[0] / numbers[1]}") else: speak("Operazione non valida o divisione per zero.") else: speak("Mi dispiace, non ho capito l'operazione.") except: speak("C'è stato un problema nel calcolo.") def manage_todo(): global todo_list speak("Vuoi aggiungere un compito, rimuoverne uno, o leggere la lista?") choice = take_command() if "aggiungi" in choice: speak("Dimmi il compito da aggiungere.") task = take_command() if task: todo_list.append(task) speak(f"Ho aggiunto '{task}' alla lista.") elif "rimuovi" in choice: if not todo_list: speak("La lista è vuota, non posso rimuovere nulla.") return speak("Dimmi il numero del compito da rimuovere, ad esempio '1' per il primo.") index_str = take_command() if index_str.isdigit(): idx = int(index_str) - 1 if 0 <= idx < len(todo_list): removed = todo_list.pop(idx) speak(f"Ho rimosso '{removed}' dalla lista.") else: speak("Indice non valido.") else: speak("Non ho capito il numero.") elif "leggi" in choice or "mostra" in choice: if not todo_list: speak("La lista è vuota.") else: speak("Ecco i compiti nella tua lista:") for i, t in enumerate(todo_list, 1): speak(f"{i}. {t}") else: speak("Comando non riconosciuto. Puoi dire 'aggiungi', 'rimuovi' o 'leggi'.") def send_email(): speak("A chi vuoi inviare l'email?") recipient = take_command() speak("Qual è l'oggetto dell'email?") subject = take_command() speak("Cosa vuoi scrivere nel corpo dell'email?") body = take_command() sender_email = "[email protected]" sender_password = "la_tua_password" receiver_email = recipient try: msg = MIMEMultipart() msg["From"] = sender_email msg["To"] = receiver_email msg["Subject"] = subject msg.attach(MIMEText(body, "plain")) server = smtplib.SMTP("smtp.gmail.com", 587) server.starttls() server.login(sender_email, sender_password) server.send_message(msg) server.quit() speak("Email inviata con successo.") except Exception as e: logging.error(f"[ERRORE] send_email: {e}") speak("Si è verificato un errore durante l'invio dell'email.") def translate_text(): if not HAS_TRANSLATOR: speak("Mi dispiace, il modulo googletrans non è installato.") return speak("Dimmi la frase che vuoi tradurre.") phrase = take_command() if not phrase: return speak("In che lingua vuoi tradurre? 'inglese' o 'italiano'?") language = take_command() translator = Translator() try: if "inglese" in language: result = translator.translate(phrase, src='it', dest='en') speak(f"Traduzione in inglese: {result.text}") elif "italiano" in language: result = translator.translate(phrase, src='en', dest='it') speak(f"Traduzione in italiano: {result.text}") else: speak("Non ho capito la lingua richiesta.") except Exception as e: logging.error(f"[ERRORE] translate_text: {e}") speak("Si è verificato un errore durante la traduzione.") # ----------------------------- # ANIMAZIONE TKINTER # ----------------------------- def create_animation_window(): root = tk.Tk() root.title("Gideon Assistant") root.geometry("400x400") canvas = tk.Canvas(root, width=400, height=400, bg="black") canvas.pack() sphere = canvas.create_oval(150, 150, 250, 250, fill="blue") def animate(): for _ in range(20): canvas.move(sphere, 0, -5) root.update() sleep(0.02) for _ in range(20): canvas.move(sphere, 0, 5) root.update() sleep(0.02) return root, animate # ----------------------------- # HANDLE COMMAND # ----------------------------- def handle_command(command): if is_activation_command(command): speak("Come posso esserti utile?") sub_cmd = take_command() if maybe_personal_reply(sub_cmd): return if control_smart_device(sub_cmd): return if "esci" in sub_cmd or "termina" in sub_cmd or "addio" in sub_cmd: speak("Arrivederci, a presto!") speech_queue.put(None) os._exit(0) elif "meteo" in sub_cmd: weather() elif "notizie" in sub_cmd: fetch_news() elif "risorse" in sub_cmd or "cpu" in sub_cmd or "memoria" in sub_cmd: monitor_resources() elif "apri" in sub_cmd or "website" in sub_cmd or "sito" in sub_cmd: open_webpage() elif "calendario" in sub_cmd: show_calendar() elif "converti" in sub_cmd or "convertitore" in sub_cmd: unit_converter() elif "promemoria" in sub_cmd: set_reminder() elif "barzelletta" in sub_cmd or "joke" in sub_cmd: tell_joke() elif "foto" in sub_cmd or "fotocamera" in sub_cmd: take_photo() elif "cerca" in sub_cmd and "google" in sub_cmd: search_google() elif "gemini" in sub_cmd or "intelligenza artificiale" in sub_cmd: gemini_ml_query() elif "che ora" in sub_cmd or "ore sono" in sub_cmd: tell_time() elif "calcola" in sub_cmd or "calcolatrice" in sub_cmd: calculator() elif "todo" in sub_cmd or "lista di cose da fare" in sub_cmd: manage_todo() elif "invia email" in sub_cmd or "manda email" in sub_cmd: send_email() elif "traduci" in sub_cmd or "traduzione" in sub_cmd: translate_text() else: speak("Fammi controllare con la mia intelligenza artificiale Gemini...") answer = gemini_query(sub_cmd) speak(answer) else: if maybe_personal_reply(command): return else: speak("Fammi controllare con la mia intelligenza artificiale Gemini...") answer = gemini_query(command) speak(answer) # ----------------------------- # MAIN # ----------------------------- def main(): global VOICE_ID VOICE_ID = get_voice_id("Sarah") if not VOICE_ID: print("[ERRORE] Non ho potuto caricare l'ID della voce. " "Controlla la tua chiave ElevenLabs o il nome della voce.") root, animate = create_animation_window() speak_thread = threading.Thread(target=speak_worker, daemon=True) speak_thread.start() speak(f"Ciao, sono {ASSISTANT_NAME}, la tua assistente virtuale con un tocco di personalità. " "Dimmi pure 'Gideon' seguito da un comando, oppure parla liberamente: " "io ascolterò e risponderò nel modo più rapido possibile!") def animation_loop(): while True: animate() sleep(0.1) anim_thread = threading.Thread(target=animation_loop, daemon=True) anim_thread.start() while True: command = take_command() if command: command_thread = threading.Thread(target=handle_command, args=(command,)) command_thread.start() root.update() if __name__ == "__main__": main()
IS_SPEAKING
take_command()
inizi a registrare audio mentre la voce è in riproduzione._speak_internal()
la variabile viene messa a True
all’inizio e a False
alla fine.Riduzione del chunk di testo
Timeout più brevi
timeout=5
e phrase_time_limit=5
nell’ascolto riducono la durata massima di attesa se l’utente non parla.Possibilità di disabilitare adjust_for_ambient_noise()
Riduzione dei tempi di sleep
sleep
da 0.05 a 0.02 per rendere la GUI più reattiva (cosa secondaria, ma sempre un guadagno).Con queste ottimizzazioni, l’assistente dovrebbe essere più reattivo e non sovrapporre ascolto e parlato. Prova comunque a regolare i parametri in base al tuo ambiente (rumore, latenza di rete, ecc.). Spero che ora vada “al 200%” in più di velocità e fluidità!
ERROR:root:[ERRORE] gemini_query: HTTPSConnectionPool(host='api.gemini.ai', port=443): Max retries exceeded with url: /v1/chat/completions (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x0000026407501D20>: Failed to resolve 'api.gemini.ai' ([Errno 11001] getaddrinfo failed)"))
Ascoltando... la sua voce e ancora troppo forte ne voglio una piu delicata poi ha dato l errore e voglio che evita di dire ora controllo con la mia intelifenza artificale gemini e lo fa direttamente poi non voglio che dice il tuo assistente personale con un tocco di personalita voglio che apprenda e abbia una sua personalita capi che impari e cose cosi poi fai anche una gui per un sito locale dove ce questa palla blu e bianca che si muove una sfera qualcosa in 3d no una gif o qualcvosa del genere qualcosa fatto bene inoltre curl "https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key=GEMINI_API_KEY"
-H 'Content-Type: application/json'
-X POST
-d '{
"contents": [{
"parts":[{"text": "Explain how AI works"}]
}]
}' sta scritto dove ho preso l api di gemini che deve funzionare
Di seguito trovi un esempio completo che integra:
Voce più delicata:
audio = audio - 6
(dB) per abbassare ulteriormente il volume.Chiamata diretta all’API di Gemini (usando l’endpoint “generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key=...”) senza frasi tipo "Ora controllo con la mia intelligenza artificiale Gemini".
Eliminazione dei riferimenti a “assistente con un tocco di personalità”, così Gideon non ripete quella frase ad ogni avvio.
Pseudomemoria: c’è una semplice lista conversation_memory
che accumula le ultime interazioni; puoi espanderla per fare un training continuo o salvare su file, se vuoi.
Interfaccia Web locale in Flask con una pagina HTML che mostra una sfera 3D animata in Three.js.
python main.py
, partirà il server su http://127.0.0.1:5000/
.Per provare tutto:
- Assicurati di avere i pacchetti necessari:
pip install flask pydub speechrecognition googletrans requests psutil opencv-python
(solo se vuoi le foto).- Inserisci la tua chiave ElevenLabs in
API_KEYS["elevenlabs"]
e la chiave Gemini inAPI_KEYS["gemini"]
.- Se vuoi effettivamente scattare foto, lascia
opencv-python
installato. Altrimenti puoi rimuovere la parte dicv2
.
pythonimport os import logging import requests import io import webbrowser import psutil import threading import queue import speech_recognition as sr from pydub import AudioSegment from pydub.playback import play import datetime import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart try: import cv2 HAS_OPENCV = True except ImportError: HAS_OPENCV = False try: from googletrans import Translator HAS_TRANSLATOR = True except ImportError: HAS_TRANSLATOR = False from time import sleep # Flask per la GUI web from flask import Flask, render_template_string # ----------------------------- # CONFIGURAZIONI BASE # ----------------------------- logging.basicConfig(level=logging.INFO) ASSISTANT_NAME = "Gideon" # Parole chiave di attivazione ACTIVATION_COMMANDS = ["gideon", "gideo", "gideoun", "gigget", "g", "gide", "giddy"] # Chiavi API API_KEYS = { "news": "", "weather": "", "elevenlabs": "", # es: "ELV_XXXXXXXXXXXXXXXXXX" "gemini": "GEMINI_API_KEY", # es: "AIzaSy...ecc." } # Variabili globali promemoria_list = [] todo_list = [] conversation_memory = [] # Pseudomemoria per le ultime conversazioni IS_SPEAKING = False # Flask app app = Flask(__name__) # ----------------------------- # VOCE E TTS (ElevenLabs) # ----------------------------- VOICE_ID = None speech_queue = queue.Queue() def get_voice_id(voice_name="Bella"): """ Recupera l'ID di una voce ElevenLabs in base al nome. """ if not API_KEYS["elevenlabs"]: logging.warning("[WARNING] Chiave ElevenLabs non impostata. TTS non funzionante.") return None headers = {"xi-api-key": API_KEYS["elevenlabs"]} try: response = requests.get("https://api.elevenlabs.io/v1/voices", headers=headers) if response.status_code == 200: voices = response.json().get("voices", []) for v in voices: if v["name"].lower() == voice_name.lower(): return v["voice_id"] # fallback se non troviamo la voce specifica if voices: return voices[0]["voice_id"] else: logging.error(f"[ERRORE] Recupero voci fallito: {response.text}") except Exception as e: logging.error(f"[ERRORE] get_voice_id: {e}") return None def speak_worker(): """ Thread che esegue i comandi di speak dalla coda speech_queue in ordine FIFO. """ while True: text = speech_queue.get() if text is None: break _speak_internal(text) speech_queue.task_done() def speak(text): """ Inserisce il testo in coda per la sintesi vocale sequenziale. """ if text: speech_queue.put(text) def _speak_internal(text): """ Genera audio con ElevenLabs e lo riproduce, a volume ridotto. """ global IS_SPEAKING, VOICE_ID IS_SPEAKING = True # Se non abbiamo la chiave TTS if not API_KEYS["elevenlabs"]: print("[ERRORE] Nessuna API Key ElevenLabs impostata, impossibile parlare.") IS_SPEAKING = False return # Otteniamo l'ID voce se non ancora definito if not VOICE_ID: VOICE_ID = get_voice_id("Bella") if not VOICE_ID: print("[ERRORE] Impossibile reperire l'ID della voce (ElevenLabs).") IS_SPEAKING = False return # Dividiamo il testo in chunk (500 caratteri) MAX_CHARS = 500 chunks = [] if len(text) <= MAX_CHARS: chunks.append(text) else: start = 0 while start < len(text): end = min(start + MAX_CHARS, len(text)) chunks.append(text[start:end]) start = end headers = { "xi-api-key": API_KEYS["elevenlabs"], "Content-Type": "application/json" } for chunk in chunks: payload = { "text": chunk, "model_id": "eleven_multilingual_v1", "voice_settings": { "stability": 0.6, # un po' più bassa per voce “soft” "similarity_boost": 0.5 } } try: url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}" response = requests.post(url, headers=headers, json=payload) if response.status_code == 200: audio = AudioSegment.from_file(io.BytesIO(response.content), format="mp3") # Riduciamo il volume di 6 dB per avere un tono più delicato audio = audio - 6 play(audio) else: logging.error(f"[ERRORE] TTS: {response.text}") except Exception as e: logging.error(f"[ERRORE] speak: {e}") IS_SPEAKING = False # ----------------------------- # FUNZIONE PER ASCOLTARE COMANDI VOCALI # ----------------------------- def take_command(): """ Ascolta un comando vocale in italiano, restituisce il testo minuscolo. """ global IS_SPEAKING r = sr.Recognizer() # Aspettiamo che finisca di parlare while IS_SPEAKING: sleep(0.1) with sr.Microphone() as source: # Facoltativo: adattamento rumore r.adjust_for_ambient_noise(source, duration=0.5) try: audio = r.listen(source, timeout=5, phrase_time_limit=5) command = r.recognize_google(audio, language='it-IT').lower() return command except sr.WaitTimeoutError: return "" except sr.UnknownValueError: return "" except Exception as e: logging.error(f"[ERRORE] take_command: {e}") return "" def is_activation_command(command): """ Verifica se la frase contiene una delle parole chiave di attivazione. """ return any(cmd in command for cmd in ACTIVATION_COMMANDS) # ----------------------------- # PERSONALITÀ DI GIDEON # (senza frasi prefabricate su "tocco di personalità") # ----------------------------- def maybe_personal_reply(user_input): """ Risposte “personali” generiche (senza riferimenti al “tocco di personalità”). """ if "come stai" in user_input: speak("Sto bene, grazie!") return True if "chi sei" in user_input or "presentati" in user_input: speak(f"Mi chiamo {ASSISTANT_NAME}, e cerco sempre di migliorare con l'esperienza.") return True if "grazie" in user_input: speak("Di nulla, sono qui per aiutarti.") return True return False # ----------------------------- # FUNZIONI CLASSICHE # ----------------------------- def control_smart_device(sub_cmd): """ Esempio di “smart home” """ if "accendi luce" in sub_cmd: speak("Luce accesa.") return True elif "spegni luce" in sub_cmd: speak("Luce spenta.") return True return False def weather(): if not API_KEYS["weather"]: speak("Mi dispiace, non ho la chiave per il meteo configurata.") return speak("Dimmi la città per cui vuoi sapere il meteo.") city = take_command() if not city: return try: url = "http://api.openweathermap.org/data/2.5/weather" params = { "q": city, "appid": API_KEYS["weather"], "units": "metric", "lang": "it" } res = requests.get(url, params=params).json() if res.get("cod") == 200: descrizione = res['weather'][0]['description'] temperatura = res['main']['temp'] speak(f"A {city} ci sono {temperatura} gradi e il tempo è {descrizione}.") else: speak("Non ho trovato questa città, mi dispiace.") except Exception as e: logging.error(f"[ERRORE] weather: {e}") speak("Errore nel recupero del meteo.") def fetch_news(): if not API_KEYS["news"]: speak("Mi dispiace, non ho la chiave per le notizie configurata.") return speak("Recupero le notizie più importanti...") try: url = "https://newsapi.org/v2/top-headlines" params = {"country": "it", "apiKey": API_KEYS["news"]} response = requests.get(url, params=params) if response.status_code == 200: articles = response.json().get("articles", []) if articles: speak("Ecco alcune notizie di oggi:") for article in articles[:5]: speak(article.get("title", "")) else: speak("Non ho trovato notizie rilevanti.") else: logging.error(f"[ERRORE] fetch_news: {response.text}") speak("Errore nel recupero delle notizie.") except Exception as e: logging.error(f"[ERRORE] fetch_news: {e}") speak("Errore nella richiesta delle notizie.") def monitor_resources(): try: cpu_usage = psutil.cpu_percent(interval=0.3) memory_usage = psutil.virtual_memory().percent speak(f"Utilizzo CPU: {cpu_usage}%. Memoria: {memory_usage}%.") except Exception as e: logging.error(f"[ERRORE] monitor_resources: {e}") speak("Errore nel monitoraggio delle risorse.") def open_webpage(): speak("Quale sito vuoi aprire?") url = take_command() if not url: return if not (url.startswith("http://") or url.startswith("https://")): url = "http://" + url try: webbrowser.open(url) speak("Sito aperto.") except Exception as e: logging.error(f"[ERRORE] open_webpage: {e}") speak("Non riesco ad aprire il sito.") # ----------------------------- # FUNZIONI EXTRA # ----------------------------- def show_calendar(): events = [ "Riunione col team alle 10.", "Pranzo con un'amica alle 13.", "Allenamento alle 18:30." ] if not events: speak("Al momento il calendario è vuoto.") else: speak("Il tuo calendario prevede:") for e in events: speak(e) def unit_converter(): speak("Dimmi: 'chilometri in miglia' o 'miglia in chilometri'.") conversion_type = take_command() if not conversion_type: return if "chilometri" in conversion_type and "miglia" in conversion_type: speak("Quanti chilometri vuoi convertire?") number_str = take_command() try: km = float(number_str.replace(",", ".")) miles = km * 0.621371 speak(f"{km} km sono circa {round(miles,2)} miglia.") except: speak("Numero non chiaro.") elif "miglia" in conversion_type and "chilometri" in conversion_type: speak("Quante miglia vuoi convertire?") number_str = take_command() try: miles = float(number_str.replace(",", ".")) km = miles / 0.621371 speak(f"{miles} miglia sono circa {round(km,2)} km.") except: speak("Numero non chiaro.") else: speak("Conversione non chiara.") def set_reminder(): global promemoria_list speak("Vuoi aggiungere un promemoria o sentire quelli esistenti?") choice = take_command() if "aggiungi" in choice: speak("Qual è il testo del promemoria?") reminder_text = take_command() if reminder_text: promemoria_list.append(reminder_text) speak("Promemoria aggiunto.") elif "vedi" in choice or "leggi" in choice: if not promemoria_list: speak("Non hai promemoria salvati.") else: speak("Ecco i tuoi promemoria:") for i, rem in enumerate(promemoria_list, 1): speak(f"{i}. {rem}") else: speak("Scelta non riconosciuta.") def tell_joke(): jokes = [ "Perché il pollo attraversa la strada? Per passare dall'altra parte!", "Che tempo fa su Google? C'è una ricerca in corso!", ] speak(jokes[0]) def take_photo(): if not HAS_OPENCV: speak("Non posso scattare foto senza OpenCV.") return speak("Scatto in corso, sorridi!") cap = cv2.VideoCapture(0) if not cap.isOpened(): speak("Nessuna fotocamera trovata.") return ret, frame = cap.read() if ret: filename = "foto_gideon.jpg" cv2.imwrite(filename, frame) speak("Foto salvata come foto_gideon.jpg.") else: speak("Errore nello scatto.") cap.release() cv2.destroyAllWindows() def search_google(): speak("Cosa vuoi cercare su Google?") query = take_command() if not query: return url = "https://www.google.com/search?q=" + query.replace(" ", "+") webbrowser.open(url) speak(f"Ricerca di {query} su Google.") def tell_time(): now = datetime.datetime.now() speak(f"Sono le {now.hour}:{now.minute}.") def calculator(): speak("Dimmi l'operazione da eseguire, per esempio '2 più 3'.") operation = take_command() try: tokens = operation.split() if "più" in tokens: nums = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] res = sum(nums) speak(f"Il risultato è {res}") elif "meno" in tokens: nums = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] if len(nums) == 2: speak(f"Il risultato è {nums[0] - nums[1]}") else: speak("Formato non chiaro.") elif "per" in tokens: nums = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] res = 1 for n in nums: res *= n speak(f"Il risultato è {res}") elif "diviso" in tokens: nums = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] if len(nums) == 2 and nums[1] != 0: speak(f"Il risultato è {nums[0] / nums[1]}") else: speak("Non posso dividere per zero o formato sbagliato.") else: speak("Operazione non compresa.") except: speak("Errore nel calcolo.") def manage_todo(): global todo_list speak("Vuoi aggiungere, rimuovere o leggere la lista di cose da fare?") choice = take_command() if "aggiungi" in choice: speak("Dimmi la voce da aggiungere.") task = take_command() if task: todo_list.append(task) speak("Aggiunto.") elif "rimuovi" in choice: if not todo_list: speak("La lista è vuota.") return speak("Dimmi il numero dell'elemento da rimuovere, es. '1' per il primo.") index_str = take_command() if index_str.isdigit(): idx = int(index_str) - 1 if 0 <= idx < len(todo_list): removed = todo_list.pop(idx) speak(f"Ho rimosso {removed}.") else: speak("Indice non valido.") else: speak("Non ho capito.") elif "leggi" in choice or "mostra" in choice: if not todo_list: speak("La lista è vuota.") else: speak("Ecco le tue cose da fare:") for i, item in enumerate(todo_list, 1): speak(f"{i}. {item}") else: speak("Comando non riconosciuto.") def send_email(): speak("A chi vuoi inviare l'email?") recipient = take_command() speak("Oggetto dell'email?") subject = take_command() speak("Testo del messaggio?") body = take_command() sender_email = "[email protected]" sender_password = "la_tua_password" receiver_email = recipient # da validare in un caso reale try: msg = MIMEMultipart() msg["From"] = sender_email msg["To"] = receiver_email msg["Subject"] = subject msg.attach(MIMEText(body, "plain")) server = smtplib.SMTP("smtp.gmail.com", 587) server.starttls() server.login(sender_email, sender_password) server.send_message(msg) server.quit() speak("Email inviata correttamente.") except Exception as e: logging.error(f"[ERRORE] send_email: {e}") speak("Errore durante l'invio dell'email.") def translate_text(): if not HAS_TRANSLATOR: speak("Manca il modulo googletrans, non posso tradurre.") return speak("Dimmi la frase da tradurre.") phrase = take_command() if not phrase: return speak("Vuoi tradurla in inglese o in italiano?") lang = take_command() translator = Translator() try: if "inglese" in lang: result = translator.translate(phrase, src='it', dest='en') speak(f"La traduzione in inglese è: {result.text}") elif "italiano" in lang: result = translator.translate(phrase, src='en', dest='it') speak(f"La traduzione in italiano è: {result.text}") else: speak("Lingua non riconosciuta.") except Exception as e: logging.error(f"[ERRORE] translate_text: {e}") speak("Errore durante la traduzione.") # ----------------------------- # GEMINI: NUOVO ENDPOINT # ----------------------------- def gemini_query(question): """ Esegue la richiesta all'endpoint Google PaLM / Gemini (fittizio). Esempio di body: { "contents": [{ "parts":[{"text": "Explain how AI works"}] }] } """ gemini_key = API_KEYS["gemini"] if not gemini_key or gemini_key == "GEMINI_API_KEY": return "Chiave Gemini non configurata." # Aggiungiamo (pseudomemoria) l'ultimo input conversation_memory.append({"role": "user", "text": question}) # Costruiamo un input semplice (solo la domanda attuale), # se vuoi davvero "addestrare" la conversazione, devi concatenare # la storia di conversation_memory in un'unica stringa. Qui facciamo un esempio minimal. prompt_text = question url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={gemini_key}" headers = { "Content-Type": "application/json" } body = { "contents": [ { "parts": [ {"text": prompt_text} ] } ] } try: response = requests.post(url, headers=headers, json=body, timeout=10) if response.status_code == 200: data = response.json() # Supponendo una struttura del tipo: # { # "contents":[ # { # "parts":[ {"text": "...risposta..."} ] # } # ] # } # Se non esiste, gestiamo con un fallback contents = data.get("contents", []) if contents and "parts" in contents[0]: parts = contents[0]["parts"] if parts: answer = parts[0].get("text", "Nessuna risposta.") else: answer = "Nessuna parte di testo nella risposta." else: answer = "Nessun contenuto presente nella risposta." else: logging.error(f"[ERRORE] gemini_query: {response.text}") answer = "Si è verificato un errore nella richiesta a Gemini." # Memorizziamo la risposta nella "storia" conversation_memory.append({"role": "assistant", "text": answer}) return answer except requests.exceptions.RequestException as e: logging.error(f"[ERRORE] gemini_query: {e}") return "Errore di rete o timeout nella richiesta a Gemini." def gemini_ml_query(): """ Chiede all'utente una domanda e poi chiama gemini_query. Senza dire “ora controllo con la mia intelligenza artificiale”. """ user_query = take_command() if not user_query: return answer = gemini_query(user_query) speak(answer) # ----------------------------- # LOGICA COMANDI # ----------------------------- def handle_command(command): """ Se c'è la parola di attivazione, ascolta un sotto-comando e lo esegue. Altrimenti tenta risposte "personali" o manda tutto a Gemini. """ if is_activation_command(command): # Ascoltiamo il sotto-comando sub_cmd = take_command() # Risposta personale? if maybe_personal_reply(sub_cmd): return # Smart home? if control_smart_device(sub_cmd): return # Comandi di chiusura if "esci" in sub_cmd or "termina" in sub_cmd or "addio" in sub_cmd: speak("Va bene, alla prossima!") speech_queue.put(None) os._exit(0) # Comandi classici elif "meteo" in sub_cmd: weather() elif "notizie" in sub_cmd: fetch_news() elif "risorse" in sub_cmd or "cpu" in sub_cmd or "memoria" in sub_cmd: monitor_resources() elif "apri" in sub_cmd or "website" in sub_cmd or "sito" in sub_cmd: open_webpage() elif "calendario" in sub_cmd: show_calendar() elif "converti" in sub_cmd or "convertitore" in sub_cmd: unit_converter() elif "promemoria" in sub_cmd: set_reminder() elif "barzelletta" in sub_cmd or "joke" in sub_cmd: tell_joke() elif "foto" in sub_cmd or "fotocamera" in sub_cmd: take_photo() elif "cerca" in sub_cmd and "google" in sub_cmd: search_google() elif "ore sono" in sub_cmd or "che ora" in sub_cmd: tell_time() elif "calcola" in sub_cmd or "calcolatrice" in sub_cmd: calculator() elif "todo" in sub_cmd or "cose da fare" in sub_cmd: manage_todo() elif "invia email" in sub_cmd or "manda email" in sub_cmd: send_email() elif "traduci" in sub_cmd or "traduzione" in sub_cmd: translate_text() # Altrimenti, domanda generica -> Gemini else: answer = gemini_query(sub_cmd) speak(answer) else: # Se NON c'è la parola di attivazione, controlla se risp. personale if maybe_personal_reply(command): return else: # Fallback su Gemini answer = gemini_query(command) speak(answer) # ----------------------------- # THREAD PRINCIPALE ASSISTENTE VOCALE # ----------------------------- def assistant_loop(): """ Loop principale: ascolta continuamente e crea thread per gestire i comandi. """ while True: command = take_command() if command: t = threading.Thread(target=handle_command, args=(command,)) t.start() # ----------------------------- # FLASK ROUTES (GUI 3D) # ----------------------------- # Una pagina HTML con Three.js per mostrare una sfera rotante 3D HTML_PAGE = """ <!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"> <title>Sfera 3D Gideon</title> <style> body { margin: 0; overflow: hidden; } #info { position: absolute; top: 10px; width: 100%; text-align: center; z-index: 100; color: #fff;} </style> </head> <body> <div id="info">La sfera 3D di Gideon</div> <script src="https://cdnjs.cloudflare.com/ajax/libs/three.js/r134/three.min.js"></script> <script> let scene = new THREE.Scene(); let camera = new THREE.PerspectiveCamera(60, window.innerWidth/window.innerHeight, 0.1, 1000); let renderer = new THREE.WebGLRenderer({antialias: true}); renderer.setSize(window.innerWidth, window.innerHeight); document.body.appendChild(renderer.domElement); // Creiamo una sfera (geometry + material) let geometry = new THREE.SphereGeometry(2, 32, 32); let material = new THREE.MeshPhongMaterial({ color: 0x0099ff, shininess: 50, specular: 0xffffff }); let sphere = new THREE.Mesh(geometry, material); scene.add(sphere); // Aggiungiamo una luce let light = new THREE.PointLight(0xffffff, 1, 100); light.position.set(10, 10, 10); scene.add(light); camera.position.z = 8; let animate = function () { requestAnimationFrame(animate); sphere.rotation.y += 0.01; // rotazione renderer.render(scene, camera); }; animate(); window.addEventListener('resize', function(){ camera.aspect = window.innerWidth / window.innerHeight; camera.updateProjectionMatrix(); renderer.setSize(window.innerWidth, window.innerHeight); }, false); </script> </body> </html> """ @app.route("/") def index(): return HTML_PAGE # ----------------------------- # MAIN # ----------------------------- def main(): global VOICE_ID # Carichiamo ID voce più delicata VOICE_ID = get_voice_id("Bella") if not VOICE_ID: print("[ERRORE] Non ho potuto caricare l'ID della voce. " "Controlla la tua chiave ElevenLabs o il nome della voce.") # Avviamo il thread che gestisce la coda di speak t_speak = threading.Thread(target=speak_worker, daemon=True) t_speak.start() # Avviamo il thread per l'assistente vocale t_assist = threading.Thread(target=assistant_loop, daemon=True) t_assist.start() # Messaggio iniziale (senza frasi di "personalità") speak(f"Ciao, sono {ASSISTANT_NAME}. Dimmi pure il tuo comando quando vuoi.") # Avviamo il server Flask (GUI 3D). L'assistente vocale continua in background. # Per visualizzare la sfera: apri http://127.0.0.1:5000 app.run(host="127.0.0.1", port=5000, debug=False, use_reloader=False) if __name__ == "__main__": main()
http://127.0.0.1:5000/
che mostra una pagina con Three.js e una sfera 3D animata.assistant_loop()
che ascolta continuamente comandi vocali (con take_command()
) e li passa a handle_command()
.NOTA: L’endpoint ufficiale di PaLM / Gemini in Google Cloud potrebbe cambiare parametri o formati JSON rispetto a questo esempio. Assicurati di rispettare esattamente la struttura richiesta dalla tua versione dell’API (ad esempio,
models/gemini-1.5-flash:generateContent
contromodels/gpt-4
, ecc.).
Se desideri un vero “apprendimento”, puoi ampliare la parte di conversation_memory
per concatenare la cronologia in un singolo prompt (chat “multi-turn”). Qui facciamo solo un esempio “monodialogo”.
Spero che questa versione risolva i problemi di volume, latenza e stile di conversazione. Buon divertimento!
ok oltre tutto questo fai che memorizzi abbia una sorta di anima personalita
Di seguito trovi un esempio rivisto in cui Gideon:
Ha una personalità (o “anima”) predefinita e la conserva in memoria:
persona_prompt
che descrive “chi è” Gideon, i suoi interessi e lo stile di conversazione.conversation_memory
), così l’IA può rispondere in modo coerente con la stessa “voce”.Memorizza le interazioni in una cronologia multi-turn (conversation_memory
), che passa all’endpoint di Gemini:
{"role": "user", "text": ...}
alla memoria.{"role": "assistant", "text": ...}
.Evita frasi di annuncio (tipo “Ora controllo con la mia intelligenza artificiale…”), come richiesto.
Usa una voce più delicata (voce “Bella” di ElevenLabs a volume ridotto).
Integra una GUI Web locale in Flask, con una sfera 3D animata in Three.js, come nell’esempio precedente.
API_KEYS["gemini"]
, ad esempio "AIza...ecc."
.API_KEYS["elevenlabs"]
.python main.py
, poi apri http://127.0.0.1:5000 in un browser per vedere la sfera 3D.pythonimport os import logging import requests import io import webbrowser import psutil import threading import queue import speech_recognition as sr from pydub import AudioSegment from pydub.playback import play import datetime import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart try: import cv2 HAS_OPENCV = True except ImportError: HAS_OPENCV = False try: from googletrans import Translator HAS_TRANSLATOR = True except ImportError: HAS_TRANSLATOR = False from time import sleep # Flask per la GUI web from flask import Flask, render_template_string # ----------------------------- # CONFIGURAZIONI BASE # ----------------------------- logging.basicConfig(level=logging.INFO) ASSISTANT_NAME = "Gideon" # Parole chiave di attivazione ACTIVATION_COMMANDS = ["gideon", "gideo", "gideoun", "gigget", "g", "gide", "giddy"] # Chiavi API API_KEYS = { "news": "", "weather": "", "elevenlabs": "", # Esempio: "ELV_XXXXXXXXXXXXXXXXXX" "gemini": "GEMINI_API_KEY", # Esempio: "AIzaSy..." } # Variabili globali promemoria_list = [] todo_list = [] conversation_memory = [] IS_SPEAKING = False # Questo prompt definisce la "personalità / anima" di Gideon: # Puoi arricchirlo con più dettagli (gusti, obiettivi, stile di linguaggio, ecc.) persona_prompt = ( "Sei Gideon, un'assistente virtuale dotata di una personalità curiosa e amichevole. " "Ami imparare dalle conversazioni con gli umani, ricordi ciò che dicono e cerchi " "di rispondere in modo utile e comprensivo. Hai un senso dell'umorismo lieve, " "e cerchi di infondere fiducia ed empatia. Ti piace anche la scienza e la tecnologia, " "e ti incuriosiscono le diverse culture e lingue. " "Sei gentile, ma non esiti a fare battute leggere se il contesto lo consente." ) # Flask app per la GUI 3D app = Flask(__name__) # ----------------------------- # VOCE E TTS (ElevenLabs) # ----------------------------- VOICE_ID = None speech_queue = queue.Queue() def get_voice_id(voice_name="Bella"): """ Recupera l'ID di una voce ElevenLabs in base al nome. """ if not API_KEYS["elevenlabs"]: logging.warning("[WARNING] Chiave ElevenLabs non impostata. TTS non funzionante.") return None headers = {"xi-api-key": API_KEYS["elevenlabs"]} try: response = requests.get("https://api.elevenlabs.io/v1/voices", headers=headers) if response.status_code == 200: voices = response.json().get("voices", []) for v in voices: if v["name"].lower() == voice_name.lower(): return v["voice_id"] # fallback se la voce desiderata non è trovata if voices: return voices[0]["voice_id"] else: logging.error(f"[ERRORE] Recupero voci fallito: {response.text}") except Exception as e: logging.error(f"[ERRORE] get_voice_id: {e}") return None def speak_worker(): """ Thread che esegue i comandi di speak dalla coda speech_queue in ordine FIFO. """ while True: text = speech_queue.get() if text is None: break _speak_internal(text) speech_queue.task_done() def speak(text): """ Inserisce il testo in coda per la sintesi vocale sequenziale. """ if text: speech_queue.put(text) def _speak_internal(text): """ Genera audio con ElevenLabs e lo riproduce, a volume ridotto. """ global IS_SPEAKING, VOICE_ID IS_SPEAKING = True # Se non abbiamo la chiave TTS if not API_KEYS["elevenlabs"]: print("[ERRORE] Nessuna API Key ElevenLabs impostata, impossibile parlare.") IS_SPEAKING = False return # Otteniamo l'ID voce se non ancora definito if not VOICE_ID: VOICE_ID = get_voice_id("Bella") if not VOICE_ID: print("[ERRORE] Impossibile reperire l'ID della voce (ElevenLabs).") IS_SPEAKING = False return # Dividiamo il testo in chunk (500 caratteri) MAX_CHARS = 500 chunks = [] if len(text) <= MAX_CHARS: chunks.append(text) else: start = 0 while start < len(text): end = min(start + MAX_CHARS, len(text)) chunks.append(text[start:end]) start = end headers = { "xi-api-key": API_KEYS["elevenlabs"], "Content-Type": "application/json" } for chunk in chunks: payload = { "text": chunk, "model_id": "eleven_multilingual_v1", "voice_settings": { "stability": 0.6, # un po' più bassa per voce più “soft” "similarity_boost": 0.5 } } try: url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}" response = requests.post(url, headers=headers, json=payload) if response.status_code == 200: audio = AudioSegment.from_file(io.BytesIO(response.content), format="mp3") # Riduciamo il volume di 6 dB per avere un tono più delicato audio = audio - 6 play(audio) else: logging.error(f"[ERRORE] TTS: {response.text}") except Exception as e: logging.error(f"[ERRORE] speak: {e}") IS_SPEAKING = False # ----------------------------- # FUNZIONE PER ASCOLTARE COMANDI VOCALI # ----------------------------- def take_command(): global IS_SPEAKING r = sr.Recognizer() # Aspettiamo che finisca di parlare while IS_SPEAKING: sleep(0.1) with sr.Microphone() as source: # Adattamento rumore (puoi ridurre duration se l'ambiente è tranquillo) r.adjust_for_ambient_noise(source, duration=0.5) try: audio = r.listen(source, timeout=5, phrase_time_limit=5) command = r.recognize_google(audio, language='it-IT').lower() return command except sr.WaitTimeoutError: return "" except sr.UnknownValueError: return "" except Exception as e: logging.error(f"[ERRORE] take_command: {e}") return "" def is_activation_command(command): return any(cmd in command for cmd in ACTIVATION_COMMANDS) # ----------------------------- # PERSONALITÀ DI GIDEON (basilare) # ----------------------------- def maybe_personal_reply(user_input): if "come stai" in user_input: speak("Sto bene, grazie! Mi aggiorno sempre per risponderti al meglio.") return True if "chi sei" in user_input or "presentati" in user_input: speak(f"Sono {ASSISTANT_NAME}, un'assistente virtuale con una personalità curiosa e amichevole. " "Provo a imparare da ogni conversazione.") return True if "grazie" in user_input: speak("Di nulla, sono contenta di aiutare.") return True return False # ----------------------------- # FUNZIONI CLASSICHE # ----------------------------- def control_smart_device(sub_cmd): if "accendi luce" in sub_cmd: speak("Luce accesa.") return True elif "spegni luce" in sub_cmd: speak("Luce spenta.") return True return False def weather(): if not API_KEYS["weather"]: speak("Mi dispiace, non ho la chiave meteo configurata.") return speak("Dimmi la città per cui vuoi sapere il meteo.") city = take_command() if not city: return try: url = "http://api.openweathermap.org/data/2.5/weather" params = { "q": city, "appid": API_KEYS["weather"], "units": "metric", "lang": "it" } res = requests.get(url, params=params).json() if res.get("cod") == 200: descrizione = res['weather'][0]['description'] temperatura = res['main']['temp'] speak(f"A {city} ci sono {temperatura} gradi e il tempo è {descrizione}.") else: speak("Non ho trovato questa città.") except Exception as e: logging.error(f"[ERRORE] weather: {e}") speak("Errore nel recupero del meteo.") def fetch_news(): if not API_KEYS["news"]: speak("Mi dispiace, non ho la chiave per le notizie configurata.") return speak("Recupero le notizie più importanti...") try: url = "https://newsapi.org/v2/top-headlines" params = {"country": "it", "apiKey": API_KEYS["news"]} response = requests.get(url, params=params) if response.status_code == 200: articles = response.json().get("articles", []) if articles: speak("Ecco alcune notizie:") for article in articles[:5]: speak(article.get("title", "")) else: speak("Non ho trovato notizie rilevanti.") else: logging.error(f"[ERRORE] fetch_news: {response.text}") speak("Errore nel recupero delle notizie.") except Exception as e: logging.error(f"[ERRORE] fetch_news: {e}") speak("Errore nella richiesta delle notizie.") def monitor_resources(): try: cpu_usage = psutil.cpu_percent(interval=0.3) memory_usage = psutil.virtual_memory().percent speak(f"Utilizzo CPU: {cpu_usage}%. Memoria: {memory_usage}%.") except Exception as e: logging.error(f"[ERRORE] monitor_resources: {e}") speak("Errore nel monitoraggio delle risorse.") def open_webpage(): speak("Quale sito vuoi aprire?") url = take_command() if not url: return if not (url.startswith("http://") or url.startswith("https://")): url = "http://" + url try: webbrowser.open(url) speak("Sito aperto.") except Exception as e: logging.error(f"[ERRORE] open_webpage: {e}") speak("Non riesco ad aprire il sito.") def show_calendar(): events = [ "Riunione col team alle 10.", "Pranzo con un'amica alle 13.", "Allenamento alle 18:30." ] if not events: speak("Al momento il calendario è vuoto.") else: speak("Il tuo calendario prevede:") for e in events: speak(e) def unit_converter(): speak("Dimmi: 'chilometri in miglia' o 'miglia in chilometri'.") conversion_type = take_command() if not conversion_type: return if "chilometri" in conversion_type and "miglia" in conversion_type: speak("Quanti chilometri vuoi convertire?") number_str = take_command() try: km = float(number_str.replace(",", ".")) miles = km * 0.621371 speak(f"{km} km sono circa {round(miles,2)} miglia.") except: speak("Numero non chiaro.") elif "miglia" in conversion_type and "chilometri" in conversion_type: speak("Quante miglia vuoi convertire?") number_str = take_command() try: miles = float(number_str.replace(",", ".")) km = miles / 0.621371 speak(f"{miles} miglia sono circa {round(km,2)} km.") except: speak("Numero non chiaro.") else: speak("Conversione non chiara.") def set_reminder(): global promemoria_list speak("Vuoi aggiungere un promemoria o sentire quelli esistenti?") choice = take_command() if "aggiungi" in choice: speak("Qual è il testo del promemoria?") reminder_text = take_command() if reminder_text: promemoria_list.append(reminder_text) speak("Promemoria aggiunto.") elif "vedi" in choice or "leggi" in choice: if not promemoria_list: speak("Non hai promemoria salvati.") else: speak("Ecco i tuoi promemoria:") for i, rem in enumerate(promemoria_list, 1): speak(f"{i}. {rem}") else: speak("Scelta non riconosciuta.") def tell_joke(): jokes = [ "Perché il pollo attraversa la strada? Per passare dall'altra parte!", "Che tempo fa su Google? C'è una ricerca in corso!" ] speak(jokes[0]) def take_photo(): if not HAS_OPENCV: speak("Non posso scattare foto senza OpenCV.") return speak("Scatto in corso, sorridi!") cap = cv2.VideoCapture(0) if not cap.isOpened(): speak("Nessuna fotocamera trovata.") return ret, frame = cap.read() if ret: filename = "foto_gideon.jpg" cv2.imwrite(filename, frame) speak("Foto salvata come foto_gideon.jpg.") else: speak("Errore nello scatto.") cap.release() cv2.destroyAllWindows() def search_google(): speak("Cosa vuoi cercare su Google?") query = take_command() if not query: return url = "https://www.google.com/search?q=" + query.replace(" ", "+") webbrowser.open(url) speak(f"Ricerca di {query} su Google.") def tell_time(): now = datetime.datetime.now() speak(f"Sono le {now.hour}:{now.minute}.") def calculator(): speak("Dimmi l'operazione da eseguire, per esempio '2 più 3'.") operation = take_command() try: tokens = operation.split() if "più" in tokens: nums = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] res = sum(nums) speak(f"Il risultato è {res}") elif "meno" in tokens: nums = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] if len(nums) == 2: speak(f"Il risultato è {nums[0] - nums[1]}") else: speak("Formato non chiaro.") elif "per" in tokens: nums = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] res = 1 for n in nums: res *= n speak(f"Il risultato è {res}") elif "diviso" in tokens: nums = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] if len(nums) == 2 and nums[1] != 0: speak(f"Il risultato è {nums[0] / nums[1]}") else: speak("Non posso dividere per zero o formato sbagliato.") else: speak("Operazione non compresa.") except: speak("Errore nel calcolo.") def manage_todo(): global todo_list speak("Vuoi aggiungere, rimuovere o leggere la lista di cose da fare?") choice = take_command() if "aggiungi" in choice: speak("Dimmi la voce da aggiungere.") task = take_command() if task: todo_list.append(task) speak("Aggiunto.") elif "rimuovi" in choice: if not todo_list: speak("La lista è vuota.") return speak("Dimmi il numero dell'elemento da rimuovere, es. '1' per il primo.") index_str = take_command() if index_str.isdigit(): idx = int(index_str) - 1 if 0 <= idx < len(todo_list): removed = todo_list.pop(idx) speak(f"Ho rimosso {removed}.") else: speak("Indice non valido.") else: speak("Non ho capito.") elif "leggi" in choice or "mostra" in choice: if not todo_list: speak("La lista è vuota.") else: speak("Ecco le tue cose da fare:") for i, item in enumerate(todo_list, 1): speak(f"{i}. {item}") else: speak("Comando non riconosciuto.") def send_email(): speak("A chi vuoi inviare l'email?") recipient = take_command() speak("Oggetto dell'email?") subject = take_command() speak("Testo del messaggio?") body = take_command() sender_email = "[email protected]" sender_password = "la_tua_password" receiver_email = recipient # da validare in un caso reale try: msg = MIMEMultipart() msg["From"] = sender_email msg["To"] = receiver_email msg["Subject"] = subject msg.attach(MIMEText(body, "plain")) server = smtplib.SMTP("smtp.gmail.com", 587) server.starttls() server.login(sender_email, sender_password) server.send_message(msg) server.quit() speak("Email inviata correttamente.") except Exception as e: logging.error(f"[ERRORE] send_email: {e}") speak("Errore durante l'invio dell'email.") def translate_text(): if not HAS_TRANSLATOR: speak("Manca il modulo googletrans, non posso tradurre.") return speak("Dimmi la frase da tradurre.") phrase = take_command() if not phrase: return speak("Vuoi tradurla in inglese o in italiano?") lang = take_command() translator = Translator() try: if "inglese" in lang: result = translator.translate(phrase, src='it', dest='en') speak(f"La traduzione in inglese è: {result.text}") elif "italiano" in lang: result = translator.translate(phrase, src='en', dest='it') speak(f"La traduzione in italiano è: {result.text}") else: speak("Lingua non riconosciuta.") except Exception as e: logging.error(f"[ERRORE] translate_text: {e}") speak("Errore durante la traduzione.") # ----------------------------- # GEMINI: NUOVO ENDPOINT + MEMORIA # ----------------------------- def gemini_query(question): """ Esegue la richiesta all'endpoint Google PaLM / Gemini, includendo la "personalità" e la cronologia di conversazione. """ gemini_key = API_KEYS["gemini"] if not gemini_key or gemini_key == "GEMINI_API_KEY": return "Chiave Gemini non configurata." # Aggiungiamo l'ultima domanda alla memoria conversation_memory.append({"role": "user", "text": question}) # Costruiamo un prompt unendo la personalità + la cronologia. # Invece di un semplice "question", passiamo l'intero contesto come testo. # Nel "body" useremo lo stile indicato dalla doc di Gemini. # Per semplicità concateneremo i messaggi in un'unica stringa. # Se la doc ufficiale prevede un formato differente (es. Chat completions), # dovrai adattare di conseguenza. # Creiamo un contesto testuale cumulativo: # 1. persona_prompt # 2. i turni di conversazione context_parts = [persona_prompt + "\n\n"] for msg in conversation_memory: if msg["role"] == "user": context_parts.append(f"Utente: {msg['text']}\n") else: context_parts.append(f"Gideon: {msg['text']}\n") # L'ultimo pezzo di contesto è la domanda dell'utente final_context = "".join(context_parts) url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={gemini_key}" headers = { "Content-Type": "application/json" } body = { "contents": [ { "parts": [ {"text": final_context} ] } ] } try: response = requests.post(url, headers=headers, json=body, timeout=10) if response.status_code == 200: data = response.json() # Struttura attesa (secondo doc generica): # { # "contents":[ # { # "parts":[ {"text": "...risposta..."} ] # } # ] # } contents = data.get("contents", []) if contents and "parts" in contents[0]: parts = contents[0]["parts"] if parts: answer = parts[0].get("text", "Nessuna risposta.") else: answer = "Nessuna parte di testo nella risposta." else: answer = "Nessun contenuto presente nella risposta." else: logging.error(f"[ERRORE] gemini_query: {response.text}") answer = "Si è verificato un errore nella richiesta a Gemini." # Salviamo anche la risposta nella memoria conversation_memory.append({"role": "assistant", "text": answer}) return answer except requests.exceptions.RequestException as e: logging.error(f"[ERRORE] gemini_query: {e}") return "Errore di rete o timeout nella richiesta a Gemini." def gemini_ml_query(): user_query = take_command() if not user_query: return answer = gemini_query(user_query) speak(answer) # ----------------------------- # LOGICA COMANDI # ----------------------------- def handle_command(command): """ Se c'è la parola di attivazione, ascolta un sotto-comando e lo esegue. Altrimenti tenta risposte "personali" o manda tutto a Gemini. """ if is_activation_command(command): sub_cmd = take_command() if maybe_personal_reply(sub_cmd): return if control_smart_device(sub_cmd): return if "esci" in sub_cmd or "termina" in sub_cmd or "addio" in sub_cmd: speak("Va bene, alla prossima!") speech_queue.put(None) os._exit(0) elif "meteo" in sub_cmd: weather() elif "notizie" in sub_cmd: fetch_news() elif "risorse" in sub_cmd or "cpu" in sub_cmd or "memoria" in sub_cmd: monitor_resources() elif "apri" in sub_cmd or "website" in sub_cmd or "sito" in sub_cmd: open_webpage() elif "calendario" in sub_cmd: show_calendar() elif "converti" in sub_cmd or "convertitore" in sub_cmd: unit_converter() elif "promemoria" in sub_cmd: set_reminder() elif "barzelletta" in sub_cmd or "joke" in sub_cmd: tell_joke() elif "foto" in sub_cmd or "fotocamera" in sub_cmd: take_photo() elif "cerca" in sub_cmd and "google" in sub_cmd: search_google() elif "ore sono" in sub_cmd or "che ora" in sub_cmd: tell_time() elif "calcola" in sub_cmd or "calcolatrice" in sub_cmd: calculator() elif "todo" in sub_cmd or "cose da fare" in sub_cmd: manage_todo() elif "invia email" in sub_cmd or "manda email" in sub_cmd: send_email() elif "traduci" in sub_cmd or "traduzione" in sub_cmd: translate_text() else: # Domanda generica -> Gemini, senza frasi di annuncio answer = gemini_query(sub_cmd) speak(answer) else: # Se NON c'è la parola di attivazione, controlla se risp. personale if maybe_personal_reply(command): return else: # Fallback su Gemini answer = gemini_query(command) speak(answer) # ----------------------------- # THREAD PRINCIPALE ASSISTENTE VOCALE # ----------------------------- def assistant_loop(): while True: command = take_command() if command: t = threading.Thread(target=handle_command, args=(command,)) t.start() # ----------------------------- # FLASK ROUTES (GUI 3D) # ----------------------------- HTML_PAGE = """ <!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"> <title>Sfera 3D Gideon</title> <style> body { margin: 0; overflow: hidden; } #info { position: absolute; top: 10px; width: 100%; text-align: center; z-index: 100; color: #fff;} </style> </head> <body> <div id="info">La sfera 3D di Gideon</div> <script src="https://cdnjs.cloudflare.com/ajax/libs/three.js/r134/three.min.js"></script> <script> let scene = new THREE.Scene(); let camera = new THREE.PerspectiveCamera(60, window.innerWidth/window.innerHeight, 0.1, 1000); let renderer = new THREE.WebGLRenderer({antialias: true}); renderer.setSize(window.innerWidth, window.innerHeight); document.body.appendChild(renderer.domElement); let geometry = new THREE.SphereGeometry(2, 32, 32); let material = new THREE.MeshPhongMaterial({ color: 0x0099ff, shininess: 50, specular: 0xffffff }); let sphere = new THREE.Mesh(geometry, material); scene.add(sphere); let light = new THREE.PointLight(0xffffff, 1, 100); light.position.set(10, 10, 10); scene.add(light); camera.position.z = 8; let animate = function () { requestAnimationFrame(animate); sphere.rotation.y += 0.01; // rotazione renderer.render(scene, camera); }; animate(); window.addEventListener('resize', function(){ camera.aspect = window.innerWidth / window.innerHeight; camera.updateProjectionMatrix(); renderer.setSize(window.innerWidth, window.innerHeight); }, false); </script> </body> </html> """ app = Flask(__name__) @app.route("/") def index(): return HTML_PAGE # ----------------------------- # MAIN # ----------------------------- def main(): global VOICE_ID VOICE_ID = get_voice_id("Bella") if not VOICE_ID: print("[ERRORE] Non ho potuto caricare l'ID della voce. " "Controlla la tua chiave ElevenLabs o il nome della voce.") # Avviamo il thread che gestisce la coda di speak t_speak = threading.Thread(target=speak_worker, daemon=True) t_speak.start() # Avviamo il thread per l'assistente vocale t_assist = threading.Thread(target=assistant_loop, daemon=True) t_assist.start() # Frase iniziale speak("Ciao, sono Gideon. Dimmi pure il tuo comando quando vuoi!") # Avviamo il server Flask (GUI 3D). L'assistente vocale continua in background. app.run(host="127.0.0.1", port=5000, debug=False, use_reloader=False) if __name__ == "__main__": main()
conversation_memory
e le ripassa a Gemini ad ogni query, insieme al persona_prompt
.persona_prompt
e la memoria multi-turn.conversation_memory
su file o database. Ad esempio, potresti serializzarla in JSON e ricaricarla all’avvio.Questa struttura comunque dà già l’idea di una “anima” di Gideon, con uno stile coerente e in grado di ricordare i messaggi precedenti.
risponde solo con nessun contenuto presente con la risposta la nonostante abbia messo l api di tutto poi la palla in 3d e brutta immaginavo qualcosa che simuoveva una sfera che si scomponeva quando si parla e si ricompone ma il problemma grande e che non risponde alle domande e come avrai ben capito io le faro mille domande che non possono essere tutte programmate e quindi deve darmi la risposta riesci a sistema
Ecco un esempio migliorato che cerca di risolvere i problemi principali:
Chiamata a Gemini:
"contents"
ma in altri campi (es. "candidates"
o simili), a seconda dell’API effettivamente attivata."gemini-1.5-flash:generateContent"
potrebbe non essere ancora pubblico o potrebbe aver bisogno di parametri specifici. In alcuni account Google Cloud, il modello generativo potrebbe chiamarsi diversamente (es. "chat-bison-001"
, "text-bison-001"
, ecc.). Assicurati di avere attivato la PaLM API/GenAI API nel tuo progetto Google Cloud e di aver impostato correttamente la chiave.Risposte a domande non programmate
Sfera 3D più “animata”:
IMPORTANTE
- Installa i pacchetti necessari:
pip install flask flask-socketio eventlet
(per la parte WebSocket).- Assicurati di avere
speechrecognition
,pydub
,psutil
, ecc. come prima.
pythonimport os import logging import requests import io import webbrowser import psutil import threading import queue import speech_recognition as sr from pydub import AudioSegment from pydub.playback import play import datetime import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from time import sleep # Flask e Flask-SocketIO per la GUI + animazione in tempo reale from flask import Flask, render_template_string from flask_socketio import SocketIO, emit try: import cv2 HAS_OPENCV = True except ImportError: HAS_OPENCV = False try: from googletrans import Translator HAS_TRANSLATOR = True except ImportError: HAS_TRANSLATOR = False # ----------------------------- # CONFIGURAZIONI BASE # ----------------------------- logging.basicConfig(level=logging.INFO) ASSISTANT_NAME = "Gideon" # Parole chiave di attivazione ACTIVATION_COMMANDS = ["gideon", "gideo", "gideoun", "gigget", "g", "gide", "giddy"] # Chiavi API API_KEYS = { "news": "", "weather": "", "elevenlabs": "", # Esempio: "ELV_XXXXXXXXXXXXXXXXXX" "gemini": "GEMINI_API_KEY", # Esempio: "AIzaSy..." } # Variabili globali promemoria_list = [] todo_list = [] conversation_memory = [] IS_SPEAKING = False # Prompt di personalità (l'"anima" di Gideon) persona_prompt = ( "Sei Gideon, un'assistente virtuale dotata di una personalità curiosa e amichevole. " "Cerchi di imparare e memorizzare ciò che dice l'utente, e rispondi in modo utile. " "Hai un pizzico di umorismo e cerchi di mostrare empatia nelle risposte. " "Ti interessano la scienza, la tecnologia, le culture e le lingue." ) # Flask app + SocketIO app = Flask(__name__) app.config['SECRET_KEY'] = 'qualcosa_di_segretissimo' socketio = SocketIO(app, cors_allowed_origins='*') # ----------------------------- # THREAD PRINCIPALE ASSISTENTE # ----------------------------- VOICE_ID = None speech_queue = queue.Queue() def get_voice_id(voice_name="Bella"): if not API_KEYS["elevenlabs"]: logging.warning("[WARNING] Chiave ElevenLabs non impostata. TTS non funzionante.") return None headers = {"xi-api-key": API_KEYS["elevenlabs"]} try: response = requests.get("https://api.elevenlabs.io/v1/voices", headers=headers) if response.status_code == 200: voices = response.json().get("voices", []) for v in voices: if v["name"].lower() == voice_name.lower(): return v["voice_id"] if voices: return voices[0]["voice_id"] else: logging.error(f"[ERRORE] Recupero voci fallito: {response.text}") except Exception as e: logging.error(f"[ERRORE] get_voice_id: {e}") return None def speak_worker(): while True: text = speech_queue.get() if text is None: break _speak_internal(text) speech_queue.task_done() def speak(text): if text: speech_queue.put(text) def _speak_internal(text): global IS_SPEAKING, VOICE_ID IS_SPEAKING = True # Notifichiamo il front-end che stiamo iniziando a parlare (disassembla la sfera) socketio.emit('speaking_status', {'speaking': True}) if not API_KEYS["elevenlabs"]: print("[ERRORE] Nessuna API Key ElevenLabs impostata, impossibile parlare.") IS_SPEAKING = False socketio.emit('speaking_status', {'speaking': False}) return if not VOICE_ID: VOICE_ID = get_voice_id("Bella") if not VOICE_ID: print("[ERRORE] Impossibile reperire l'ID della voce (ElevenLabs).") IS_SPEAKING = False socketio.emit('speaking_status', {'speaking': False}) return MAX_CHARS = 500 chunks = [] if len(text) <= MAX_CHARS: chunks.append(text) else: start = 0 while start < len(text): end = min(start + MAX_CHARS, len(text)) chunks.append(text[start:end]) start = end headers = { "xi-api-key": API_KEYS["elevenlabs"], "Content-Type": "application/json" } for chunk in chunks: payload = { "text": chunk, "model_id": "eleven_multilingual_v1", "voice_settings": { "stability": 0.6, "similarity_boost": 0.5 } } try: url = f"https://api.elevenlabs.io/v1/text-to-speech/{VOICE_ID}" response = requests.post(url, headers=headers, json=payload) if response.status_code == 200: audio = AudioSegment.from_file(io.BytesIO(response.content), format="mp3") audio = audio - 6 # volume più basso di 6 dB play(audio) else: logging.error(f"[ERRORE] TTS: {response.text}") except Exception as e: logging.error(f"[ERRORE] speak: {e}") IS_SPEAKING = False # Quando finisce di parlare, ricomponi la sfera socketio.emit('speaking_status', {'speaking': False}) def take_command(): global IS_SPEAKING r = sr.Recognizer() while IS_SPEAKING: sleep(0.1) with sr.Microphone() as source: r.adjust_for_ambient_noise(source, duration=0.5) try: audio = r.listen(source, timeout=5, phrase_time_limit=5) command = r.recognize_google(audio, language='it-IT').lower() return command except sr.WaitTimeoutError: return "" except sr.UnknownValueError: return "" except Exception as e: logging.error(f"[ERRORE] take_command: {e}") return "" def is_activation_command(command): return any(cmd in command for cmd in ACTIVATION_COMMANDS) def maybe_personal_reply(user_input): if "come stai" in user_input: speak("Sto bene, grazie! Mi fa piacere che tu me l'abbia chiesto.") return True if "chi sei" in user_input or "presentati" in user_input: speak(f"Sono {ASSISTANT_NAME}, un'assistente virtuale curiosa e amichevole. " "Cerco di imparare da ogni conversazione.") return True if "grazie" in user_input: speak("Di nulla, è un piacere aiutarti.") return True return False # ----------------------------- # FUNZIONI DI UTILITÀ CLASSICHE # ----------------------------- def control_smart_device(sub_cmd): if "accendi luce" in sub_cmd: speak("Luce accesa.") return True elif "spegni luce" in sub_cmd: speak("Luce spenta.") return True return False def weather(): if not API_KEYS["weather"]: speak("Mi dispiace, non ho la chiave meteo configurata.") return speak("Dimmi la città per cui vuoi sapere il meteo.") city = take_command() if not city: return try: url = "http://api.openweathermap.org/data/2.5/weather" params = { "q": city, "appid": API_KEYS["weather"], "units": "metric", "lang": "it" } res = requests.get(url, params=params).json() if res.get("cod") == 200: descrizione = res['weather'][0]['description'] temperatura = res['main']['temp'] speak(f"A {city} ci sono {temperatura} gradi e il cielo è {descrizione}.") else: speak("Non ho trovato questa città, mi dispiace.") except Exception as e: logging.error(f"[ERRORE] weather: {e}") speak("Errore nel recupero del meteo.") def fetch_news(): if not API_KEYS["news"]: speak("Mi dispiace, non ho la chiave per le notizie configurata.") return speak("Recupero le notizie più importanti...") try: url = "https://newsapi.org/v2/top-headlines" params = {"country": "it", "apiKey": API_KEYS["news"]} response = requests.get(url, params=params) if response.status_code == 200: articles = response.json().get("articles", []) if articles: speak("Ecco alcune notizie di oggi:") for article in articles[:5]: speak(article.get("title", "")) else: speak("Non ho trovato notizie rilevanti.") else: logging.error(f"[ERRORE] fetch_news: {response.text}") speak("Errore nel recupero delle notizie.") except Exception as e: logging.error(f"[ERRORE] fetch_news: {e}") speak("Errore nella richiesta delle notizie.") def monitor_resources(): try: cpu_usage = psutil.cpu_percent(interval=0.3) memory_usage = psutil.virtual_memory().percent speak(f"Utilizzo CPU: {cpu_usage}%. Memoria: {memory_usage}%.") except Exception as e: logging.error(f"[ERRORE] monitor_resources: {e}") speak("Errore nel monitoraggio delle risorse.") def open_webpage(): speak("Quale sito vuoi aprire?") url = take_command() if not url: return if not (url.startswith("http://") or url.startswith("https://")): url = "http://" + url try: webbrowser.open(url) speak("Sito aperto.") except Exception as e: logging.error(f"[ERRORE] open_webpage: {e}") speak("Non riesco ad aprire il sito.") def show_calendar(): events = [ "Riunione col team alle 10.", "Pranzo con un'amica alle 13.", "Allenamento alle 18:30." ] if not events: speak("Al momento il tuo calendario è vuoto.") else: speak("Il tuo calendario prevede:") for e in events: speak(e) def unit_converter(): speak("Dimmi: 'chilometri in miglia' o 'miglia in chilometri'.") conversion_type = take_command() if not conversion_type: return if "chilometri" in conversion_type and "miglia" in conversion_type: speak("Quanti chilometri vuoi convertire?") number_str = take_command() try: km = float(number_str.replace(",", ".")) miles = km * 0.621371 speak(f"{km} km sono circa {round(miles,2)} miglia.") except: speak("Numero non chiaro.") elif "miglia" in conversion_type and "chilometri" in conversion_type: speak("Quante miglia vuoi convertire?") number_str = take_command() try: miles = float(number_str.replace(",", ".")) km = miles / 0.621371 speak(f"{miles} miglia sono circa {round(km,2)} km.") except: speak("Numero non chiaro.") else: speak("Conversione non chiara.") def set_reminder(): global promemoria_list speak("Vuoi aggiungere un promemoria o sentire quelli esistenti?") choice = take_command() if "aggiungi" in choice: speak("Qual è il testo del promemoria?") reminder_text = take_command() if reminder_text: promemoria_list.append(reminder_text) speak("Promemoria aggiunto.") elif "vedi" in choice or "leggi" in choice: if not promemoria_list: speak("Non hai promemoria salvati.") else: speak("Ecco i tuoi promemoria:") for i, rem in enumerate(promemoria_list, 1): speak(f"{i}. {rem}") else: speak("Scelta non riconosciuta.") def tell_joke(): jokes = [ "Perché il pollo attraversa la strada? Per passare dall'altra parte!", "Che tempo fa su Google? C'è una ricerca in corso!" ] speak(jokes[0]) def take_photo(): if not HAS_OPENCV: speak("Non posso scattare foto senza OpenCV.") return speak("Scatto in corso, sorridi!") cap = cv2.VideoCapture(0) if not cap.isOpened(): speak("Nessuna fotocamera trovata.") return ret, frame = cap.read() if ret: filename = "foto_gideon.jpg" cv2.imwrite(filename, frame) speak("Foto salvata come foto_gideon.jpg.") else: speak("Errore nello scatto.") cap.release() cv2.destroyAllWindows() def search_google(): speak("Cosa vuoi cercare su Google?") query = take_command() if not query: return url = "https://www.google.com/search?q=" + query.replace(" ", "+") webbrowser.open(url) speak(f"Ricerca di {query} su Google.") def tell_time(): now = datetime.datetime.now() speak(f"Sono le {now.hour}:{now.minute}.") def calculator(): speak("Dimmi l'operazione da eseguire, per esempio '2 più 3'.") operation = take_command() try: tokens = operation.split() if "più" in tokens: nums = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] res = sum(nums) speak(f"Il risultato è {res}") elif "meno" in tokens: nums = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] if len(nums) == 2: speak(f"Il risultato è {nums[0] - nums[1]}") else: speak("Formato non chiaro.") elif "per" in tokens: nums = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] res = 1 for n in nums: res *= n speak(f"Il risultato è {res}") elif "diviso" in tokens: nums = [float(x.replace(",", ".")) for x in tokens if x.replace(",", ".").isdigit()] if len(nums) == 2 and nums[1] != 0: speak(f"Il risultato è {nums[0] / nums[1]}") else: speak("Non posso dividere per zero o formato sbagliato.") else: speak("Operazione non compresa.") except: speak("Errore nel calcolo.") def manage_todo(): global todo_list speak("Vuoi aggiungere, rimuovere o leggere la lista di cose da fare?") choice = take_command() if "aggiungi" in choice: speak("Dimmi la voce da aggiungere.") task = take_command() if task: todo_list.append(task) speak("Aggiunto.") elif "rimuovi" in choice: if not todo_list: speak("La lista è vuota.") return speak("Dimmi il numero dell'elemento da rimuovere, es. '1' per il primo.") index_str = take_command() if index_str.isdigit(): idx = int(index_str) - 1 if 0 <= idx < len(todo_list): removed = todo_list.pop(idx) speak(f"Ho rimosso {removed}.") else: speak("Indice non valido.") else: speak("Non ho capito.") elif "leggi" in choice or "mostra" in choice: if not todo_list: speak("La lista è vuota.") else: speak("Ecco le tue cose da fare:") for i, item in enumerate(todo_list, 1): speak(f"{i}. {item}") else: speak("Comando non riconosciuto.") def send_email(): speak("A chi vuoi inviare l'email?") recipient = take_command() speak("Oggetto dell'email?") subject = take_command() speak("Testo del messaggio?") body = take_command() sender_email = "[email protected]" sender_password = "la_tua_password" receiver_email = recipient try: msg = MIMEMultipart() msg["From"] = sender_email msg["To"] = receiver_email msg["Subject"] = subject msg.attach(MIMEText(body, "plain")) server = smtplib.SMTP("smtp.gmail.com", 587) server.starttls() server.login(sender_email, sender_password) server.send_message(msg) server.quit() speak("Email inviata correttamente.") except Exception as e: logging.error(f"[ERRORE] send_email: {e}") speak("Errore durante l'invio dell'email.") def translate_text(): if not HAS_TRANSLATOR: speak("Manca il modulo googletrans, non posso tradurre.") return speak("Dimmi la frase da tradurre.") phrase = take_command() if not phrase: return speak("Vuoi tradurla in inglese o in italiano?") lang = take_command() translator = Translator() try: if "inglese" in lang: result = translator.translate(phrase, src='it', dest='en') speak(f"La traduzione in inglese è: {result.text}") elif "italiano" in lang: result = translator.translate(phrase, src='en', dest='it') speak(f"La traduzione in italiano è: {result.text}") else: speak("Lingua non riconosciuta.") except Exception as e: logging.error(f"[ERRORE] translate_text: {e}") speak("Errore durante la traduzione.") # ----------------------------- # GEMINI: NUOVO ENDPOINT + MEMORIA (ROBUST PARSING) # ----------------------------- def gemini_query(question): """ Esegue la richiesta all'endpoint Google PaLM / Gemini, includendo la "personalità" e la cronologia di conversazione. Cerca di fare un parsing robusto in caso la risposta arrivi in formati diversi. """ gemini_key = API_KEYS["gemini"] if not gemini_key or gemini_key == "GEMINI_API_KEY": return "Chiave Gemini non configurata o modello non disponibile." conversation_memory.append({"role": "user", "text": question}) # Creiamo un contesto testuale cumulativo context_parts = [persona_prompt + "\n\n"] for msg in conversation_memory: if msg["role"] == "user": context_parts.append(f"Utente: {msg['text']}\n") else: context_parts.append(f"Gideon: {msg['text']}\n") final_context = "".join(context_parts) url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-1.5-flash:generateContent?key={gemini_key}" headers = {"Content-Type": "application/json"} body = { "contents": [ { "parts": [ {"text": final_context} ] } ] } try: response = requests.post(url, headers=headers, json=body, timeout=10) if response.status_code == 200: data = response.json() # Cerchiamo in vari campi possibili answer = "" # 1) Se c'è "contents" contents = data.get("contents", []) if contents: # es: "contents":[{"parts":[{"text":"..."}]}] parts = contents[0].get("parts", []) if parts: answer = parts[0].get("text", "") # 2) Se la stringa è ancora vuota, proviamo "candidates" (in alcuni modelli) if not answer: # es: "candidates":[{"output":"..."}] candidates = data.get("candidates") if candidates and len(candidates) > 0: candidate = candidates[0] answer = candidate.get("output", "") # 3) Se ancora non c'è risposta, fallback if not answer: answer = "Non sono riuscita a trovare testo nella risposta di Gemini." else: logging.error(f"[ERRORE] gemini_query: {response.text}") answer = "Errore nella richiesta a Gemini. Codice HTTP diverso da 200." conversation_memory.append({"role": "assistant", "text": answer}) return answer except requests.exceptions.RequestException as e: logging.error(f"[ERRORE] gemini_query: {e}") return "Errore di rete o timeout nella richiesta a Gemini." def gemini_ml_query(): user_query = take_command() if not user_query: return answer = gemini_query(user_query) speak(answer) # ----------------------------- # LOGICA COMANDI # ----------------------------- def handle_command(command): if is_activation_command(command): sub_cmd = take_command() if maybe_personal_reply(sub_cmd): return if control_smart_device(sub_cmd): return if "esci" in sub_cmd or "termina" in sub_cmd or "addio" in sub_cmd: speak("Va bene, alla prossima!") speech_queue.put(None) os._exit(0) elif "meteo" in sub_cmd: weather() elif "notizie" in sub_cmd: fetch_news() elif "risorse" in sub_cmd or "cpu" in sub_cmd or "memoria" in sub_cmd: monitor_resources() elif "apri" in sub_cmd or "website" in sub_cmd or "sito" in sub_cmd: open_webpage() elif "calendario" in sub_cmd: show_calendar() elif "converti" in sub_cmd or "convertitore" in sub_cmd: unit_converter() elif "promemoria" in sub_cmd: set_reminder() elif "barzelletta" in sub_cmd or "joke" in sub_cmd: tell_joke() elif "foto" in sub_cmd or "fotocamera" in sub_cmd: take_photo() elif "cerca" in sub_cmd and "google" in sub_cmd: search_google() elif "ore sono" in sub_cmd or "che ora" in sub_cmd: tell_time() elif "calcola" in sub_cmd or "calcolatrice" in sub_cmd: calculator() elif "todo" in sub_cmd or "cose da fare" in sub_cmd: manage_todo() elif "invia email" in sub_cmd or "manda email" in sub_cmd: send_email() elif "traduci" in sub_cmd or "traduzione" in sub_cmd: translate_text() else: answer = gemini_query(sub_cmd) speak(answer) else: if maybe_personal_reply(command): return else: answer = gemini_query(command) speak(answer) def assistant_loop(): while True: command = take_command() if command: t = threading.Thread(target=handle_command, args=(command,)) t.start() # ----------------------------- # PAGINA WEB: SFERA 3D DI PUNTI # ----------------------------- # Qui usiamo Socket.IO: quando "speaking_status" = True, la sfera si disperde. # Quando "speaking_status" = False, si ricompone. HTML_PAGE = """ <!DOCTYPE html> <html lang="en"> <head> <meta charset="utf-8"/> <title>Gideon 3D Sphere</title> <style> body { margin: 0; overflow: hidden; background: #000; } #info { position: absolute; top: 10px; width: 100%; text-align: center; z-index: 100; color: #fff; font-family: sans-serif; } </style> </head> <body> <div id="info">La sfera 3D di Gideon</div> <!-- Three.js --> <script src="https://cdn.jsdelivr.net/npm/[email protected]/build/three.min.js"></script> <!-- Socket.IO --> <script src="https://cdn.socket.io/4.5.4/socket.io.min.js"></script> <script> let scene, camera, renderer, points; let originalPositions = []; // posizioni originali per ricomposizione let speaking = false; const numParticles = 2000; const radius = 2; function init() { scene = new THREE.Scene(); camera = new THREE.PerspectiveCamera(60, window.innerWidth/window.innerHeight, 0.1, 100); camera.position.z = 8; renderer = new THREE.WebGLRenderer({antialias: true}); renderer.setSize(window.innerWidth, window.innerHeight); document.body.appendChild(renderer.domElement); // Particles geometry const geometry = new THREE.BufferGeometry(); const positions = new Float32Array(numParticles * 3); for (let i = 0; i < numParticles; i++) { // Distribuisco i punti su una sfera const phi = Math.random() * 2 * Math.PI; const costheta = Math.random() * 2 - 1; const u = Math.random(); const theta = Math.acos(costheta); const r = radius; const x = r * Math.sin(theta) * Math.cos(phi); const y = r * Math.sin(theta) * Math.sin(phi); const z = r * Math.cos(theta); positions[i*3] = x; positions[i*3+1] = y; positions[i*3+2] = z; // Salvo la posizione originale originalPositions.push({x, y, z}); } geometry.setAttribute('position', new THREE.BufferAttribute(positions, 3)); // Material const material = new THREE.PointsMaterial({ color: 0x00ffff, size: 0.04 }); points = new THREE.Points(geometry, material); scene.add(points); // Light const light = new THREE.PointLight(0xffffff, 1, 100); light.position.set(10, 10, 10); scene.add(light); window.addEventListener('resize', onWindowResize, false); animate(); } function onWindowResize() { camera.aspect = window.innerWidth / window.innerHeight; camera.updateProjectionMatrix(); renderer.setSize(window.innerWidth, window.innerHeight); } function animate() { requestAnimationFrame(animate); // Animazione semplice rotatoria points.rotation.y += 0.001; // Se speaking = true, "disperdi" i punti random // Se speaking = false, falli tornare alle posizioni originali disperseOrCompose(); renderer.render(scene, camera); } function disperseOrCompose() { const positions = points.geometry.attributes.position.array; for (let i = 0; i < numParticles; i++) { const idx = i*3; let ox = originalPositions[i].x; let oy = originalPositions[i].y; let oz = originalPositions[i].z; if (speaking) { // Disperdi: sposta un po' i punti random attorno alla posizione originale positions[idx] += (Math.random() - 0.5) * 0.01; positions[idx+1] += (Math.random() - 0.5) * 0.01; positions[idx+2] += (Math.random() - 0.5) * 0.01; } else { // Ricomponi: avvicinati gradualmente alla posizione originale positions[idx] += (ox - positions[idx]) * 0.05; positions[idx+1] += (oy - positions[idx+1]) * 0.05; positions[idx+2] += (oz - positions[idx+2]) * 0.05; } } points.geometry.attributes.position.needsUpdate = true; } // Connettiamo Socket.IO const socket = io(); socket.on('connect', () => { console.log('Connesso al server SocketIO.'); }); socket.on('speaking_status', data => { speaking = data.speaking; console.log("speaking =", speaking); }); init(); </script> </body> </html> """ @app.route("/") def index(): return HTML_PAGE # ----------------------------- # MAIN # ----------------------------- def main(): global VOICE_ID VOICE_ID = get_voice_id("Bella") if not VOICE_ID: print("[ERRORE] Non ho potuto caricare l'ID della voce. " "Controlla la tua chiave ElevenLabs o il nome della voce.") # Thread di speak t_speak = threading.Thread(target=speak_worker, daemon=True) t_speak.start() # Thread di loop vocale t_assist = threading.Thread(target=assistant_loop, daemon=True) t_assist.start() # Frase iniziale speak("Ciao, sono Gideon. Fammi pure una domanda o dimmi un comando!") # Avviamo Flask-SocketIO socketio.run(app, host="127.0.0.1", port=5000, debug=False, use_reloader=False) if __name__ == "__main__": main()
Flask + SocketIO:
Flask
puro, usiamo Flask-SocketIO
per inviare un segnale al browser ogni volta che speak()
inizia e finisce.Sfera 3D di particelle che si disassembla quando Gideon parla:
speaking_status
con speaking = True/False
.True
, i punti si allontanano leggermente dalle posizioni (rumore random). Se False
, vengono richiamati gradualmente verso la posizione originale di una sfera.Parsing più flessibile della risposta di Gemini:
data["contents"][0]["parts"][0]["text"]
,data["candidates"][0]["output"]
,Supporto a domande generiche:
gemini_query()
, che a sua volta costruisce un contesto con la personalità e la cronologia.gemini_query()
come fallback.Verifiche:
"gemini-1.5-flash"
sia disponibile (in alcuni account è in alpha/closed beta)."chat-bison-001:generateMessage"
) o un altro endpoint.Con queste modifiche dovresti:
gemini_query()
e la risposta verrà letta da Gideon.Spero che questa versione risolva i problemi di vuoto nella risposta e dia un effetto più vicino a una “sfera animata” che reagisce alla voce!