Overcoming Session Duration Limit

Created on: December 22, 2024

Answered using O1 by Chat01

Question

In the google 2 flash documentation there is this limit:

Session duration is limited to up to 15 minutes for audio or up to 2 minutes of audio and video. When the session duration exceeds the limit, the connection is terminated.

So how can we modify the following code to overcome this limit.

import tempfile
import asyncio
import base64
import json
import os
import wave
from websockets.asyncio.client import connect
import websockets
import pyaudio
from dotenv import load_dotenv
import sys
from pydub import AudioSegment
from tqdm import tqdm # Import tqdm for the progress bar

load_dotenv()

GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY')
VOICE_A = os.getenv('VOICE_A', 'Puck')
VOICE_B = os.getenv('VOICE_B', 'Kore')

if sys.version_info < (3, 11):
import taskgroup, exceptiongroup
asyncio.TaskGroup = taskgroup.TaskGroup
asyncio.ExceptionGroup = exceptiongroup.ExceptionGroup

class AudioGenerator:
def init(self, progress_bar=None):
self.audio_in_queue = asyncio.Queue()
self.ws = None
self.ws_semaphore = asyncio.Semaphore(1)
self.progress_bar = progress_bar # Initialize the progress bar

text
# Audio configuration self.FORMAT = pyaudio.paInt16 self.CHANNELS = 2 self.SAMPLE_RATE = 24000 self.CHUNK_SIZE = 512 # WebSocket configuration self.ws_options = { 'ping_interval': 20, 'ping_timeout': 10, 'close_timeout': 5 } # API configuration self.host = 'generativelanguage.googleapis.com' self.model = "gemini-2.0-flash-exp" self.uri = f"wss://{self.host}/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key={GOOGLE_API_KEY}" # Store complete audio data self.complete_audio = bytearray() async def startup(self, voice): async with self.ws_semaphore: setup_msg = { "setup": { "model": f"models/{self.model}", "generation_config": { "speech_config": { "voice_config": { "prebuilt_voice_config": { "voice_name": voice } } } } } } await self.ws.send(json.dumps(setup_msg)) response = await self.ws.recv() async def send_text(self, text, voice): async with self.ws_semaphore: msg = { "client_content": { "turn_complete": True, "turns": [ {"role": "user", "parts": [{"text": text}]} ] } } await self.ws.send(json.dumps(msg)) async def receive_audio(self, output_file): async with self.ws_semaphore: self.complete_audio.clear() await asyncio.sleep(0.1) try: async for raw_response in self.ws: response = json.loads(raw_response) # Process audio data try: parts = response["serverContent"]["modelTurn"]["parts"] for part in parts: if "inlineData" in part: b64data = part["inlineData"]["data"] pcm_data = base64.b64decode(b64data) self.complete_audio.extend(pcm_data) self.audio_in_queue.put_nowait(pcm_data) except KeyError: pass # Check for completion try: if response["serverContent"].get("turnComplete", False): self.save_wav_file(output_file) while not self.audio_in_queue.empty(): self.audio_in_queue.get_nowait() break except KeyError: pass except websockets.exceptions.ConnectionClosedError as e: print(f"Connection closed: {e}") raise def save_wav_file(self, filename): with wave.open(filename, 'wb') as wav_file: wav_file.setnchannels(self.CHANNELS) wav_file.setsampwidth(2) wav_file.setframerate(self.SAMPLE_RATE) # Convert mono to stereo by duplicating the audio data stereo_data = bytearray() for i in range(0, len(self.complete_audio), 2): sample = self.complete_audio[i:i+2] stereo_data.extend(sample) # Left channel stereo_data.extend(sample) # Right channel wav_file.writeframes(stereo_data) # Update the progress bar instead of printing if self.progress_bar: self.progress_bar.update(1) # Optionally, you can remove the following line if you don't want any output # print(f"Audio saved to {filename}") async def run(self, dialogues, output_files, voices, max_retries=3): last_exception = None for attempt in range(max_retries): try: async with await connect(self.uri, **self.ws_options) as ws: self.ws = ws await self.startup(voices[0]) # Process dialogues sequentially for i in range(len(dialogues)): await self.send_text(dialogues[i], voices[i]) await self.receive_audio(output_files[i]) return except websockets.exceptions.ConnectionClosedError as e: last_exception = e if attempt < max_retries - 1: print(f"Connection lost. Retrying in 5 seconds... (Attempt {attempt + 1}/{max_retries})") await asyncio.sleep(5) else: print("Max retries reached. Unable to reconnect.") raise last_exception

