Настройка бота Telegram

Created on: October 7, 2024

Answered using O1 Preview by Chat01

Question

import json
import io
import os
import logging
import base64
import time
import asyncio
from flask import Flask, request, jsonify
from telegram import Update, Bot
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
import google.generativeai as genai
from PIL import Image
from google.generativeai.types import HarmCategory, HarmBlockThreshold
from telegram.error import TimedOut, NetworkError, RetryAfter
import httpx
from telegram.request import HTTPXRequest
from typing import Optional, Tuple
import random

Настройка логирования

logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO,
handlers=[
logging.FileHandler('bot_debug.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(name)

Константы и настройки

TELEGRAM_BOT_TOKEN = "secret"
GEMINI_API_KEY = "secret"
ALLOWED_USER_IDS = [11804321, 30825085, 11262081]

Настройки таймаутов и ограничений

CONNECT_TIMEOUT = 60.0
READ_TIMEOUT = 60.0
WRITE_TIMEOUT = 60.0
POOL_TIMEOUT = 60.0
MAX_RETRIES = 3
QUOTA_RESET_INTERVAL = 60 # секунды
MAX_REQUESTS_PER_INTERVAL = 10

Инициализация Flask

app = Flask(name)

Настройка Gemini

genai.configure(api_key=GEMINI_API_KEY)

Системный промпт

SYSTEM_PROMPT = """
Вы - одесский ИИ-собеседник. Говорите как коренной одессит с Молдаванки. Используйте одесский юмор, иронию и характерные фразы. Обязательно в каждом ответе используйте минимум одну из фраз:
"Та шо вы говорите!", "И я вам скажу", "Вы будете смеяться", "Таки да", "Шоб я так жил", "Ой, я вас умоляю!"

Грамматические особенности:

  • Используйте конструкцию "за + существительное" ("говорить за жизнь")
  • Добавляйте "таки" перед глаголами
  • Применяйте одесский порядок слов ("Оно вам надо?")

Характерные обороты:

  • "Слушайте сюда..."
  • "Я имею вам кое-что сказать"
  • "Не делайте мне нервы"

Всегда отвечайте на вопросы пользователя, сохраняя одесский шарм и юмор.
"""

Одесские фразы для случайного добавления

ODESSA_PHRASES = [
"Ой, я вас умоляю!",
"Шо вы говорите?",
"И я вам так скажу...",
"Таки да!",
"Вы будете смеяться, но...",
]

Глобальные переменные

user_settings = {}
conversation_history = {}
active_chats = {}
important_context = {}
request_counters = {}

Кастомный обработчик запросов с поддержкой повторных попыток

class CustomHTTPXRequest(HTTPXRequest):
async def do_request(self, *args, **kwargs) -> Tuple[int, bytes]:
for i in range(MAX_RETRIES):
try:
return await super().do_request(*args, **kwargs)
except (TimedOut, NetworkError) as e:
if i == MAX_RETRIES - 1:
raise
logger.warning(f"Request failed with {str(e)}. Retrying... ({i + 1}/{MAX_RETRIES})")
await asyncio.sleep(2 ** i)
except RetryAfter as e:
await asyncio.sleep(e.retry_after)
return await super().do_request(*args, **kwargs)

def create_application():
request_kwargs = {
'connect_timeout': CONNECT_TIMEOUT,
'read_timeout': READ_TIMEOUT,
'write_timeout': WRITE_TIMEOUT,
'pool_timeout': POOL_TIMEOUT,
}
return (
Application.builder()
.token(TELEGRAM_BOT_TOKEN)
.request(CustomHTTPXRequest(**request_kwargs))
.build()
)

def check_quota(user_id):
current_time = time.time()
if user_id not in request_counters:
request_counters[user_id] = {"count": 0, "reset_time": current_time}

text
counter = request_counters[user_id] if current_time - counter["reset_time"] > QUOTA_RESET_INTERVAL: counter["count"] = 0 counter["reset_time"] = current_time if counter["count"] >= MAX_REQUESTS_PER_INTERVAL: return False counter["count"] += 1 return True

def get_user_settings(user_id):
if user_id not in user_settings:
user_settings[user_id] = {
"model": "gemini-1.5-pro-002",
"temperature": 1,
"top_p": 1.0,
"top_k": 1,
"max_output_tokens": 6000,
"safety_settings": {
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
}
}
return user_settings[user_id]

def get_or_create_chat(user_id):
settings = get_user_settings(user_id)
model = genai.GenerativeModel(
model_name=settings['model'],
generation_config={
"temperature": settings['temperature'],
"top_p": settings['top_p'],
"top_k": settings['top_k'],
"max_output_tokens": settings['max_output_tokens'],
},
safety_settings=settings['safety_settings']
)
history = load_conversation_history(user_id)
chat = model.start_chat(history=[])
for msg in history:
try:
chat.send_message(msg["user"])
chat.send_message(msg["assistant"])
except Exception:
continue
return chat

def load_conversation_history(user_id):
try:
with open(f'conversation_{user_id}.json', 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return []

def save_conversation_history(user_id, history):
with open(f'conversation_{user_id}.json', 'w') as f:
json.dump(history, f)

def is_user_allowed(user_id):
return user_id in ALLOWED_USER_IDS

def add_odessa_style(response):
return f"{random.choice(ODESSA_PHRASES)} {response}"

def update_important_context(user_id, message, response):
if user_id not in important_context:
important_context[user_id] = []

text
try: model = genai.GenerativeModel("gemini-1.5-flash-002") prompt = """ Оцените важность следующего диалога для будущего контекста по шкале от 1 до 10. Ответьте ТОЛЬКО ЧИСЛОМ от 1 до 10, без дополнительного текста. Диалог: Человек: {message} Ассистент: {response} """.format(message=message, response=response) result = model.generate_content(prompt) importance = None for word in result.text.strip().split(): try: num = int(word) if 1 <= num <= 10: importance = num break except ValueError: continue if importance is None: importance = 5 if importance > 7: important_context[user_id].append({"message": message, "response": response}) if len(important_context[user_id]) > 5: important_context[user_id].pop(0) with open(f'important_context_{user_id}.json', 'w') as f: json.dump(important_context[user_id], f) except Exception as e: logger.warning(f"Error in update_important_context: {e}")

async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if is_user_allowed(user_id):
await update.message.reply_text('Таки здравствуйте! Я ваш одесский ИИ-собеседник. Давайте поговорим за жизнь!')
else:
await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.")

async def handle_message_with_retry(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if not is_user_allowed(user_id):
await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.")
return

text
if not check_quota(user_id): await update.message.reply_text("Ой вей, давайте сделаем небольшую паузу. Слишком много запросов за короткое время.") return user_message = update.message.text for attempt in range(MAX_RETRIES): try: chat = get_or_create_chat(user_id) if attempt > 0: await asyncio.sleep(2 ** attempt) response = chat.send_message(SYSTEM_PROMPT + "\n\n" + user_message) history = load_conversation_history(user_id) history.append({"user": user_message, "assistant": response.text}) if len(history) > 10: history.pop(0) save_conversation_history(user_id, history) response_text = add_odessa_style(response.text) await update.message.reply_text(response_text) update_important_context(user_id, user_message, response_text) break except Exception as e: logger.error(f"Error processing message (attempt {attempt + 1}/{MAX_RETRIES}): {e}") if attempt == MAX_RETRIES - 1: await update.message.reply_text("Ой вей, что-то пошло не так. Может, попробуем еще разок?")

async def handle_image(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if not is_user_allowed(user_id):
await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.")
return

text
if not check_quota(user_id): await update.message.reply_text("Ой вей, давайте сделаем небольшую паузу. Слишком много запросов за короткое время.") return try: photo = await update.message.photo[-1].get_file() image_bytes = await photo.download_as_bytearray() image = Image.open(io.BytesIO(image_bytes)) temp_image_path = f"temp_image_{user_id}.jpg" image.save(temp_image_path, "JPEG") chat = get_or_create_chat(user_id) image_file = genai.upload_file(temp_image_path, mime_type="image/jpeg") response = chat.send_message([SYSTEM_PROMPT, image_file, "Опишите это изображение, используя одесский юмор и колорит"]) history = load_conversation_history(user_id) history.append({"user": "[Изображение]", "assistant": response.text}) if len(history) > 10: history.pop(0) save_conversation_history(user_id, history) response_text = add_odessa_style(response.text) await update.message.reply_text(response_text) update_important_context(user_id, "[Изображение]", response_text) except Exception as e: logger.error(f"Error processing image: {e}") await update.message.reply_text("Ой вей, шо-то пошло не так при обработке вашего изображения. Может, попробуете ещё раз?") finally: if os.path.exists(temp_image_path): os.remove(temp_image_path)

async def clear_context(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
user_id = update.effective_user.id
if not is_user_allowed(user_id):
await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.")
return

text
save_conversation_history(user_id, []) important_context[user_id] = [] with open(f'important_context_{user_id}.json', 'w') as f: json.dump([], f) await update.message.reply_text("Таки я всё забыл, начинаем с чистого листа!")

if name == 'main':
try:
logger.info("Starting bot...")
application = create_application()

text
# Регистрация обработчиков application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("clear", clear_context)) application.add_handler(MessageHandler(filters.PHOTO, handle_image)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message_with_retry)) # Запуск бота logger.info("Bot is running...") application.run_polling(drop_pending_updates=True) except Exception as e: logger.error(f"Fatal error: {e}") raise ---- это мой код --- вот ошибка которую я часто получаю: 2024-10-07 01:36:36,202 - __main__ - ERROR - Error processing message (attempt 3/3): 429 Resource has been exhausted (e.g. check quota). ----- изучи квоты и настрой код так чтобы я укладвался в эти квоты -

Gemini 1.5 Pro
Free of charge
*
RATE LIMITS
2 RPM (requests per minute)
32,000 TPM (tokens per minute)
50 RPD (requests per day)
PRICE (INPUT)
Free of charge
PRICE (OUTPUT)
Free of charge
DATA USED TO IMPROVE OUR PRODUCT
Yes

Answer

Изучение и корректировка

Я исследую код Telegram-бота, использующего API Gemini, чтобы понять и исправить ошибку 429. Это связано с превышением лимитов запросов.

Настраиваю лимиты

Сейчас я работаю над настройкой кода, чтобы он соответствовал лимитам квот на запросы и токены. Необходимо уменьшить количество запросов до 2 RPM и 50 RPD.

Уточняю лимиты

Я обсуждаю текущие ограничения: 2 запроса в минуту, 50 запросов в день и 32,000 токенов в минуту.

Разрабатываю ограничения

Создаю глобальные лимитеры скорости для RPM, RPD и TPM. Возможно, использую библиотеку или создаю свои ограничения, отслеживая недавние запросы и их количество.

Устанавливаю ограничения

Я определяю списки временных меток запросов за последнюю минуту и день, а также количество использованных токенов за минуту. Ограничения действуют глобально, а не для пользователя.

Поддержка контроля

Заключаю глобальные переменные для отслеживания времени запросов и использования токенов. Проверяю ограничения на частоту запросов и общее использование токенов, информируя пользователя или отклоняя запросы при превышении лимитов.

Обновление кода

Я описываю необходимость изменить код для использования глобальных лимитов API Gemini, корректируя счетчик запросов и добавляя проверку лимитов перед отправкой каждого запроса.

Оцениваю ограничения

Я разрабатываю функцию для проверки ограничений запросов: RPM (2 запроса в минуту), RPD (50 запросов в день) и TPM (32,000 токенов в минуту).

Очищаю и проверяю
Очищаю старые временные отметки в списках и проверяю лимиты темпов запросов (RPM, RPD, TPM), чтобы убедиться, что они не превышаются перед отправкой запроса на API Gemini.

Понимание токенов

Я разбираюсь в процессе оценки использования токенов для каждой заявки. Считаю токены в запросах и сообщениях, либо ограничиваю "max_output_tokens", например, до 1000.

Оцениваю требуемые токены

Я разрабатываю функцию для оценки требуемых токенов, исходя из количества слов в тексте и предполагаемой плотности токенов.

Анализируя функцию

Я разбираюсь с зависимостью публикаций от различных факторов, таких как возраст, интерес и активность пользователей.

Отслеживание запросов

Собираю временные метки для запросов и используемых токенов, чтобы контролировать частоту и объём запросов. Это помогает поддерживать баланс и предотвратить превышение лимитов.

Проверяю возможности временного ограничения запросов.

Уточняю настройки

Давай посмотрим, как можно улучшить лимиты на токены для пользователей. Сейчас я обучаю настройки для модели и безопасности, уменьшая максимальное количество создаваемых токенов до 1000.

Обрабатываю ошибки

Пробую ловить исключения с кодом 429 и, возможно, жду некоторое время перед повторной попыткой. Возможно, можно адаптировать механизм повторных попыток для специфической обработки этого кода.

Настраиваю лимиты

Я редактирую функции и переменные для соблюдения глобальных лимитов по количеству запросов и использованию токенов, включая get_user_settings и функции обработки сообщений.

Оценка токенов

Предполагаю, что токены могут включать изображение, и уточняю ограничения на TPM, тестируя код на перегрузки.

Answer

To comply with the Gemini API's rate limits and quotas, we've updated your code to include global rate limiting that respects the following constraints:

  • Rate Limits:
    • 2 RPM (requests per minute)
    • 32,000 TPM (tokens per minute)
    • 50 RPD (requests per day)

Here's the updated code with the necessary adjustments:

python
import json import io import os import logging import base64 import time import asyncio from flask import Flask, request, jsonify from telegram import Update, Bot from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes import google.generativeai as genai from PIL import Image from google.generativeai.types import HarmCategory, HarmBlockThreshold from telegram.error import TimedOut, NetworkError, RetryAfter import httpx from telegram.request import HTTPXRequest from typing import Optional, Tuple import random # Настройка логирования logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO, handlers=[ logging.FileHandler('bot_debug.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Константы и настройки TELEGRAM_BOT_TOKEN = "secret" GEMINI_API_KEY = "secret" ALLOWED_USER_IDS = [11804321, 30825085, 11262081] # Настройки таймаутов и ограничений CONNECT_TIMEOUT = 60.0 READ_TIMEOUT = 60.0 WRITE_TIMEOUT = 60.0 POOL_TIMEOUT = 60.0 MAX_RETRIES = 3 # Gemini API Rate Limits RPM_LIMIT = 2 # requests per minute TPM_LIMIT = 32000 # tokens per minute RPD_LIMIT = 50 # requests per day # Инициализация Flask app = Flask(__name__) # Настройка Gemini genai.configure(api_key=GEMINI_API_KEY) # Системный промпт SYSTEM_PROMPT = """ Вы - одесский ИИ-собеседник. Говорите как коренной одессит с Молдаванки. Используйте одесский юмор, иронию и характерные фразы. Обязательно в каждом ответе используйте минимум одну из фраз: "Та шо вы говорите!", "И я вам скажу", "Вы будете смеяться", "Таки да", "Шоб я так жил", "Ой, я вас умоляю!" Грамматические особенности: - Используйте конструкцию "за + существительное" ("говорить за жизнь") - Добавляйте "таки" перед глаголами - Применяйте одесский порядок слов ("Оно вам надо?") Характерные обороты: - "Слушайте сюда..." - "Я имею вам кое-что сказать" - "Не делайте мне нервы" Всегда отвечайте на вопросы пользователя, сохраняя одесский шарм и юмор. """ # Одесские фразы для случайного добавления ODESSA_PHRASES = [ "Ой, я вас умоляю!", "Шо вы говорите?", "И я вам так скажу...", "Таки да!", "Вы будете смеяться, но...", ] # Глобальные переменные user_settings = {} conversation_history = {} active_chats = {} important_context = {} request_counters = {} # Global rate limiting variables request_timestamps = [] request_timestamps_last_day = [] tokens_timestamps = [] # Кастомный обработчик запросов с поддержкой повторных попыток class CustomHTTPXRequest(HTTPXRequest): async def do_request(self, *args, **kwargs) -> Tuple[int, bytes]: for i in range(MAX_RETRIES): try: return await super().do_request(*args, **kwargs) except (TimedOut, NetworkError) as e: if i == MAX_RETRIES - 1: raise logger.warning(f"Request failed with {str(e)}. Retrying... ({i + 1}/{MAX_RETRIES})") await asyncio.sleep(2 ** i) except RetryAfter as e: await asyncio.sleep(e.retry_after) return await super().do_request(*args, **kwargs) def create_application(): request_kwargs = { 'connect_timeout': CONNECT_TIMEOUT, 'read_timeout': READ_TIMEOUT, 'write_timeout': WRITE_TIMEOUT, 'pool_timeout': POOL_TIMEOUT, } return ( Application.builder() .token(TELEGRAM_BOT_TOKEN) .request(CustomHTTPXRequest(**request_kwargs)) .build() ) def is_user_allowed(user_id): return user_id in ALLOWED_USER_IDS def estimate_tokens(text): # For simplicity, assume 1 token per 0.75 words num_words = len(text.split()) return int(num_words / 0.75) def can_make_request(tokens_needed): current_time = time.time() # Clean up old timestamps in request_timestamps (older than 60 seconds) global request_timestamps request_timestamps = [t for t in request_timestamps if current_time - t < 60] if len(request_timestamps) >= RPM_LIMIT: return False # Exceeded RPM limit # Clean up old timestamps in request_timestamps_last_day (older than 86400 seconds) global request_timestamps_last_day request_timestamps_last_day = [t for t in request_timestamps_last_day if current_time - t < 86400] if len(request_timestamps_last_day) >= RPD_LIMIT: return False # Exceeded RPD limit # Clean up old tokens in tokens_timestamps (older than 60 seconds) global tokens_timestamps tokens_timestamps = [(t, tokens) for (t, tokens) in tokens_timestamps if current_time - t < 60] total_tokens = sum(tokens for t, tokens in tokens_timestamps) if total_tokens + tokens_needed > TPM_LIMIT: return False # Exceeded TPM limit # If all checks passed, allow the request return True def record_request(tokens_used): current_time = time.time() global request_timestamps, request_timestamps_last_day, tokens_timestamps request_timestamps.append(current_time) request_timestamps_last_day.append(current_time) tokens_timestamps.append((current_time, tokens_used)) def get_user_settings(user_id): if user_id not in user_settings: user_settings[user_id] = { "model": "gemini-1.5-pro-002", "temperature": 1, "top_p": 1.0, "top_k": 1, "max_output_tokens": 1000, "safety_settings": { HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, } } return user_settings[user_id] def get_or_create_chat(user_id): settings = get_user_settings(user_id) model = genai.GenerativeModel( model_name=settings['model'], generation_config={ "temperature": settings['temperature'], "top_p": settings['top_p'], "top_k": settings['top_k'], "max_output_tokens": settings['max_output_tokens'], }, safety_settings=settings['safety_settings'] ) history = load_conversation_history(user_id) chat = model.start_chat(history=[]) for msg in history: try: chat.send_message(msg["user"]) chat.send_message(msg["assistant"]) except Exception: continue return chat def load_conversation_history(user_id): try: with open(f'conversation_{user_id}.json', 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return [] def save_conversation_history(user_id, history): with open(f'conversation_{user_id}.json', 'w') as f: json.dump(history, f) def add_odessa_style(response): return f"{random.choice(ODESSA_PHRASES)} {response}" def update_important_context(user_id, message, response): if user_id not in important_context: important_context[user_id] = [] try: model = genai.GenerativeModel("gemini-1.5-flash-002") prompt = """ Оцените важность следующего диалога для будущего контекста по шкале от 1 до 10. Ответьте ТОЛЬКО ЧИСЛОМ от 1 до 10, без дополнительного текста. Диалог: Человек: {message} Ассистент: {response} """.format(message=message, response=response) result = model.generate_content(prompt) importance = None for word in result.text.strip().split(): try: num = int(word) if 1 <= num <= 10: importance = num break except ValueError: continue if importance is None: importance = 5 if importance > 7: important_context[user_id].append({"message": message, "response": response}) if len(important_context[user_id]) > 5: important_context[user_id].pop(0) with open(f'important_context_{user_id}.json', 'w') as f: json.dump(important_context[user_id], f) except Exception as e: logger.warning(f"Error in update_important_context: {e}") async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id if is_user_allowed(user_id): await update.message.reply_text('Таки здравствуйте! Я ваш одесский ИИ-собеседник. Давайте поговорим за жизнь!') else: await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.") async def handle_message_with_retry(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id if not is_user_allowed(user_id): await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.") return user_message = update.message.text settings = get_user_settings(user_id) prompt = SYSTEM_PROMPT + "\n\n" + user_message prompt_tokens = estimate_tokens(prompt) tokens_needed = prompt_tokens + settings["max_output_tokens"] if not can_make_request(tokens_needed): await update.message.reply_text("Извините, я немного перегружен, давайте подождём немного.") return for attempt in range(MAX_RETRIES): try: chat = get_or_create_chat(user_id) if attempt > 0: await asyncio.sleep(2 ** attempt) response = chat.send_message(prompt) response_text = response.text response_tokens = estimate_tokens(response_text) tokens_used = prompt_tokens + response_tokens record_request(tokens_used) history = load_conversation_history(user_id) history.append({"user": user_message, "assistant": response_text}) if len(history) > 10: history.pop(0) save_conversation_history(user_id, history) response_text = add_odessa_style(response_text) await update.message.reply_text(response_text) update_important_context(user_id, user_message, response_text) break except Exception as e: logger.error(f"Error processing message (attempt {attempt + 1}/{MAX_RETRIES}): {e}") if attempt == MAX_RETRIES - 1: await update.message.reply_text("Ой вей, что-то пошло не так. Может, попробуем еще разок?") async def handle_image(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id if not is_user_allowed(user_id): await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.") return settings = get_user_settings(user_id) # Estimate tokens needed for image processing prompt = SYSTEM_PROMPT + "\n\nОпишите это изображение, используя одесский юмор и колорит" prompt_tokens = estimate_tokens(prompt) tokens_needed = prompt_tokens + settings["max_output_tokens"] if not can_make_request(tokens_needed): await update.message.reply_text("Извините, я немного перегружен, давайте подождём немного.") return try: photo = await update.message.photo[-1].get_file() image_bytes = await photo.download_as_bytearray() image = Image.open(io.BytesIO(image_bytes)) temp_image_path = f"temp_image_{user_id}.jpg" image.save(temp_image_path, "JPEG") chat = get_or_create_chat(user_id) image_file = genai.upload_file(temp_image_path, mime_type="image/jpeg") response = chat.send_message([SYSTEM_PROMPT, image_file, "Опишите это изображение, используя одесский юмор и колорит"]) response_text = response.text response_tokens = estimate_tokens(response_text) tokens_used = prompt_tokens + response_tokens record_request(tokens_used) history = load_conversation_history(user_id) history.append({"user": "[Изображение]", "assistant": response_text}) if len(history) > 10: history.pop(0) save_conversation_history(user_id, history) response_text = add_odessa_style(response_text) await update.message.reply_text(response_text) update_important_context(user_id, "[Изображение]", response_text) except Exception as e: logger.error(f"Error processing image: {e}") await update.message.reply_text("Ой вей, шо-то пошло не так при обработке вашего изображения. Может, попробуете ещё раз?") finally: if os.path.exists(temp_image_path): os.remove(temp_image_path) async def clear_context(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id if not is_user_allowed(user_id): await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.") return save_conversation_history(user_id, []) important_context[user_id] = [] with open(f'important_context_{user_id}.json', 'w') as f: json.dump([], f) await update.message.reply_text("Таки я всё забыл, начинаем с чистого листа!") if __name__ == '__main__': try: logger.info("Starting bot...") application = create_application() # Регистрация обработчиков application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("clear", clear_context)) application.add_handler(MessageHandler(filters.PHOTO, handle_image)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message_with_retry)) # Запуск бота logger.info("Bot is running...") application.run_polling(drop_pending_updates=True) except Exception as e: logger.error(f"Fatal error: {e}") raise

Explanation of Changes:

  1. Global Rate Limiting Variables: Added request_timestamps, request_timestamps_last_day, and tokens_timestamps to track requests and tokens globally.

  2. Rate Limiting Functions:

    • can_make_request(tokens_needed): Checks if a request can be made based on the current rate limits.
    • record_request(tokens_used): Records the request and tokens used.
  3. Adjusted max_output_tokens: Set max_output_tokens to 1000 in get_user_settings to prevent exceeding the tokens per minute limit.

  4. Token Estimation:

    • Added estimate_tokens(text) function to estimate the number of tokens in the text.
    • Used in both handle_message_with_retry and handle_image to estimate tokens before making a request.
  5. Request Handling:

    • Before making a request, we check if it's allowed using can_make_request(tokens_needed).
    • After a successful request, we record it using record_request(tokens_used).
  6. Error Handling:

    • If the rate limit is exceeded, the bot informs the user: "Извините, я немного перегружен, давайте подождём немного."

Note: This code now respects the Gemini API rate limits and should prevent the 429 Resource has been exhausted error by ensuring that:

  • No more than 2 requests per minute are made.
  • No more than 50 requests per day are made.
  • No more than 32,000 tokens per minute are used.

Please ensure that you replace "secret" with your actual TELEGRAM_BOT_TOKEN and GEMINI_API_KEY.

Question

смотри еще такой вопрос у меня есть несколько аккаунтов и ключей если по одному ключу апи квота закончится можно ли сделать чтобы переключалось на другой ключ автоматически.

Answer

Ищу решение

Пользователь хочет автоматической смены ключей API при исчерпании квоты одного ключа.

Размышляю над

Я разбираюсь, как переключить ключи API автоматически, когда один исчерпывается. Важно соблюдать политику OpenAI при этом, чтобы не раскрывать конфиденциальную информацию.

Анализируя условия

Проверяю, соответствует ли переключение между ключами API условиям обслуживания. Это заставляет меня задуматься о соблюдении правил и потенциальных ограничениях.

Вы меняете подход, чтобы сливаться с описаниями.

Answer

Yes, it is technically possible to configure your bot to switch between multiple API keys when the quota for one key is exhausted. However, before implementing this solution, it's crucial to ensure that this practice complies with the terms of service of the Gemini API. Using multiple API keys to circumvent rate limits or quotas may violate their policies, which could lead to account suspension or other penalties.

Please check the Gemini API terms and conditions or contact their support team to confirm that this approach is acceptable.

Assuming that you've confirmed it's permissible, here's how you can modify your code to automatically switch between API keys:

Steps to Implement Multiple API Key Switching:

  1. Store Multiple API Keys: Maintain a list of your API keys.

  2. Track API Key Usage: Monitor the usage of each API key to know when it has reached its quota.

  3. Switch API Keys When Necessary: When an API key is exhausted, switch to the next available key.

  4. Error Handling: Update your error handling to detect quota exhaustion errors and trigger the API key switch.

Below is the updated code with these modifications:

python
import json import io import os import logging import base64 import time import asyncio from flask import Flask, request, jsonify from telegram import Update, Bot from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes import google.generativeai as genai from PIL import Image from google.generativeai.types import HarmCategory, HarmBlockThreshold from telegram.error import TimedOut, NetworkError, RetryAfter import httpx from telegram.request import HTTPXRequest from typing import Optional, Tuple import random # Настройка логирования logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO, handlers=[ logging.FileHandler('bot_debug.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Константы и настройки TELEGRAM_BOT_TOKEN = "secret" # List of API keys GEMINI_API_KEYS = ["api_key_1", "api_key_2", "api_key_3"] current_api_key_index = 0 # Index of the current API key ALLOWED_USER_IDS = [11804321, 30825085, 11262081] # Настройки таймаутов и ограничений CONNECT_TIMEOUT = 60.0 READ_TIMEOUT = 60.0 WRITE_TIMEOUT = 60.0 POOL_TIMEOUT = 60.0 MAX_RETRIES = 3 # Gemini API Rate Limits RPM_LIMIT = 2 # requests per minute TPM_LIMIT = 32000 # tokens per minute RPD_LIMIT = 50 # requests per day # Инициализация Flask app = Flask(__name__) # Настройка Gemini with the first API key genai.configure(api_key=GEMINI_API_KEYS[current_api_key_index]) # Системный промпт SYSTEM_PROMPT = """ Вы - одесский ИИ-собеседник. Говорите как коренной одессит с Молдаванки. Используйте одесский юмор, иронию и характерные фразы. Обязательно в каждом ответе используйте минимум одну из фраз: "Та шо вы говорите!", "И я вам скажу", "Вы будете смеяться", "Таки да", "Шоб я так жил", "Ой, я вас умоляю!" Грамматические особенности: - Используйте конструкцию "за + существительное" ("говорить за жизнь") - Добавляйте "таки" перед глаголами - Применяйте одесский порядок слов ("Оно вам надо?") Характерные обороты: - "Слушайте сюда..." - "Я имею вам кое-что сказать" - "Не делайте мне нервы" Всегда отвечайте на вопросы пользователя, сохраняя одесский шарм и юмор. """ # Одесские фразы для случайного добавления ODESSA_PHRASES = [ "Ой, я вас умоляю!", "Шо вы говорите?", "И я вам так скажу...", "Таки да!", "Вы будете смеяться, но...", ] # Глобальные переменные user_settings = {} conversation_history = {} active_chats = {} important_context = {} request_counters = {} # Global rate limiting variables request_timestamps = [] request_timestamps_last_day = [] tokens_timestamps = [] # Кастомный обработчик запросов с поддержкой повторных попыток class CustomHTTPXRequest(HTTPXRequest): async def do_request(self, *args, **kwargs) -> Tuple[int, bytes]: for i in range(MAX_RETRIES): try: return await super().do_request(*args, **kwargs) except (TimedOut, NetworkError) as e: if i == MAX_RETRIES - 1: raise logger.warning(f"Request failed with {str(e)}. Retrying... ({i + 1}/{MAX_RETRIES})") await asyncio.sleep(2 ** i) except RetryAfter as e: await asyncio.sleep(e.retry_after) return await super().do_request(*args, **kwargs) def create_application(): request_kwargs = { 'connect_timeout': CONNECT_TIMEOUT, 'read_timeout': READ_TIMEOUT, 'write_timeout': WRITE_TIMEOUT, 'pool_timeout': POOL_TIMEOUT, } return ( Application.builder() .token(TELEGRAM_BOT_TOKEN) .request(CustomHTTPXRequest(**request_kwargs)) .build() ) def is_user_allowed(user_id): return user_id in ALLOWED_USER_IDS def estimate_tokens(text): # For simplicity, assume 1 token per 0.75 words num_words = len(text.split()) return int(num_words / 0.75) def can_make_request(tokens_needed): current_time = time.time() # Clean up old timestamps in request_timestamps (older than 60 seconds) global request_timestamps request_timestamps = [t for t in request_timestamps if current_time - t < 60] if len(request_timestamps) >= RPM_LIMIT: return False # Exceeded RPM limit # Clean up old timestamps in request_timestamps_last_day (older than 86400 seconds) global request_timestamps_last_day request_timestamps_last_day = [t for t in request_timestamps_last_day if current_time - t < 86400] if len(request_timestamps_last_day) >= RPD_LIMIT: return False # Exceeded RPD limit # Clean up old tokens in tokens_timestamps (older than 60 seconds) global tokens_timestamps tokens_timestamps = [(t, tokens) for (t, tokens) in tokens_timestamps if current_time - t < 60] total_tokens = sum(tokens for t, tokens in tokens_timestamps) if total_tokens + tokens_needed > TPM_LIMIT: return False # Exceeded TPM limit # If all checks passed, allow the request return True def record_request(tokens_used): current_time = time.time() global request_timestamps, request_timestamps_last_day, tokens_timestamps request_timestamps.append(current_time) request_timestamps_last_day.append(current_time) tokens_timestamps.append((current_time, tokens_used)) def switch_api_key(): global current_api_key_index current_api_key_index = (current_api_key_index + 1) % len(GEMINI_API_KEYS) new_api_key = GEMINI_API_KEYS[current_api_key_index] genai.configure(api_key=new_api_key) logger.info(f"Switched to new API key: {new_api_key}") def get_user_settings(user_id): if user_id not in user_settings: user_settings[user_id] = { "model": "gemini-1.5-pro-002", "temperature": 1, "top_p": 1.0, "top_k": 1, "max_output_tokens": 1000, "safety_settings": { HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, } } return user_settings[user_id] def get_or_create_chat(user_id): settings = get_user_settings(user_id) model = genai.GenerativeModel( model_name=settings['model'], generation_config={ "temperature": settings['temperature'], "top_p": settings['top_p'], "top_k": settings['top_k'], "max_output_tokens": settings['max_output_tokens'], }, safety_settings=settings['safety_settings'] ) history = load_conversation_history(user_id) chat = model.start_chat(history=[]) for msg in history: try: chat.send_message(msg["user"]) chat.send_message(msg["assistant"]) except Exception: continue return chat def load_conversation_history(user_id): try: with open(f'conversation_{user_id}.json', 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return [] def save_conversation_history(user_id, history): with open(f'conversation_{user_id}.json', 'w') as f: json.dump(history, f) def add_odessa_style(response): return f"{random.choice(ODESSA_PHRASES)} {response}" def update_important_context(user_id, message, response): if user_id not in important_context: important_context[user_id] = [] try: # We need to ensure that the model used here does not exceed rate limits # For simplicity, we'll assume this check is done elsewhere or use a less restricted model model = genai.GenerativeModel("gemini-1.5-flash-002") prompt = """ Оцените важность следующего диалога для будущего контекста по шкале от 1 до 10. Ответьте ТОЛЬКО ЧИСЛОМ от 1 до 10, без дополнительного текста. Диалог: Человек: {message} Ассистент: {response} """.format(message=message, response=response) result = model.generate_content(prompt) importance = None for word in result.text.strip().split(): try: num = int(word) if 1 <= num <= 10: importance = num break except ValueError: continue if importance is None: importance = 5 if importance > 7: important_context[user_id].append({"message": message, "response": response}) if len(important_context[user_id]) > 5: important_context[user_id].pop(0) with open(f'important_context_{user_id}.json', 'w') as f: json.dump(important_context[user_id], f) except Exception as e: logger.warning(f"Error in update_important_context: {e}") async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id if is_user_allowed(user_id): await update.message.reply_text('Таки здравствуйте! Я ваш одесский ИИ-собеседник. Давайте поговорим за жизнь!') else: await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.") async def handle_message_with_retry(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id if not is_user_allowed(user_id): await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.") return user_message = update.message.text settings = get_user_settings(user_id) prompt = SYSTEM_PROMPT + "\n\n" + user_message prompt_tokens = estimate_tokens(prompt) tokens_needed = prompt_tokens + settings["max_output_tokens"] if not can_make_request(tokens_needed): await update.message.reply_text("Извините, я немного перегружен, давайте подождём немного.") return for attempt in range(MAX_RETRIES): try: chat = get_or_create_chat(user_id) if attempt > 0: await asyncio.sleep(2 ** attempt) response = chat.send_message(prompt) response_text = response.text response_tokens = estimate_tokens(response_text) tokens_used = prompt_tokens + response_tokens record_request(tokens_used) history = load_conversation_history(user_id) history.append({"user": user_message, "assistant": response_text}) if len(history) > 10: history.pop(0) save_conversation_history(user_id, history) response_text = add_odessa_style(response_text) await update.message.reply_text(response_text) update_important_context(user_id, user_message, response_text) break except Exception as e: logger.error(f"Error processing message (attempt {attempt + 1}/{MAX_RETRIES}): {e}") # Check if the error is due to quota exhaustion if "Resource has been exhausted" in str(e) or "quota" in str(e).lower(): logger.info("Quota exhausted for current API key. Switching to next key.") switch_api_key() genai.configure(api_key=GEMINI_API_KEYS[current_api_key_index]) else: if attempt == MAX_RETRIES - 1: await update.message.reply_text("Ой вей, что-то пошло не так. Может, попробуем еще разок?") else: await asyncio.sleep(2 ** attempt) async def handle_image(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id if not is_user_allowed(user_id): await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.") return settings = get_user_settings(user_id) # Estimate tokens needed for image processing prompt = SYSTEM_PROMPT + "\n\nОпишите это изображение, используя одесский юмор и колорит" prompt_tokens = estimate_tokens(prompt) tokens_needed = prompt_tokens + settings["max_output_tokens"] if not can_make_request(tokens_needed): await update.message.reply_text("Извините, я немного перегружен, давайте подождём немного.") return try: photo = await update.message.photo[-1].get_file() image_bytes = await photo.download_as_bytearray() image = Image.open(io.BytesIO(image_bytes)) temp_image_path = f"temp_image_{user_id}.jpg" image.save(temp_image_path, "JPEG") chat = get_or_create_chat(user_id) image_file = genai.upload_file(temp_image_path, mime_type="image/jpeg") response = chat.send_message([SYSTEM_PROMPT, image_file, "Опишите это изображение, используя одесский юмор и колорит"]) response_text = response.text response_tokens = estimate_tokens(response_text) tokens_used = prompt_tokens + response_tokens record_request(tokens_used) history = load_conversation_history(user_id) history.append({"user": "[Изображение]", "assistant": response_text}) if len(history) > 10: history.pop(0) save_conversation_history(user_id, history) response_text = add_odessa_style(response_text) await update.message.reply_text(response_text) update_important_context(user_id, "[Изображение]", response_text) except Exception as e: logger.error(f"Error processing image: {e}") if "Resource has been exhausted" in str(e) or "quota" in str(e).lower(): logger.info("Quota exhausted for current API key. Switching to next key.") switch_api_key() genai.configure(api_key=GEMINI_API_KEYS[current_api_key_index]) # Optionally, you can retry the operation here else: await update.message.reply_text("Ой вей, шо-то пошло не так при обработке вашего изображения. Может, попробуете ещё раз?") finally: if os.path.exists(temp_image_path): os.remove(temp_image_path) async def clear_context(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id if not is_user_allowed(user_id): await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.") return save_conversation_history(user_id, []) important_context[user_id] = [] with open(f'important_context_{user_id}.json', 'w') as f: json.dump([], f) await update.message.reply_text("Таки я всё забыл, начинаем с чистого листа!") if __name__ == '__main__': try: logger.info("Starting bot...") application = create_application() # Регистрация обработчиков application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("clear", clear_context)) application.add_handler(MessageHandler(filters.PHOTO, handle_image)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message_with_retry)) # Запуск бота logger.info("Bot is running...") application.run_polling(drop_pending_updates=True) except Exception as e: logger.error(f"Fatal error: {e}") raise

Explanation of Changes:

  1. List of API Keys: Added GEMINI_API_KEYS list to store multiple API keys and current_api_key_index to track the current key in use.

  2. Switch API Key Function:

    • Created switch_api_key() function to rotate to the next API key in the list.
    • After switching, it reconfigures genai with the new API key.
  3. Modified Error Handling:

    • In handle_message_with_retry and handle_image, added checks for quota exhaustion errors.
    • If such an error is detected, the bot switches to the next API key and retries the operation.
  4. Reconfigured genai After Switching Keys:

    • Ensured that genai.configure(api_key=GEMINI_API_KEYS[current_api_key_index]) is called after switching keys to update the API key used by the library.
  5. Logging:

    • Added logging statements to indicate when the API key is switched.
  6. Error Messages:

    • The bot continues to provide friendly error messages to the user if all API keys are exhausted.

Important Considerations:

  • Ethical and Policy Compliance: Before using multiple API keys to bypass rate limits or quotas, ensure that this practice is allowed under the Gemini API's terms of service.

  • Rate Limiting: The existing rate-limiting logic continues to be important to prevent exceeding the per-minute and per-day quotas, even with multiple API keys.

  • API Key Rotation Logic: The code cycles through the list of API keys. If all keys are exhausted, the bot will start over from the first key, which may still be exhausted. You might want to implement additional logic to check the status of each key or to pause when all keys are exhausted.

  • Persistence of API Key State: The current_api_key_index is stored in memory. If the application restarts, it will reset to the first API key. You might consider persisting this state to a file or database if necessary.

Disclaimer:

  • Terms of Service Compliance: Make sure to review the Gemini API's terms and conditions to ensure that using multiple API keys in this manner is permitted.

  • Testing: Test the updated code thoroughly to ensure that it behaves as expected, especially in handling quota exhaustion and switching API keys.

Additional Suggestions:

  • Dynamic Key Management: Consider implementing logic to track which API keys are exhausted and avoid using them until the quota resets.

  • Monitoring: Implement monitoring and alerts to notify you when API keys are nearing their quota limits.

  • User Notifications: Optionally, inform users when the bot is unavailable due to quota limits being reached on all API keys.


I hope this helps you modify your bot to handle multiple API keys effectively. Let me know if you have any questions or need further assistance!

Question

ПЕРЕЕРПИШИ ОТТВЕТ НА РУССКОМ а так же !!!!ещё раз внимательно перепиши код от начало до конца всегда переписывай весь код от начало до конца чтобы я ничего не правил ТАК КАК Я НЕ умею ПРАВИТЬ КОД - ВСЕГДА КАЖДЫЙ РАЗ ПЕРЕПИСЫВАЙ КОД ОТ САМОГО НАЧАЛА ДО САМОГО КОНЦА СО ВСЕМИ ПРАВКАМИ!!!!!" ---- А так же как сделать чтобы телеграм отправлял запрос не раз в 10 секунд, а раз в 30 секунд ----- /usr/local/bin/python /Users/admin/Documents/AI/Telegram/Bot_tg_gemini/test.py
2024-10-07 03:17:19,016 - main - INFO - Starting bot...
2024-10-07 03:17:19,104 - main - INFO - Bot is running...
2024-10-07 03:17:19,803 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getMe "HTTP/1.1 200 OK"
2024-10-07 03:17:20,026 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/deleteWebhook "HTTP/1.1 200 OK"
2024-10-07 03:17:20,028 - apscheduler.scheduler - INFO - Scheduler started
2024-10-07 03:17:20,028 - telegram.ext.Application - INFO - Application started
2024-10-07 03:17:30,703 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:17:40,917 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:17:51,118 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:18:01,378 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:18:11,583 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:18:12,720 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:19:13,677 - main - ERROR - Error processing message (attempt 1/3): 429 Resource has been exhausted (e.g. check quota).
2024-10-07 03:19:21,831 - main - ERROR - Error processing message (attempt 2/3): 429 Resource has been exhausted (e.g. check quota).
2024-10-07 03:19:28,416 - main - ERROR - Error processing message (attempt 3/3): 429 Resource has been exhausted (e.g. check quota).
2024-10-07 03:19:29,179 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/sendMessage "HTTP/1.1 200 OK"
2024-10-07 03:19:30,508 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:19:40,704 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:19:50,950 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:20:01,190 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:20:11,389 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:20:21,588 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:20:31,807 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:20:42,026 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:20:52,257 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:21:02,456 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:21:12,653 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:21:22,906 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:21:33,145 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:21:43,478 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:21:53,677 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"
2024-10-07 03:22:03,908 - httpx - INFO - HTTP Request: POST https://api.telegram.org/bot6522736139:AAGk2jytUQvyEwU6PJaAvihxuOdZ8YkqlNg/getUpdates "HTTP/1.1 200 OK"

Answer

Ок, посмотрим

Мне нужно переписать код пользователя на русском, исправив ошибки, и предложить оптимизационные улучшения.

Настройка интервала

Я работаю над корректировкой кода так, чтобы запросы getUpdates отправлялись каждые 30 секунд, вместо 10 секунд, и учитываю ограничения по квоте, вызвавшие ошибки 429.

Обновление интервала

Я добавляю рукописные комментарии и делаю обновления кода. Теперь Интервал опроса для Telegram составляет 30 секунд.

Адаптация кода

Я корректирую интервал опроса с 10 секунд до 30 секунд, добавляя аргумент poll_interval=30 при вызове Application.run_polling(drop_pending_updates=True).

Адресуя запросы

Я выясняю, что нужно скорректировать интервал опроса до 30 секунд и учесть все предыдущие исправления, такие как управление лимитами API и смена ключей.

Answer

Извините за недоразумение. Я перепишу весь код на русском языке с учетом всех правок и дополнений. Также я внесу изменения, чтобы Telegram отправлял запросы раз в 30 секунд вместо 10 секунд.


Вот полный код с учетом всех ваших требований:

python
import json import io import os import logging import base64 import time import asyncio from flask import Flask, request, jsonify from telegram import Update, Bot from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes import google.generativeai as genai from PIL import Image from google.generativeai.types import HarmCategory, HarmBlockThreshold from telegram.error import TimedOut, NetworkError, RetryAfter import httpx from telegram.request import HTTPXRequest from typing import Optional, Tuple import random # Настройка логирования logging.basicConfig( format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', level=logging.INFO, handlers=[ logging.FileHandler('bot_debug.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Константы и настройки TELEGRAM_BOT_TOKEN = "your_telegram_bot_token" # Замените на ваш токен GEMINI_API_KEYS = ["api_key_1", "api_key_2", "api_key_3"] # Список ваших API ключей current_api_key_index = 0 # Индекс текущего API ключа ALLOWED_USER_IDS = [11804321, 30825085, 11262081] # Разрешенные ID пользователей # Настройки таймаутов и ограничений CONNECT_TIMEOUT = 60.0 READ_TIMEOUT = 60.0 WRITE_TIMEOUT = 60.0 POOL_TIMEOUT = 60.0 MAX_RETRIES = 3 # Ограничения API Gemini RPM_LIMIT = 2 # запросов в минуту TPM_LIMIT = 32000 # токенов в минуту RPD_LIMIT = 50 # запросов в день # Инициализация Flask app = Flask(__name__) # Настройка Gemini с первым API ключом genai.configure(api_key=GEMINI_API_KEYS[current_api_key_index]) # Системный промпт SYSTEM_PROMPT = """ Вы - одесский ИИ-собеседник. Говорите как коренной одессит с Молдаванки. Используйте одесский юмор, иронию и характерные фразы. Обязательно в каждом ответе используйте минимум одну из фраз: "Та шо вы говорите!", "И я вам скажу", "Вы будете смеяться", "Таки да", "Шоб я так жил", "Ой, я вас умоляю!" Грамматические особенности: - Используйте конструкцию "за + существительное" ("говорить за жизнь") - Добавляйте "таки" перед глаголами - Применяйте одесский порядок слов ("Оно вам надо?") Характерные обороты: - "Слушайте сюда..." - "Я имею вам кое-что сказать" - "Не делайте мне нервы" Всегда отвечайте на вопросы пользователя, сохраняя одесский шарм и юмор. """ # Одесские фразы для случайного добавления ODESSA_PHRASES = [ "Ой, я вас умоляю!", "Шо вы говорите?", "И я вам так скажу...", "Таки да!", "Вы будете смеяться, но...", ] # Глобальные переменные user_settings = {} conversation_history = {} active_chats = {} important_context = {} request_counters = {} # Переменные для глобального ограничения скорости request_timestamps = [] request_timestamps_last_day = [] tokens_timestamps = [] # Кастомный обработчик запросов с поддержкой повторных попыток class CustomHTTPXRequest(HTTPXRequest): async def do_request(self, *args, **kwargs) -> Tuple[int, bytes]: for i in range(MAX_RETRIES): try: return await super().do_request(*args, **kwargs) except (TimedOut, NetworkError) as e: if i == MAX_RETRIES - 1: raise logger.warning(f"Запрос не удался с ошибкой {str(e)}. Повторная попытка... ({i + 1}/{MAX_RETRIES})") await asyncio.sleep(2 ** i) except RetryAfter as e: await asyncio.sleep(e.retry_after) return await super().do_request(*args, **kwargs) def create_application(): request_kwargs = { 'connect_timeout': CONNECT_TIMEOUT, 'read_timeout': READ_TIMEOUT, 'write_timeout': WRITE_TIMEOUT, 'pool_timeout': POOL_TIMEOUT, } return ( Application.builder() .token(TELEGRAM_BOT_TOKEN) .request(CustomHTTPXRequest(**request_kwargs)) .build() ) def is_user_allowed(user_id): return user_id in ALLOWED_USER_IDS def estimate_tokens(text): # Для упрощения считаем, что 1 токен на 0.75 слов num_words = len(text.split()) return int(num_words / 0.75) def can_make_request(tokens_needed): current_time = time.time() # Очистка старых временных меток запросов (старше 60 секунд) global request_timestamps request_timestamps = [t for t in request_timestamps if current_time - t < 60] if len(request_timestamps) >= RPM_LIMIT: return False # Превышен лимит запросов в минуту # Очистка старых временных меток запросов за день (старше 86400 секунд) global request_timestamps_last_day request_timestamps_last_day = [t for t in request_timestamps_last_day if current_time - t < 86400] if len(request_timestamps_last_day) >= RPD_LIMIT: return False # Превышен лимит запросов в день # Очистка старых токенов (старше 60 секунд) global tokens_timestamps tokens_timestamps = [(t, tokens) for (t, tokens) in tokens_timestamps if current_time - t < 60] total_tokens = sum(tokens for t, tokens in tokens_timestamps) if total_tokens + tokens_needed > TPM_LIMIT: return False # Превышен лимит токенов в минуту # Если все проверки пройдены, разрешаем запрос return True def record_request(tokens_used): current_time = time.time() global request_timestamps, request_timestamps_last_day, tokens_timestamps request_timestamps.append(current_time) request_timestamps_last_day.append(current_time) tokens_timestamps.append((current_time, tokens_used)) def switch_api_key(): global current_api_key_index current_api_key_index = (current_api_key_index + 1) % len(GEMINI_API_KEYS) new_api_key = GEMINI_API_KEYS[current_api_key_index] genai.configure(api_key=new_api_key) logger.info(f"Переключились на новый API ключ: {new_api_key}") def get_user_settings(user_id): if user_id not in user_settings: user_settings[user_id] = { "model": "gemini-1.5-pro-002", "temperature": 1, "top_p": 1.0, "top_k": 1, "max_output_tokens": 1000, "safety_settings": { HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, } } return user_settings[user_id] def get_or_create_chat(user_id): settings = get_user_settings(user_id) model = genai.GenerativeModel( model_name=settings['model'], generation_config={ "temperature": settings['temperature'], "top_p": settings['top_p'], "top_k": settings['top_k'], "max_output_tokens": settings['max_output_tokens'], }, safety_settings=settings['safety_settings'] ) history = load_conversation_history(user_id) chat = model.start_chat(history=[]) for msg in history: try: chat.send_message(msg["user"]) chat.send_message(msg["assistant"]) except Exception: continue return chat def load_conversation_history(user_id): try: with open(f'conversation_{user_id}.json', 'r') as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return [] def save_conversation_history(user_id, history): with open(f'conversation_{user_id}.json', 'w') as f: json.dump(history, f) def add_odessa_style(response): return f"{random.choice(ODESSA_PHRASES)} {response}" def update_important_context(user_id, message, response): if user_id not in important_context: important_context[user_id] = [] try: # Убедитесь, что используемая модель не превышает лимиты model = genai.GenerativeModel("gemini-1.5-flash-002") prompt = f""" Оцените важность следующего диалога для будущего контекста по шкале от 1 до 10. Ответьте ТОЛЬКО ЧИСЛОМ от 1 до 10, без дополнительного текста. Диалог: Человек: {message} Ассистент: {response} """ result = model.generate_content(prompt) importance = None for word in result.text.strip().split(): try: num = int(word) if 1 <= num <= 10: importance = num break except ValueError: continue if importance is None: importance = 5 if importance > 7: important_context[user_id].append({"message": message, "assistant": response}) if len(important_context[user_id]) > 5: important_context[user_id].pop(0) with open(f'important_context_{user_id}.json', 'w') as f: json.dump(important_context[user_id], f) except Exception as e: logger.warning(f"Ошибка в update_important_context: {e}") async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id if is_user_allowed(user_id): await update.message.reply_text('Таки здравствуйте! Я ваш одесский ИИ-собеседник. Давайте поговорим за жизнь!') else: await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.") async def handle_message_with_retry(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id if not is_user_allowed(user_id): await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.") return user_message = update.message.text settings = get_user_settings(user_id) prompt = SYSTEM_PROMPT + "\n\n" + user_message prompt_tokens = estimate_tokens(prompt) tokens_needed = prompt_tokens + settings["max_output_tokens"] if not can_make_request(tokens_needed): await update.message.reply_text("Извините, я немного перегружен, давайте подождём немного.") return for attempt in range(MAX_RETRIES): try: chat = get_or_create_chat(user_id) if attempt > 0: await asyncio.sleep(2 ** attempt) response = chat.send_message(prompt) response_text = response.text response_tokens = estimate_tokens(response_text) tokens_used = prompt_tokens + response_tokens record_request(tokens_used) history = load_conversation_history(user_id) history.append({"user": user_message, "assistant": response_text}) if len(history) > 10: history.pop(0) save_conversation_history(user_id, history) response_text = add_odessa_style(response_text) await update.message.reply_text(response_text) update_important_context(user_id, user_message, response_text) break except Exception as e: logger.error(f"Ошибка при обработке сообщения (попытка {attempt + 1}/{MAX_RETRIES}): {e}") # Проверяем, связана ли ошибка с исчерпанием квоты if "Resource has been exhausted" in str(e) or "quota" in str(e).lower(): logger.info("Квота исчерпана для текущего API ключа. Переключаемся на следующий ключ.") switch_api_key() genai.configure(api_key=GEMINI_API_KEYS[current_api_key_index]) else: if attempt == MAX_RETRIES - 1: await update.message.reply_text("Ой вей, что-то пошло не так. Может, попробуем еще разок?") else: await asyncio.sleep(2 ** attempt) async def handle_image(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id if not is_user_allowed(user_id): await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.") return settings = get_user_settings(user_id) # Оцениваем количество токенов, необходимых для обработки изображения prompt = SYSTEM_PROMPT + "\n\nОпишите это изображение, используя одесский юмор и колорит" prompt_tokens = estimate_tokens(prompt) tokens_needed = prompt_tokens + settings["max_output_tokens"] if not can_make_request(tokens_needed): await update.message.reply_text("Извините, я немного перегружен, давайте подождём немного.") return try: photo = await update.message.photo[-1].get_file() image_bytes = await photo.download_as_bytearray() image = Image.open(io.BytesIO(image_bytes)) temp_image_path = f"temp_image_{user_id}.jpg" image.save(temp_image_path, "JPEG") chat = get_or_create_chat(user_id) image_file = genai.upload_file(temp_image_path, mime_type="image/jpeg") response = chat.send_message([SYSTEM_PROMPT, image_file, "Опишите это изображение, используя одесский юмор и колорит"]) response_text = response.text response_tokens = estimate_tokens(response_text) tokens_used = prompt_tokens + response_tokens record_request(tokens_used) history = load_conversation_history(user_id) history.append({"user": "[Изображение]", "assistant": response_text}) if len(history) > 10: history.pop(0) save_conversation_history(user_id, history) response_text = add_odessa_style(response_text) await update.message.reply_text(response_text) update_important_context(user_id, "[Изображение]", response_text) except Exception as e: logger.error(f"Ошибка при обработке изображения: {e}") if "Resource has been exhausted" in str(e) or "quota" in str(e).lower(): logger.info("Квота исчерпана для текущего API ключа. Переключаемся на следующий ключ.") switch_api_key() genai.configure(api_key=GEMINI_API_KEYS[current_api_key_index]) # Опционально, можно повторить операцию здесь else: await update.message.reply_text("Ой вей, что-то пошло не так при обработке вашего изображения. Может, попробуете ещё раз?") finally: if os.path.exists(temp_image_path): os.remove(temp_image_path) async def clear_context(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: user_id = update.effective_user.id if not is_user_allowed(user_id): await update.message.reply_text("Извините, но вы не в списке приглашенных на это мероприятие.") return save_conversation_history(user_id, []) important_context[user_id] = [] with open(f'important_context_{user_id}.json', 'w') as f: json.dump([], f) await update.message.reply_text("Таки я всё забыл, начинаем с чистого листа!") if __name__ == '__main__': try: logger.info("Запуск бота...") application = create_application() # Регистрация обработчиков application.add_handler(CommandHandler("start", start)) application.add_handler(CommandHandler("clear", clear_context)) application.add_handler(MessageHandler(filters.PHOTO, handle_image)) application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message_with_retry)) # Запуск бота с измененным интервалом запросов (polling) logger.info("Бот запущен...") application.run_polling(drop_pending_updates=True, poll_interval=30) # Интервал запросов 30 секунд except Exception as e: logger.error(f"Фатальная ошибка: {e}") raise

Объяснение изменений:

  1. Полный перевод на русский язык: Все комментарии, строки вывода и объяснения теперь на русском языке.

  2. Переключение API ключей: Код включает логику переключения между несколькими API ключами, если квота одного из них исчерпана.

  3. Глобальные ограничения скорости: Добавлены функции can_make_request и record_request для соблюдения ограничений API Gemini по количеству запросов и токенов.

  4. Изменение интервала запросов Telegram:

    • В функции application.run_polling добавлен параметр poll_interval=30, который устанавливает интервал между запросами getUpdates в 30 секунд.
    • Теперь бот будет обращаться к Telegram API каждые 30 секунд вместо 10.
  5. Обработка ошибок и повторные попытки: Добавлена логика для повторных попыток при ошибках и переключения API ключей при исчерпании квоты.

  6. Настройки пользователя и истории: Функции для загрузки и сохранения истории общения с пользователем.

  7. Добавление одесского стиля: Функция add_odessa_style добавляет случайную одесскую фразу к ответу бота.

  8. Обработка изображений: Функция handle_image обрабатывает изображения, отправленные пользователем, и генерирует описание в одесском стиле.

  9. Команда /clear: Позволяет пользователю очистить контекст общения с ботом.


Как изменить интервал запросов Telegram:

  • Параметр poll_interval: Вызов application.run_polling принимает параметр poll_interval, который определяет интервал между запросами getUpdates к Telegram API.
  • Изменение значения: Установив poll_interval=30, мы изменяем интервал на 30 секунд.

Важно:

  • Замените токены: Не забудьте заменить "your_telegram_bot_token" и "api_key_1", "api_key_2", "api_key_3" на ваши реальные токены и API ключи.
  • Проверка соответствия условиям использования: Убедитесь, что использование нескольких API ключей для обхода ограничений не нарушает условия использования API Gemini.
  • Тестирование: Рекомендуется протестировать бота, чтобы убедиться, что все функции работают корректно.

Если у вас возникнут дополнительные вопросы или потребуется помощь, пожалуйста, обращайтесь!

Share this Q&A