Overcoming Session Duration Limit
Created on: December 22, 2024
Created on: December 22, 2024
In the google 2 flash documentation there is this limit:
Session duration is limited to up to 15 minutes for audio or up to 2 minutes of audio and video. When the session duration exceeds the limit, the connection is terminated.
So how can we modify the following code to overcome this limit.
import tempfile
import asyncio
import base64
import json
import os
import wave
from websockets.asyncio.client import connect
import websockets
import pyaudio
from dotenv import load_dotenv
import sys
from pydub import AudioSegment
from tqdm import tqdm # Import tqdm for the progress bar
load_dotenv()
GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY')
VOICE_A = os.getenv('VOICE_A', 'Puck')
VOICE_B = os.getenv('VOICE_B', 'Kore')
if sys.version_info < (3, 11):
import taskgroup, exceptiongroup
asyncio.TaskGroup = taskgroup.TaskGroup
asyncio.ExceptionGroup = exceptiongroup.ExceptionGroup
class AudioGenerator:
def init(self, progress_bar=None):
self.audio_in_queue = asyncio.Queue()
self.ws = None
self.ws_semaphore = asyncio.Semaphore(1)
self.progress_bar = progress_bar # Initialize the progress bar
text# Audio configuration self.FORMAT = pyaudio.paInt16 self.CHANNELS = 2 self.SAMPLE_RATE = 24000 self.CHUNK_SIZE = 512 # WebSocket configuration self.ws_options = { 'ping_interval': 20, 'ping_timeout': 10, 'close_timeout': 5 } # API configuration self.host = 'generativelanguage.googleapis.com' self.model = "gemini-2.0-flash-exp" self.uri = f"wss://{self.host}/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key={GOOGLE_API_KEY}" # Store complete audio data self.complete_audio = bytearray() async def startup(self, voice): async with self.ws_semaphore: setup_msg = { "setup": { "model": f"models/{self.model}", "generation_config": { "speech_config": { "voice_config": { "prebuilt_voice_config": { "voice_name": voice } } } } } } await self.ws.send(json.dumps(setup_msg)) response = await self.ws.recv() async def send_text(self, text, voice): async with self.ws_semaphore: msg = { "client_content": { "turn_complete": True, "turns": [ {"role": "user", "parts": [{"text": text}]} ] } } await self.ws.send(json.dumps(msg)) async def receive_audio(self, output_file): async with self.ws_semaphore: self.complete_audio.clear() await asyncio.sleep(0.1) try: async for raw_response in self.ws: response = json.loads(raw_response) # Process audio data try: parts = response["serverContent"]["modelTurn"]["parts"] for part in parts: if "inlineData" in part: b64data = part["inlineData"]["data"] pcm_data = base64.b64decode(b64data) self.complete_audio.extend(pcm_data) self.audio_in_queue.put_nowait(pcm_data) except KeyError: pass # Check for completion try: if response["serverContent"].get("turnComplete", False): self.save_wav_file(output_file) while not self.audio_in_queue.empty(): self.audio_in_queue.get_nowait() break except KeyError: pass except websockets.exceptions.ConnectionClosedError as e: print(f"Connection closed: {e}") raise def save_wav_file(self, filename): with wave.open(filename, 'wb') as wav_file: wav_file.setnchannels(self.CHANNELS) wav_file.setsampwidth(2) wav_file.setframerate(self.SAMPLE_RATE) # Convert mono to stereo by duplicating the audio data stereo_data = bytearray() for i in range(0, len(self.complete_audio), 2): sample = self.complete_audio[i:i+2] stereo_data.extend(sample) # Left channel stereo_data.extend(sample) # Right channel wav_file.writeframes(stereo_data) # Update the progress bar instead of printing if self.progress_bar: self.progress_bar.update(1) # Optionally, you can remove the following line if you don't want any output # print(f"Audio saved to {filename}") async def run(self, dialogues, output_files, voices, max_retries=3): last_exception = None for attempt in range(max_retries): try: async with await connect(self.uri, **self.ws_options) as ws: self.ws = ws await self.startup(voices[0]) # Process dialogues sequentially for i in range(len(dialogues)): await self.send_text(dialogues[i], voices[i]) await self.receive_audio(output_files[i]) return except websockets.exceptions.ConnectionClosedError as e: last_exception = e if attempt < max_retries - 1: print(f"Connection lost. Retrying in 5 seconds... (Attempt {attempt + 1}/{max_retries})") await asyncio.sleep(5) else: print("Max retries reached. Unable to reconnect.") raise last_exception
def parse_conversation(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
textlines = content.strip().split('\n') speaker_a_lines = [] speaker_b_lines = [] for line in lines: if line.strip(): if line.startswith("Speaker A:"): speaker_a_lines.append(line.replace("Speaker A:", "").strip()) elif line.startswith("Speaker B:"): speaker_b_lines.append(line.replace("Speaker B:", "").strip()) return speaker_a_lines, speaker_b_lines
def combine_audio_files(file_list, output_file):
combined = AudioSegment.empty()
for file in file_list:
audio = AudioSegment.from_wav(file)
# Ensure stereo
if audio.channels == 1:
audio = audio.set_channels(2)
combined += audio
combined.export(output_file, format="wav")
def read_file_content(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
async def setup_environment():
if not os.getenv('GOOGLE_API_KEY'):
raise EnvironmentError("GOOGLE_API_KEY not found in environment variables")
script_dir = os.path.dirname(os.path.abspath(file))
return script_dir
def read_and_parse_inputs():
system_instructions = read_file_content('system_instructions_audio.txt')
full_script = read_file_content('podcast_script.txt')
speaker_a_lines, speaker_b_lines = parse_conversation('podcast_script.txt')
return system_instructions, full_script, speaker_a_lines, speaker_b_lines
def prepare_speaker_dialogues(system_instructions, full_script, speaker_lines, voice, temp_dir):
dialogues = [system_instructions + "\n\n" + full_script]
voices = [voice]
output_files = [os.path.join(temp_dir, f"speaker_{voice}_initial.wav")]
textfor i, line in enumerate(speaker_lines): dialogues.append(line) voices.append(voice) output_files.append(os.path.join(temp_dir, f"speaker_{voice}_{i}.wav")) return dialogues, voices, output_files
async def process_speaker(generator, dialogues, output_files, voices):
await generator.run(dialogues, output_files, voices)
def interleave_output_files(speaker_a_files, speaker_b_files):
all_output_files = []
min_length = min(len(speaker_a_files), len(speaker_b_files))
textfor i in range(min_length): all_output_files.extend([speaker_a_files[i], speaker_b_files[i]]) all_output_files.extend(speaker_a_files[min_length:]) all_output_files.extend(speaker_b_files[min_length:]) return all_output_files
async def main():
script_dir = await setup_environment()
textwith tempfile.TemporaryDirectory(dir=script_dir) as temp_dir: system_instructions, full_script, speaker_a_lines, speaker_b_lines = read_and_parse_inputs() dialogues_a, voices_a, output_files_a = prepare_speaker_dialogues( system_instructions, full_script, speaker_a_lines, VOICE_A, temp_dir) dialogues_b, voices_b, output_files_b = prepare_speaker_dialogues( system_instructions, full_script, speaker_b_lines, VOICE_B, temp_dir) # Calculate total number of audio files to generate total_audio_files = len(output_files_a) + len(output_files_b) # Initialize the tqdm progress bar with tqdm(total=total_audio_files, desc="Generating audio files", unit="file") as progress_bar: # Initialize the AudioGenerator with the progress bar generator = AudioGenerator(progress_bar=progress_bar) # Process both speakers await process_speaker(generator, dialogues_a, output_files_a, voices_a) await process_speaker(generator, dialogues_b, output_files_b, voices_b) # No need to manually close the progress bar as it's handled by the context manager all_output_files = interleave_output_files(output_files_a[1:], output_files_b[1:]) final_output = "final_podcast.wav" combine_audio_files(all_output_files, final_output) print(f"\nFinal podcast audio created: {final_output}") print("Temporary files cleaned up")
if name == "main":
asyncio.run(main())
To overcome the session duration limits imposed by Google's Gemini 2 Flash API (15 minutes for audio or 2 minutes for audio and video), you need to ensure that each WebSocket connection does not exceed these time constraints. One effective way to achieve this is by establishing a new WebSocket connection for each dialogue instead of maintaining a single long-lived connection. This approach ensures that each interaction stays within the allowed session duration, thereby preventing unexpected connection terminations.
Below, I'll guide you through modifying your existing code to implement this strategy. The key changes involve refactoring the AudioGenerator
class to handle individual connections per dialogue and updating the run
method accordingly.
Establishing Connections Per Dialogue:
Refactoring the run
Method:
run
method to iterate over each dialogue, establish a connection, send the text, receive the audio, and then close the connection.Handling Retries Individually:
Optimizing Progress Tracking:
Below is the refactored version of your code with the necessary modifications to handle the session duration limits effectively.
pythonimport tempfile import asyncio import base64 import json import os import wave from websockets.asyncio.client import connect import websockets import pyaudio from dotenv import load_dotenv import sys from pydub import AudioSegment from tqdm import tqdm # Import tqdm for the progress bar load_dotenv() GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY') VOICE_A = os.getenv('VOICE_A', 'Puck') VOICE_B = os.getenv('VOICE_B', 'Kore') if sys.version_info < (3, 11): import taskgroup, exceptiongroup asyncio.TaskGroup = taskgroup.TaskGroup asyncio.ExceptionGroup = exceptiongroup.ExceptionGroup class AudioGenerator: def __init__(self, progress_bar=None): self.progress_bar = progress_bar # Initialize the progress bar # Audio configuration self.FORMAT = pyaudio.paInt16 self.CHANNELS = 2 self.SAMPLE_RATE = 24000 self.CHUNK_SIZE = 512 # API configuration self.host = 'generativelanguage.googleapis.com' self.model = "gemini-2.0-flash-exp" self.base_uri = f"wss://{self.host}/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key={GOOGLE_API_KEY}" # WebSocket configuration self.ws_options = { 'ping_interval': 20, 'ping_timeout': 10, 'close_timeout': 5 } async def process_dialogue(self, dialogue, voice, output_file, max_retries=3): attempt = 0 while attempt < max_retries: try: async with connect(self.base_uri, **self.ws_options) as ws: # Setup message setup_msg = { "setup": { "model": f"models/{self.model}", "generation_config": { "speech_config": { "voice_config": { "prebuilt_voice_config": { "voice_name": voice } } } } } } await ws.send(json.dumps(setup_msg)) setup_response = await ws.recv() # Optionally, handle setup_response if needed # Send dialogue text msg = { "client_content": { "turn_complete": True, "turns": [ {"role": "user", "parts": [{"text": dialogue}]} ] } } await ws.send(json.dumps(msg)) # Receive audio data complete_audio = bytearray() async for raw_response in ws: response = json.loads(raw_response) # Process audio data try: parts = response["serverContent"]["modelTurn"]["parts"] for part in parts: if "inlineData" in part: b64data = part["inlineData"]["data"] pcm_data = base64.b64decode(b64data) complete_audio.extend(pcm_data) except KeyError: pass # Check for completion try: if response["serverContent"].get("turnComplete", False): self.save_wav_file(complete_audio, output_file) break except KeyError: pass return # Successfully processed the dialogue except websockets.exceptions.ConnectionClosedError as e: attempt += 1 if attempt < max_retries: print(f"Connection closed: {e}. Retrying in 5 seconds... (Attempt {attempt}/{max_retries})") await asyncio.sleep(5) else: print(f"Max retries reached for dialogue: {dialogue}. Skipping.") raise e except Exception as e: attempt += 1 if attempt < max_retries: print(f"Error: {e}. Retrying in 5 seconds... (Attempt {attempt}/{max_retries})") await asyncio.sleep(5) else: print(f"Max retries reached for dialogue: {dialogue}. Skipping.") raise e def save_wav_file(self, audio_data, filename): with wave.open(filename, 'wb') as wav_file: wav_file.setnchannels(self.CHANNELS) wav_file.setsampwidth(2) wav_file.setframerate(self.SAMPLE_RATE) # Convert mono to stereo by duplicating the audio data stereo_data = bytearray() for i in range(0, len(audio_data), 2): sample = audio_data[i:i+2] stereo_data.extend(sample) # Left channel stereo_data.extend(sample) # Right channel wav_file.writeframes(stereo_data) # Update the progress bar instead of printing if self.progress_bar: self.progress_bar.update(1) # Optionally, you can remove the following line if you don't want any output # print(f"Audio saved to {filename}") async def run(self, dialogues, voices, output_files): tasks = [] for dialogue, voice, output_file in zip(dialogues, voices, output_files): tasks.append(self.process_dialogue(dialogue, voice, output_file)) await asyncio.gather(*tasks) def parse_conversation(file_path): with open(file_path, 'r', encoding='utf-8') as file: content = file.read() lines = content.strip().split('\n') speaker_a_lines = [] speaker_b_lines = [] for line in lines: if line.strip(): if line.startswith("Speaker A:"): speaker_a_lines.append(line.replace("Speaker A:", "").strip()) elif line.startswith("Speaker B:"): speaker_b_lines.append(line.replace("Speaker B:", "").strip()) return speaker_a_lines, speaker_b_lines def combine_audio_files(file_list, output_file): combined = AudioSegment.empty() for file in file_list: audio = AudioSegment.from_wav(file) # Ensure stereo if audio.channels == 1: audio = audio.set_channels(2) combined += audio combined.export(output_file, format="wav") def read_file_content(file_path): with open(file_path, 'r', encoding='utf-8') as file: return file.read() async def setup_environment(): if not os.getenv('GOOGLE_API_KEY'): raise EnvironmentError("GOOGLE_API_KEY not found in environment variables") script_dir = os.path.dirname(os.path.abspath(__file__)) return script_dir def read_and_parse_inputs(): system_instructions = read_file_content('system_instructions_audio.txt') full_script = read_file_content('podcast_script.txt') speaker_a_lines, speaker_b_lines = parse_conversation('podcast_script.txt') return system_instructions, full_script, speaker_a_lines, speaker_b_lines def prepare_speaker_dialogues(system_instructions, full_script, speaker_lines, voice, temp_dir): dialogues = [system_instructions + "\n\n" + full_script] voices = [voice] output_files = [os.path.join(temp_dir, f"speaker_{voice}_initial.wav")] for i, line in enumerate(speaker_lines): dialogues.append(line) voices.append(voice) output_files.append(os.path.join(temp_dir, f"speaker_{voice}_{i}.wav")) return dialogues, voices, output_files async def process_speaker(generator, dialogues, output_files, voices): await generator.run(dialogues, voices, output_files) def interleave_output_files(speaker_a_files, speaker_b_files): all_output_files = [] min_length = min(len(speaker_a_files), len(speaker_b_files)) for i in range(min_length): all_output_files.extend([speaker_a_files[i], speaker_b_files[i]]) all_output_files.extend(speaker_a_files[min_length:]) all_output_files.extend(speaker_b_files[min_length:]) return all_output_files async def main(): script_dir = await setup_environment() with tempfile.TemporaryDirectory(dir=script_dir) as temp_dir: system_instructions, full_script, speaker_a_lines, speaker_b_lines = read_and_parse_inputs() dialogues_a, voices_a, output_files_a = prepare_speaker_dialogues( system_instructions, full_script, speaker_a_lines, VOICE_A, temp_dir) dialogues_b, voices_b, output_files_b = prepare_speaker_dialogues( system_instructions, full_script, speaker_b_lines, VOICE_B, temp_dir) # Calculate total number of audio files to generate total_audio_files = len(output_files_a) + len(output_files_b) # Initialize the tqdm progress bar with tqdm(total=total_audio_files, desc="Generating audio files", unit="file") as progress_bar: # Initialize the AudioGenerator with the progress bar generator = AudioGenerator(progress_bar=progress_bar) # Process both speakers concurrently await asyncio.gather( process_speaker(generator, dialogues_a, output_files_a, voices_a), process_speaker(generator, dialogues_b, output_files_b, voices_b) ) # No need to manually close the progress bar as it's handled by the context manager all_output_files = interleave_output_files(output_files_a[1:], output_files_b[1:]) final_output = "final_podcast.wav" combine_audio_files(all_output_files, final_output) print(f"\nFinal podcast audio created: {final_output}") print("Temporary files cleaned up") if __name__ == "__main__": asyncio.run(main())
Processing Dialogues Individually:
process_dialogue
Method: A new method process_dialogue
is introduced within the AudioGenerator
class. This method handles the entire lifecycle of a single dialogue: establishing a connection, sending the setup and dialogue messages, receiving the audio, and saving the audio file.max_retries
times before skipping the problematic dialogue.Refactored run
Method:
run
method now iterates over each dialogue, voice, and output file, creating a list of tasks that are executed concurrently using asyncio.gather
. This ensures that each dialogue is processed in its own connection without exceeding session limits.Progress Bar Integration:
save_wav_file
method after each audio file is successfully saved. This provides real-time feedback on the progress of audio generation.Handling Multiple Speakers:
Combining Audio Files:
combine_audio_files
function.Compliance with Session Limits: By establishing a new WebSocket connection for each dialogue, you ensure that no single connection exceeds the 15-minute (or 2-minute) session duration limit.
Robustness: The retry mechanism ensures that transient network issues or temporary API outages do not halt the entire audio generation process.
Scalability: Processing dialogues concurrently (while still respecting session limits) can significantly speed up the overall execution time, especially for large scripts.
Maintainability: The refactored code is modular, making it easier to understand, maintain, and extend in the future.
Rate Limiting: Depending on the API's rate limits, you might need to implement rate limiting to avoid exceeding allowed request rates. You can use tools like asyncio.Semaphore
to control the number of concurrent connections.
Logging: Implementing a more sophisticated logging mechanism can help in debugging and monitoring the process, especially when dealing with a large number of dialogues.
Error Handling Enhancements: Consider handling specific exceptions more gracefully and providing more informative error messages to aid in troubleshooting.
Configuration Management: Externalize parameters like max_retries
, retry_delay
, and others to configuration files or environment variables for easier adjustments without modifying the code.
By implementing these changes, your application will be better equipped to handle the session duration limits imposed by Google's Gemini 2 Flash API, ensuring a smooth and uninterrupted audio generation process.
that's not working at all:
onnection closed: received 1011 (internal error) Request trace id: 28b49dfaa440f090, Quota exceeded for quota metric 'Bidi generate content active sessions' and limit 'Bidi; then sent 1011 (internal error) Request trace id: 28b49dfaa440f090, Quota exceeded for quota metric 'Bidi generate content active sessions' and limit 'Bidi. Retrying in 5 seconds... (Attempt 1/3)
I apologize for the inconvenience you're experiencing. The error message:
Connection closed: received 1011 (internal error) Request trace id: 28b49dfaa440f090, Quota exceeded for quota metric 'Bidi generate content active sessions' and limit 'Bidi. Retrying in 5 seconds... (Attempt 1/3)
indicates that your application is exceeding the quota for active bidirectional (Bidi) generate content sessions. This typically happens when too many WebSocket connections are opened simultaneously, surpassing the allowed limit set by the Google API.
To resolve this issue, you need to limit the number of concurrent WebSocket connections. The most straightforward way to achieve this is by processing dialogues sequentially rather than concurrently. This ensures that only one active session is maintained at any given time, adhering to the quota restrictions.
Below, I'll provide a revised version of your code that processes each dialogue one by one, ensuring compliance with the session limits. Additionally, I'll include some best practices and enhancements to improve the overall robustness of your application.
pythonimport tempfile import asyncio import base64 import json import os import wave from websockets.asyncio.client import connect import websockets import pyaudio from dotenv import load_dotenv import sys from pydub import AudioSegment from tqdm import tqdm # Import tqdm for the progress bar load_dotenv() GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY') VOICE_A = os.getenv('VOICE_A', 'Puck') VOICE_B = os.getenv('VOICE_B', 'Kore') if sys.version_info < (3, 11): import taskgroup, exceptiongroup asyncio.TaskGroup = taskgroup.TaskGroup asyncio.ExceptionGroup = exceptiongroup.ExceptionGroup class AudioGenerator: def __init__(self, progress_bar=None): self.progress_bar = progress_bar # Initialize the progress bar # Audio configuration self.FORMAT = pyaudio.paInt16 self.CHANNELS = 2 self.SAMPLE_RATE = 24000 self.CHUNK_SIZE = 512 # API configuration self.host = 'generativelanguage.googleapis.com' self.model = "gemini-2.0-flash-exp" self.base_uri = f"wss://{self.host}/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key={GOOGLE_API_KEY}" # WebSocket configuration self.ws_options = { 'ping_interval': 20, 'ping_timeout': 10, 'close_timeout': 5 } async def process_dialogue(self, dialogue, voice, output_file, max_retries=3): attempt = 0 while attempt < max_retries: try: async with connect(self.base_uri, **self.ws_options) as ws: # Setup message setup_msg = { "setup": { "model": f"models/{self.model}", "generation_config": { "speech_config": { "voice_config": { "prebuilt_voice_config": { "voice_name": voice } } } } } } await ws.send(json.dumps(setup_msg)) setup_response = await ws.recv() # Optionally, handle setup_response if needed # Send dialogue text msg = { "client_content": { "turn_complete": True, "turns": [ {"role": "user", "parts": [{"text": dialogue}]} ] } } await ws.send(json.dumps(msg)) # Receive audio data complete_audio = bytearray() async for raw_response in ws: response = json.loads(raw_response) # Process audio data try: parts = response["serverContent"]["modelTurn"]["parts"] for part in parts: if "inlineData" in part: b64data = part["inlineData"]["data"] pcm_data = base64.b64decode(b64data) complete_audio.extend(pcm_data) except KeyError: pass # Check for completion try: if response["serverContent"].get("turnComplete", False): self.save_wav_file(complete_audio, output_file) break except KeyError: pass return # Successfully processed the dialogue except websockets.exceptions.ConnectionClosedError as e: attempt += 1 if attempt < max_retries: print(f"Connection closed: {e}. Retrying in 5 seconds... (Attempt {attempt}/{max_retries})") await asyncio.sleep(5) else: print(f"Max retries reached for dialogue: {dialogue}. Skipping.") raise e except Exception as e: attempt += 1 if attempt < max_retries: print(f"Error: {e}. Retrying in 5 seconds... (Attempt {attempt}/{max_retries})") await asyncio.sleep(5) else: print(f"Max retries reached for dialogue: {dialogue}. Skipping.") raise e def save_wav_file(self, audio_data, filename): with wave.open(filename, 'wb') as wav_file: wav_file.setnchannels(self.CHANNELS) wav_file.setsampwidth(2) wav_file.setframerate(self.SAMPLE_RATE) # Convert mono to stereo by duplicating the audio data stereo_data = bytearray() for i in range(0, len(audio_data), 2): sample = audio_data[i:i+2] stereo_data.extend(sample) # Left channel stereo_data.extend(sample) # Right channel wav_file.writeframes(stereo_data) # Update the progress bar instead of printing if self.progress_bar: self.progress_bar.update(1) # Optionally, you can remove the following line if you don't want any output # print(f"Audio saved to {filename}") async def run(self, dialogues, voices, output_files): for dialogue, voice, output_file in zip(dialogues, voices, output_files): await self.process_dialogue(dialogue, voice, output_file) def parse_conversation(file_path): with open(file_path, 'r', encoding='utf-8') as file: content = file.read() lines = content.strip().split('\n') speaker_a_lines = [] speaker_b_lines = [] for line in lines: if line.strip(): if line.startswith("Speaker A:"): speaker_a_lines.append(line.replace("Speaker A:", "").strip()) elif line.startswith("Speaker B:"): speaker_b_lines.append(line.replace("Speaker B:", "").strip()) return speaker_a_lines, speaker_b_lines def combine_audio_files(file_list, output_file): combined = AudioSegment.empty() for file in file_list: audio = AudioSegment.from_wav(file) # Ensure stereo if audio.channels == 1: audio = audio.set_channels(2) combined += audio combined.export(output_file, format="wav") def read_file_content(file_path): with open(file_path, 'r', encoding='utf-8') as file: return file.read() async def setup_environment(): if not os.getenv('GOOGLE_API_KEY'): raise EnvironmentError("GOOGLE_API_KEY not found in environment variables") script_dir = os.path.dirname(os.path.abspath(__file__)) return script_dir def read_and_parse_inputs(): system_instructions = read_file_content('system_instructions_audio.txt') full_script = read_file_content('podcast_script.txt') speaker_a_lines, speaker_b_lines = parse_conversation('podcast_script.txt') return system_instructions, full_script, speaker_a_lines, speaker_b_lines def prepare_speaker_dialogues(system_instructions, full_script, speaker_lines, voice, temp_dir): dialogues = [system_instructions + "\n\n" + full_script] voices = [voice] output_files = [os.path.join(temp_dir, f"speaker_{voice}_initial.wav")] for i, line in enumerate(speaker_lines): dialogues.append(line) voices.append(voice) output_files.append(os.path.join(temp_dir, f"speaker_{voice}_{i}.wav")) return dialogues, voices, output_files async def process_speaker(generator, dialogues, output_files, voices): await generator.run(dialogues, voices, output_files) def interleave_output_files(speaker_a_files, speaker_b_files): all_output_files = [] min_length = min(len(speaker_a_files), len(speaker_b_files)) for i in range(min_length): all_output_files.extend([speaker_a_files[i], speaker_b_files[i]]) all_output_files.extend(speaker_a_files[min_length:]) all_output_files.extend(speaker_b_files[min_length:]) return all_output_files async def main(): script_dir = await setup_environment() with tempfile.TemporaryDirectory(dir=script_dir) as temp_dir: system_instructions, full_script, speaker_a_lines, speaker_b_lines = read_and_parse_inputs() dialogues_a, voices_a, output_files_a = prepare_speaker_dialogues( system_instructions, full_script, speaker_a_lines, VOICE_A, temp_dir) dialogues_b, voices_b, output_files_b = prepare_speaker_dialogues( system_instructions, full_script, speaker_b_lines, VOICE_B, temp_dir) # Calculate total number of audio files to generate total_audio_files = len(output_files_a) + len(output_files_b) # Initialize the tqdm progress bar with tqdm(total=total_audio_files, desc="Generating audio files", unit="file") as progress_bar: # Initialize the AudioGenerator with the progress bar generator = AudioGenerator(progress_bar=progress_bar) # Process Speaker A await process_speaker(generator, dialogues_a, output_files_a, voices_a) # Process Speaker B await process_speaker(generator, dialogues_b, output_files_b, voices_b) # No need to manually close the progress bar as it's handled by the context manager all_output_files = interleave_output_files(output_files_a[1:], output_files_b[1:]) final_output = "final_podcast.wav" combine_audio_files(all_output_files, final_output) print(f"\nFinal podcast audio created: {final_output}") print("Temporary files cleaned up") if __name__ == "__main__": asyncio.run(main())
Sequential Processing of Dialogues:
Previous Approach: The original modification processed both speakers concurrently using asyncio.gather
, leading to multiple active WebSocket connections simultaneously. This exceeded the quota for active sessions.
Revised Approach: The updated code processes each dialogue sequentially. This is achieved by iterating over each dialogue in the run
method without using asyncio.gather
. This ensures that only one WebSocket connection is active at any given time, adhering to the quota limits.
pythonasync def run(self, dialogues, voices, output_files): for dialogue, voice, output_file in zip(dialogues, voices, output_files): await self.process_dialogue(dialogue, voice, output_file)
Processing Speakers Sequentially:
Previous Approach: Both Speaker A and Speaker B were processed concurrently, which likely resulted in multiple active sessions and quota exhaustion.
Revised Approach: Speakers are now processed one after the other. First, all dialogues for Speaker A are processed, followed by Speaker B. This further ensures that only one active session exists at any point.
python# Process Speaker A await process_speaker(generator, dialogues_a, output_files_a, voices_a) # Process Speaker B await process_speaker(generator, dialogues_b, output_files_b, voices_b)
Removed Concurrent Task Execution:
asyncio.gather
for processing multiple tasks concurrently has been removed to prevent exceeding the quota.Enhanced Error Handling:
process_dialogue
has been retained to retry connections up to max_retries
times in case of transient errors. If retries are exhausted, the dialogue is skipped, and the error is raised.Progress Bar Integration:
tqdm
) is updated after each audio file is successfully saved, providing real-time feedback on the progress of audio generation.Resource Cleanup:
with tempfile.TemporaryDirectory
ensures that temporary files are cleaned up automatically after processing.Monitor and Respect Quotas:
Understand Quotas: Review your Google Cloud Console to understand the exact quotas for your API usage. This includes the number of active sessions, requests per minute, etc.
Adjust Accordingly: If your application requirements exceed the current quotas, consider requesting an increase from Google. However, ensure that your application's design aligns with the quota limits to prevent interruptions.
Implement Rate Limiting:
asyncio.Semaphore
to control the number of concurrent sessions without exceeding the quota.pythonfrom asyncio import Semaphore class AudioGenerator: def __init__(self, progress_bar=None, max_concurrent_sessions=1): self.progress_bar = progress_bar self.semaphore = Semaphore(max_concurrent_sessions) # ... rest of the initialization async def process_dialogue(self, dialogue, voice, output_file, max_retries=3): async with self.semaphore: # existing process_dialogue code
max_concurrent_sessions
based on your quota limits.Implement Exponential Backoff:
Logging:
Optimize Dialogue Batching:
Graceful Shutdown:
By modifying your application to process dialogues sequentially, you ensure that only one active WebSocket connection exists at any given time. This approach aligns with the quota restrictions imposed by the Google Gemini 2 Flash API, preventing quota exceedance errors. Additionally, adhering to best practices like robust error handling, rate limiting, and effective resource management will enhance the reliability and maintainability of your application.
If you continue to face quota-related issues even after these adjustments, consider reaching out to Google Cloud support to discuss your usage patterns and explore potential quota increases.