def parse_conversation(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()

text
lines = content.strip().split('\n') speaker_a_lines = [] speaker_b_lines = [] for line in lines: if line.strip(): if line.startswith("Speaker A:"): speaker_a_lines.append(line.replace("Speaker A:", "").strip()) elif line.startswith("Speaker B:"): speaker_b_lines.append(line.replace("Speaker B:", "").strip()) return speaker_a_lines, speaker_b_lines

def combine_audio_files(file_list, output_file):
combined = AudioSegment.empty()
for file in file_list:
audio = AudioSegment.from_wav(file)
# Ensure stereo
if audio.channels == 1:
audio = audio.set_channels(2)
combined += audio
combined.export(output_file, format="wav")

def read_file_content(file_path):
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()

async def setup_environment():
if not os.getenv('GOOGLE_API_KEY'):
raise EnvironmentError("GOOGLE_API_KEY not found in environment variables")
script_dir = os.path.dirname(os.path.abspath(file))
return script_dir

def read_and_parse_inputs():
system_instructions = read_file_content('system_instructions_audio.txt')
full_script = read_file_content('podcast_script.txt')
speaker_a_lines, speaker_b_lines = parse_conversation('podcast_script.txt')
return system_instructions, full_script, speaker_a_lines, speaker_b_lines

def prepare_speaker_dialogues(system_instructions, full_script, speaker_lines, voice, temp_dir):
dialogues = [system_instructions + "\n\n" + full_script]
voices = [voice]
output_files = [os.path.join(temp_dir, f"speaker_{voice}_initial.wav")]

text
for i, line in enumerate(speaker_lines): dialogues.append(line) voices.append(voice) output_files.append(os.path.join(temp_dir, f"speaker_{voice}_{i}.wav")) return dialogues, voices, output_files

async def process_speaker(generator, dialogues, output_files, voices):
await generator.run(dialogues, output_files, voices)

def interleave_output_files(speaker_a_files, speaker_b_files):
all_output_files = []
min_length = min(len(speaker_a_files), len(speaker_b_files))

text
for i in range(min_length): all_output_files.extend([speaker_a_files[i], speaker_b_files[i]]) all_output_files.extend(speaker_a_files[min_length:]) all_output_files.extend(speaker_b_files[min_length:]) return all_output_files

async def main():
script_dir = await setup_environment()

text
with tempfile.TemporaryDirectory(dir=script_dir) as temp_dir: system_instructions, full_script, speaker_a_lines, speaker_b_lines = read_and_parse_inputs() dialogues_a, voices_a, output_files_a = prepare_speaker_dialogues( system_instructions, full_script, speaker_a_lines, VOICE_A, temp_dir) dialogues_b, voices_b, output_files_b = prepare_speaker_dialogues( system_instructions, full_script, speaker_b_lines, VOICE_B, temp_dir) # Calculate total number of audio files to generate total_audio_files = len(output_files_a) + len(output_files_b) # Initialize the tqdm progress bar with tqdm(total=total_audio_files, desc="Generating audio files", unit="file") as progress_bar: # Initialize the AudioGenerator with the progress bar generator = AudioGenerator(progress_bar=progress_bar) # Process both speakers await process_speaker(generator, dialogues_a, output_files_a, voices_a) await process_speaker(generator, dialogues_b, output_files_b, voices_b) # No need to manually close the progress bar as it's handled by the context manager all_output_files = interleave_output_files(output_files_a[1:], output_files_b[1:]) final_output = "final_podcast.wav" combine_audio_files(all_output_files, final_output) print(f"\nFinal podcast audio created: {final_output}") print("Temporary files cleaned up")

if name == "main":
asyncio.run(main())

Thought

Tackling session limits

The user is working with Google’s Gemini 2 Flash API, facing session time limits of 15 minutes for audio and 2 minutes for audio-video. They are modifying code to handle reconnections auto for maintaining seamless sesiion flow.

Taking a closer look

Modifying the run method to re-establish the WebSocket connection when sessions nearing their limit. This ensures timely processing of dialogues, avoiding non-functioning due to session expiration.

Reworking connection strategy

I'm exploring the idea of establishing a WebSocket connection per dialogue to keep within session limits, ensuring each processing session stays within time constraints.

Batch processing for dialogue management

I’m concluding that segmenting dialogues into smaller batches, each under the session time limit, and employing WebSocket connections per batch, offers a balanced solution.

Deciding connections per dialogue

To manage session limits, I’m thinking the "run" method should open a WebSocket for each dialogue. This method ensures the connection fits within the session time parameters.

Opting for dialogue-based connections

I’m mapping out a simpler approach by focusing on individual dialogues, establishing separate WebSocket connections for each. This ensures the processing aligns with session time constraints efficiently.

Crafting dialogue connections

I’m tweaking the AudioGenerator class by making the run method handle each dialogue separately. This involves setting up, sending, receiving, and shutting down WebSocket connections.

Thought for 7 seconds

Answer

To overcome the session duration limits imposed by Google's Gemini 2 Flash API (15 minutes for audio or 2 minutes for audio and video), you need to ensure that each WebSocket connection does not exceed these time constraints. One effective way to achieve this is by establishing a new WebSocket connection for each dialogue instead of maintaining a single long-lived connection. This approach ensures that each interaction stays within the allowed session duration, thereby preventing unexpected connection terminations.

Below, I'll guide you through modifying your existing code to implement this strategy. The key changes involve refactoring the AudioGenerator class to handle individual connections per dialogue and updating the run method accordingly.

Key Modifications:

  1. Establishing Connections Per Dialogue:

    • Instead of opening a single WebSocket connection for all dialogues, open a new connection for each dialogue.
  2. Refactoring the run Method:

    • Modify the run method to iterate over each dialogue, establish a connection, send the text, receive the audio, and then close the connection.
  3. Handling Retries Individually:

    • Implement retry logic for each dialogue to handle transient connection issues without affecting other dialogues.
  4. Optimizing Progress Tracking:

    • Update the progress bar after each dialogue is processed to provide real-time feedback.

Updated Code:

Below is the refactored version of your code with the necessary modifications to handle the session duration limits effectively.

python
import tempfile import asyncio import base64 import json import os import wave from websockets.asyncio.client import connect import websockets import pyaudio from dotenv import load_dotenv import sys from pydub import AudioSegment from tqdm import tqdm # Import tqdm for the progress bar load_dotenv() GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY') VOICE_A = os.getenv('VOICE_A', 'Puck') VOICE_B = os.getenv('VOICE_B', 'Kore') if sys.version_info < (3, 11): import taskgroup, exceptiongroup asyncio.TaskGroup = taskgroup.TaskGroup asyncio.ExceptionGroup = exceptiongroup.ExceptionGroup class AudioGenerator: def __init__(self, progress_bar=None): self.progress_bar = progress_bar # Initialize the progress bar # Audio configuration self.FORMAT = pyaudio.paInt16 self.CHANNELS = 2 self.SAMPLE_RATE = 24000 self.CHUNK_SIZE = 512 # API configuration self.host = 'generativelanguage.googleapis.com' self.model = "gemini-2.0-flash-exp" self.base_uri = f"wss://{self.host}/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key={GOOGLE_API_KEY}" # WebSocket configuration self.ws_options = { 'ping_interval': 20, 'ping_timeout': 10, 'close_timeout': 5 } async def process_dialogue(self, dialogue, voice, output_file, max_retries=3): attempt = 0 while attempt < max_retries: try: async with connect(self.base_uri, **self.ws_options) as ws: # Setup message setup_msg = { "setup": { "model": f"models/{self.model}", "generation_config": { "speech_config": { "voice_config": { "prebuilt_voice_config": { "voice_name": voice } } } } } } await ws.send(json.dumps(setup_msg)) setup_response = await ws.recv() # Optionally, handle setup_response if needed # Send dialogue text msg = { "client_content": { "turn_complete": True, "turns": [ {"role": "user", "parts": [{"text": dialogue}]} ] } } await ws.send(json.dumps(msg)) # Receive audio data complete_audio = bytearray() async for raw_response in ws: response = json.loads(raw_response) # Process audio data try: parts = response["serverContent"]["modelTurn"]["parts"] for part in parts: if "inlineData" in part: b64data = part["inlineData"]["data"] pcm_data = base64.b64decode(b64data) complete_audio.extend(pcm_data) except KeyError: pass # Check for completion try: if response["serverContent"].get("turnComplete", False): self.save_wav_file(complete_audio, output_file) break except KeyError: pass return # Successfully processed the dialogue except websockets.exceptions.ConnectionClosedError as e: attempt += 1 if attempt < max_retries: print(f"Connection closed: {e}. Retrying in 5 seconds... (Attempt {attempt}/{max_retries})") await asyncio.sleep(5) else: print(f"Max retries reached for dialogue: {dialogue}. Skipping.") raise e except Exception as e: attempt += 1 if attempt < max_retries: print(f"Error: {e}. Retrying in 5 seconds... (Attempt {attempt}/{max_retries})") await asyncio.sleep(5) else: print(f"Max retries reached for dialogue: {dialogue}. Skipping.") raise e def save_wav_file(self, audio_data, filename): with wave.open(filename, 'wb') as wav_file: wav_file.setnchannels(self.CHANNELS) wav_file.setsampwidth(2) wav_file.setframerate(self.SAMPLE_RATE) # Convert mono to stereo by duplicating the audio data stereo_data = bytearray() for i in range(0, len(audio_data), 2): sample = audio_data[i:i+2] stereo_data.extend(sample) # Left channel stereo_data.extend(sample) # Right channel wav_file.writeframes(stereo_data) # Update the progress bar instead of printing if self.progress_bar: self.progress_bar.update(1) # Optionally, you can remove the following line if you don't want any output # print(f"Audio saved to {filename}") async def run(self, dialogues, voices, output_files): tasks = [] for dialogue, voice, output_file in zip(dialogues, voices, output_files): tasks.append(self.process_dialogue(dialogue, voice, output_file)) await asyncio.gather(*tasks) def parse_conversation(file_path): with open(file_path, 'r', encoding='utf-8') as file: content = file.read() lines = content.strip().split('\n') speaker_a_lines = [] speaker_b_lines = [] for line in lines: if line.strip(): if line.startswith("Speaker A:"): speaker_a_lines.append(line.replace("Speaker A:", "").strip()) elif line.startswith("Speaker B:"): speaker_b_lines.append(line.replace("Speaker B:", "").strip()) return speaker_a_lines, speaker_b_lines def combine_audio_files(file_list, output_file): combined = AudioSegment.empty() for file in file_list: audio = AudioSegment.from_wav(file) # Ensure stereo if audio.channels == 1: audio = audio.set_channels(2) combined += audio combined.export(output_file, format="wav") def read_file_content(file_path): with open(file_path, 'r', encoding='utf-8') as file: return file.read() async def setup_environment(): if not os.getenv('GOOGLE_API_KEY'): raise EnvironmentError("GOOGLE_API_KEY not found in environment variables") script_dir = os.path.dirname(os.path.abspath(__file__)) return script_dir def read_and_parse_inputs(): system_instructions = read_file_content('system_instructions_audio.txt') full_script = read_file_content('podcast_script.txt') speaker_a_lines, speaker_b_lines = parse_conversation('podcast_script.txt') return system_instructions, full_script, speaker_a_lines, speaker_b_lines def prepare_speaker_dialogues(system_instructions, full_script, speaker_lines, voice, temp_dir): dialogues = [system_instructions + "\n\n" + full_script] voices = [voice] output_files = [os.path.join(temp_dir, f"speaker_{voice}_initial.wav")] for i, line in enumerate(speaker_lines): dialogues.append(line) voices.append(voice) output_files.append(os.path.join(temp_dir, f"speaker_{voice}_{i}.wav")) return dialogues, voices, output_files async def process_speaker(generator, dialogues, output_files, voices): await generator.run(dialogues, voices, output_files) def interleave_output_files(speaker_a_files, speaker_b_files): all_output_files = [] min_length = min(len(speaker_a_files), len(speaker_b_files)) for i in range(min_length): all_output_files.extend([speaker_a_files[i], speaker_b_files[i]]) all_output_files.extend(speaker_a_files[min_length:]) all_output_files.extend(speaker_b_files[min_length:]) return all_output_files async def main(): script_dir = await setup_environment() with tempfile.TemporaryDirectory(dir=script_dir) as temp_dir: system_instructions, full_script, speaker_a_lines, speaker_b_lines = read_and_parse_inputs() dialogues_a, voices_a, output_files_a = prepare_speaker_dialogues( system_instructions, full_script, speaker_a_lines, VOICE_A, temp_dir) dialogues_b, voices_b, output_files_b = prepare_speaker_dialogues( system_instructions, full_script, speaker_b_lines, VOICE_B, temp_dir) # Calculate total number of audio files to generate total_audio_files = len(output_files_a) + len(output_files_b) # Initialize the tqdm progress bar with tqdm(total=total_audio_files, desc="Generating audio files", unit="file") as progress_bar: # Initialize the AudioGenerator with the progress bar generator = AudioGenerator(progress_bar=progress_bar) # Process both speakers concurrently await asyncio.gather( process_speaker(generator, dialogues_a, output_files_a, voices_a), process_speaker(generator, dialogues_b, output_files_b, voices_b) ) # No need to manually close the progress bar as it's handled by the context manager all_output_files = interleave_output_files(output_files_a[1:], output_files_b[1:]) final_output = "final_podcast.wav" combine_audio_files(all_output_files, final_output) print(f"\nFinal podcast audio created: {final_output}") print("Temporary files cleaned up") if __name__ == "__main__": asyncio.run(main())

Detailed Explanation of Changes:

  1. Processing Dialogues Individually:

    • process_dialogue Method: A new method process_dialogue is introduced within the AudioGenerator class. This method handles the entire lifecycle of a single dialogue: establishing a connection, sending the setup and dialogue messages, receiving the audio, and saving the audio file.
    • Retry Logic: The method includes retry logic to handle transient errors. If a connection is closed unexpectedly, it retries up to max_retries times before skipping the problematic dialogue.
  2. Refactored run Method:

    • The run method now iterates over each dialogue, voice, and output file, creating a list of tasks that are executed concurrently using asyncio.gather. This ensures that each dialogue is processed in its own connection without exceeding session limits.
  3. Progress Bar Integration:

    • The progress bar is updated within the save_wav_file method after each audio file is successfully saved. This provides real-time feedback on the progress of audio generation.
  4. Handling Multiple Speakers:

    • The main function processes both Speaker A and Speaker B concurrently by creating separate lists of dialogues, voices, and output files for each speaker. This parallel processing optimizes the overall execution time while maintaining session limits.
  5. Combining Audio Files:

    • After all dialogues are processed, the audio files are interleaved and combined into a final podcast audio file using the combine_audio_files function.

Benefits of This Approach:

  • Compliance with Session Limits: By establishing a new WebSocket connection for each dialogue, you ensure that no single connection exceeds the 15-minute (or 2-minute) session duration limit.

  • Robustness: The retry mechanism ensures that transient network issues or temporary API outages do not halt the entire audio generation process.

  • Scalability: Processing dialogues concurrently (while still respecting session limits) can significantly speed up the overall execution time, especially for large scripts.

  • Maintainability: The refactored code is modular, making it easier to understand, maintain, and extend in the future.

Additional Recommendations:

  1. Rate Limiting: Depending on the API's rate limits, you might need to implement rate limiting to avoid exceeding allowed request rates. You can use tools like asyncio.Semaphore to control the number of concurrent connections.

  2. Logging: Implementing a more sophisticated logging mechanism can help in debugging and monitoring the process, especially when dealing with a large number of dialogues.

  3. Error Handling Enhancements: Consider handling specific exceptions more gracefully and providing more informative error messages to aid in troubleshooting.

  4. Configuration Management: Externalize parameters like max_retries, retry_delay, and others to configuration files or environment variables for easier adjustments without modifying the code.

By implementing these changes, your application will be better equipped to handle the session duration limits imposed by Google's Gemini 2 Flash API, ensuring a smooth and uninterrupted audio generation process.

Question

that's not working at all:

onnection closed: received 1011 (internal error) Request trace id: 28b49dfaa440f090, Quota exceeded for quota metric 'Bidi generate content active sessions' and limit 'Bidi; then sent 1011 (internal error) Request trace id: 28b49dfaa440f090, Quota exceeded for quota metric 'Bidi generate content active sessions' and limit 'Bidi. Retrying in 5 seconds... (Attempt 1/3)

Thought

Addressing quota constraint

Wondering how to handle multiple WebSocket sessions? The issue might be hitting the 'Bidi generate content active sessions' quota. Limiting to one concurrent session could be a neat solution.

Evaluating strategies

I'm considering methods to tackle the WebSocket quota issue. Options include processing dialogues individually rather than in parallel, adding a semaphore to manage concurrent tasks, or re-evaluating connection handling.

Adjusting connections

I’m working through strategies to tackle the 'Bidi generate content active sessions' quota. Limiting concurrent connections, processing dialogues one at a time, or tweaking the API quota could work.

Streamlining dialogue handling

I’m digging into the run method, noting its use of asyncio.gather for both speakers. To better manage active session quotas, processing dialogues sequentially or limiting concurrent sessions appears crucial.

Adjusting session handling

I'm reevaluating session configurations to balance multiple dialogues with quota limits. The aim is to streamline connections per session while staying within the session duration constraints.

Optimizing dialogue handling

I've realized that processing dialogues sequentially, managing only one active session at a time, is crucial for avoiding quota issues and achieving the best results.

Mapping the process

I’m piecing together a sequential dialogue handling method to avoid quota issues. This involves examining the run method and tweaking connections to ensure they close properly after each dialogue.

Embracing serial processing

I'm thinking about tweaking the code to handle dialogues one by one, ensuring only one active connection. Consistent review of quota limits remains a key focus.

Ensuring method

I’m tweaking the run method to process dialogues one by one, maintaining a single active session. This avoids exceeding quota limits and ensures every connection is closed correctly after each dialogue.

Thought for 11 seconds

Answer

I apologize for the inconvenience you're experiencing. The error message:

Connection closed: received 1011 (internal error) Request trace id: 28b49dfaa440f090, Quota exceeded for quota metric 'Bidi generate content active sessions' and limit 'Bidi. Retrying in 5 seconds... (Attempt 1/3)

indicates that your application is exceeding the quota for active bidirectional (Bidi) generate content sessions. This typically happens when too many WebSocket connections are opened simultaneously, surpassing the allowed limit set by the Google API.

To resolve this issue, you need to limit the number of concurrent WebSocket connections. The most straightforward way to achieve this is by processing dialogues sequentially rather than concurrently. This ensures that only one active session is maintained at any given time, adhering to the quota restrictions.

Below, I'll provide a revised version of your code that processes each dialogue one by one, ensuring compliance with the session limits. Additionally, I'll include some best practices and enhancements to improve the overall robustness of your application.

Revised Code: Sequential Processing of Dialogues

python
import tempfile import asyncio import base64 import json import os import wave from websockets.asyncio.client import connect import websockets import pyaudio from dotenv import load_dotenv import sys from pydub import AudioSegment from tqdm import tqdm # Import tqdm for the progress bar load_dotenv() GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY') VOICE_A = os.getenv('VOICE_A', 'Puck') VOICE_B = os.getenv('VOICE_B', 'Kore') if sys.version_info < (3, 11): import taskgroup, exceptiongroup asyncio.TaskGroup = taskgroup.TaskGroup asyncio.ExceptionGroup = exceptiongroup.ExceptionGroup class AudioGenerator: def __init__(self, progress_bar=None): self.progress_bar = progress_bar # Initialize the progress bar # Audio configuration self.FORMAT = pyaudio.paInt16 self.CHANNELS = 2 self.SAMPLE_RATE = 24000 self.CHUNK_SIZE = 512 # API configuration self.host = 'generativelanguage.googleapis.com' self.model = "gemini-2.0-flash-exp" self.base_uri = f"wss://{self.host}/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key={GOOGLE_API_KEY}" # WebSocket configuration self.ws_options = { 'ping_interval': 20, 'ping_timeout': 10, 'close_timeout': 5 } async def process_dialogue(self, dialogue, voice, output_file, max_retries=3): attempt = 0 while attempt < max_retries: try: async with connect(self.base_uri, **self.ws_options) as ws: # Setup message setup_msg = { "setup": { "model": f"models/{self.model}", "generation_config": { "speech_config": { "voice_config": { "prebuilt_voice_config": { "voice_name": voice } } } } } } await ws.send(json.dumps(setup_msg)) setup_response = await ws.recv() # Optionally, handle setup_response if needed # Send dialogue text msg = { "client_content": { "turn_complete": True, "turns": [ {"role": "user", "parts": [{"text": dialogue}]} ] } } await ws.send(json.dumps(msg)) # Receive audio data complete_audio = bytearray() async for raw_response in ws: response = json.loads(raw_response) # Process audio data try: parts = response["serverContent"]["modelTurn"]["parts"] for part in parts: if "inlineData" in part: b64data = part["inlineData"]["data"] pcm_data = base64.b64decode(b64data) complete_audio.extend(pcm_data) except KeyError: pass # Check for completion try: if response["serverContent"].get("turnComplete", False): self.save_wav_file(complete_audio, output_file) break except KeyError: pass return # Successfully processed the dialogue except websockets.exceptions.ConnectionClosedError as e: attempt += 1 if attempt < max_retries: print(f"Connection closed: {e}. Retrying in 5 seconds... (Attempt {attempt}/{max_retries})") await asyncio.sleep(5) else: print(f"Max retries reached for dialogue: {dialogue}. Skipping.") raise e except Exception as e: attempt += 1 if attempt < max_retries: print(f"Error: {e}. Retrying in 5 seconds... (Attempt {attempt}/{max_retries})") await asyncio.sleep(5) else: print(f"Max retries reached for dialogue: {dialogue}. Skipping.") raise e def save_wav_file(self, audio_data, filename): with wave.open(filename, 'wb') as wav_file: wav_file.setnchannels(self.CHANNELS) wav_file.setsampwidth(2) wav_file.setframerate(self.SAMPLE_RATE) # Convert mono to stereo by duplicating the audio data stereo_data = bytearray() for i in range(0, len(audio_data), 2): sample = audio_data[i:i+2] stereo_data.extend(sample) # Left channel stereo_data.extend(sample) # Right channel wav_file.writeframes(stereo_data) # Update the progress bar instead of printing if self.progress_bar: self.progress_bar.update(1) # Optionally, you can remove the following line if you don't want any output # print(f"Audio saved to {filename}") async def run(self, dialogues, voices, output_files): for dialogue, voice, output_file in zip(dialogues, voices, output_files): await self.process_dialogue(dialogue, voice, output_file) def parse_conversation(file_path): with open(file_path, 'r', encoding='utf-8') as file: content = file.read() lines = content.strip().split('\n') speaker_a_lines = [] speaker_b_lines = [] for line in lines: if line.strip(): if line.startswith("Speaker A:"): speaker_a_lines.append(line.replace("Speaker A:", "").strip()) elif line.startswith("Speaker B:"): speaker_b_lines.append(line.replace("Speaker B:", "").strip()) return speaker_a_lines, speaker_b_lines def combine_audio_files(file_list, output_file): combined = AudioSegment.empty() for file in file_list: audio = AudioSegment.from_wav(file) # Ensure stereo if audio.channels == 1: audio = audio.set_channels(2) combined += audio combined.export(output_file, format="wav") def read_file_content(file_path): with open(file_path, 'r', encoding='utf-8') as file: return file.read() async def setup_environment(): if not os.getenv('GOOGLE_API_KEY'): raise EnvironmentError("GOOGLE_API_KEY not found in environment variables") script_dir = os.path.dirname(os.path.abspath(__file__)) return script_dir def read_and_parse_inputs(): system_instructions = read_file_content('system_instructions_audio.txt') full_script = read_file_content('podcast_script.txt') speaker_a_lines, speaker_b_lines = parse_conversation('podcast_script.txt') return system_instructions, full_script, speaker_a_lines, speaker_b_lines def prepare_speaker_dialogues(system_instructions, full_script, speaker_lines, voice, temp_dir): dialogues = [system_instructions + "\n\n" + full_script] voices = [voice] output_files = [os.path.join(temp_dir, f"speaker_{voice}_initial.wav")] for i, line in enumerate(speaker_lines): dialogues.append(line) voices.append(voice) output_files.append(os.path.join(temp_dir, f"speaker_{voice}_{i}.wav")) return dialogues, voices, output_files async def process_speaker(generator, dialogues, output_files, voices): await generator.run(dialogues, voices, output_files) def interleave_output_files(speaker_a_files, speaker_b_files): all_output_files = [] min_length = min(len(speaker_a_files), len(speaker_b_files)) for i in range(min_length): all_output_files.extend([speaker_a_files[i], speaker_b_files[i]]) all_output_files.extend(speaker_a_files[min_length:]) all_output_files.extend(speaker_b_files[min_length:]) return all_output_files async def main(): script_dir = await setup_environment() with tempfile.TemporaryDirectory(dir=script_dir) as temp_dir: system_instructions, full_script, speaker_a_lines, speaker_b_lines = read_and_parse_inputs() dialogues_a, voices_a, output_files_a = prepare_speaker_dialogues( system_instructions, full_script, speaker_a_lines, VOICE_A, temp_dir) dialogues_b, voices_b, output_files_b = prepare_speaker_dialogues( system_instructions, full_script, speaker_b_lines, VOICE_B, temp_dir) # Calculate total number of audio files to generate total_audio_files = len(output_files_a) + len(output_files_b) # Initialize the tqdm progress bar with tqdm(total=total_audio_files, desc="Generating audio files", unit="file") as progress_bar: # Initialize the AudioGenerator with the progress bar generator = AudioGenerator(progress_bar=progress_bar) # Process Speaker A await process_speaker(generator, dialogues_a, output_files_a, voices_a) # Process Speaker B await process_speaker(generator, dialogues_b, output_files_b, voices_b) # No need to manually close the progress bar as it's handled by the context manager all_output_files = interleave_output_files(output_files_a[1:], output_files_b[1:]) final_output = "final_podcast.wav" combine_audio_files(all_output_files, final_output) print(f"\nFinal podcast audio created: {final_output}") print("Temporary files cleaned up") if __name__ == "__main__": asyncio.run(main())

Key Changes and Explanations

  1. Sequential Processing of Dialogues:

    • Previous Approach: The original modification processed both speakers concurrently using asyncio.gather, leading to multiple active WebSocket connections simultaneously. This exceeded the quota for active sessions.

    • Revised Approach: The updated code processes each dialogue sequentially. This is achieved by iterating over each dialogue in the run method without using asyncio.gather. This ensures that only one WebSocket connection is active at any given time, adhering to the quota limits.

    python
    async def run(self, dialogues, voices, output_files): for dialogue, voice, output_file in zip(dialogues, voices, output_files): await self.process_dialogue(dialogue, voice, output_file)
  2. Processing Speakers Sequentially:

    • Previous Approach: Both Speaker A and Speaker B were processed concurrently, which likely resulted in multiple active sessions and quota exhaustion.

    • Revised Approach: Speakers are now processed one after the other. First, all dialogues for Speaker A are processed, followed by Speaker B. This further ensures that only one active session exists at any point.

    python
    # Process Speaker A await process_speaker(generator, dialogues_a, output_files_a, voices_a) # Process Speaker B await process_speaker(generator, dialogues_b, output_files_b, voices_b)
  3. Removed Concurrent Task Execution:

    • The use of asyncio.gather for processing multiple tasks concurrently has been removed to prevent exceeding the quota.
  4. Enhanced Error Handling:

    • The error handling within process_dialogue has been retained to retry connections up to max_retries times in case of transient errors. If retries are exhausted, the dialogue is skipped, and the error is raised.
  5. Progress Bar Integration:

    • The progress bar (tqdm) is updated after each audio file is successfully saved, providing real-time feedback on the progress of audio generation.
  6. Resource Cleanup:

    • Using with tempfile.TemporaryDirectory ensures that temporary files are cleaned up automatically after processing.

Additional Recommendations

  1. Monitor and Respect Quotas:

    • Understand Quotas: Review your Google Cloud Console to understand the exact quotas for your API usage. This includes the number of active sessions, requests per minute, etc.

    • Adjust Accordingly: If your application requirements exceed the current quotas, consider requesting an increase from Google. However, ensure that your application's design aligns with the quota limits to prevent interruptions.

  2. Implement Rate Limiting:

    • If you anticipate spikes in usage or if processing dialogues sequentially is too slow for your needs, implement rate limiting using asyncio.Semaphore to control the number of concurrent sessions without exceeding the quota.
    python
    from asyncio import Semaphore class AudioGenerator: def __init__(self, progress_bar=None, max_concurrent_sessions=1): self.progress_bar = progress_bar self.semaphore = Semaphore(max_concurrent_sessions) # ... rest of the initialization async def process_dialogue(self, dialogue, voice, output_file, max_retries=3): async with self.semaphore: # existing process_dialogue code
    • Usage: Set max_concurrent_sessions based on your quota limits.
  3. Implement Exponential Backoff:

    • Instead of retrying after a fixed delay, use exponential backoff to handle retries more gracefully, especially in high-failure scenarios.
  4. Logging:

    • Implement a more robust logging mechanism to capture detailed information about successes, failures, retries, and other important events. This will aid in monitoring and debugging.
  5. Optimize Dialogue Batching:

    • If possible, batch multiple dialogues within a single session to minimize the number of active sessions. However, ensure that the total duration of the session does not exceed the API's session duration limits.
  6. Graceful Shutdown:

    • Ensure that your application can gracefully handle shutdown signals, closing any active WebSocket connections properly to avoid resource leaks.

Summary

By modifying your application to process dialogues sequentially, you ensure that only one active WebSocket connection exists at any given time. This approach aligns with the quota restrictions imposed by the Google Gemini 2 Flash API, preventing quota exceedance errors. Additionally, adhering to best practices like robust error handling, rate limiting, and effective resource management will enhance the reliability and maintainability of your application.

If you continue to face quota-related issues even after these adjustments, consider reaching out to Google Cloud support to discuss your usage patterns and explore potential quota increases.

Share this Q&A