插件agent:# video_chat_plugin/ai_companion_bot.py "...

作成日: 2025年7月30日

使用モデル O3 by Chat01

質問

插件agent:# video_chat_plugin/ai_companion_bot.py
"""
AI Companion Bot - LiveKit Agents Implementation
基于官方文档最佳实践重构的语音AI助手
"""
import asyncio
import logging
import base64
import re
import os
import io
import json
import httpx
import soundfile as sf
import numpy as np
import time
from pathlib import Path
from typing import Dict, Any, Optional, AsyncGenerator
from PIL import Image
from io import BytesIO
from dotenv import load_dotenv

加载环境变量

from pathlib import Path
plugin_dir = Path(file).parent
load_dotenv(plugin_dir / '.env')

配置日志级别

log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
debug_mode = os.getenv('DEBUG', 'false').lower() == 'true'

if debug_mode:
log_level = 'DEBUG'

使用增强的日志系统替代基础logging

try:
from enhanced_logging import setup_global_logging, log_session_event, log_agent_status, log_error_with_context
logger = setup_global_logging()
logger.info("✅ AI Companion Bot - 增强日志系统已启用")
except ImportError:
# 回退到基础日志系统
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s"
)
logger = logging.getLogger(name)
logger.warning("⚠️ 增强日志系统不可用,使用基础日志系统")
# 创建占位符函数
def log_session_event(event_type: str, session_id: str, data: dict): pass
def log_agent_status(agent_id: str, status: str, details: dict = None): pass
def log_error_with_context(error: Exception, context: dict = None): pass

if os.name == "nt":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

LiveKit Agents核心导入

from livekit import agents, rtc
from livekit.agents import (
Agent, AgentSession, JobContext, RunContext,
function_tool, ToolError, stt, llm, tts,
ChatContext, ChatMessage, MetricsCollectedEvent,
RoomInputOptions, RoomOutputOptions,
AutoSubscribe, WorkerOptions, WorkerPermissions,
AgentStateChangedEvent, UserStateChangedEvent, SpeechCreatedEvent,
ConversationItemAddedEvent, APIConnectOptions, get_job_context
)

方案2A需要的额外导入 - 为了捕获usage相关异常

from livekit.agents._exceptions import APIConnectionError
from pydantic_core import ValidationError

导入插件

from livekit.plugins import silero

✅ 按官方文档导入视觉相关模块

from livekit.agents.llm import ImageContent
from livekit.agents.utils.images import encode, EncodeOptions, ResizeOptions

尝试导入OpenAI插件,如果版本冲突则跳过

try:
from livekit.plugins import openai
OPENAI_PLUGIN_AVAILABLE = True
logger.info("LiveKit OpenAI插件加载成功")
except ImportError as e:
logger.warning(f"LiveKit OpenAI插件加载失败,将使用兼容实现: {e}")
openai = None
OPENAI_PLUGIN_AVAILABLE = False

尝试导入转向检测器,兼容不同版本的导入路径

try:
from livekit.plugins.turn_detector import MultilingualModel
except ImportError:
try:
from livekit.plugins.turn_detector.multilingual import MultilingualModel
except ImportError:
logger.warning("Turn detector plugin not available, using VAD-only turn detection")
MultilingualModel = None

导入辅助工具

from livekit.agents import metrics, utils

✅ 修正导入:确保导入ImageContent用于视觉识别

from livekit.agents.voice import SpeechHandle
from typing import AsyncIterable
from livekit.agents.llm import ImageContent

导入metrics模块,用于性能指标收集

from livekit.agents import metrics as agent_metrics

导入新的STT和TTS适配器

from stt_paraformer_streaming import ParaformerStreamingSTT
from tts_fish import FishTTS

不再需要StreamAdapter,FishTTS已支持ChunkedStream

导入DashScope的识别相关类

try:
import dashscope
from dashscope.audio.asr import Recognition, RecognitionCallback, RecognitionResult
except ImportError:
logger.error("DashScope SDK not found. Please install: pip install dashscope")
raise

class AICompanionAgent(Agent):
"""AI情感陪伴助手Agent (带动态表情工具)"""

text
def __init__(self, config: Dict[str, Any], character_data: Dict[str, Any], system_prompt: str = None, chat_ctx = None): # 首先设置基础属性 self.config = config self.character_data = character_data self.session_id = config.get("session_id", "unknown") self.use_function_call = config.get("use_function_call_for_avatar", False) # 是否使用function call # 添加调试日志 logger.info(f"[{self.session_id}] 🔧 Function Call设置: {self.use_function_call}") logger.info(f"[{self.session_id}] 🔧 原始配置值: {config.get('use_function_call_for_avatar')}") logger.info(f"[{self.session_id}] 🔧 环境变量值: '{os.getenv('USE_FUNCTION_CALL_FOR_AVATAR')}'") logger.info(f"[{self.session_id}] 🔧 config对象: {type(config)}, 所有键: {list(config.keys()) if isinstance(config, dict) else 'N/A'}") self._latest_frame: Optional[rtc.VideoFrame] = None self._video_stream: Optional[rtc.VideoStream] = None self._tasks = [] self._avatar_done = False # 控制每轮对话只执行一次表情切换 # --- 核心改动:自动从角色文件夹加载预置图片 --- tools = [] preset_images = self._load_character_images() if preset_images and self.use_function_call: # 只有在启用function call时才创建工具 available_expressions = list(preset_images.keys()) # 创建一个动态的工具描述 expressions_list_str = ", ".join([f"'{kw}'" for kw in available_expressions]) tool_description = f"""根据对话内容,选择一个最合适的表情或动作来改变你的形象。

你必须从以下可用的关键词中选择一个:{expressions_list_str}"""

text
# 创建工具函数 async def set_expression_wrapper(expression_keyword: str) -> str: return await self.set_expression(expression_keyword) # 创建工具 expression_tool = function_tool(name="set_character_expression", description=tool_description)(set_expression_wrapper) tools.append(expression_tool) # 构建完整的 instructions(系统提示词) if system_prompt: base_prompt = f"{system_prompt}\n\n{character_data.get('additive_prompt', '')}" else: base_prompt = f"{config.get('system_prompt', '你是一个AI助手。')}\n\n{character_data.get('additive_prompt', '')}" # 动态添加情感控制指令(仅当启用情感控制时) enable_emotion_control = config.get("enable_emotion_control", True) if enable_emotion_control: emotion_instruction = f"""

语音情感控制指令(仅语音对话时使用)

当你在语音对话中回复时,可以使用Fish Audio的情感控制指令来让你的声音更有表现力。

指令格式

使用括号()包含情感指令,格式:(情感指令)要朗读的文本

可用的情感指令

情感指令

  • (开心)、(高兴)、(愉快)、(兴奋):表达积极正面的情绪
  • (悲伤)、(难过)、(失望)、(伤心):表达消极负面的情绪
  • (愤怒)、(生气)、(恼火):表达愤怒情绪
  • (惊讶)、(震惊)、(吃惊):表达惊讶情绪
  • (平静)、(温柔)、(亲切)、(轻柔):表达温和情绪
  • (严肃)、(认真):表达严肃情绪
  • (疑惑)、(困惑)、(不解):表达困惑情绪

语气控制指令

  • (语速变慢):用于强调或思考
  • (大声):用于强调重要内容
  • (声音变小):用于温柔或悄悄话的效果
  • (语气激动):用于表达强烈情绪

副语言指令(需要配合拟声词):

  • (大笑)哈哈哈:表达开怀大笑
  • (哭泣)呜呜呜:表达伤心哭泣

使用示例

  • 正确:(开心)今天天气真不错呢!
  • 正确:(温柔)没关系,我会一直陪着你的。
  • 错误:开心今天天气真不错呢!(缺少括号)

重要提醒

  • 情感指令只在语音对话时使用,文字聊天时请正常回复

  • 情感指令应该与你回复的内容情感保持一致

  • 如果不确定用什么情感,可以不使用指令,让声音保持自然
    """
    base_prompt += emotion_instruction

    text
    # 添加【形象=xxx】使用规则(仅在不使用function call时) if not self.use_function_call and preset_images: available_expressions = list(preset_images.keys()) expressions_str = "、".join([f"'{expr}'" for expr in available_expressions]) avatar_instruction = f"""

⚠️ 形象切换规则

  • 每条回复的第一个token必须且只能包含一次【形象=xxx】标记

  • 可用形象关键词:{expressions_str}

  • 其后禁止再出现此格式

  • 根据对话情绪选择合适的形象表达"""
    base_prompt += avatar_instruction

    text
    # 初始化父类,传递工具、指令和聊天上下文 super().__init__(tools=tools, instructions=base_prompt, chat_ctx=chat_ctx) # 验证聊天上下文是否正确传递 if chat_ctx: logger.info(f"[{self.session_id}] ✅ 初始聊天上下文已传递") else: logger.info(f"[{self.session_id}] ℹ️ 未提供初始聊天上下文,将使用空上下文") logger.info(f"[{self.session_id}] AI Companion Agent initialized with character: {character_data.get('name', 'default')}") if preset_images: logger.info(f"[{self.session_id}] Available expressions: {list(preset_images.keys())}")

    def _load_character_images(self) -> Dict[str, str]:
    """自动从角色文件夹加载所有图片作为预置形象"""
    character_name = self.character_data.get("name", "default")
    characters_base_path = Path(file).parent / "assets" / "images" / "characters" / character_name

    text
    logger.info(f"[{self.session_id}] 🔍 扫描角色图片文件夹: {characters_base_path}") if not characters_base_path.exists(): logger.warning(f"[{self.session_id}] ⚠️ 角色文件夹不存在: {characters_base_path}") return {} # 支持的图片格式 image_extensions = ['.jpg', '.jpeg', '.png', '.webp', '.gif'] preset_images = {} try: # 扫描文件夹中的所有图片 for file_path in characters_base_path.iterdir(): if file_path.is_file() and file_path.suffix.lower() in image_extensions: # 使用文件名(不含扩展名)作为表情关键词 expression_name = file_path.stem # 构建相对路径 relative_path = file_path.relative_to(Path(__file__).parent) preset_images[expression_name] = str(relative_path).replace('\\', '/') logger.info(f"[{self.session_id}] ✅ 自动加载了 {len(preset_images)} 个预置形象: {list(preset_images.keys())}") except Exception as e: logger.error(f"[{self.session_id}] ❌ 扫描角色文件夹失败: {e}") return preset_images

    async def llm_node(
    self,
    chat_ctx: llm.ChatContext,
    tools: list[agents.FunctionTool],
    model_settings: agents.ModelSettings,
    **kwargs
    ) -> AsyncIterable[llm.ChatChunk]:
    """
    官方插件版本:使用super().llm_node()获得稳定的流式响应,
    在此基础上实现【形象=xxx】流式解析
    """
    logger.debug(f"[{self.session_id}] 🤖 LLM节点启动(官方版),使用function call: {self.use_function_call}")

    text
    # 只在不使用function call时启用【形象=xxx】解析 avatar_pattern = re.compile(r'【形象=([^】]+)】') if not self.use_function_call else None try: # 显式获取 LLMStream,确保用到新版逻辑 # ✅ 修复:去掉对 model_settings.conn_options 的访问,使用默认连接选项 llm_stream = self.session.llm.chat( chat_ctx=chat_ctx, tools=tools, # 如需自定义连接选项可取消注释下一行,否则使用SDK默认值 # conn_options=APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0), tool_choice=getattr(model_settings, "tool_choice", "auto"), ) async for chunk in llm_stream: # 流式【形象=xxx】解析 if avatar_pattern: text = None try: if hasattr(chunk, "choices") and chunk.choices: choice = chunk.choices[0] # 处理dict格式的choice(官方插件格式) if isinstance(choice, dict) and "delta" in choice: delta = choice.get("delta", {}) text = delta.get("content") # 处理对象格式的choice elif hasattr(choice, "delta") and hasattr(choice.delta, "content"): text = choice.delta.content except Exception: # 静默忽略不兼容的chunk结构 pass if text: # 检测【形象=xxx】标记 - 只处理第一次 if not self._avatar_done: matches = list(avatar_pattern.finditer(text)) if matches: # 只执行第一个匹配 first_match = matches[0] keyword = first_match.group(1) logger.info(f"[{self.session_id}] 🎭 检测到首个形象切换标记: {keyword}") asyncio.create_task(self._handle_avatar_change(keyword)) self._avatar_done = True # 标记已处理 # 清理文本中的形象标记(只清理第一个) cleaned_text = avatar_pattern.sub('', text, count=1) if cleaned_text != text: # 只有在实际清理了内容时才修改 try: if isinstance(choice, dict) and "delta" in choice: # 对于dict格式,创建新的chunk cleaned_chunk = llm.ChatChunk( id=chunk.id, choices=[{ "delta": { "role": choice["delta"].get("role", "assistant"), "content": cleaned_text }, "index": choice.get("index", 0) }] ) yield cleaned_chunk continue # 跳过原始chunk的输出 elif hasattr(choice, "delta") and hasattr(choice.delta, "content"): # 对于对象格式,直接修改 choice.delta.content = cleaned_text except Exception as e: logger.debug(f"[{self.session_id}] 清理形象标记失败,保持原文本: {e}") # 继续把 chunk 向上游传递,保证前端仍可流式显示 yield chunk # 正常结束 - 重置表情切换标志 self._avatar_done = False logger.debug(f"[{self.session_id}] ✅ LLM流式处理正常完成,已重置表情标志") return except Exception as e: # 其他意外异常,尽量不中断用户体验 logger.error(f"[{self.session_id}] LLM流式处理意外错误: {e!r}", exc_info=True) # 增强错误分类处理 error_msg = "收到,我在听。请继续。" # 默认回复 if "timeout" in str(e).lower(): error_msg = "网络有点慢,请再说一遍?" elif "rate" in str(e).lower() or "limit" in str(e).lower(): error_msg = "请求太频繁了,稍等一下再试。" elif "token" in str(e).lower() or "unauthorized" in str(e).lower(): error_msg = "系统有点问题,管理员正在处理中。" elif "model" in str(e).lower() or "not found" in str(e).lower(): error_msg = "AI服务暂时不可用,请稍后再试。" # 使用session.say而不是手写ChatChunk,保证跨版本兼容 await self.session.say(error_msg) # 异常结束时也要重置表情切换标志 self._avatar_done = False return

    async def _handle_avatar_change(self, keyword: str):
    """🔄 异步处理Bot形象切换。"""
    try:
    logger.info(f"[{self.session_id}] 🔄 开始处理Bot形象切换: {keyword}")
    await self.set_expression(keyword)
    logger.info(f"[{self.session_id}] ✅ Bot形象切换完成: {keyword}")
    except Exception as e:
    logger.error(f"[{self.session_id}] ❌ Bot形象切换失败: {e}")

    async def on_enter(self) -> None:
    """
    生命周期钩子:当Agent加入视频通话会话时,初始化视觉识别功能。
    按照官方文档优化纯视频通话场景的视频帧处理。
    """
    logger.info(f"[{self.session_id}] Agent已加入视频通话会话")

    text
    # 使用 get_job_context() 获取 room try: room = get_job_context().room if not room: logger.error(f"[{self.session_id}] Room not available from job context") return except Exception as e: logger.error(f"[{self.session_id}] Failed to get room from job context: {e}") return logger.info(f"[{self.session_id}] 视频通话环境就绪,检查用户连接") # 🔥 关键修改:检查是否有非Agent参与者 non_agent_participants = [ p for p in room.remote_participants.values() if getattr(p, 'kind', None) != 'agent' ] if not non_agent_participants: logger.info(f"[{self.session_id}] 等待用户加入视频通话...") # 监听参与者连接事件 def on_user_connected(participant): if getattr(participant, 'kind', None) != 'agent': logger.info(f"[{self.session_id}] 用户已加入视频通话: {participant.identity}") # 移除事件监听器,避免重复初始化 try: room.off("participant_connected", on_user_connected) except Exception as e: logger.debug(f"[{self.session_id}] 移除事件监听器失败: {e}") # 异步执行初始化 asyncio.create_task(self._initialize_after_user_connected()) room.on("participant_connected", on_user_connected) return # 如果用户已经连接,直接初始化 logger.info(f"[{self.session_id}] 发现已连接的用户: {[p.identity for p in non_agent_participants]}") await self._initialize_after_user_connected()

    async def _initialize_after_user_connected(self):
    """用户连接后的初始化逻辑,包含视觉输出初始化。"""
    logger.info(f"[{self.session_id}] 🚀 开始初始化Agent(包含视觉功能)...")

    text
    try: # 获取 room room = get_job_context().room if not room: logger.error(f"[{self.session_id}] Room not available in _initialize_after_user_connected") return # 等待足够时间确保用户端准备就绪(页面加载、设备初始化等) await asyncio.sleep(2.0) # 订阅用户视频流(Bot视觉输入) logger.info(f"[{self.session_id}] 👁️ 开始订阅用户视频流(作为Bot视觉输入)...") self._subscribe_to_video_stream() # 首先发送初始图片(Bot视觉输出) preset_images = self._load_character_images() # 重新加载确保最新 logger.info(f"[{self.session_id}] 🎭 可用的Bot预置形象: {list(preset_images.keys())}") if preset_images: default_keyword = "默认" if "默认" in preset_images else list(preset_images.keys())[0] logger.info(f"[{self.session_id}] 🎭 设置初始Bot形象: {default_keyword}") await self.set_expression(default_keyword) else: logger.warning(f"[{self.session_id}] ⚠️ 没有可用的Bot预置形象") # 发送连接状态到前端 await room.local_participant.publish_data( json.dumps({ "type": "agent_ready", "message": "AI助手已就绪", "character": self.character_data.get('name', 'default') }).encode("utf-8"), topic="agent_status" ) logger.info(f"[{self.session_id}] 📡 已发送Agent就绪状态到前端") # 等待更长时间再发送欢迎消息,确保图片完全加载和用户准备好 await asyncio.sleep(3.0) # 发送欢迎消息 display_name = self.character_data.get('display_name', 'AI助手') logger.info(f"[{self.session_id}] 👋 发送欢迎消息: {display_name}") await self.session.say(f"你好!我是{display_name},很高兴见到你!", allow_interruptions=False) logger.info(f"[{self.session_id}] ✅ Agent初始化完成(包含视觉功能)") except Exception as e: logger.error(f"[{self.session_id}] ❌ Agent初始化失败: {e}", exc_info=True)

    def _subscribe_to_video_stream(self):
    """
    🔥 正确理解:订阅用户的摄像头视频流,用于视觉输入。
    这是用户对Bot的视觉输入,不是Bot对用户的视觉输出!
    符合LiveKit Agents官方标准的视频轨道处理实现。
    """
    try:
    room = get_job_context().room
    if not room:
    logger.error(f"[{self.session_id}] Room not available in _subscribe_to_video_stream")
    return
    except Exception as e:
    logger.error(f"[{self.session_id}] Failed to get room: {e}")
    return

    text
    # ✅ 查找用户的视频轨道(作为Bot的视觉输入) for participant in room.remote_participants.values(): for publication in participant.track_publications.values(): if publication.track and publication.track.kind == rtc.TrackKind.KIND_VIDEO: logger.info(f"[{self.session_id}] 找到用户视频轨道(作为视觉输入): {publication.track.sid}") self._create_video_stream(publication.track) return # 只订阅第一个找到的 # ✅ 监听用户后续发布的轨道 @room.on("track_subscribed") def on_track_subscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant): if track.kind == rtc.TrackKind.KIND_VIDEO and self._video_stream is None: logger.info(f"[{self.session_id}] 用户新视频轨道已订阅(视觉输入): {track.sid}") self._create_video_stream(track) elif track.kind == rtc.TrackKind.KIND_AUDIO: # ✅ 音频轨道防御逻辑:检测静音轨道问题 logger.info(f"[{self.session_id}] 用户音频轨道已订阅: {track.sid}") asyncio.create_task(self._monitor_audio_track_activity(track, publication, participant))

    async def _monitor_audio_track_activity(self, track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant, check_duration: float = 3.0):
    """
    ✅ 保险逻辑:监控音频轨道活动,如果检测到当前轨道长时间静音而有新的活跃轨道,则切换
    """
    try:
    await asyncio.sleep(check_duration) # 等待一段时间检测活动

    text
    # 检查当前轨道是否有音频活动(简化实现:通过音频帧检测) # 如果需要更精确的检测,可以集成音量检测逻辑 # 检查是否有其他音频轨道更活跃 all_audio_tracks = [] for pub in participant.track_publications.values(): if pub.track and pub.track.kind == rtc.TrackKind.KIND_AUDIO and pub.track.sid != track.sid: all_audio_tracks.append(pub.track) if all_audio_tracks: logger.warning(f"[{self.session_id}] 检测到多个音频轨道,当前={track.sid},其他={[t.sid for t in all_audio_tracks]}") logger.warning(f"[{self.session_id}] 这可能表示前端重复发布了音频轨道,请检查前端日志") # 这里可以添加更复杂的切换逻辑,但前端修复应该已经解决根本问题 # 记录警告便于调试 except Exception as e: logger.error(f"[{self.session_id}] 音频轨道监控失败: {e}")

    def _create_video_stream(self, track: rtc.Track):
    """
    🔥 正确理解:为用户的视频轨道创建视频流读取器。
    这是用户摄像头的视频帧,作为Bot的视觉输入用于理解用户。
    符合LiveKit Agents官方标准的视频帧采样实现。
    """
    # 关闭现有流(一次只保持一个)
    if self._video_stream is not None:
    # 使用任务调度异步关闭,避免在同步上下文中阻塞
    asyncio.create_task(self._video_stream.aclose())

    text
    # 创建新流接收用户视频帧 self._video_stream = rtc.VideoStream(track) async def read_stream(): logger.info(f"[{self.session_id}] 开始读取用户视频流 {track.sid}(作为视觉输入)") async for event in self._video_stream: # ✅ 存储用户最新帧供后续 AI 分析 self._latest_frame = event.frame logger.debug(f"[{self.session_id}] 捕获用户视频帧,时间戳: {event.frame.timestamp}") logger.info(f"[{self.session_id}] 用户视频流 {track.sid} 已关闭") # 存储异步任务 task = asyncio.create_task(read_stream()) self._tasks.append(task) task.add_done_callback(lambda t: self._tasks.remove(t))

    async def on_user_turn_completed(self, turn_ctx: ChatContext, new_message: ChatMessage) -> None:
    """
    🔥 正确理解:在用户说话结束后,将用户摄像头的视频帧作为视觉输入发送给AI。
    AI看到的是用户的样子,而不是Bot自己的形象!
    符合LiveKit Agents官方标准的视觉识别实现。
    """
    try:
    logger.info(f"[{self.session_id}] 🎯 on_user_turn_completed 被调用")

    text
    # 安全提取用户文本 - 修复 ChatMessage.text 属性错误 user_text = "" try: # ChatMessage 使用 content 属性,不是 text if hasattr(new_message, 'content'): content = new_message.content if isinstance(content, str): user_text = content elif isinstance(content, list): # content 可能是内容块数组 for item in content: if isinstance(item, str): user_text = item break elif hasattr(item, 'text'): user_text = item.text break except Exception as e: logger.warning(f"[{self.session_id}] 提取消息文本时出错: {e}") logger.info(f"[{self.session_id}] 📝 用户消息: {user_text[:100] if user_text else '(空)'}") logger.info(f"[{self.session_id}] 🎤 用户说话结束,内容: {user_text[:100]}...") if self._latest_frame: logger.info(f"[{self.session_id}] 将用户摄像头帧作为视觉输入发送给AI") # ✅ 使用优化的视频帧编码(用户摄像头内容) try: image_content = await _encode_video_frame_optimized(self._latest_frame, self.session_id) # 确保 content 是 list if not isinstance(new_message.content, list): user_text = new_message.content if isinstance(new_message.content, str) else "" new_message.content = [user_text] if user_text else [] new_message.content.append(image_content) logger.debug(f"[{self.session_id}] 用户视频帧已添加到AI视觉输入") except Exception as e: logger.error(f"[{self.session_id}] 用户视频帧处理失败: {e}") # 回退到基本实现 image_content = ImageContent( image=self._latest_frame, inference_width=1024, inference_height=1024, inference_detail="auto" ) # 确保 content 是 list if not isinstance(new_message.content, list): user_text = new_message.content if isinstance(new_message.content, str) else "" new_message.content = [user_text] if user_text else [] new_message.content.append(image_content) # 重置,确保每回合只用一次 self._latest_frame = None # 确保调用父类方法以触发默认的LLM响应流程 logger.info(f"[{self.session_id}] 🚀 准备调用父类的on_user_turn_completed") await super().on_user_turn_completed(turn_ctx, new_message) logger.info(f"[{self.session_id}] ✅ 已触发AI响应流程") except Exception as e: logger.error(f"[{self.session_id}] ❌ on_user_turn_completed 处理异常: {e}", exc_info=True) # 可选:向前端发送安全的状态事件 try: room = get_job_context().room if room and room.local_participant: await room.local_participant.publish_data( json.dumps({"type": "error", "message": "处理用户输入时出现问题"}).encode("utf-8"), topic="agent_status" ) except Exception: pass # 静默忽略状态发送错误

    async def on_exit(self) -> None:
    """
    生命周期钩子:当Agent离开会话时,关闭视频流。
    """
    logger.info(f"[{self.session_id}] Agent is exiting the session, cleaning up video stream.")
    if self._video_stream:
    await self._video_stream.aclose()
    self._video_stream = None

    text
    for task in self._tasks: task.cancel() self._tasks.clear()

    --- 核心改动:新的工具实现 ---

    async def set_expression(self, expression_keyword: str) -> str:
    """
    🎭 Bot视觉输出核心方法:设置角色表情的核心方法。
    自动从角色文件夹中查找对应的图片。
    """
    logger.info(f"[{self.session_id}] 🎭 Bot视觉输出触发: '{expression_keyword}'")
    logger.info(f"[{self.session_id}] 📁 当前角色: {self.character_data.get('name', 'unknown')}")

    text
    # 重新加载最新的预置图片 preset_images = self._load_character_images() logger.info(f"[{self.session_id}] 🖼️ 可用预置形象: {list(preset_images.keys())}") image_path_str = preset_images.get(expression_keyword) # 如果预置图片库中没有,尝试模糊匹配 if not image_path_str: logger.info(f"[{self.session_id}] 🔍 '关键词{expression_keyword}'不在预置图库中,尝试模糊匹配...") # 尝试模糊匹配 for key, path in preset_images.items(): if expression_keyword.lower() in key.lower() or key.lower() in expression_keyword.lower(): image_path_str = path logger.info(f"[{self.session_id}] ✅ 模糊匹配成功: {key} -> {path}") break else: logger.info(f"[{self.session_id}] ✅ 在预置图库中找到 '{expression_keyword}': {image_path_str}") if not image_path_str: logger.warning(f"[{self.session_id}] ⚠️ 关键词 '{expression_keyword}' 既不在预置图库也不在角色文件夹中") # 即使关键词不存在,也返回一个友好的错误,让LLM处理 raise ToolError(f"我还不懂 '{expression_keyword}' 这个表情呢。") # 构建相对于插件目录的完整路径 image_full_path = Path(__file__).parent / image_path_str logger.info(f"[{self.session_id}] 📂 构建图片路径: {image_full_path}") logger.info(f"[{self.session_id}] 📝 图片文件存在: {image_full_path.exists()}") if not image_full_path.exists(): logger.error(f"[{self.session_id}] ❌ 图片文件不存在: {image_full_path}") raise ToolError("抱歉,我好像找不到对应的图片文件。") try: # 关键修复:把同步文件读取移到线程池避免事件循环阻塞 def read_image_file(): with open(image_full_path, "rb") as f: return f.read() image_bytes = await asyncio.to_thread(read_image_file) logger.info(f"[{self.session_id}] ✅ 图片加载成功,大小: {len(image_bytes)} 字节") # 获取 room try: room = get_job_context().room if not room: logger.error(f"[{self.session_id}] ❌ Room不可用,无法发送图片") raise ToolError("房间未连接,无法发送图片。") except Exception as e: logger.error(f"[{self.session_id}] ❌ 获取Room失败: {e}") raise ToolError("会话未就绪,无法发送图片。") logger.info(f"[{self.session_id}] 📤 开始发送Bot形象数据到前端...") # 使用 ByteStream API 发送图片数据 (LiveKit SDK v2.0+) try: # 尝试使用新的 stream_bytes API writer = await room.local_participant.stream_bytes( topic='agent.avatar', name=f'{expression_keyword}.jpg', mime_type='image/jpeg' ) await writer.write(image_bytes) await writer.aclose() # 修复:使用 aclose() 而不是 close() logger.info(f"[{self.session_id}] ✅ 使用ByteStream API发送成功") except AttributeError: # 回退到旧的 publish_data API (兼容旧版本) logger.warning(f"[{self.session_id}] ⚠️ ByteStream API不可用,使用旧版publish_data") await room.local_participant.publish_data( image_bytes, # 作为第一个位置参数 reliable=True, topic='agent.avatar' ) logger.info(f"[{self.session_id}] ✅ Bot形象发送成功: '{expression_keyword}' ({len(image_bytes)} 字节)") logger.info(f"[{self.session_id}] 👤 用户应该现在看到Bot新形象") # 返回一个成功信息给LLM,LLM可以基于此生成口头回复 return f"成功将形象切换为 '{expression_keyword}'。" except Exception as e: logger.error(f"[{self.session_id}] ❌ Bot形象发送失败 '{expression_keyword}': {e}", exc_info=True) raise ToolError("发送图片时出错了。")

    _find_image_in_character_folder 方法已被 _load_character_images 替代

--- 按官方文档添加用户视频帧优化编码功能 ---

async def _encode_video_frame_optimized(frame: rtc.VideoFrame, session_id: str = "unknown") -> ImageContent:
"""
🔥 正确理解:优化编码用户摄像头的视频帧,作为Bot的视觉输入。
这是用户的摄像头视频帧 → AI的视觉识别,不是Bot形象。
支持自定义尺寸和质量设置,优化LLM推理性能。

text
Args: frame: 用户摄像头的LiveKit视频帧 session_id: 会话 ID,用于日志 Returns: ImageContent: 官方标准的图像内容对象,作为AI视觉输入 """ try: import base64 # 关键修复:把同步JPEG编码移到线程池避免事件循环阻塞 image_bytes = await asyncio.to_thread( encode, frame, EncodeOptions( format="JPEG", quality=75, # 降低质量以减少文件大小和token消耗 resize_options=ResizeOptions( width=512, # 官方推荐的512-768px范围,选择512px height=512, # 保持正方形比例 strategy="scale_aspect_fit" # 保持宽高比 ) ) ) # 创建数据 URL data_url = f"data:image/jpeg;base64,{base64.b64encode(image_bytes).decode('utf-8')}" # ✅ 使用官方ImageContent格式(用户视觉输入) return ImageContent( image=data_url, inference_detail="auto" # 自动选择推理精度 ) except Exception as e: logger.warning(f"[{session_id}] 用户视频帧优化编码失败,使用默认方式: {e}") # 回退到默认编码,同样使用512px尺寸 return ImageContent( image=frame, inference_width=512, inference_height=512, inference_detail="auto" )

--- 结束用户视频帧优化功能 ---

async def entrypoint(ctx: JobContext) -> None:
"""
LiveKit Agents入口点函数 - 基于LiveKit v1.0最佳实践
包括:STT-LLM-TTS管道、VAD、转向检测、错误处理、Agent连接诊断等
"""
import os # 确保os模块在函数作用域内可用
# 开启asyncio调试模式,定位事件循环阻塞点
import asyncio, logging
loop = asyncio.get_running_loop()
loop.set_debug(True)
loop.slow_callback_duration = 0.2 # 200ms阈值
logging.getLogger("asyncio").setLevel(logging.WARNING)
logger.info("🔧 Asyncio debug mode enabled with 200ms slow callback detection")

text
# 重新加载环境变量,确保获取最新配置 from pathlib import Path env_path = Path(__file__).parent / '.env' if env_path.exists(): load_dotenv(env_path, override=True) logger.info(f"✅ 重新加载环境变量文件: {env_path}") # 调试:检查环境变量的实际值 use_func_call_raw = os.getenv("USE_FUNCTION_CALL_FOR_AVATAR") logger.info(f"🔍 USE_FUNCTION_CALL_FOR_AVATAR原始值: '{use_func_call_raw}' (type: {type(use_func_call_raw)})") logger.info(f"🔍 解析后的布尔值: {use_func_call_raw.lower() == 'true' if use_func_call_raw else False}") # ✅ 关键诊断日志 - 确保能看到entrypoint被调用 print(f"🚀🚀🚀 AGENT ENTRYPOINT CALLED 🚀🚀🚀") print(f"Room: {ctx.room.name}") print(f"Job ID: {ctx.job.id}") logger.info(f"🚀 Agent entrypoint called for room: {ctx.room.name}") logger.info(f"💼 Job ID: {ctx.job.id}") logger.info(f"🏷️ Job metadata: {ctx.job.metadata}") logger.info(f"🏷️ Job metadata type: {type(ctx.job.metadata)}") logger.info(f"🏷️ Job metadata length: {len(ctx.job.metadata) if ctx.job.metadata else 0}") # ✅ 修复:立即设置默认的日志上下文,避免显示unknown ctx.log_context_fields = { "worker_id": ctx.worker_id, "room_name": ctx.room.name, "user_id": "pending", "character_name": "pending", "session_id": "pending" } # 初始化Agent连接诊断系统(可选,不影响核心功能) diagnostics = None try: from agent_diagnostics import get_diagnostics diagnostics = get_diagnostics() await diagnostics.initialize() logger.info("🔍 Agent连接诊断系统已初始化") except ImportError: logger.debug("⚠️ Agent诊断系统不可用(模块未找到)") except Exception as e: logger.debug(f"⚠️ Agent诊断系统初始化失败: {e}") # 不影响核心功能,继续执行 # 设置遥测(可选) try: from .telemetry_setup import setup_telemetry setup_telemetry() except (ImportError, Exception) as e: logger.debug(f"遥测设置跳过: {e}") # 初始化默认值 user_id = None character_name = "default" system_prompt = "你是一个AI助手。" chat_history = [] # 根据官方文档,从job metadata获取基本信息,添加UTF-8编码错误处理 try: import json basic_info = {} if ctx.job.metadata and ctx.job.metadata.strip(): logger.info(f"🔍 开始解析Job元数据...") # 处理潜在的UTF-8编码问题 try: # 如果metadata是bytes,先解码 if isinstance(ctx.job.metadata, bytes): metadata_str = ctx.job.metadata.decode('utf-8', errors='ignore') logger.info("📝 从bytes解码metadata") else: metadata_str = str(ctx.job.metadata) basic_info = json.loads(metadata_str) logger.info(f"📋 解析成功,基本信息: {basic_info}") except UnicodeDecodeError as e: logger.error(f"❌ UTF-8解码metadata失败: {e}") log_error_with_context(e, {"metadata_type": type(ctx.job.metadata), "metadata_length": len(ctx.job.metadata) if ctx.job.metadata else 0}) basic_info = {} else: logger.warning("⚠️ 未收到有效的job metadata,使用默认配置") except json.JSONDecodeError as e: logger.error(f"❌ JSON解析job metadata失败: {e}") logger.error(f"❌ 原始metadata: '{ctx.job.metadata}'") log_error_with_context(e, {"raw_metadata": str(ctx.job.metadata)[:200]}) basic_info = {} except Exception as e: logger.error(f"❌ 处理job metadata时发生未知错误: {e}") log_error_with_context(e, {"metadata_type": type(ctx.job.metadata)}) basic_info = {} # 从基本信息中提取关键字段 user_id = basic_info.get("user_id") character_name = basic_info.get("character_name", "default") session_id = basic_info.get("session_id") # ✅ 增强调试信息 logger.info(f"👤 用户ID: {user_id}") logger.info(f"🎭 角色名称: {character_name}") logger.info(f"🔗 会话ID: {session_id}") logger.info(f"📋 完整metadata解析结果: {basic_info}") # 检查metadata传递是否正确 if not session_id: logger.error("❌ 关键问题:session_id为空!") logger.error(f"❌ 原始metadata: '{ctx.job.metadata}'") logger.error(f"❌ 解析后basic_info: {basic_info}") else: logger.info(f"✅ session_id提取成功: {session_id}") # ✅ 修复:更新日志上下文为实际值 ctx.log_context_fields = { "worker_id": ctx.worker_id, "room_name": ctx.room.name, "user_id": user_id or "unknown", "character_name": character_name or "default", "session_id": session_id or "unknown" } # 启动Agent连接诊断(如果可用) if diagnostics and session_id and user_id: try: diagnostics.start_session_diagnosis( session_id=session_id, room_name=ctx.room.name, user_id=user_id, character_name=character_name ) logger.info(f"🔍 已启动会话连接诊断: {session_id}") log_session_event("diagnostics_started", session_id, { "room_name": ctx.room.name, "user_id": user_id, "character_name": character_name }) except Exception as e: logger.error(f"❌ 启动连接诊断失败: {e}") log_error_with_context(e, {"session_id": session_id, "user_id": user_id}) # 根据官方文档建议,在连接前异步获取完整的session数据 system_prompt = "你是一个AI助手。" chat_history = [] if session_id: try: # 根据官方文档建议,在连接前完成数据获取 logger.info(f"🔄 正在获取session {session_id} 的完整数据...") import httpx import os # 获取FastAPI服务器地址 fastapi_port = int(os.getenv("FASTAPI_PORT", "8008")) api_url = f"http://localhost:{fastapi_port}/session/{session_id}/context" logger.info(f"🌐 请求URL: {api_url}") async with httpx.AsyncClient(timeout=5.0) as client: response = await client.get(api_url) logger.info(f"📡 HTTP响应状态码: {response.status_code}") if response.status_code == 200: session_data = response.json() logger.info(f"📦 API响应数据: {session_data}") if session_data.get("success"): system_prompt = session_data.get("system_prompt", system_prompt) chat_history = session_data.get("chat_history", []) logger.info(f"✅ 成功获取session数据:") logger.info(f" - system_prompt长度: {len(system_prompt)}") logger.info(f" - 历史消息数量: {len(chat_history)}条") logger.info(f" - 历史消息详情: {chat_history}") else: logger.warning(f"⚠️ Session数据获取失败: {session_data}") else: logger.warning(f"⚠️ HTTP请求失败: {response.status_code}") logger.warning(f"⚠️ 响应内容: {response.text}") except Exception as e: logger.error(f"❌ 获取session数据失败: {e}") logger.error(f"❌ 异常详情: {e.__class__.__name__}: {str(e)}") logger.warning("⚠️ 将使用基本配置继续") else: logger.error("❌ 无法获取session数据:session_id为空") logger.info(f"✅ 最终配置 - 用户: {user_id}, 角色: {character_name}, 会话: {session_id}") logger.info(f"💬 系统提示词长度: {len(system_prompt)}, 历史消息: {len(chat_history)}条") # 加载本地配置文件(用于获取API密钥等) config = load_agent_config() # 从本地配置获取角色数据 character_data = config.get("characters", {}).get(character_name, { "name": "default", "display_name": "默认角色", "voice_id": "", "additive_prompt": "", "tts_params": {"temperature": 0.7, "top_p": 0.7, "backend": "s1"} }) logger.info(f"使用角色: {character_name} ({character_data.get('display_name', character_name)})") # 创建AI服务组件 try: logger.info(f"🔌 正在连接到房间: {ctx.room.name}") # 首先连接到房间 await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) logger.info(f"✅ Agent已连接到房间: {ctx.room.name}") # 🔥 关键修改:等待用户参与者连接前先设置日志上下文 ctx.log_context_fields = { "worker_id": ctx.worker_id, "room_name": ctx.room.name, "user_id": user_id or "unknown", "character_name": character_name, "session_id": session_id or "unknown" } logger.info("⏳ 等待用户参与者连接...") try: # 等待第一个非Agent参与者连接,最多等待2分钟 participant = await asyncio.wait_for(ctx.wait_for_participant(), timeout=120.0) logger.info(f"👤 用户参与者已连接: {participant.identity} (kind: {getattr(participant, 'kind', 'unknown')})") except asyncio.TimeoutError: logger.warning("⏰ 等待用户连接超时(2分钟),继续初始化") except Exception as e: logger.error(f"❌ 等待用户连接时出错: {e}") # 发送连接成功消息 await ctx.room.local_participant.publish_data( json.dumps({"type": "agent_connected", "message": "AI助手已连接", "character": character_name}).encode("utf-8"), topic="agent_status" ) logger.info(f"📢 已发送Agent连接状态") # 使用官方openai.LLM实现 llm_config = config.get("llm_config", {}) if OPENAI_PLUGIN_AVAILABLE: logger.info("使用官方LiveKit OpenAI插件") llm_instance = openai.LLM( model=llm_config.get('model', 'gpt-4o'), api_key=llm_config.get('api_key', ''), base_url=llm_config.get('base_url', ''), temperature=0.7 ) else: logger.error("LiveKit OpenAI插件不可用,请检查安装") raise RuntimeError("需要安装 livekit-plugins-openai") logger.info(f"🤖 LLM配置完成: {llm_config.get('model', 'gpt-4o')} (官方插件)") # VAD - 语音活动检测(使用预加载的实例或创建新的) vad = ctx.proc.userdata.get("vad") if not vad: vad = silero.VAD.load( min_speech_duration=0.05, min_silence_duration=0.55, activation_threshold=0.5 ) logger.info(f"🎤 VAD加载完成(非预热)") else: logger.info(f"✅ 使用预加载的VAD") # 转向检测器 - 使用预加载的实例或创建新的 turn_detector = ctx.proc.userdata.get("turn_detector") if not turn_detector: if MultilingualModel: try: # 尝试创建 MultilingualModel 实例 turn_detector = MultilingualModel() logger.info(f"🔄 转向检测器加载完成(非预热)") except Exception as e: logger.warning(f"⚠️ 无法加载 MultilingualModel: {e}") logger.info(f"💡 提示: 运行 'python ai_companion_bot.py download-files' 下载模型文件") # 回退到VAD-only模式 turn_detector = "vad" logger.info(f"🔄 使用VAD-only转向检测模式") else: turn_detector = "vad" # 回退到VAD-only模式 logger.info(f"🔄 使用VAD-only转向检测(turn detector插件不可用)") else: logger.info(f"✅ 使用预加载的转向检测器") # 准备完整的配置,包含session_id和初始聊天上下文 complete_config = config.copy() # 确保不会意外替换有效的session_id if session_id: complete_config["session_id"] = session_id else: complete_config["session_id"] = f"session_{int(time.time())}" logger.warning(f"⚠️ 未获取到session_id,生成新的: {complete_config['session_id']}") logger.info(f"🔧 使用session_id: {complete_config['session_id']}") # ✅ 根据官方文档:在ctx.connect()之后,创建Agent之前构建ChatContext logger.info(f"📜 正在构建初始聊天上下文...") initial_ctx = ChatContext() initial_message_count = 0 # 记录初始消息数量,用于会话结束时分离新消息 for msg in chat_history: role = msg.get("role", "user") content = msg.get("content", "") if role in ["user", "assistant"] and content: initial_ctx.add_message(role=role, content=content) initial_message_count += 1 logger.info(f"📜 构建初始聊天上下文完成,包含 {initial_message_count} 条历史消息") # 创建AI助手实例,传递系统提示词和初始聊天上下文 agent = AICompanionAgent( config=complete_config, character_data=character_data, system_prompt=system_prompt, chat_ctx=initial_ctx # 根据文档,在构造函数中传递chat_ctx ) logger.info(f"🎭 AI助手实例创建完成,session_id: {agent.session_id}") # 记录Agent状态 log_agent_status( agent_id=agent.session_id, status="created", details={ "user_id": user_id, "character_name": character_name, "room_name": ctx.room.name, "system_prompt_length": len(system_prompt), "chat_history_length": len(chat_history) } ) # STT - 使用优化后的流式Paraformer适配器(已移除保活机制) custom_stt = ParaformerStreamingSTT( api_key=config.get("dashscope_api_key"), model="paraformer-realtime-8k-v2", sample_rate=8000, semantic_punctuation_enabled=False, # 关闭以启用情感识别 max_sentence_silence=1500, # 增加静音超时到1.5秒 ) logger.info(f"🗣️ STT服务配置完成: 8kHz + 情感识别(已优化性能)") # TTS - 使用新的Fish Audio适配器 # 初始化TTS服务 character_tts = character_data.get("tts_params", {}) fish_api_key = config.get("fish_audio_api_key") voice_id = character_data.get("voice_id") fish_tts_raw = FishTTS( api_key=fish_api_key, reference_id=voice_id, # FishTTS使用reference_id temperature=character_tts.get("temperature", 0.7), top_p=character_tts.get("top_p", 0.7), backend=character_tts.get("backend", "s1"), # 默认使用s1以支持情感 enable_emotion_control=config.get("enable_emotion_control", True), streaming=False, # ✅ 关键:非流式模式,避免WebSocket阻塞 sentences_per_chunk=int(os.getenv("FISH_TTS_SENTENCES_PER_CHUNK", "1")), # 可配置批量大小 output_format=os.getenv("FISH_TTS_FORMAT", "pcm"), # pcm|wav|mp3 frame_size_ms=int(os.getenv("FISH_TTS_FRAME_MS", "40")), # 40ms 推荐 ) # 关键修复:直接使用FishTTS,不再套StreamAdapter fish_tts = fish_tts_raw # 直接使用,符合官方文档建议 logger.info(f"🔊 TTS服务配置完成: Fish Audio (直接模式)") # 启动会话 - 使用增强的配置 # 现在我们同时指定stt和tts插件,依靠框架的默认行为 logger.info(f"[{session_id}] 🎬 创建AgentSession...") logger.info(f"[{session_id}] 📊 配置: STT={type(custom_stt)}, TTS={type(fish_tts)}, LLM={type(llm_instance)}") session = AgentSession( stt=custom_stt, tts=fish_tts, # 使用Fish Audio TTS llm=llm_instance, vad=vad, turn_detection=turn_detector, # 添加转向检测 allow_interruptions=True, # 允许中断 min_interruption_duration=0.5, # 最小中断时长 min_endpointing_delay=0.5, # 最小结束点延迟 max_endpointing_delay=3.0, # 最大结束点延迟 preemptive_generation=True, # 启用预生成以减少延迟 use_tts_aligned_transcript=True, # 启用TTS对齐的转录转发 ) logger.info(f"📞 Agent Session创建完成") # 添加调试日志来追踪Session对象 logger.info(f"🔍 Session对象类型: {type(session)}") logger.info(f"🔍 Session属性: {[attr for attr in dir(session) if not attr.startswith('_')][:10]}...") logger.info(f"🔊 TTS配置:使用 Fish Audio TTS 服务 (直接模式)") if fish_tts_raw: logger.info(f"🔊 TTS服务状态: 模型={fish_tts_raw.backend}, 情感控制={'启用' if fish_tts_raw.enable_emotion_control else '禁用'}") else: logger.warning(f"⚠️ TTS服务未正确初始化") # ✅ 根据官方文档配置房间输入输出选项 room_input_options = RoomInputOptions( text_enabled=True, # 启用文本输入 audio_enabled=True, # 启用音频输入 - 这是关键! video_enabled=False # 禁用视频输入(如需视频支持可设为True) ) room_output_options = RoomOutputOptions( audio_enabled=True, # 启用音频输出 transcription_enabled=True # 启用转录输出 ) # 添加调试日志 logger.info(f"🚀 启动Agent Session...") logger.info(f" - STT: {type(custom_stt).__name__}") logger.info(f" - TTS: {type(fish_tts).__name__}") logger.info(f" - LLM: {type(llm_instance).__name__}") logger.info(f" - VAD: {type(vad).__name__}") logger.info(f" - Turn Detection: {type(turn_detector).__name__ if turn_detector != 'vad' else 'VAD-only'}") await session.start( room=ctx.room, agent=agent, room_input_options=room_input_options, room_output_options=room_output_options ) logger.info(f"✅ Agent Session已启动,等待用户语音输入...") # ✅ 关键改动:使用 v1.0+ 推荐的文本流方式发布转录 # 不再使用 STTSegmentsForwarder,而是直接发布到 lk.transcription 主题 @session.on("stt_event") def on_stt_event(ev): """STT事件处理器:将 ParaformerSTT 的文本发布到文本流""" try: # 只处理最终和临时转录事件 from livekit.agents.stt import SpeechEventType if ev.type in [SpeechEventType.FINAL_TRANSCRIPT, SpeechEventType.INTERIM_TRANSCRIPT]: if ev.alternatives and len(ev.alternatives) > 0: text = ev.alternatives[0].text is_final = ev.type == SpeechEventType.FINAL_TRANSCRIPT # 记录STT事件 if is_final: logger.info(f"🎤 STT最终转录: {text}") else: logger.debug(f"🎤 STT临时转录: {text[:50]}...") # 异步发布到文本流 async def publish_transcription(): try: # ✅ 使用 stream_text 发送带属性的文本流,明确标识为用户转录 if hasattr(ctx.room.local_participant, 'stream_text'): # 使用新的 stream_text API,支持属性设置 writer = await ctx.room.local_participant.stream_text( topic="lk.transcription", attributes={ "participant_type": "user", # ✅ 明确标识为用户 "is_final": str(is_final).lower(), "source": "stt" } ) await writer.write(text) await writer.aclose() logger.debug(f"📤 STT文本已通过stream_text发布: final={is_final}, text: {text[:50]}...") else: # 回退到 publish_data(旧版本兼容) await ctx.room.local_participant.publish_data( text.encode("utf-8"), topic="lk.transcription", reliable=False ) logger.debug(f"📤 STT文本已通过publish_data发布: final={is_final}, text: {text[:50]}...") except Exception as e: logger.error(f"❌ STT文本流发布失败: {e}") # 创建异步任务 asyncio.create_task(publish_transcription()) except Exception as e: logger.error(f"❌ STT文本流发布错误: {e}") logger.info(f"📡 STT文本流发布已启用,前端将通过 lk.transcription 接收用户语音转录") logger.info(f"🔊 TTS转录对齐已启用 (use_tts_aligned_transcript=True),前端将接收AI语音同步文本") # 记录会话启动事件 - 修复NotGiven对象序列化问题 log_session_event("session_started", agent.session_id, { "room_name": ctx.room.name, "user_id": user_id, "character_name": character_name, "text_enabled": room_input_options.text_enabled, "video_enabled": room_input_options.video_enabled, "audio_output_enabled": room_output_options.audio_enabled, "transcription_enabled": room_output_options.transcription_enabled }) log_agent_status( agent_id=agent.session_id, status="session_started", details={ "room_name": ctx.room.name, "transcription_enabled": room_output_options.transcription_enabled } ) # 添加事件监听器 @session.on("metrics_collected") def on_metrics_collected(ev: MetricsCollectedEvent): """收集和记录性能指标""" agent_metrics.log_metrics(ev.metrics) # 选择性记录关键指标 if hasattr(ev.metrics, 'ttft'): # LLM指标 logger.info(f"📊 LLM TTFT: {ev.metrics.ttft:.3f}s, 总时长: {ev.metrics.duration:.3f}s") elif hasattr(ev.metrics, 'ttfb'): # TTS指标 logger.info(f"🔊 TTS TTFB: {ev.metrics.ttfb:.3f}s, 音频时长: {ev.metrics.audio_duration:.3f}s") elif hasattr(ev.metrics, 'end_of_utterance_delay'): # EOU指标 logger.info(f"🎯 EOU延迟: {ev.metrics.end_of_utterance_delay:.3f}s") @session.on("error") def on_error_sync(ev): async def handle_error_async(): logger.error(f"Session error: {ev.error}", exc_info=ev.error) if not getattr(ev.error, 'recoverable', True): await session.say("抱歉,我遇到了一些技术问题,请稍后再试。") asyncio.create_task(handle_error_async()) session.on("error", on_error_sync) # ✅ 修复: 将异步回调包装在同步函数中 def on_agent_state_changed_sync(ev: AgentStateChangedEvent): async def handle_state_async(): logger.info(f"🤖 Agent state changed: {ev.old_state} -> {ev.new_state}") await ctx.room.local_participant.publish_data( json.dumps({"type": "agent_state_changed", "new_state": ev.new_state}).encode("utf-8"), topic="agent_status" ) asyncio.create_task(handle_state_async()) session.on("agent_state_changed", on_agent_state_changed_sync) def on_user_state_changed_sync(ev: UserStateChangedEvent): async def handle_user_state_async(): logger.info(f"👤 用户状态变化: {ev.old_state} -> {ev.new_state}") # 如果用户离开,给更多时间再询问 if ev.new_state == "away": await asyncio.sleep(60) # 等待1分钟,给用户充足时间 if session.user_state == "away": # 再次检查 await session.say("您还在吗?如果需要帮助,请随时告诉我。") asyncio.create_task(handle_user_state_async()) session.on("user_state_changed", on_user_state_changed_sync) @session.on("speech_created") def on_speech_created(ev: SpeechCreatedEvent): """语音创建事件""" logger.debug(f"🗣️ 语音创建: source={ev.source}, user_initiated={ev.user_initiated}") try: @session.on("agent_speech_interrupted") def on_agent_speech_interrupted(ev): logger.info(f"⚠️ Agent回应被中断") except Exception: pass # 事件可能不存在 # 添加conversation_item_added监听器 @session.on("conversation_item_added") def on_conversation_item_added(ev: ConversationItemAddedEvent): """记录对话历史项目""" item = ev.item text_content = "" if hasattr(item, 'text_content'): text_content = item.text_content[:100] if item.text_content else "" logger.info(f"💬 对话项目添加 - 角色: {item.role}, 内容长度: {len(text_content)}, 被中断: {getattr(item, 'interrupted', False)}") # 记录完整内容用于调试 if item.role == "user": logger.info(f"👤 用户说: {text_content}") elif item.role == "assistant": logger.info(f"🤖 AI回复: {text_content}") # 统计对话轮次 if hasattr(session, '_conversation_stats'): if item.role == "user": session._conversation_stats['user_turns'] += 1 elif item.role == "assistant": session._conversation_stats['assistant_turns'] += 1 # 初始化对话统计 session._conversation_stats = { 'user_turns': 0, 'assistant_turns': 0, 'start_time': time.time() } # 初始化使用量收集器 usage_collector = agent_metrics.UsageCollector() # 在metrics_collected事件中收集使用量 @session.on("metrics_collected") def collect_usage(ev: MetricsCollectedEvent): usage_collector.collect(ev.metrics) async def log_usage_summary(): """记录使用量摘要""" summary = usage_collector.get_summary() logger.info(f"📊 会话使用量统计:") logger.info(f" STT: {summary.stt_audio_duration:.1f}秒") logger.info(f" LLM: {summary.llm_prompt_tokens} 输入/{summary.llm_completion_tokens} 输出") logger.info(f" TTS: {summary.tts_characters_count} 字符") # 添加到关闭回调 ctx.add_shutdown_callback(log_usage_summary) logger.info(f"🎉 Agent session started successfully for room: {ctx.room.name}") logger.info(f"🎧 等待用户音频输入...") # 添加优雅关闭回调 async def cleanup_callback(): """清理资源和保存状态,包含通话历史回调""" try: logger.info(f"🧹 开始清理会话资源: {ctx.job.id}") # 获取会话历史和统计信息 if hasattr(session, 'history'): history_summary = { "job_id": ctx.job.id, "room_name": ctx.room.name, "user_id": user_id or "unknown", "character_name": character_name, "session_id": session_id or "unknown", "message_count": len(session.history.messages) if hasattr(session.history, 'messages') else 0, "end_time": time.time() } # 添加对话统计 if hasattr(session, '_conversation_stats'): history_summary.update({ "user_turns": session._conversation_stats.get('user_turns', 0), "assistant_turns": session._conversation_stats.get('assistant_turns', 0), "duration": time.time() - session._conversation_stats.get('start_time', time.time()) }) logger.info(f"📊 会话统计: {history_summary}") # 🆕 提取本次通话的新增消息 if hasattr(session.history, 'messages'): all_messages = session.history.messages current_call_messages = all_messages[initial_message_count:] logger.info(f"📞 本次通话产生 {len(current_call_messages)} 条新消息。") # 准备发送给主程序的数据 callback_data = { "session_id": session_id, "user_id": user_id, "character_name": character_name, "final_history": [ {"role": msg.role, "content": msg.content} for msg in current_call_messages ], "call_statistics": history_summary } # 通过HTTP回调将数据发送给主程序 try: fastapi_port = int(os.getenv("FASTAPI_PORT", "8008")) callback_url = f"http://localhost:{fastapi_port}/session/callback/call_ended" async with httpx.AsyncClient(timeout=10.0) as client: await client.post(callback_url, json=callback_data) logger.info(f"✅ 已通过回调将通话记录发送至主程序: {callback_url}") except Exception as e: logger.error(f"❌ 发送通话结束回调失败: {e}") # 清理背景音频(如果存在) # 注意:当前版本中未实现背景音频功能 # 如果未来添加背景音频支持,请在此处添加清理逻辑 # if 'background_audio' in locals() and background_audio: # await background_audio.cleanup() # logger.info("🔇 背景音频已清理") # 安全发送断开连接消息 - 检查房间和参与者状态 try: if (ctx.room and ctx.room.local_participant and hasattr(ctx.room.local_participant, 'is_connected') and ctx.room.local_participant.is_connected): # 检查房间状态 room_state = getattr(ctx.room, 'state', None) if room_state == "CONNECTED" or room_state is None: # 如果房间状态是CONNECTED或无法检测,尝试发送 await ctx.room.local_participant.publish_data( json.dumps({"type": "agent_disconnected", "message": "AI助手已断开连接"}).encode("utf-8"), reliable=True, topic="agent_status" ) logger.info(f"[{session_id}] 已发送断开连接消息") else: logger.info(f"[{session_id}] 房间状态为{room_state},跳过消息发送") else: logger.info(f"[{session_id}] 房间或参与者已断开,跳过消息发送") except Exception as e: logger.warning(f"[{session_id}] 清理阶段发送消息失败: {e}") logger.info(f"✅ 会话清理完成: {ctx.job.id}") except Exception as e: logger.error(f"清理过程中出错: {e}", exc_info=True) # 注册关闭回调 ctx.add_shutdown_callback(cleanup_callback) except Exception as e: logger.error(f"Failed to start agent session: {e}", exc_info=True) raise

def load_agent_config() -> Dict[str, Any]:
"""从环境变量加载Agent配置"""
try:
# 构建字符配置
characters = {}
character_names = set()

text
# 扫描所有字符配置 for key in os.environ: if key.startswith('CHARACTER_') and key.endswith('_NAME') and '_DISPLAY_NAME' not in key: char_prefix = key[:-5] # 移除 '_NAME' char_name = os.getenv(key) if char_name: character_names.add(char_prefix) # 为每个字符构建完整配置 for char_prefix in character_names: char_name = os.getenv(f'{char_prefix}_NAME') if char_name: voice_id = os.getenv(f'{char_prefix}_VOICE_ID', '') characters[char_name] = { "name": char_name, "display_name": os.getenv(f'{char_prefix}_DISPLAY_NAME', char_name), "voice_id": voice_id, "additive_prompt": os.getenv(f'{char_prefix}_ADDITIVE_PROMPT', ''), "base_image_path": os.getenv(f'{char_prefix}_BASE_IMAGE_PATH', f'video_chat_plugin/assets/images/characters/{char_name}.jpg'), "tts_params": { "temperature": float(os.getenv(f'{char_prefix}_TTS_TEMPERATURE', '0.7')), "top_p": float(os.getenv(f'{char_prefix}_TTS_TOP_P', '0.7')), "backend": os.getenv(f'{char_prefix}_TTS_BACKEND', 's1') } } # 确保有默认角色 if "default" not in characters and not characters: characters["default"] = { "name": "default", "display_name": "默认角色", "voice_id": "", "additive_prompt": "你是一个AI助手。", "base_image_path": "video_chat_plugin/assets/images/characters/default_char.png", "tts_params": {"temperature": 0.7, "top_p": 0.7, "backend": "s1"} } # Debug logging for function call config final_config = { "characters": characters, "llm_config": { "api_key": os.getenv("OPENAI_API_KEY", ""), "base_url": os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"), "model": os.getenv("OPENAI_MODEL", "gpt-4o") }, "image_generation_config": { "api_key": os.getenv("IMAGE_GENERATION_API_KEY", ""), "base_url": os.getenv("IMAGE_GENERATION_BASE_URL", ""), "model": os.getenv("IMAGE_GENERATION_MODEL", "") }, "dashscope_api_key": os.getenv("DASHSCOPE_API_KEY", ""), "fish_audio_api_key": os.getenv("FISH_AUDIO_API_KEY", ""), "use_function_call_for_avatar": os.getenv("USE_FUNCTION_CALL_FOR_AVATAR", "false").strip("'\"").lower() == "true", "enable_emotion_control": os.getenv("ENABLE_EMOTION_CONTROL", "true").lower() == "true" } logger.info(f"🔍 Config loaded - use_function_call_for_avatar: {final_config['use_function_call_for_avatar']}") logger.info(f"🔍 Raw env value: '{os.getenv('USE_FUNCTION_CALL_FOR_AVATAR')}'") return final_config except Exception as e: logger.error(f"从环境变量加载配置失败: {e}") # 返回最小可用配置 return { "characters": { "default": { "name": "default", "display_name": "默认角色", "voice_id": "", "additive_prompt": "你是一个AI助手。", "base_image_path": "video_chat_plugin/assets/images/characters/default_char.png", "preset_images": {}, # 新增字段 "users": [], "tts_params": {"temperature": 0.7, "top_p": 0.7, "backend": "s1"} } }, "llm_config": { "api_key": os.getenv("OPENAI_API_KEY", ""), "base_url": os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1"), "model": os.getenv("OPENAI_MODEL", "gpt-4o") }, "enable_emotion_control": os.getenv("ENABLE_EMOTION_CONTROL", "true").lower() == "true", "use_function_call_for_avatar": os.getenv("USE_FUNCTION_CALL_FOR_AVATAR", "false").strip("'\"").lower() == "true" }

async def generate_image_from_prompt(
prompt: str,
api_key: str,
model: str,
base_url: str,
base_image_path: str,
) -> Optional[bytes]:
"""简单图像生成示例:传递带底图的多模态提示到远程模型,返回 JPEG bytes。"""
if not Path(base_image_path).exists():
logger.error(f"底图文件未找到: {base_image_path}")
return None
try:
base_image = Image.open(base_image_path)
buffered = BytesIO()
base_image.save(buffered, format="PNG")
base64_image = base64.b64encode(buffered.getvalue()).decode('utf-8')
messages = [
{"role": "system", "content": "你是一个图像生成助手。"},
{
"role": "user",
"content": [
{"type": "text", "text": "这是基础形象。"},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}},
],
},
{
"role": "user",
"content": [
{
"type": "text",
"text": f"请根据基础形象,重绘一张图片,让她表现出:'{prompt}'。保持人物特征、画风和构图一致。",
}
],
},
]
proxies = {}
if os.environ.get("HTTPS_PROXY"):
proxies["https://"] = os.environ.get("HTTPS_PROXY")
if os.environ.get("HTTP_PROXY"):
proxies["http://"] = os.environ.get("HTTP_PROXY")
async with httpx.AsyncClient(proxies=proxies, timeout=60.0) as client:
response = await client.post(
f"{base_url.strip('/')}/chat/completions",
headers={"Authorization": f"Bearer {api_key}"},
json={"model": model, "messages": messages, "max_tokens": 1000},
)
response.raise_for_status()
data = response.json()
content = data["choices"][0]["message"]["content"]
image_url_match = re.search(r'https?://[^\s)]+', content)
if not image_url_match:
logger.warning(f"在 LLM 响应中未找到图片 URL: {content}")
return None
image_url = image_url_match.group(0)
img_response = await client.get(image_url, timeout=30.0)
img_response.raise_for_status()
image = Image.open(BytesIO(img_response.content))
if image.mode != 'RGB':
image = image.convert('RGB')
output_buffer = BytesIO()
image.save(output_buffer, format="JPEG", quality=90)
return output_buffer.getvalue()
except Exception as e:
logger.error(f"图像生成失败: {e}", exc_info=True)
return None

保持向后兼容的函数,供main.py调用

async def run_bot_session(connection_data: Any, config: Dict[str, Any]) -> Any:
"""
向后兼容函数,将在main.py重构后移除
现在LiveKit Agents使用不同的架构,不需要connection参数
"""
logger.warning("run_bot_session is deprecated, use LiveKit Agents entrypoint instead")
# 返回一个占位符,实际的Agent通过entrypoint启动
return None

def prewarm_agent(proc: agents.JobProcess):
"""预加载模型和资源以减少启动延迟"""
logger.info("开始预热Agent进程...")

text
# 预加载VAD模型 try: proc.userdata["vad"] = silero.VAD.load( min_speech_duration=0.05, min_silence_duration=0.55, activation_threshold=0.5 ) logger.info("✅ VAD模型预加载完成") except Exception as e: logger.warning(f"VAD预加载失败: {e}") # 注意:turn_detector需要在job context中创建,不能在prewarm中预加载 logger.info("✅ Agent预热完成") logger.info("🎉 Agent进程预热完成")

def compute_worker_load(worker: agents.Worker) -> float:
"""计算Worker负载,用于控制并发"""
# 获取活跃任务数
active_jobs = len(worker.active_jobs)

text
# 设置最大并发任务数为10(从5增加到10) max_concurrent_jobs = 10 job_load = active_jobs / max_concurrent_jobs # 获取CPU使用率(如果可用) try: import psutil # 降低CPU权重,只考虑50%的CPU负载 cpu_load = psutil.cpu_percent(interval=0.1) / 100.0 * 0.5 except: # 如果psutil不可用,使用较低的默认值 cpu_load = 0.3 # 返回综合负载(取较高值) total_load = max(job_load, cpu_load) logger.debug(f"Worker负载 - 任务: {active_jobs}/{max_concurrent_jobs} ({job_load:.2f}), CPU: {cpu_load:.2f}, 总计: {total_load:.2f}") return min(total_load, 1.0)

if name == "main":
print("🚀 启动AI Agent Worker...")
print(f"📍 工作目录: {os.getcwd()}")
print(f"🔗 LiveKit URL: {os.getenv('LIVEKIT_URL', 'Not Set')}")
print(f"🗝️ LiveKit API Key: {os.getenv('LIVEKIT_API_KEY', 'Not Set')[:10]}...")

text
# 启用自动dispatch模式 worker_opts = WorkerOptions( # 移除agent_name以启用自动dispatch # agent_name="ai-companion-agent", # 与main.py中dispatch请求的agent_name匹配 entrypoint_fnc=entrypoint, # 指定entrypoint函数 # Prewarm函数 - 预加载模型 prewarm_fnc=prewarm_agent, # 负载函数 - 控制并发 load_fnc=compute_worker_load, load_threshold=0.8, # 负载超过80%时停止接受新任务 # 权限设置 permissions=WorkerPermissions( can_publish=True, can_subscribe=True, can_publish_data=True, hidden=False ), # 优雅关闭配置 - 用于系统维护时的处理 drain_timeout=600, # 10分钟优雅关闭时间,确保不会突然中断用户对话 shutdown_process_timeout=900, # 15分钟强制关闭时间 # 负载配置 num_idle_processes=2, # 保持2个空闲进程以提高响应速度 ) print(f"⚙️ Worker配置:") print(f" Agent名称: ai-companion-agent") print(f" Entrypoint: {entrypoint.__name__}") print("🎯 开始注册Worker并等待dispatch...") # 运行Agent agents.cli.run_app(worker_opts)

1. LiveKit Agents框架架构

核心框架特性

python
# requirements.txt中的关键依赖 livekit-agents[openai,silero,turn-detector]>=1.0.0 # 主框架 livekit>=0.10.0 # LiveKit客户端

技术特点

  • Agents 1.0+框架:支持实时多模态AI代理
  • WebRTC原生支持:基于LiveKit Server的SFU架构
  • 自动Agent调度:基于房间和用户身份的智能分派
  • 插件化架构:OpenAI、Silero、转向检测器模块化集成

2. 音频处理链路

STT(语音识别)链路

text
音频流 → LiveKit AudioFrame → DashScope Paraformer → 实时转写文本 + 情感分析 → AI LLM处理

TTS(语音合成)链路

text
AI文本回复 → Fish Audio SDK → 句子级批处理 → PCM/WAV音频流 → LiveKit AudioEmitter

优化配置

bash
# .env中的音频优化参数 FISH_TTS_SENTENCES_PER_CHUNK=1 # 逐句处理,降低延迟 FISH_TTS_FORMAT=pcm # PCM格式,减少编码时间 FISH_TTS_FRAME_MS=40 # 40ms音频帧,平衡延迟和质量

3. AI服务集成

多模态AI服务栈

  • STT: DashScope Paraformer (阿里云) → 中文语音识别 + 情感分析
  • LLM: OpenAI兼容API (DeepSeek/OpenAI) → 对话生成
  • TTS: Fish Audio SDK → 中文语音合成
  • Vision: LiveKit ImageContent → 视觉识别能力

遗留文件分析

核心功能文件(保留)

text
根目录核心: ├── bot.py # 主程序入口 ├── config.py # 全局配置 ├── config_editor.py # 配置管理界面 ├── plugin_config_accessor.py # 插件配置访问器 ├── Run.bat # 系统启动脚本 └── requirements.txt # Python依赖 (不完整,仅包含pip) 插件核心: video_chat_plugin/ ├── main.py # 插件主程序 ├── ai_companion_bot.py # LiveKit AI Agent核心 ├── wechat_integration.py # 微信集成桥接 ├── livekit_token_server.py # JWT Token服务 ├── .env # 环境变量配置 (当前使用) ├── requirements.txt # 插件完整依赖清单 ├── static/ # 前端静态资源 ├── templates/ # HTML模板 └── assets/ # 音频图像资源

系统工作流程总结

完整工作流程

text
1. 用户微信消息 → bot.py检测视频聊天关键词 2. 后台线程启动 → 获取角色配置和历史记录 3. 调用wechat_integration.handle_video_chat_request_async() 4. FastAPI创建会话上下文和唯一链接 5. 用户点击链接 → 前端获取JWT token 6. LiveKit Room连接建立 → Agent自动分配 7. 开始实时AI语音对话

技术优势

  • 无隐藏依赖:完全移除Pipecat和Daily.js的第三方依赖
  • 完全自托管:基于开源LiveKit Server,无PaaS锁定
  • 高性能WebRTC:SFU架构支持多方通话和负载均衡
  • 流式处理:句子级别的实时STT和TTS处理
  • 模块化设计:插件与主程序松耦合,便于维护和扩展
  • 异步非阻塞:所有耗时操作都在后台线程中执行
  • 多层容错:多层次的错误处理和重试机制

开发指南

常用启动命令

bash
# 主系统启动 (自动处理依赖和兼容性) Run.bat # 直接运行主程序 python bot.py # 完整插件系统启动 (推荐) cd video_chat_plugin start_complete_system.bat # 单独组件启动 python video_chat_plugin/main.py # FastAPI服务器 + Agent Worker python video_chat_plugin/main.py agent # 仅启动Agent Worker

配置文件位置

  • 主程序配置: config.py
  • 插件环境配置: video_chat_plugin/.env
  • 角色Prompt: prompts/{角色名}.md
  • LiveKit连接: video_chat_plugin/.env中的LIVEKIT_*变量

日志和调试

  • 主程序日志: 控制台输出
  • 插件日志: video_chat_plugin/logs/
  • 错误日志: video_chat_plugin/logs/errors_*.log
  • 兼容性检查: python video_chat_plugin/check_compatibility.py

本文档基于2025年7月30日的代码库状态生成,涵盖了从Pipecat到LiveKit的完整迁移架构。

  【1.  用户转录显示bug(前端):

在发送欢迎语后,用户说话时,没有及时显示转录(后端有发送,前端未显示)
2. 通话结束流程:需要调用主程序,主程序使用两个AI功能:第一次调用,是为了让AI用角色的口吻(查
看bot点py中是如何获取当前用户所对应的角色prompt的),向用户发送一段结束告别语;第二次调用AI是把用
户跟AI的语音聊天历史总结成一段文字,并写入到临时聊天历史中。不得强制触发临时聊天记忆总结(主程序与
videochat插件是两个不同的系统),临时聊天总结会根据主程序的定时器自动触发。

  3.  UI信息杂乱: 前端直接显示了后端调试用的情感分析文本

【用户情感可能是...】,破坏了沉浸感。前端返回的stt最终转录文本包含【】时,【】内应该被前端去除。
现在LLM输出了形象示之后,【】的形象提示并没有正确的被处理显示和清除,导致在TTS中就把形象朗读出
来了,转录中也包含了这个无用的信息
4. TTS换行符错误: AI生成的 \\n
被TTS直接朗读,导致发音怪异。应该把LLM输出时,把这两种换行符替换为逗号。
5. AI角色感不强 (Prompt问题):
AI没有按预期生成情感指令和形象切换指令,导致表现力不足。优化提示词要求。
6. 通用文案问题:
“AI情感助手”等通用文案没有替换为角色名,代入感不强。通过正确方式获取角色名称,并代入。
8.这是一个角色模拟聊天程序。对角色的输出,结束语等,都尽量避免硬编码,而是用角色prompt引导AI输
出符合语境的回答。
9.尽量少改动原先代码,而是复用原先模块。由于重构的原因,前后端很多接受参数不一样,务必使其正确
匹配。
以下是部分详细修改方案。】
2。### 根本原因

  • 你现在只在 拿到 完整【形象=xxx】 之后sub 一次就立即把当前 chunk 改写并
    yield
  • 流式输出会把一个 token 拆成多个 chunk——常见的情况是:
text
chunk‑1 【形 chunk‑2 象=开心】嗨,同学…

chunk‑1 里还没有闭合的 ,正则匹配不到,于是这一半就直接被送进 TTS;
chunk‑2 再匹配并删除时,为时已晚——TTS 已经朗读了 “【形”.

修正思路

  1. 在解析 avatar 之前把所有内容暂存起来,直到检测到

  2. 一旦找到完整标签

    • 触发 set_expression()
    • 把缓存中过去的 “【形象=xxx】” 整段裁掉
    • 把真正的文本一次性 yield 出去
  3. 后续 chunk 就可以直接 passthrough,不再影响延迟。

关键代码片段(放在 llm_node 里,直接替换现有 avatar 处理逻辑即可)

python
class AICompanionAgent(Agent): def __init__(...): ... self._avatar_buffer = "" # <— 新增:缓存区 self._avatar_handled = False async def llm_node(...): avatar_re = re.compile(r'【形象=([^】]+)】') async for chunk in llm_stream: # 1. 先把这段文本抓出来 delta_txt = _extract_text(chunk) # 见下 if delta_txt is None: # 非文本 delta 直接 pass yield chunk continue # 2. 还没处理过形象 → 继续累积 if not self._avatar_handled: self._avatar_buffer += delta_txt if '】' not in self._avatar_buffer: # 标签还没闭合,什么都不发给上游 continue # 3. 缓冲区里已有完整标签 —— 开始处理 m = avatar_re.search(self._avatar_buffer) if m: keyword = m.group(1) asyncio.create_task(self._handle_avatar_change(keyword)) # 4. 把整段缓存剪掉标签 cleaned_text = avatar_re.sub('', self._avatar_buffer, count=1) self._avatar_handled = True self._avatar_buffer = "" # 清空缓存 # 5. 把真正的内容重新包进一个 chunk 发出去 yield _make_new_chunk(chunk, cleaned_text) continue # 原生 chunk 不再下发 # 6. 形象已经处理过,后面的 chunk 直接 passthrough yield chunk # ---------- 小工具 ---------- def _extract_text(chunk): """兼容 OpenAI / dict 两种结构,取出当前 chunk 的文本""" try: if isinstance(chunk.choices[0], dict): return chunk.choices[0].get("delta", {}).get("content") return chunk.choices[0].delta.content except Exception: return None def _make_new_chunk(orig_chunk, text): """用同一 id & index 造一个新的 ChatChunk 放干净文本""" if isinstance(orig_chunk.choices[0], dict): # dict 结构 return llm.ChatChunk( id=orig_chunk.id, choices=[{ "delta": {"role": "assistant", "content": text}, "index": orig_chunk.choices[0].get("index", 0) }] ) # 对象结构 new_choice = copy.deepcopy(orig_chunk.choices[0]) new_choice.delta.content = text return llm.ChatChunk(id=orig_chunk.id, choices=[new_choice])

为什么不用 avatar_pattern.sub 直接改当前 chunk?
因为第一批字符已经被送出,所以必须在 真正输出前
暂存,并把缓存里的半截标签剪掉;否则会无法回收已经播放的语音。

其他小细节

| 问题 | 建议
|
| --------------------- |

|
| 多行文本换行符导致朗读突兀 | 继续保留你现有的 replace('\n', ', ') 逻辑,但要在
avatar 处理之后 再做,避免打乱缓存匹配。 |
| Function‑call 模式无需此分支 | 在 use_function_call_for_avatar=True
时可直接跳过这整个缓存逻辑,保持你现在的实现。 |
| 兼容性 | 如果担心一次把所有文本 flush
会带来较大首包延迟,可以在缓存期间对外发送 chunk(只含 role, 没有 content),这样前端 UI
仍能保持 “正在输入…” 动画。 |

一句话总结

核心就是 “流式前先缓存,拿到 后再一次性裁剪+下发”。只要保证标签从未到达 TTS
pipeline,就不会再被朗读和写入转录。祝你调试顺利

5.升级AI情感/形象提示词
诊断分析:
您提供的Fish Audio情感控制文档非常详尽。之前的Prompt过于简单,现在我们可以用这份官方文档来构建一个
强大、精确、模型无法忽视的指令集。
解决方案:在 ai_companion_bot.py 中重写 emotion_instruction。
将 AICompanionAgent.init 方法中的 emotion_instruction
字符串替换为以下内容。这几乎是您文档的直接转译,并加入了强制性要求。
Generated python

ai_companion_bot.py - AICompanionAgent.init

...

动态添加情感控制指令(使用您提供的详细文档)

if enable_emotion_control:
emotion_instruction = f"""

规则:语音表现力指令 (强制遵循)

你必须使用一套特殊的指令来控制你声音的情感、语速和语调。这是为了让你的声音更有表现力。

1. 核心语法

  • 格式: 所有指令都必须放在括号 () 内,格式为 (指令)要朗读的文本
  • 作用范围: 一个指令会对其后的所有文本生效,直到遇到一个新的指令。

2. 指令分类与规则

2.1 情感指令 (必须使用)

这些指令用于设定一句话的情感基调。你应当在每句有明确情感的回复前,选择一个最合适的指令。

  • 正面情绪: (开心), (兴奋), (高兴), (得意), (坚定), (感激), (自信), (愉快)
  • 负面情绪: (生气), (气愤), (愤怒), (讽刺), (讥讽), (悲伤), (难过), (无奈),
    (失望), (着急), (内疚), (沮丧)
  • 中性/疑问: (疑惑), (疑问), (质问), (思考), (惊讶), (震惊), (严肃), (平静)

示例:

  • (开心)太好了,我们终于见面了!
  • 我对你这么信任,(愤怒)你就是这样回报我的吗?

2.2 语气控制指令

这些指令用于调整声音的表达方式,可以放在句子的任何位置。

  • 速度:
    • (语速变快): 用于紧张、急切的场景。
    • (语速变慢): 用于庄重、思考或强调。
  • 音量/音调:
    • (大声): 用于喊叫或强调。
    • (声音变小): 用于耳语或自言自语。
    • (语气激动): 用于表达强烈的惊喜、激动。

示例: 快走!后面有人在追,(语速变快)再不跑就来不及了!

2.3 副语言指令

用于模拟非语言声音,指令后必须紧跟对应的拟声词

  • (大笑)哈哈哈哈
  • (哭泣)呜呜呜
  • (叹气)唉

示例: 听到这个笑话,他再也忍不住了,(大笑)哈哈哈哈!

3. 重要提醒

  • 强制要求: 你必须严格遵守上述规则。如果对话内容情感平淡,可以使用 (平静)
  • 组合使用: 你可以组合使用指令,例如:(愤怒)你竟然敢背叛我!(语气激动) 我对你这么信任,(大声) 你就是这样回报我的吗?
  • 避免创造指令: 不要使用 (用悲伤安静的声音) 这样的复杂指令,而应组合 (悲伤)(声音变小)
    """
    base_prompt += emotion_instruction

... (后续的形象切换规则等保持不变)

Use code with caution.
Python
这个新的Prompt详细、具体且带有强制性,将极大地提升AI的表现力。

【上网搜索验证以上任务的可行性(不会对原先代码实现造成影响),根据提供的代码给出具体的实现方式,分多个部分编写代码重构的详细技术文档。等待我发送主程序后再开始回答。先分析这个代码。使用了很多livekit的方法。】

思考

Thought for 19 seconds

回答

下面先做 代码架构与可行性分析(分章节),不动主程 main.py ;待你把主程贴上来后,再给出逐段重构指引和完整技术文档。


1 整体流程梳理

阶段关键对象说明
Worker 启动entrypoint()AgentSession读取 .env → 预热模型 → ctx.connect() 加入房间 → 频道内等待真人加入
会话初始化AICompanionAgent.__init__拼装系统提示词、加载角色图片、注册 set_character_expression 工具
实时管线AgentSession 默认管线<br>+覆写的 AICompanionAgent 节点VAD → Paraformer STT → llm_node → FishTTS → avatar worker 推流
收尾cleanup_callback()汇总用量、推送通话记录、发布 agent_disconnected

LiveKit Agents 1.x 提供的 Pipeline/Hook 机制 支撑你的自定义节点;覆写 llm_node 的做法与官方文档一致 (LiveKit Docs)。
Avatar 切换通过 room.local_participant.stream_bytes(topic="agent.avatar") 符合虚拟形象集成规范 (LiveKit Docs)。
STT / TTS 转录实时下发依赖 Text Streams API,stream_text() 属于最新 SDK,文档也确认支持 (LiveKit Docs)。


2 AICompanionAgent 关键职责

  1. 角色资源加载

    • _load_character_images() 动态扫描 assets/images/characters/{role}/,返回 dict[keyword -> path]
    • 避免阻塞:文件读取 & JPEG 编码均已放入 asyncio.to_thread()
  2. 视觉输出(set_expression)

    • 模糊匹配关键词 → 读取文件 → 通过 Byte Stream 或回退 publish_data() 发送。
    • 该方法被工具或【形象=】标签触发。
  3. llm_node(已覆写)

    • 当前实现:在每个 chunk 中直接 avatar_pattern.sub(),无法处理 拆分标签 的流式场景——这正是你们在问题 1/3 中提到的 “朗读了半截 【形” 的根因。
    • 可行修复:改为“缓冲‑直到闭合符”方案(详见下一节)。
  4. 用户视频流作为 Vision 输入

    • _subscribe_to_video_stream() 找到第一条 video track → _create_video_stream()_latest_frame
    • _encode_video_frame_optimized() 把 JPEG Data‑URL 塞进 ImageContent,附到用户回合的 ChatMessage 中,供 LLM 调用 Vision 能力。
  5. STT / TTS 通道

    • Paraformer‑Streaming‑STT + FishTTS,均以 8 kHz / 40 ms 帧参数运行。
    • STT 事件通过 stream_text(topic="lk.transcription") 转发;若前端未显示,问题多半在前端解码/过滤逻辑而非后端(见任务 1)。

3 核心 Bug 与改动可行性

#症状现状分析可行修补点
1转录不显示 / 情感提示裸露STT 已发出带属性的 text‑stream,但前端简单把含 【…】 的整段过滤掉。前端正则替换 or split on 再渲染即可;后端无需改。
2【形象=】朗读 & 残留llm_node 逐 chunk sub,半截标签已进入 TTS。参考官方 ChatChunk 缓冲 模式;一次裁剪后再 yield,新流式延迟 ≈ 2–3 tokens,可接受。
3\n / \ 被朗读目前替换发生在 avatar 处理前,遇到多 chunk 容易漏掉在「avatar 裁剪完毕 → 决定输出前」做统一 text.replace('\\n', ', ').replace('\\', '')
4结束语 & 聊天总结已在 cleanup_callback 内将新增 messages 发回 FastAPI;主程需要二次调用 LLM。保持接口;主程逻辑改即可,不影响 Agent 侧。
5Prompt 表现力不足emotion_instruction 简易版直接替换为你提供的详细规则字符串,拼接处已留钩子;对现有逻辑透明。

验证资料

  • ChatChunk / LLM node 自定义:官方文档示例 (LiveKit Docs)
  • Avatar byte‑stream 发送:虚拟形象集成文档 (LiveKit Docs)
  • Text stream + transcription:LiveKit Text Streams 指南 (LiveKit Docs)

4 缓冲式 avatar 处理示例(可行性证明)

官方允许在 llm_node异步 yield 自定义对象,因此维护一个 self._avatar_buffer 再构造新 ChatChunk 是安全的。
GitHub demo(Agents examples)里对 function‑call 缓冲 的做法等同 (GitHub),证明可行。


5 结论 & 下一步

  • 以上修改全部局限在 AICompanionAgent,不用动主程逻辑,也不会破坏 LiveKit 事件流。
  • 目标任务(1‑6, 8‑9)均可在当前 SDK 能力范围内完成,无需额外依赖。
  • 请把 main.py 或相关前端片段贴上来,我会按模块输出「重构步骤 + 完整代码 diff + 风险提示」的多段技术文档。

質問

import os
import sys
import time
import json
import asyncio
import logging
import threading
from pathlib import Path
from typing import Optional, Dict, Any
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, FileResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from dotenv import load_dotenv

确保当前目录在Python路径中 - 必须在其他导入之前

current_dir = Path(file).parent
if str(current_dir) not in sys.path:
sys.path.insert(0, str(current_dir))

LiveKit 导入

from livekit import api as livekit_api

✅ 修复: 添加缺失的 WorkerPermissions 导入

from livekit.agents import WorkerOptions, cli as agents_cli, WorkerPermissions

本地模块导入

try:
from livekit_token_server import generate_room_token, get_token_server
from ai_companion_bot import entrypoint
# 导入Agent显式分配器(备选方案)
from agent_dispatch import get_agent_dispatcher
except ImportError as e:
# 在日志系统初始化前,使用 print
print(f"FATAL: Failed to import local modules: {e}")
print(f"Current working directory: {os.getcwd()}")
print(f"Python path: {sys.path}")
raise

加载环境变量

load_dotenv()

配置日志级别

log_level = os.getenv('LOG_LEVEL', 'INFO').upper()
debug_mode = os.getenv('DEBUG', 'false').lower() == 'true'

设置日志级别

if debug_mode:
log_level = 'DEBUG'

使用增强的日志系统替代基础logging

try:
from enhanced_logging import setup_global_logging
logger = setup_global_logging()
logger.info("✅ 增强日志系统已启用 - 支持文件输出和结构化日志")
except ImportError:
# 回退到基础日志系统
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(name)
logger.warning("⚠️ 增强日志系统不可用,使用基础日志系统")

active_sessions: Dict[str, Dict[str, Any]] = {}
session_contexts: Dict[str, Dict[str, Any]] = {}
CONTEXT_EXPIRATION_SECONDS = 3600 # 1小时过期,防止用户在创建会话后较长时间才访问

class SessionStartRequest(BaseModel):
"""启动会话请求"""
user_id: str
character_name: str = "default"
system_prompt: str = ""
chat_history: list = []

class JoinRoomRequest(BaseModel):
"""加入房间请求"""
session_id: str
participant_name: Optional[str] = None

class TokenRequest(BaseModel):
"""获取连接token请求"""
room_name: str
participant_identity: str
participant_name: Optional[str] = None
session_id: Optional[str] = None # 新增:关联到session context

=============================================================

主服务类

=============================================================

class VideoChatServer:
"""基于LiveKit的视频聊天服务器"""

text
def __init__(self): self.config = self.load_config_from_env() self.setup_global_proxy() # 创建lifespan上下文管理器 @asynccontextmanager async def lifespan(app: FastAPI): # 启动事件 await self.startup() yield # 关闭事件 await self.shutdown() self.app = FastAPI( title="AI Companion Video Chat API - LiveKit", version="5.1.0", lifespan=lifespan ) self.app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) self.session_lock = asyncio.Lock() self.livekit_api = None self.setup_routes() def setup_global_proxy(self) -> None: """设置全局代理""" enable_proxy = os.getenv("ENABLE_PROXY", "false").lower() == "true" if not enable_proxy: return proxy_url = os.getenv("HTTP_PROXY") if not proxy_url: return os.environ["HTTPS_PROXY"] = proxy_url os.environ["ALL_PROXY"] = proxy_url os.environ["NO_PROXY"] = "api.dashscope.aliyuncs.com,dashscope.aliyuncs.com,ecs.aliyuncs.com,oss.aliyuncs.com,localhost,127.0.0.1" logger.info(f"已设置全局代理: {proxy_url}") def load_config_from_env(self) -> Dict[str, Any]: """从环境变量加载配置""" try: # 构建字符配置 characters = {} character_names = set() # 扫描所有字符配置 for key in os.environ: if key.startswith('CHARACTER_') and key.endswith('_NAME'): char_prefix = key[:-5] # 移除 '_NAME' char_name = os.getenv(key) if char_name: character_names.add(char_prefix) # 为每个字符构建完整配置 for char_prefix in character_names: char_name = os.getenv(f'{char_prefix}_NAME') if char_name: preset_images_json = os.getenv(f'{char_prefix}_PRESET_IMAGES', '{}') try: preset_images = json.loads(preset_images_json) # 修复路径分隔符问题 preset_images = {k: v.replace('\\', '/') for k, v in preset_images.items()} except json.JSONDecodeError as e: preset_images = {} logger.warning(f"角色 {char_name} 的预置形象配置格式错误: {str(e)}") characters[char_name] = { "name": char_name, "display_name": os.getenv(f'{char_prefix}_DISPLAY_NAME', char_name), "voice_id": os.getenv(f'{char_prefix}_VOICE_ID', ''), "additive_prompt": os.getenv(f'{char_prefix}_ADDITIVE_PROMPT', ''), "base_image_path": os.getenv(f'{char_prefix}_BASE_IMAGE_PATH', f'video_chat_plugin/assets/images/characters/{char_name}.jpg'), "preset_images": preset_images, # 新增字段 "tts_params": { "temperature": float(os.getenv(f'{char_prefix}_TTS_TEMPERATURE', '0.7')), "top_p": float(os.getenv(f'{char_prefix}_TTS_TOP_P', '0.7')), "backend": os.getenv(f'{char_prefix}_TTS_BACKEND', 'speech-1.6') } } return { "llm_config": { "api_key": os.getenv("OPENAI_API_KEY"), "base_url": os.getenv("OPENAI_BASE_URL"), "model": os.getenv("OPENAI_MODEL", "gpt-4o") }, "image_generation_config": { "api_key": os.getenv("IMAGE_GENERATION_API_KEY"), "base_url": os.getenv("IMAGE_GENERATION_BASE_URL"), "model": os.getenv("IMAGE_GENERATION_MODEL") }, "dashscope_api_key": os.getenv("DASHSCOPE_API_KEY"), "fish_audio_api_key": os.getenv("FISH_AUDIO_API_KEY"), "enable_proxy": os.getenv("ENABLE_PROXY", "false").lower() == "true", "http_proxy": os.getenv("HTTP_PROXY", ""), "https_proxy": os.getenv("HTTPS_PROXY", ""), "fastapi_port": int(os.getenv("FASTAPI_PORT", "8008")), "public_domain": os.getenv("PUBLIC_DOMAIN"), "use_function_call_for_avatar": os.getenv("USE_FUNCTION_CALL_FOR_AVATAR", "false").strip("'\"").lower() == "true", "characters": characters } except Exception as e: logger.error(f"从环境变量加载配置失败: {e}") raise async def init_livekit_services(self): """初始化LiveKit服务""" try: # 根据官方文档使用LiveKitAPI from livekit.api import LiveKitAPI self.livekit_api = LiveKitAPI( url=os.getenv('LIVEKIT_URL', 'ws://localhost:7880'), api_key=os.getenv('LIVEKIT_API_KEY'), api_secret=os.getenv('LIVEKIT_API_SECRET') ) logger.info("LiveKit API客户端初始化成功") except Exception as e: logger.error(f"LiveKit服务初始化失败: {e}") raise async def startup(self): """FastAPI启动事件""" # 挂载静态文件 static_dir = current_dir / "static" assets_dir = current_dir / "assets" if static_dir.exists(): self.app.mount("/static", StaticFiles(directory=str(static_dir)), name="static") if assets_dir.exists(): self.app.mount("/assets", StaticFiles(directory=str(assets_dir)), name="assets") # 初始化LiveKit服务 await self.init_livekit_services() # 启动定期清理任务 asyncio.create_task(self.periodic_cleanup()) # 启动Worker健康监控 try: from worker_health_monitor import get_health_monitor self.health_monitor = get_health_monitor() if await self.health_monitor.initialize(): asyncio.create_task(self.health_monitor.start_monitoring()) logger.info("✅ Worker健康监控已启动") else: logger.warning("⚠️ Worker健康监控初始化失败") self.health_monitor = None except ImportError: logger.warning("⚠️ Worker健康监控模块不可用") self.health_monitor = None except Exception as e: logger.error(f"❌ Worker健康监控启动失败: {e}") self.health_monitor = None logger.info("Video Chat Server started successfully with LiveKit") async def shutdown(self): """FastAPI关闭事件""" logger.info("Shutting down server...") # 停止健康监控 if hasattr(self, 'health_monitor') and self.health_monitor: try: await self.health_monitor.stop_monitoring() logger.info("✅ Worker健康监控已停止") except Exception as e: logger.error(f"❌ 停止健康监控失败: {e}") # 清理活跃会话 for session_id in list(active_sessions.keys()): await self.cleanup_session(session_id) logger.info("Video Chat Server shut down.") def setup_routes(self) -> None: """设置路由""" @self.app.get("/") async def root(): return {"service": "AI Companion - LiveKit", "status": "running", "version": "5.1.0"} @self.app.get("/health") async def health_check(): return { "status": "healthy", "active_sessions": len(active_sessions), "session_contexts": len(session_contexts), "livekit_url": os.getenv('LIVEKIT_URL'), "architecture": "LiveKit Agents" } @self.app.get("/debug/sessions") async def debug_sessions(): """调试:查看当前活跃的sessions""" return { "session_contexts": list(session_contexts.keys()), "active_sessions": list(active_sessions.keys()), "total_contexts": len(session_contexts), "total_active": len(active_sessions) } @self.app.get("/debug/worker-health") async def debug_worker_health(): """调试:查看Worker健康状态""" if not hasattr(self, 'health_monitor') or not self.health_monitor: return {"error": "健康监控未启用"} return { "worker_status": await self.health_monitor.get_worker_status(), "agent_health": await self.health_monitor.get_agent_health(), "monitoring_active": self.health_monitor.monitoring_active } @self.app.get("/test/image-display") async def test_image_display(): """测试图片显示页面""" return FileResponse(current_dir / "test_image_display.html") @self.app.get("/debug/connection-diagnostics/{session_id}") async def debug_connection_diagnostics(session_id: str): """调试:获取连接诊断信息""" if not hasattr(self, 'health_monitor') or not self.health_monitor: return {"error": "健康监控未启用"} return await self.health_monitor.diagnose_connection_issues(session_id) @self.app.post("/session/start") async def start_session_route(req: SessionStartRequest): """ 优化的会话启动流程: 创建房间、分派Agent、生成用户Token,并返回包含所有连接参数的响应。 """ session_id = f"session_{req.user_id}_{req.character_name}_{int(time.time())}" room_name = f"room_{session_id}" try: await self.ensure_room_exists(room_name) # ✅ 修复Agent dispatch - 显式分配Agent到房间 agent_metadata = json.dumps({ "type": "agent", # 明确标识这是一个agent "user_id": req.user_id, "character_name": req.character_name, "session_id": session_id # 用于后续查询完整数据 }) logger.info(f"🔧 创建Agent元数据: {agent_metadata}") # 显式分配Agent到房间 try: dispatch_request = livekit_api.CreateAgentDispatchRequest( agent_name="ai-companion-agent", room=room_name, metadata=agent_metadata ) await self.livekit_api.agent_dispatch.create_dispatch(dispatch_request) logger.info(f"🤖 Agent已显式分配到房间 {room_name}") except Exception as e: logger.error(f"❌ Agent分配失败: {e}") # 继续执行,不中断用户流程 logger.warning("⚠️ 将依赖自动dispatch机制") user_token_data = generate_room_token( room_name=room_name, user_identity=req.user_id, user_name=req.user_id ) if not user_token_data["success"]: raise Exception("Failed to generate user token") async with self.session_lock: # 完整数据存储在session_contexts中,Agent通过session_id查询 session_contexts[session_id] = { "user_id": req.user_id, "character_name": req.character_name, "system_prompt": req.system_prompt, "chat_history": req.chat_history, "room_name": room_name, "created_at": time.time(), } logger.info(f"✅ Session context 已存储: {session_id}") logger.info(f"📊 当前 session_contexts 数量: {len(session_contexts)}") # 返回所有前端需要的信息,以便构建直连URL return JSONResponse({ "success": True, "session_id": session_id, "room_name": room_name, "user_token": user_token_data["token"], "livekit_url": user_token_data["livekit_url"], "message": "会话创建成功,Agent已分派。" }) except Exception as e: logger.error(f"Session start failed: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"会话创建失败: {str(e)}") @self.app.get("/chat") async def chat_page(request: Request): session_id = request.query_params.get('session_id') if not session_id or session_id not in session_contexts: return HTMLResponse("<h1>404 - 无效的会话</h1><p>此聊天链接无效或已过期,请通过您的应用重新发起视频聊天。</p>", status_code=404) html_file = current_dir / "templates" / "index.html" return FileResponse(str(html_file)) @self.app.post("/token") async def get_connection_token(req: TokenRequest): """获取LiveKit连接token""" try: # Token生成不需要传递agent metadata, # Agent会通过session_id查询完整的session context if req.session_id and req.session_id in session_contexts: logger.info(f"📋 找到session context: {req.session_id}") context = session_contexts[req.session_id] logger.info(f"📦 Session包含: user_id={context.get('user_id')}, character={context.get('character_name')}, history_length={len(context.get('chat_history', []))}") elif req.session_id: logger.warning(f"⚠️ 未找到session_id {req.session_id} 的context") logger.warning(f"⚠️ 可用的session contexts: {list(session_contexts.keys())}") else: logger.warning(f"⚠️ token请求缺少session_id") # 生成房间token,包含agent dispatch配置 token_data = generate_room_token( room_name=req.room_name, user_identity=req.participant_identity, user_name=req.participant_name or req.participant_identity ) if not token_data["success"]: raise HTTPException(status_code=500, detail=token_data["error"]) return JSONResponse(token_data) except Exception as e: logger.error(f"❌ Token生成失败: {str(e)}") raise HTTPException(status_code=500, detail=f"Token生成失败: {str(e)}") @self.app.get("/session/{session_id}/context") async def get_session_context(session_id: str): """获取session上下文(供Agent查询使用)""" if session_id not in session_contexts: raise HTTPException(status_code=404, detail=f"Session {session_id} not found") context = session_contexts[session_id] return JSONResponse({ "success": True, "user_id": context.get("user_id"), "character_name": context.get("character_name", "default"), "system_prompt": context.get("system_prompt", "你是一个AI助手。"), "chat_history": context.get("chat_history", []), "room_name": context.get("room_name"), "created_at": context.get("created_at") }) @self.app.post("/session/{session_id}/leave") async def leave_session_route(session_id: str): """离开会话""" await self.cleanup_session(session_id) return JSONResponse({"status": "success", "message": "Session ended."}) @self.app.post("/session/{session_id}/decline") async def decline_session_route(session_id: str): """拒绝通话请求""" try: async with self.session_lock: # 从 session_contexts 获取会话数据 session_data = session_contexts.get(session_id) if not session_data: logger.warning(f"⚠️ 拒绝通话时未找到session_id: {session_id}") raise HTTPException(status_code=404, detail="Session not found") # 记录拒绝通话日志 user_id = session_data.get('user_id', 'unknown') character_name = session_data.get('character_name', 'unknown') logger.info(f"📞❌ 用户拒绝通话: session_id={session_id}, user_id={user_id}, character={character_name}") # 清理会话资源 await self.cleanup_session_properly(session_id, session_data) # 通知主程序 bot.py 通话被拒绝(如果配置了通知) try: await self.notify_bot_call_declined(session_data) except Exception as notify_error: logger.warning(f"通知主程序拒绝通话失败: {notify_error}") return JSONResponse({ "status": "success", "message": "Call declined and session cleaned up" }) except HTTPException: raise except Exception as e: logger.error(f"❌ 拒绝通话处理失败: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Failed to decline call: {str(e)}") async def ensure_room_exists(self, room_name: str, dispatch_agent: bool = True): """确保房间存在并可选地分配Agent""" try: if self.livekit_api: from livekit.api import CreateRoomRequest # 尝试创建房间(如果已存在会返回现有房间) await self.livekit_api.room.create_room( CreateRoomRequest( name=room_name, empty_timeout=int(os.getenv('LIVEKIT_EMPTY_TIMEOUT', '1800')), max_participants=int(os.getenv('LIVEKIT_MAX_PARTICIPANTS', '10')) ) ) logger.info(f"房间 {room_name} 已确保存在") except Exception as e: # 房间可能已存在,这通常不是错误 logger.debug(f"房间创建/检查: {e}") async def cleanup_session(self, session_id: str, session_data: Dict = None) -> None: """清理会话(兼容旧接口)""" if not session_data: async with self.session_lock: # 修复:首先从session_contexts获取数据 session_data = session_contexts.pop(session_id, None) # 同时从active_sessions中删除(如果存在) active_sessions.pop(session_id, None) await self._perform_cleanup(session_id, session_data, "leave") async def cleanup_session_properly(self, session_id: str, session_data: Dict) -> None: """正确清理会话(新接口,用于拒绝通话)""" async with self.session_lock: # 从两个数据结构中删除 session_contexts.pop(session_id, None) active_sessions.pop(session_id, None) await self._perform_cleanup(session_id, session_data, "decline") async def _perform_cleanup(self, session_id: str, session_data: Dict, action_type: str) -> None: """执行实际的清理工作""" if session_data and self.livekit_api: room_name = session_data.get("room_name") if room_name: try: # 删除房间(会断开所有参与者) await self.livekit_api.room.delete_room( livekit_api.DeleteRoomRequest(room=room_name) ) logger.info(f"🧹 房间 {room_name} 已删除 (action: {action_type})") except Exception as e: logger.error(f"❌ 删除房间失败: {e}") logger.info(f"✅ Session cleaned up: {session_id} (action: {action_type})") async def notify_bot_call_declined(self, session_data: Dict) -> None: """通知主程序bot.py通话被拒绝""" try: # 这里可以添加向主程序发送通知的逻辑 # 例如:向消息队列发送事件、调用回调函数等 user_id = session_data.get('user_id') character_name = session_data.get('character_name') # 记录通知日志(实际通知机制可根据需要实现) logger.info(f"📢 已记录通话拒绝事件: user_id={user_id}, character={character_name}") # TODO: 根据具体需求实现通知机制 # 可能的实现方式: # 1. 写入数据库事件表 # 2. 发送HTTP请求到主程序API # 3. 写入共享文件或消息队列 # 4. 调用主程序提供的回调接口 except Exception as e: logger.error(f"❌ 通知主程序失败: {e}") # 不抛出异常,避免影响主要的拒绝流程 async def periodic_cleanup(self) -> None: """定期清理过期的会话上下文""" while True: await asyncio.sleep(60) try: current_time = time.time() expired_contexts = [] async with self.session_lock: for sid, context_data in session_contexts.items(): created_at = context_data.get("created_at", current_time) age_seconds = current_time - created_at if age_seconds > CONTEXT_EXPIRATION_SECONDS: expired_contexts.append((sid, age_seconds)) if expired_contexts: async with self.session_lock: for session_id, age in expired_contexts: session_contexts.pop(session_id, None) logger.info(f"清理了 {len(expired_contexts)} 个过期的会话上下文 (过期时间: {CONTEXT_EXPIRATION_SECONDS}秒)") for session_id, age in expired_contexts: logger.debug(f" 清理会话 {session_id}, 年龄: {age:.0f}秒") except Exception as e: logger.error(f"周期性清理任务出错: {e}", exc_info=True)

=============================================================

启动入口

=============================================================

def main():
"""主启动函数 - 兼容微信Bot的调用方式"""
if os.name == "nt":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

text
# 检查启动模式 if len(sys.argv) > 1: mode = sys.argv[1] if mode in ['agent', 'dev', 'console']: # 启动LiveKit Agent Worker logger.info("Starting LiveKit Agent Worker...") # 导入prewarm和load函数 from ai_companion_bot import prewarm_agent, compute_worker_load # ✅ 修复自动dispatch - 设置agent_name以确保正确调度 worker_options = WorkerOptions( agent_name="ai-companion-agent", # 恢复agent_name以确保正确调度 entrypoint_fnc=entrypoint, # 添加预热函数,预加载模型 prewarm_fnc=prewarm_agent, # 添加负载函数,控制并发 load_fnc=compute_worker_load, load_threshold=0.8, # 负载超过80%时停止接受新任务 # 设置优雅关闭配置 drain_timeout=30, # 30秒优雅关闭时间 shutdown_process_timeout=60, # 60秒强制关闭时间 # 设置为1个进程,避免重复启动 num_idle_processes=1, # 启用权限设置,确保Agent可以发布媒体 permissions=WorkerPermissions( can_publish=True, can_subscribe=True, can_publish_data=True, hidden=False ) ) logger.info("Worker配置已增强: 启用预热、负载管理、优雅关闭") agents_cli.run_app(worker_options) return # 默认模式:启动FastAPI服务器和Agent Worker logger.info("Starting Video Chat Plugin with integrated FastAPI Server and Agent Worker...") # 启动FastAPI服务器 server = VideoChatServer() port = server.config.get("fastapi_port", 8008) # Windows端口释放函数 def kill_process_on_port(port): """查找并结束占用指定端口的进程""" try: import subprocess # 使用netstat查找占用端口的进程 result = subprocess.run( f'netstat -ano | findstr :{port}', shell=True, capture_output=True, text=True ) if result.stdout: lines = result.stdout.strip().split('\n') pids = set() for line in lines: parts = line.split() if len(parts) >= 5 and 'LISTENING' in line: pid = parts[-1] if pid.isdigit(): pids.add(pid) for pid in pids: try: # 结束进程 subprocess.run(f'taskkill /F /PID {pid}', shell=True, check=True) logger.info(f"✅ 成功结束占用端口 {port} 的进程 PID: {pid}") except Exception as e: logger.warning(f"无法结束进程 PID {pid}: {e}") if pids: # 等待端口释放 time.sleep(1) return True return False except Exception as e: logger.error(f"检查端口占用时出错: {e}") return False # 检查并释放端口 logger.info(f"检查端口 {port} 是否被占用...") if kill_process_on_port(port): logger.info(f"端口 {port} 已被释放") # 使用新的Agent启动器 logger.info("Using subprocess launcher for Agent Worker...") from agent_launcher import launch_agent_worker, cleanup_agent_process # 启动Agent Worker子进程 agent_process = launch_agent_worker(current_dir) if agent_process: server.agent_process = agent_process # 注册清理函数 import atexit def cleanup(): script_path = current_dir / '_agent_worker_process.py' cleanup_agent_process(agent_process, script_path) atexit.register(cleanup) logger.info("Agent Worker subprocess launched successfully") else: logger.error("Failed to launch Agent Worker subprocess") # 等待Agent初始化 time.sleep(3) # 启动FastAPI服务器(主线程) import uvicorn logger.info("FastAPI Server ready for WeChat Bot integration") # 多次尝试启动服务器 max_retries = 3 retry_count = 0 try: while retry_count < max_retries: try: uvicorn.run( server.app, host=server.config.get("fastapi_host", "0.0.0.0"), port=port, ) break # 成功启动则退出循环 except OSError as e: if "Only one usage of each socket address" in str(e) or "[Errno 10048]" in str(e): retry_count += 1 logger.warning(f"端口 {port} 仍被占用,尝试再次释放... (尝试 {retry_count}/{max_retries})") if kill_process_on_port(port): time.sleep(2) # 等待更长时间确保端口释放 else: logger.error(f"无法释放端口 {port}") if retry_count >= max_retries: logger.error(f"无法启动服务器,端口 {port} 持续被占用") raise else: raise except KeyboardInterrupt: logger.info("Received shutdown signal") break except Exception as e: logger.error(f"Server startup failed: {e}") raise finally: logger.info("Cleaning up...") if agent_process: cleanup_agent_process(agent_process, current_dir / '_agent_worker_process.py')

if name == "main":
main()主程序bot.py(负责调用我的插件):

-- coding: utf-8 --

***********************************************************************

Modified based on the KouriChat project

Copyright of this modification: Copyright (C) 2025, iwyxdxl

Licensed under GNU GPL-3.0 or higher, see the LICENSE file for details.

This file is part of WeChatBot, which includes modifications to the KouriChat project.

The original KouriChat project's copyright and license information are preserved in the LICENSE file.

For any further details regarding the license, please refer to the LICENSE file.

***********************************************************************

import sys
import base64
import subprocess
import requests
import logging
from datetime import datetime
import datetime as dt
import threading
import asyncio
import time
from wxautox_wechatbot import WeChat
from openai import OpenAI
import random
from typing import Optional
import pyautogui
import shutil
import re
from config import *
import config_utils # 导入配置工具函数
from plugin_config_accessor import get_video_chat_keywords, get_character_switch_keywords
import queue
import json
from threading import Timer
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import os
os.environ["PROJECT_NAME"] = 'iwyxdxl/WeChatBot_WXAUTO_SE'
from wxautox_wechatbot.param import WxParam
WxParam.ENABLE_FILE_LOGGER = False
WxParam.FORCE_MESSAGE_XBIAS = True

video_chat_plugin 现在是一个包,可以直接导入

生成用户昵称列表和prompt映射字典

user_names = [entry[0] for entry in LISTEN_LIST]
prompt_mapping = {entry[0]: entry[1] for entry in LISTEN_LIST}

群聊信息缓存

group_chat_cache = {} # {user_name: is_group_chat}
group_cache_lock = threading.Lock()

持续监听消息,并且收到消息后回复

wait = 1 # 设置1秒查看一次是否有新消息

获取程序根目录

root_dir = os.path.dirname(os.path.abspath(file))

用户消息队列和聊天上下文管理

user_queues = {} # {user_id: {'messages': [], 'last_message_time': 时间戳, ...}}
queue_lock = threading.Lock() # 队列访问锁
chat_contexts = {} # {user_id: [{'role': 'user', 'content': '...'}, ...]}
CHAT_CONTEXTS_FILE = "chat_contexts.json" # 存储聊天上下文的文件名
USER_TIMERS_FILE = "user_timers.json" # 存储用户计时器状态的文件名

心跳相关全局变量

HEARTBEAT_INTERVAL = 5 # 秒
FLASK_SERVER_URL_BASE = f'http://localhost:{PORT}' # 使用从config导入的PORT

--- REMINDER RELATED GLOBALS ---

RECURRING_REMINDERS_FILE = "recurring_reminders.json" # 存储重复和长期一次性提醒的文件名

recurring_reminders 结构:

[{'reminder_type': 'recurring', 'user_id': 'xxx', 'time_str': 'HH:MM', 'content': '...'},

{'reminder_type': 'one-off', 'user_id': 'xxx', 'target_datetime_str': 'YYYY-MM-DD HH:MM', 'content': '...'}]

recurring_reminders = [] # 内存中加载的提醒列表
recurring_reminder_lock = threading.RLock() # 锁,用于处理提醒文件和列表的读写

active_timers = {} # { (user_id, timer_id): Timer_object } (用于短期一次性提醒 < 10min)
timer_lock = threading.Lock()
next_timer_id = 0
video_chat_process = None # 视频聊天插件子进程

class AsyncHTTPHandler(logging.Handler):
def init(self, url, retry_attempts=3, timeout=3, max_queue_size=1000, batch_size=20, batch_timeout=5):
"""
初始化异步 HTTP 日志处理器。

text
Args: url (str): 发送日志的目标 URL。 retry_attempts (int): 发送失败时的重试次数。 timeout (int): HTTP 请求的超时时间(秒)。 max_queue_size (int): 内存中日志队列的最大容量。 当队列满时,新的日志消息将被丢弃。 batch_size (int): 批量处理的日志数量,达到此数量会触发发送。 batch_timeout (int): 批处理超时时间(秒),即使未达到batch_size, 经过此时间也会发送当前累积的日志。 """ super().__init__() self.url = url self.retry_attempts = retry_attempts self.timeout = timeout self.log_queue = queue.Queue(maxsize=max_queue_size) self._stop_event = threading.Event() self.dropped_logs_count = 0 # 添加一个计数器来跟踪被丢弃的日志数量 self.batch_size = batch_size # 批处理大小 self.batch_timeout = batch_timeout # 批处理超时时间 # 新增: 断路器相关属性 self.consecutive_failures = 0 # 跟踪连续失败次数 self.circuit_breaker_open = False # 断路器状态 self.circuit_breaker_reset_time = None # 断路器重置时间 self.CIRCUIT_BREAKER_THRESHOLD = 5 # 触发断路器的连续失败次数 self.CIRCUIT_BREAKER_RESET_TIMEOUT = 60 # 断路器重置时间(秒) # 新增: HTTP请求统计 self.total_requests = 0 self.failed_requests = 0 self.last_success_time = time.time() # 后台线程用于处理日志队列 self.worker = threading.Thread(target=self._process_queue, daemon=True) self.worker.start() def emit(self, record): """ 格式化日志记录并尝试将其放入队列。 如果队列已满,则放弃该日志并记录警告。 """ try: log_entry = self.format(record) # 使用非阻塞方式放入队列 self.log_queue.put(log_entry, block=False) except queue.Full: # 当队列满时,捕获 queue.Full 异常 self.dropped_logs_count += 1 # 避免在日志处理器内部再次调用 logger (可能导致死循环) # 每丢弃一定数量的日志后才记录一次,避免刷屏 if self.dropped_logs_count % 100 == 1: # 每丢弃100条日志记录一次(第1, 101, 201...条时记录) logging.warning(f"日志队列已满 (容量 {self.log_queue.maxsize}),已丢弃 {self.dropped_logs_count} 条日志。请检查日志接收端或网络。") except Exception: # 处理其他可能的格式化或放入队列前的错误 self.handleError(record) def _should_attempt_send(self): """检查断路器是否开启,决定是否尝试发送""" if not self.circuit_breaker_open: return True now = time.time() if self.circuit_breaker_reset_time and now >= self.circuit_breaker_reset_time: # 重置断路器 logging.info("日志发送断路器重置,恢复尝试发送") self.circuit_breaker_open = False self.consecutive_failures = 0 return True return False def _process_queue(self): """ 后台工作线程,积累一定数量的日志后批量发送到目标 URL。 """ headers = { 'Content-Type': 'application/json', 'User-Agent': 'WeChatBot/1.0' } batch = [] # 用于存储批处理日志 last_batch_time = time.time() # 上次发送批处理的时间 while not self._stop_event.is_set(): try: # 等待日志消息,设置超时以便能响应停止事件和批处理超时 try: # 使用较短的超时时间以便及时检查批处理超时 log_entry = self.log_queue.get(timeout=0.5) batch.append(log_entry) # 标记队列任务完成 self.log_queue.task_done() except queue.Empty: # 队列为空时,检查是否应该发送当前批次(超时) pass current_time = time.time() batch_timeout_reached = current_time - last_batch_time >= self.batch_timeout batch_size_reached = len(batch) >= self.batch_size # 如果达到批量大小或超时,且有日志要发送 if (batch_size_reached or batch_timeout_reached) and batch: # 新增: 检查断路器状态 if self._should_attempt_send(): success = self._send_batch(batch, headers) if success: self.consecutive_failures = 0 # 重置失败计数 self.last_success_time = time.time() else: self.consecutive_failures += 1 self.failed_requests += 1 if self.consecutive_failures >= self.CIRCUIT_BREAKER_THRESHOLD: # 打开断路器 self.circuit_breaker_open = True self.circuit_breaker_reset_time = time.time() + self.CIRCUIT_BREAKER_RESET_TIMEOUT logging.warning(f"日志发送连续失败 {self.consecutive_failures} 次,断路器开启 {self.CIRCUIT_BREAKER_RESET_TIMEOUT} 秒") else: # 断路器开启,暂时不发送 reset_remaining = self.circuit_breaker_reset_time - time.time() if self.circuit_breaker_reset_time else 0 logging.debug(f"断路器开启状态,暂不发送 {len(batch)} 条日志,将在 {reset_remaining:.1f} 秒后尝试恢复") batch = [] # 无论是否发送成功,都清空批次 last_batch_time = current_time # 重置批处理时间 except Exception as e: # 出错时清空当前批次,避免卡住 logging.error(f"日志处理队列异常: {str(e)}", exc_info=True) batch = [] last_batch_time = time.time() time.sleep(1) # 出错后暂停一下,避免CPU占用过高 # 关闭前发送剩余的日志 if batch: self._send_batch(batch, headers) def _send_batch(self, batch, headers): """ 发送一批日志记录,使用改进的重试策略 返回: bool: 是否成功发送 """ data = {'logs': batch} # 改进1: 使用固定的最大重试延迟上限 MAX_RETRY_DELAY = 2.0 # 最大重试延迟(秒) BASE_DELAY = 0.5 # 基础延迟(秒) self.total_requests += 1 for attempt in range(self.retry_attempts): try: # 修复:为localhost连接禁用代理 proxies = None if 'localhost' in self.url or '127.0.0.1' in self.url: proxies = {'http': '', 'https': ''} resp = requests.post( self.url, json=data, headers=headers, timeout=self.timeout, proxies=proxies ) resp.raise_for_status() # 检查 HTTP 错误状态码 # 成功发送,记录日志数量 if attempt > 0: logging.info(f"在第 {attempt+1} 次尝试后成功发送 {len(batch)} 条日志") else: logging.debug(f"成功批量发送 {len(batch)} 条日志") return True # 成功返回 except requests.exceptions.RequestException as e: # 改进2: 根据错误类型区分处理 if isinstance(e, requests.exceptions.Timeout): logging.warning(f"日志发送超时 (尝试 {attempt+1}/{self.retry_attempts})") delay = min(BASE_DELAY, MAX_RETRY_DELAY) # 对超时使用较短的固定延迟 elif isinstance(e, requests.exceptions.ConnectionError): logging.warning(f"日志发送连接错误 (尝试 {attempt+1}/{self.retry_attempts}): {e}") delay = min(BASE_DELAY * (1.5 ** attempt), MAX_RETRY_DELAY) # 有限的指数退避 else: logging.warning(f"日志发送失败 (尝试 {attempt+1}/{self.retry_attempts}): {e}") delay = min(BASE_DELAY * (1.5 ** attempt), MAX_RETRY_DELAY) # 有限的指数退避 # 最后一次尝试不需要等待 if attempt < self.retry_attempts - 1: time.sleep(delay) # 改进3: 所有重试都失败,记录警告并返回失败状态 downtime = time.time() - self.last_success_time logging.error(f"发送日志批次失败,已达到最大重试次数 ({self.retry_attempts}),丢弃 {len(batch)} 条日志 (连续失败: {self.consecutive_failures+1}, 持续时间: {downtime:.1f}秒)") return False # 返回失败状态 def get_stats(self): """返回日志处理器的统计信息""" return { 'queue_size': self.log_queue.qsize(), 'queue_capacity': self.log_queue.maxsize, 'dropped_logs': self.dropped_logs_count, 'total_requests': self.total_requests, 'failed_requests': self.failed_requests, 'circuit_breaker_status': 'open' if self.circuit_breaker_open else 'closed', 'consecutive_failures': self.consecutive_failures } def close(self): """ 停止工作线程并等待队列处理完成(或超时)。 """ if not self.log_queue.empty(): logging.info(f"关闭日志处理器,还有 {self.log_queue.qsize()} 条日志待处理") try: # 尝试最多等待30秒处理剩余日志 self.log_queue.join(timeout=30) except: pass self._stop_event.set() self.worker.join(timeout=self.timeout * self.retry_attempts + 5) # 等待一个合理的时间 if self.worker.is_alive(): logging.warning("日志处理线程未能正常退出") else: logging.info("日志处理线程已正常退出") super().close()

创建日志格式器

formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

初始化异步HTTP处理器

async_http_handler = AsyncHTTPHandler(
url=f'http://localhost:{PORT}/api/log',
batch_size=20, # 一次发送20条日志
batch_timeout=1 # 即使不满20条,最多等待1秒也发送
)
async_http_handler.setFormatter(formatter)

配置根Logger

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.handlers.clear()

添加异步HTTP日志处理器

logger.addHandler(async_http_handler)

同时可以保留控制台日志处理器

console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)

获取微信窗口对象

try:
wx = WeChat()
except:
logger.error(f"\033[31m无法初始化微信接口,请确保您安装的是微信3.9版本,并且已经登录!\033[0m")
logger.error("\033[31m微信3.9版本下载地址:https://dldir1v6.qq.com/weixin/Windows/WeChatSetup.exe \033[0m")
exit(1)

获取登录用户的名字

ROBOT_WX_NAME = wx.nickname

存储用户的计时器和随机等待时间

user_timers = {}
user_wait_times = {}
emoji_timer = None
emoji_timer_lock = threading.Lock()

全局变量,控制消息发送状态

can_send_messages = True
is_sending_message = False

--- 定时重启相关全局变量 ---

program_start_time = 0.0 # 程序启动时间戳
last_received_message_timestamp = 0.0 # 最后一次活动(收到/处理消息)的时间戳

初始化OpenAI客户端

client = OpenAI(
api_key=DEEPSEEK_API_KEY,
base_url=DEEPSEEK_BASE_URL
)

#初始化在线 AI 客户端 (如果启用)
online_client: Optional[OpenAI] = None
if ENABLE_ONLINE_API:
try:
online_client = OpenAI(
api_key=ONLINE_API_KEY,
base_url=ONLINE_BASE_URL
)
logger.info("联网搜索 API 客户端已初始化。")
except Exception as e:
logger.error(f"初始化联网搜索 API 客户端失败: {e}", exc_info=True)
ENABLE_ONLINE_API = False # 初始化失败则禁用该功能
logger.warning("由于初始化失败,联网搜索功能已被禁用。")

初始化辅助模型客户端 (如果启用)

assistant_client: Optional[OpenAI] = None
if ENABLE_ASSISTANT_MODEL:
try:
assistant_client = OpenAI(
api_key=ASSISTANT_API_KEY,
base_url=ASSISTANT_BASE_URL
)
logger.info("辅助模型 API 客户端已初始化。")
except Exception as e:
logger.error(f"初始化辅助模型 API 客户端失败: {e}", exc_info=True)
ENABLE_ASSISTANT_MODEL = False # 初始化失败则禁用该功能
logger.warning("由于初始化失败,辅助模型功能已被禁用。")

def get_chat_type_info(user_name):
"""
获取指定用户的聊天窗口类型信息(群聊或私聊)

text
Args: user_name (str): 用户昵称 Returns: bool: True表示群聊,False表示私聊,None表示未找到或出错 """ try: # 获取所有聊天窗口 chats = wx.GetAllSubWindow() for chat in chats: chat_info = chat.ChatInfo() if chat_info.get('chat_name') == user_name: chat_type = chat_info.get('chat_type') is_group = (chat_type == 'group') logger.info(f"检测到用户 '{user_name}' 的聊天类型: {chat_type} ({'群聊' if is_group else '私聊'})") return is_group logger.warning(f"未找到用户 '{user_name}' 的聊天窗口信息") return None except Exception as e: logger.error(f"获取用户 '{user_name}' 聊天类型时出错: {e}") return None

def update_group_chat_cache():
"""
更新群聊缓存信息
"""
global group_chat_cache

text
try: with group_cache_lock: logger.info("开始更新群聊类型缓存...") for user_name in user_names: chat_type_result = get_chat_type_info(user_name) if chat_type_result is not None: group_chat_cache[user_name] = chat_type_result logger.info(f"缓存用户 '{user_name}': {'群聊' if chat_type_result else '私聊'}") else: logger.warning(f"无法确定用户 '{user_name}' 的聊天类型,将默认处理为私聊") group_chat_cache[user_name] = False logger.info(f"群聊类型缓存更新完成,共缓存 {len(group_chat_cache)} 个用户信息") except Exception as e: logger.error(f"更新群聊缓存时出错: {e}")

def is_user_group_chat(user_name):
"""
检查指定用户是否为群聊

text
Args: user_name (str): 用户昵称 Returns: bool: True表示群聊,False表示私聊 """ with group_cache_lock: # 如果缓存中没有该用户信息,则实时获取 if user_name not in group_chat_cache: chat_type_result = get_chat_type_info(user_name) if chat_type_result is not None: group_chat_cache[user_name] = chat_type_result else: # 如果无法获取,默认为私聊 group_chat_cache[user_name] = False return group_chat_cache.get(user_name, False)

def parse_time(time_str):
try:
TimeResult = datetime.strptime(time_str, "%H:%M").time()
return TimeResult
except Exception as e:
logger.error("\033[31m错误:主动消息安静时间设置有误!请填00:00-23:59 不要填24:00,并请注意中间的符号为英文冒号!\033[0m")

quiet_time_start = parse_time(QUIET_TIME_START)
quiet_time_end = parse_time(QUIET_TIME_END)

def check_user_timeouts():
"""
检查用户是否超时未活动,并将主动消息加入队列以触发联网检查流程。
"""
global last_received_message_timestamp # 引用全局变量
if ENABLE_AUTO_MESSAGE:
while True:
current_epoch_time = time.time()

text
for user in user_names: last_active = user_timers.get(user) wait_time = user_wait_times.get(user) if isinstance(last_active, (int, float)) and isinstance(wait_time, (int, float)): if current_epoch_time - last_active >= wait_time and not is_quiet_time(): # 检查是否启用了忽略群聊主动消息的配置 if IGNORE_GROUP_CHAT_FOR_AUTO_MESSAGE and is_user_group_chat(user): logger.info(f"用户 {user} 是群聊且配置为忽略群聊主动消息,跳过发送主动消息") # 重置计时器以避免频繁检查 reset_user_timer(user) continue # 构造主动消息(模拟用户消息格式) formatted_now = datetime.now().strftime("%Y-%m-%d %A %H:%M:%S") auto_content = f"触发主动发消息:[{formatted_now}] {AUTO_MESSAGE}" logger.info(f"为用户 {user} 生成主动消息并加入队列: {auto_content}") # 将主动消息加入队列(模拟用户消息) with queue_lock: if user not in user_queues: user_queues[user] = { 'messages': [auto_content], 'sender_name': user, 'username': user, 'last_message_time': time.time() } else: user_queues[user]['messages'].append(auto_content) user_queues[user]['last_message_time'] = time.time() # 更新全局的最后消息活动时间戳,因为机器人主动发消息也算一种活动 last_received_message_timestamp = time.time() # 重置计时器(不触发 on_user_message) reset_user_timer(user) time.sleep(10)

def reset_user_timer(user):
user_timers[user] = time.time()
user_wait_times[user] = get_random_wait_time()

def get_random_wait_time():
return random.uniform(MIN_COUNTDOWN_HOURS, MAX_COUNTDOWN_HOURS) * 3600 # 转换为秒

当接收到用户的新消息时,调用此函数

def on_user_message(user):
if user not in user_names:
user_names.append(user)
reset_user_timer(user)

修改get_user_prompt函数

def get_user_prompt(user_id):
# 查找映射中的文件名,若不存在则使用user_id
prompt_file = prompt_mapping.get(user_id, user_id)
prompt_path = os.path.join(root_dir, 'prompts', f'{prompt_file}.md')

text
if not os.path.exists(prompt_path): logger.error(f"Prompt文件不存在: {prompt_path}") raise FileNotFoundError(f"Prompt文件 {prompt_file}.md 未找到于 prompts 目录") with open(prompt_path, 'r', encoding='utf-8') as file: prompt_content = file.read() if UPLOAD_MEMORY_TO_AI: return prompt_content else: memory_marker = "## 记忆片段" if memory_marker in prompt_content: prompt_content = prompt_content.split(memory_marker, 1)[0].strip() return prompt_content

加载聊天上下文

def load_chat_contexts():
"""从文件加载聊天上下文。"""
global chat_contexts # 声明我们要修改全局变量
try:
if os.path.exists(CHAT_CONTEXTS_FILE):
with open(CHAT_CONTEXTS_FILE, 'r', encoding='utf-8') as f:
loaded_contexts = json.load(f)
if isinstance(loaded_contexts, dict):
chat_contexts = loaded_contexts
logger.info(f"成功从 {CHAT_CONTEXTS_FILE} 加载 {len(chat_contexts)} 个用户的聊天上下文。")
else:
logger.warning(f"{CHAT_CONTEXTS_FILE} 文件内容格式不正确(非字典),将使用空上下文。")
chat_contexts = {} # 重置为空
else:
logger.info(f"{CHAT_CONTEXTS_FILE} 未找到,将使用空聊天上下文启动。")
chat_contexts = {} # 初始化为空
except json.JSONDecodeError:
logger.error(f"解析 {CHAT_CONTEXTS_FILE} 失败,文件可能已损坏。将使用空上下文。")
# 可以考虑在这里备份损坏的文件
# shutil.copy(CHAT_CONTEXTS_FILE, CHAT_CONTEXTS_FILE + ".corrupted")
chat_contexts = {} # 重置为空
except Exception as e:
logger.error(f"加载聊天上下文失败: {e}", exc_info=True)
chat_contexts = {} # 出现其他错误也重置为空,保证程序能启动

保存聊天上下文

def save_chat_contexts():
"""将当前聊天上下文保存到文件。"""
global chat_contexts
temp_file_path = CHAT_CONTEXTS_FILE + ".tmp"
try:
# 创建要保存的上下文副本,以防在写入时被其他线程修改
# 如果在 queue_lock 保护下调用,则直接使用全局 chat_contexts 即可
contexts_to_save = dict(chat_contexts) # 创建浅拷贝

text
with open(temp_file_path, 'w', encoding='utf-8') as f: json.dump(contexts_to_save, f, ensure_ascii=False, indent=4) shutil.move(temp_file_path, CHAT_CONTEXTS_FILE) # 原子替换 logger.debug(f"聊天上下文已成功保存到 {CHAT_CONTEXTS_FILE}") except Exception as e: logger.error(f"保存聊天上下文到 {CHAT_CONTEXTS_FILE} 失败: {e}", exc_info=True) if os.path.exists(temp_file_path): try: os.remove(temp_file_path) # 清理临时文件 except OSError: pass # 忽略清理错误

def get_deepseek_response(message, user_id, store_context=True, is_summary=False):
"""
从 DeepSeek API 获取响应,确保正确的上下文处理,并持久化上下文。

text
参数: message (str): 用户的消息或系统提示词(用于工具调用)。 user_id (str): 用户或系统组件的标识符。 store_context (bool): 是否将此交互存储到聊天上下文中。 对于工具调用(如解析或总结),设置为 False。 """ try: # 每次调用都重新加载聊天上下文,以应对文件被外部修改的情况 load_chat_contexts() logger.info(f"调用 Chat API - ID: {user_id}, 是否存储上下文: {store_context}, 消息: {message[:100]}...") # 日志记录消息片段 messages_to_send = [] context_limit = MAX_GROUPS * 2 # 最大消息总数(不包括系统消息) if store_context: # --- 处理需要上下文的常规聊天消息 --- # 1. 获取该用户的系统提示词 try: user_prompt = get_user_prompt(user_id) messages_to_send.append({"role": "system", "content": user_prompt}) except FileNotFoundError as e: logger.error(f"用户 {user_id} 的提示文件错误: {e},使用默认提示。") messages_to_send.append({"role": "system", "content": "你是一个乐于助人的助手。"}) # 2. 管理并检索聊天历史记录 with queue_lock: # 确保对 chat_contexts 的访问是线程安全的 if user_id not in chat_contexts: chat_contexts[user_id] = [] # 在添加当前消息之前获取现有历史记录 history = list(chat_contexts.get(user_id, [])) # 获取副本 # 如果历史记录超过限制,则进行裁剪 if len(history) > context_limit: history = history[-context_limit:] # 保留最近的消息 # 将历史消息添加到 API 请求列表中 messages_to_send.extend(history) # 3. 将当前用户消息添加到 API 请求列表中 messages_to_send.append({"role": "user", "content": message}) # 4. 在准备 API 调用后更新持久上下文 # 将用户消息添加到持久存储中 chat_contexts[user_id].append({"role": "user", "content": message}) # 如果需要,裁剪持久存储(在助手回复后会再次裁剪) if len(chat_contexts[user_id]) > context_limit + 1: # +1 因为刚刚添加了用户消息 chat_contexts[user_id] = chat_contexts[user_id][-(context_limit + 1):] # 保存上下文到文件 save_chat_contexts() # 在用户消息添加后保存一次 else: # --- 处理工具调用(如提醒解析、总结) --- messages_to_send.append({"role": "user", "content": message}) logger.info(f"工具调用 (store_context=False),ID: {user_id}。仅发送提供的消息。") # --- 调用 API --- reply = call_chat_api_with_retry(messages_to_send, user_id, is_summary=is_summary) # --- 如果需要,存储助手回复到上下文中 --- if store_context: with queue_lock: # 再次获取锁来更新和保存 if user_id not in chat_contexts: chat_contexts[user_id] = [] # 安全初始化 (理论上此时应已存在) chat_contexts[user_id].append({"role": "assistant", "content": reply}) if len(chat_contexts[user_id]) > context_limit: chat_contexts[user_id] = chat_contexts[user_id][-context_limit:] # 保存上下文到文件 save_chat_contexts() # 在助手回复添加后再次保存 return reply except Exception as e: logger.error(f"Chat 调用失败 (ID: {user_id}): {str(e)}", exc_info=True) return "抱歉,我现在有点忙,稍后再聊吧。"

def strip_before_thought_tags(text):
# 匹配并截取 </thought></think> 后面的内容
match = re.search(r'(?:</thought>|</think>)([\s\S]*)', text)
if match:
return match.group(1)
else:
return text

def call_chat_api_with_retry(messages_to_send, user_id, max_retries=2, is_summary=False):
"""
调用 Chat API 并在第一次失败或返回空结果时重试。

text
参数: messages_to_send (list): 要发送给 API 的消息列表。 user_id (str): 用户或系统组件的标识符。 max_retries (int): 最大重试次数。 返回: str: API 返回的文本回复。 """ attempt = 0 while attempt <= max_retries: try: logger.debug(f"发送给 API 的消息 (ID: {user_id}): {messages_to_send}") response = client.chat.completions.create( model=MODEL, messages=messages_to_send, temperature=TEMPERATURE, max_tokens=MAX_TOKEN, stream=False ) if response.choices: content = response.choices[0].message.content.strip() if content and "[image]" not in content: filtered_content = strip_before_thought_tags(content) if filtered_content: return filtered_content # 记录错误日志 logger.error(f"错误请求消息体: {MODEL}") logger.error(json.dumps(messages_to_send, ensure_ascii=False, indent=2)) logger.error(f"\033[31m错误:API 返回了空的选择项或内容为空。模型名:{MODEL}\033[0m") logger.error(f"完整响应对象: {response}") except Exception as e: logger.error(f"错误请求消息体: {MODEL}") logger.error(json.dumps(messages_to_send, ensure_ascii=False, indent=2)) error_info = str(e) logger.error(f"自动重试:第 {attempt + 1} 次调用 {MODEL}失败 (ID: {user_id}) 原因: {error_info}", exc_info=False) # 细化错误分类 if "real name verification" in error_info: logger.error("\033[31m错误:API 服务商反馈请完成实名认证后再使用!\033[0m") break # 终止循环,不再重试 elif "rate limit" in error_info: logger.error("\033[31m错误:API 服务商反馈当前访问 API 服务频次达到上限,请稍后再试!\033[0m") elif "payment required" in error_info: logger.error("\033[31m错误:API 服务商反馈您正在使用付费模型,请先充值再使用或使用免费额度模型!\033[0m") break # 终止循环,不再重试 elif "user quota" in error_info or "is not enough" in error_info or "UnlimitedQuota" in error_info: logger.error("\033[31m错误:API 服务商反馈,你的余额不足,请先充值再使用! 如有余额,请检查令牌是否为无限额度。\033[0m") break # 终止循环,不再重试 elif "Api key is invalid" in error_info: logger.error("\033[31m错误:API 服务商反馈 API KEY 不可用,请检查配置选项!\033[0m") elif "service unavailable" in error_info: logger.error("\033[31m错误:API 服务商反馈服务器繁忙,请稍后再试!\033[0m") elif "sensitive words detected" in error_info: logger.error("\033[31m错误:Prompt或消息中含有敏感词,无法生成回复,请联系API服务商!\033[0m") if ENABLE_SENSITIVE_CONTENT_CLEARING: logger.warning(f"已开启敏感词自动清除上下文功能,开始清除用户 {user_id} 的聊天上下文") clear_chat_context(user_id) if is_summary: clear_memory_temp_files(user_id) # 如果是总结任务,清除临时文件 break # 终止循环,不再重试 else: logger.error("\033[31m未知错误:" + error_info + "\033[0m") attempt += 1 raise RuntimeError("抱歉,我现在有点忙,稍后再聊吧。")

def get_assistant_response(message, user_id, is_summary=False):
"""
从辅助模型 API 获取响应,专用于判断型任务(表情、联网、提醒解析等)。
不存储聊天上下文,仅用于辅助判断。

text
参数: message (str): 要发送给辅助模型的消息。 user_id (str): 用户或系统组件的标识符。 返回: str: 辅助模型返回的文本回复。 """ if not assistant_client: logger.warning(f"辅助模型客户端未初始化,回退使用主模型。用户ID: {user_id}") # 回退到主模型 return get_deepseek_response(message, user_id, store_context=False, is_summary=is_summary) try: logger.info(f"调用辅助模型 API - ID: {user_id}, 消息: {message[:100]}...") messages_to_send = [{"role": "user", "content": message}] # 调用辅助模型 API reply = call_assistant_api_with_retry(messages_to_send, user_id, is_summary=is_summary) return reply except Exception as e: logger.error(f"辅助模型调用失败 (ID: {user_id}): {str(e)}", exc_info=True) logger.warning(f"辅助模型调用失败,回退使用主模型。用户ID: {user_id}") # 回退到主模型 return get_deepseek_response(message, user_id, store_context=False, is_summary=is_summary)

def call_assistant_api_with_retry(messages_to_send, user_id, max_retries=2, is_summary=False):
"""
调用辅助模型 API 并在第一次失败或返回空结果时重试。

text
参数: messages_to_send (list): 要发送给辅助模型的消息列表。 user_id (str): 用户或系统组件的标识符。 max_retries (int): 最大重试次数。 返回: str: 辅助模型返回的文本回复。 """ attempt = 0 while attempt <= max_retries: try: logger.debug(f"发送给辅助模型 API 的消息 (ID: {user_id}): {messages_to_send}") response = assistant_client.chat.completions.create( model=ASSISTANT_MODEL, messages=messages_to_send, temperature=ASSISTANT_TEMPERATURE, max_tokens=ASSISTANT_MAX_TOKEN, stream=False ) if response.choices: content = response.choices[0].message.content.strip() if content and "[image]" not in content: filtered_content = strip_before_thought_tags(content) if filtered_content: return filtered_content # 记录错误日志 logger.error("辅助模型错误请求消息体:") logger.error(f"{ASSISTANT_MODEL}") logger.error(json.dumps(messages_to_send, ensure_ascii=False, indent=2)) logger.error("辅助模型 API 返回了空的选择项或内容为空。") logger.error(f"完整响应对象: {response}") except Exception as e: logger.error("辅助模型错误请求消息体:") logger.error(f"{ASSISTANT_MODEL}") logger.error(json.dumps(messages_to_send, ensure_ascii=False, indent=2)) error_info = str(e) logger.error(f"辅助模型自动重试:第 {attempt + 1} 次调用失败 (ID: {user_id}) 原因: {error_info}", exc_info=False) # 细化错误分类 if "real name verification" in error_info: logger.error("\033[31m错误:API 服务商反馈请完成实名认证后再使用!\033[0m") break # 终止循环,不再重试 elif "rate limit" in error_info: logger.error("\033[31m错误:API 服务商反馈当前访问 API 服务频次达到上限,请稍后再试!\033[0m") elif "payment required" in error_info: logger.error("\033[31m错误:API 服务商反馈您正在使用付费模型,请先充值再使用或使用免费额度模型!\033[0m") break # 终止循环,不再重试 elif "user quota" in error_info or "is not enough" in error_info or "UnlimitedQuota" in error_info: logger.error("\033[31m错误:API 服务商反馈,你的余额不足,请先充值再使用! 如有余额,请检查令牌是否为无限额度。\033[0m") break # 终止循环,不再重试 elif "Api key is invalid" in error_info: logger.error("\033[31m错误:API 服务商反馈 API KEY 不可用,请检查配置选项!\033[0m") elif "service unavailable" in error_info: logger.error("\033[31m错误:API 服务商反馈服务器繁忙,请稍后再试!\033[0m") elif "sensitive words detected" in error_info: logger.error("\033[31m错误:提示词中含有敏感词,无法生成回复,请联系API服务商!\033[0m") if ENABLE_SENSITIVE_CONTENT_CLEARING: logger.warning(f"已开启敏感词自动清除上下文功能,开始清除用户 {user_id} 的聊天上下文") clear_chat_context(user_id) if is_summary: clear_memory_temp_files(user_id) # 如果是总结任务,清除临时文件 break # 终止循环,不再重试 else: logger.error("\033[31m未知错误:" + error_info + "\033[0m") attempt += 1 raise RuntimeError("抱歉,辅助模型现在有点忙,稍后再试吧。")

def keep_alive():
"""
定期检查监听列表,确保所有在 user_names 中的用户都被持续监听。
如果发现有用户从监听列表中丢失,则会尝试重新添加。
这是一个守护线程,用于增强程序的健壮性。
"""
check_interval = 5 # 每30秒检查一次,避免过于频繁
logger.info(f"窗口保活/监听守护线程已启动,每 {check_interval} 秒检查一次监听状态。")

text
while True: try: # 获取当前所有正在监听的用户昵称集合 current_listening_users = set(wx.listen.keys()) # 获取应该被监听的用户昵称集合 expected_users_to_listen = set(user_names) # 找出配置中应该监听但当前未在监听列表中的用户 missing_users = expected_users_to_listen - current_listening_users if missing_users: logger.warning(f"检测到 {len(missing_users)} 个用户从监听列表中丢失: {', '.join(missing_users)}") for user in missing_users: try: logger.info(f"正在尝试重新添加用户 '{user}' 到监听列表...") # 使用与程序启动时相同的回调函数 `message_listener` 重新添加监听 wx.AddListenChat(nickname=user, callback=message_listener) logger.info(f"已成功将用户 '{user}' 重新添加回监听列表。") except Exception as e: logger.error(f"重新添加用户 '{user}' 到监听列表时失败: {e}", exc_info=True) else: # 使用 debug 级别,因为正常情况下这条日志会频繁出现,避免刷屏 logger.debug(f"监听列表状态正常,所有 {len(expected_users_to_listen)} 个目标用户都在监听中。") except Exception as e: # 捕获在检查过程中可能发生的任何意外错误,使线程能继续运行 logger.error(f"keep_alive 线程在检查监听列表时发生未知错误: {e}", exc_info=True) # 等待指定间隔后再进行下一次检查 time.sleep(check_interval)

def message_listener(msg, chat):
global can_send_messages
who = chat.who
msgtype = msg.type
original_content = msg.content
sender = msg.sender
msgattr = msg.attr
logger.info(f'收到来自聊天窗口 "{who}" 中用户 "{sender}" 的原始消息 (类型: {msgtype}, 属性: {msgattr}): {original_content[:100]}')

text
if msgattr != 'friend': logger.info(f"非好友消息,已忽略。") return if msgtype == 'voice': voicetext = msg.to_text() original_content = (f"[语音消息]: {voicetext}") if msgtype == 'link': cardurl = msg.get_url() original_content = (f"[卡片链接]: {cardurl}") if msgtype == 'quote': # 引用消息处理 quoted_msg = msg.quote_content if quoted_msg: original_content = f"[引用<{quoted_msg}>消息]: {msg.content}" else: original_content = msg.content if msgtype == 'merge': logger.info(f"收到合并转发消息,开始处理") mergecontent = msg.get_messages() logger.info(f"收到合并转发消息,处理完成") # mergecontent 是一个列表,每个元素是 [发送者, 内容, 时间] # 转换为多行文本,每行格式: [时间] 发送者: 内容 if isinstance(mergecontent, list): merged_text_lines = [] for item in mergecontent: if isinstance(item, list) and len(item) == 3: sender, content, timestamp = item # 修改这里的判断逻辑,正确处理WindowsPath对象 # 检查是否为WindowsPath对象 if hasattr(content, 'suffix') and str(content.suffix).lower() in ('.png', '.jpg', '.jpeg', '.gif', '.bmp'): # 是WindowsPath对象且是图片 if ENABLE_IMAGE_RECOGNITION: try: logger.info(f"开始识别图片: {str(content)}") # 将WindowsPath对象转换为字符串 image_path = str(content) # 保存当前状态 original_can_send_messages = can_send_messages # 处理图片 content = recognize_image_with_moonshot(image_path, is_emoji=False) if content: logger.info(f"图片识别成功: {content}") content = f"[图片识别结果]: {content}" else: content = "[图片识别结果]: 无法识别图片内容" # 确保状态恢复 can_send_messages = original_can_send_messages except Exception as e: content = "[图片识别失败]" logger.error(f"图片识别失败: {e}") # 确保状态恢复 can_send_messages = True else: content = "[图片]" # 处理字符串路径的判断 (兼容性保留) elif isinstance(content, str) and content.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp')): if ENABLE_IMAGE_RECOGNITION: try: logger.info(f"开始识别图片: {content}") # 保存当前状态 original_can_send_messages = can_send_messages # 处理图片 image_content = recognize_image_with_moonshot(content, is_emoji=False) if image_content: logger.info(f"图片识别成功: {image_content}") content = f"[图片识别结果]: {image_content}" else: content = "[图片识别结果]: 无法识别图片内容" # 确保状态恢复 can_send_messages = original_can_send_messages except Exception as e: content = "[图片识别失败]" logger.error(f"图片识别失败: {e}") # 确保状态恢复 can_send_messages = True else: content = "[图片]" merged_text_lines.append(f"[{timestamp}] {sender}: {content}") else: merged_text_lines.append(str(item)) merged_text = "\n".join(merged_text_lines) original_content = f"[合并转发消息]:\n{merged_text}" else: original_content = f"[合并转发消息]: {mergecontent}" # 在处理完所有消息类型后检查内容是否为空 if not original_content: logger.info("消息内容为空,已忽略。") return should_process_this_message = False content_for_handler = original_content is_group_chat = is_user_group_chat(who) if not is_group_chat: if who in user_names: should_process_this_message = True logger.info(f"收到来自监听列表用户 {who} 的个人私聊消息,准备处理。") else: logger.info(f"收到来自用户 {sender} (聊天窗口 {who}) 的个人私聊消息,但用户 {who} 不在监听列表或发送者与聊天窗口不符,已忽略。") else: processed_group_content = original_content at_triggered = False keyword_triggered = False if not ACCEPT_ALL_GROUP_CHAT_MESSAGES and ENABLE_GROUP_AT_REPLY and ROBOT_WX_NAME: temp_content_after_at_check = processed_group_content unicode_at_pattern = f'@{re.escape(ROBOT_WX_NAME)}\u2005' space_at_pattern = f'@{re.escape(ROBOT_WX_NAME)} ' exact_at_string = f'@{re.escape(ROBOT_WX_NAME)}' if re.search(unicode_at_pattern, processed_group_content): at_triggered = True temp_content_after_at_check = re.sub(unicode_at_pattern, '', processed_group_content, 1).strip() elif re.search(space_at_pattern, processed_group_content): at_triggered = True temp_content_after_at_check = re.sub(space_at_pattern, '', processed_group_content, 1).strip() elif processed_group_content.strip() == exact_at_string: at_triggered = True temp_content_after_at_check = '' if at_triggered: logger.info(f"群聊 '{who}' 中检测到 @机器人。") processed_group_content = temp_content_after_at_check if ENABLE_GROUP_KEYWORD_REPLY: if any(keyword in processed_group_content for keyword in GROUP_KEYWORD_LIST): keyword_triggered = True logger.info(f"群聊 '{who}' 中检测到关键词。") basic_trigger_met = ACCEPT_ALL_GROUP_CHAT_MESSAGES or at_triggered or keyword_triggered if basic_trigger_met: if not ACCEPT_ALL_GROUP_CHAT_MESSAGES: if at_triggered and keyword_triggered: logger.info(f"群聊 '{who}' 消息因 @机器人 和关键词触发基本处理条件。") elif at_triggered: logger.info(f"群聊 '{who}' 消息因 @机器人 触发基本处理条件。") elif keyword_triggered: logger.info(f"群聊 '{who}' 消息因关键词触发基本处理条件。") else: logger.info(f"群聊 '{who}' 消息符合全局接收条件,触发基本处理条件。") if keyword_triggered and GROUP_KEYWORD_REPLY_IGNORE_PROBABILITY: should_process_this_message = True logger.info(f"群聊 '{who}' 消息因触发关键词且配置为忽略回复概率,将进行处理。") elif random.randint(1, 100) <= GROUP_CHAT_RESPONSE_PROBABILITY: should_process_this_message = True logger.info(f"群聊 '{who}' 消息满足基本触发条件并通过总回复概率 {GROUP_CHAT_RESPONSE_PROBABILITY}%,将进行处理。") else: should_process_this_message = False logger.info(f"群聊 '{who}' 消息满足基本触发条件,但未通过总回复概率 {GROUP_CHAT_RESPONSE_PROBABILITY}%,将忽略。") else: should_process_this_message = False logger.info(f"群聊 '{who}' 消息 (发送者: {sender}) 未满足任何基本触发条件(全局、@、关键词),将忽略。") if should_process_this_message: if not msgtype == 'image': content_for_handler = f"[群聊消息-来自群'{who}'-发送者:{sender}]:{processed_group_content}" else: content_for_handler = processed_group_content if not content_for_handler and at_triggered and not keyword_triggered: logger.info(f"群聊 '{who}' 中单独 @机器人,处理后内容为空,仍将传递给后续处理器。") if should_process_this_message: msg.content = content_for_handler logger.info(f'最终准备处理消息 from chat "{who}" by sender "{sender}": {msg.content[:100]}') if msgtype == 'emotion': is_animation_emoji_in_original = True else: is_animation_emoji_in_original = False if is_animation_emoji_in_original and ENABLE_EMOJI_RECOGNITION: handle_emoji_message(msg, who) else: handle_wxauto_message(msg, who)

def recognize_image_with_moonshot(image_path, is_emoji=False):
# 先暂停向API发送消息队列
global can_send_messages
can_send_messages = False

text
"""使用AI识别图片内容并返回文本""" try: processed_image_path = image_path # 读取图片内容并编码 with open(processed_image_path, 'rb') as img_file: image_content = base64.b64encode(img_file.read()).decode('utf-8') headers = { 'Authorization': f'Bearer {MOONSHOT_API_KEY}', 'Content-Type': 'application/json' } text_prompt = "请用中文描述这张图片的主要内容或主题。不要使用'这是'、'这张'等开头,直接描述。如果有文字,请包含在描述中。" if not is_emoji else "请用中文简洁地描述这个聊天窗口最后一张表情包所表达的情绪、含义或内容。如果表情包含文字,请一并描述。注意:1. 只描述表情包本身,不要添加其他内容 2. 不要出现'这是'、'这个'等词语" data = { "model": MOONSHOT_MODEL, "messages": [ { "role": "user", "content": [ {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image_content}"}}, {"type": "text", "text": text_prompt} ] } ], "temperature": MOONSHOT_TEMPERATURE } response = requests.post(f"{MOONSHOT_BASE_URL}/chat/completions", headers=headers, json=data) response.raise_for_status() result = response.json() recognized_text = result['choices'][0]['message']['content'] if is_emoji: # 如果recognized_text包含"最后一张表情包是",只保留后面的文本 if "最后一张表情包" in recognized_text: recognized_text = recognized_text.split("最后一张表情包", 1)[1].strip() recognized_text = "发送了表情包:" + recognized_text else: recognized_text = "发送了图片:" + recognized_text logger.info(f"AI图片识别结果: {recognized_text}") # 清理临时文件 if is_emoji and os.path.exists(processed_image_path): try: os.remove(processed_image_path) logger.debug(f"已清理临时表情: {processed_image_path}") except Exception as clean_err: logger.warning(f"清理临时表情图片失败: {clean_err}") # 恢复向Deepseek发送消息队列 can_send_messages = True return recognized_text except Exception as e: logger.error(f"调用AI识别图片失败: {str(e)}", exc_info=True) # 恢复向Deepseek发送消息队列 can_send_messages = True return ""

def handle_emoji_message(msg, who):
global emoji_timer
global can_send_messages
can_send_messages = False

text
def timer_callback(): with emoji_timer_lock: handle_wxauto_message(msg, who) emoji_timer = None with emoji_timer_lock: if emoji_timer is not None: emoji_timer.cancel() emoji_timer = threading.Timer(3.0, timer_callback) emoji_timer.start()

def fetch_and_extract_text(url: str) -> Optional[str]:
"""
获取给定 URL 的网页内容并提取主要文本。

text
Args: url (str): 要抓取的网页链接。 Returns: Optional[str]: 提取并清理后的网页文本内容(限制了最大长度),如果失败则返回 None。 """ try: # 基本 URL 格式验证 (非常基础) parsed_url = urlparse(url) if not all([parsed_url.scheme, parsed_url.netloc]): logger.warning(f"无效的URL格式,跳过抓取: {url}") return None headers = {'User-Agent': REQUESTS_USER_AGENT} logger.info(f"开始抓取链接内容: {url}") response = requests.get(url, headers=headers, timeout=REQUESTS_TIMEOUT, allow_redirects=True) response.raise_for_status() # 检查HTTP请求是否成功 (状态码 2xx) # 检查内容类型,避免处理非HTML内容(如图片、PDF等) content_type = response.headers.get('Content-Type', '').lower() if 'html' not in content_type: logger.warning(f"链接内容类型非HTML ({content_type}),跳过文本提取: {url}") return None # 使用BeautifulSoup解析HTML # 指定 lxml 解析器以获得更好的性能和兼容性 soup = BeautifulSoup(response.content, 'lxml') # 使用 response.content 获取字节流,让BS自动处理编码 # --- 文本提取策略 --- # 尝试查找主要内容区域 (这部分可能需要根据常见网站结构调整优化) main_content_tags = ['article', 'main', '.main-content', '#content', '.post-content'] # 示例选择器 main_text = "" for tag_selector in main_content_tags: element = soup.select_one(tag_selector) if element: main_text = element.get_text(separator='\n', strip=True) break # 找到一个就停止 # 如果没有找到特定的主要内容区域,则获取整个 body 的文本作为备选 if not main_text and soup.body: main_text = soup.body.get_text(separator='\n', strip=True) elif not main_text: # 如果连 body 都没有,则使用整个 soup main_text = soup.get_text(separator='\n', strip=True) # 清理文本:移除过多空行 lines = [line for line in main_text.splitlines() if line.strip()] cleaned_text = '\n'.join(lines) # 限制内容长度 if len(cleaned_text) > MAX_WEB_CONTENT_LENGTH: cleaned_text = cleaned_text[:MAX_WEB_CONTENT_LENGTH] + "..." # 截断并添加省略号 logger.info(f"网页内容已提取,并截断至 {MAX_WEB_CONTENT_LENGTH} 字符。") elif cleaned_text: logger.info(f"成功提取网页文本内容 (长度 {len(cleaned_text)}).") else: logger.warning(f"未能从链接 {url} 提取到有效文本内容。") return None # 如果提取后为空,也视为失败 return cleaned_text except requests.exceptions.Timeout: logger.error(f"抓取链接超时 ({REQUESTS_TIMEOUT}秒): {url}") return None except requests.exceptions.RequestException as e: logger.error(f"抓取链接时发生网络错误: {url}, 错误: {e}") return None except Exception as e: # 捕获其他可能的错误,例如 BS 解析错误 logger.error(f"处理链接时发生未知错误: {url}, 错误: {e}", exc_info=True) return None

辅助函数:将用户消息记录到记忆日志 (如果启用)

def log_user_message_to_memory(username, original_content):
"""将用户的原始消息记录到记忆日志文件。"""
if ENABLE_MEMORY:
try:
prompt_name = prompt_mapping.get(username, username)
log_file = os.path.join(root_dir, MEMORY_TEMP_DIR, f'{username}_{prompt_name}_log.txt')
log_entry = f"{datetime.now().strftime('%Y-%m-%d %A %H:%M:%S')} | [{username}] {original_content}\n"
os.makedirs(os.path.dirname(log_file), exist_ok=True)
with open(log_file, 'a', encoding='utf-8') as f:
f.write(log_entry)
except Exception as write_err:
logger.error(f"写入用户 {username} 的记忆日志失败: {write_err}")

✨ NEW: 视频聊天后台处理函数

def start_video_chat_in_background(user_id, sender_name, username, merged_message):
"""
在一个新的后台线程中异步启动视频聊天。
此版本会合并主程序的System Prompt和插件的Additive Prompt。
"""
try:
from video_chat_plugin.wechat_integration import handle_video_chat_request_async

text
# 1. 准备插件调用所需的全部参数 character_name = prompt_mapping.get(user_id, "default") # 2. 获取主程序的 System Prompt try: base_prompt = get_user_prompt(user_id) except FileNotFoundError: logger.error(f"无法找到角色 {character_name} 的Prompt文件,使用默认Prompt。") base_prompt = "你是一个AI助手。" # 3. 从插件配置中读取 Additive Prompt additive_prompt = "" try: # ✨ MODIFICATION: Read additive_prompt from plugin config plugin_config_path = os.path.join(root_dir, 'video_chat_plugin', 'video_chat_config.json') if os.path.exists(plugin_config_path): with open(plugin_config_path, 'r', encoding='utf-8') as f: plugin_config = json.load(f) additive_prompt = plugin_config.get("characters", {}).get(character_name, {}).get("additive_prompt", "") except Exception as e: logger.warning(f"读取插件配置中的 additive_prompt 失败: {e}") # 4. 合并 Prompt final_system_prompt = base_prompt if additive_prompt: final_system_prompt += f"\n\n--- 视频聊天附加指令 ---\n{additive_prompt}" # 5. 获取聊天历史 current_history = chat_contexts.get(user_id, []) # 6. 异步调用插件接口,传递最终合并后的Prompt success, response_message = asyncio.run(handle_video_chat_request_async( user_id=user_id, character_name=character_name, system_prompt=final_system_prompt, # 传递合并后的Prompt chat_history=current_history )) # 7. 后台线程直接发送最终的URL回复 if success: send_reply(user_id, sender_name, username, merged_message, response_message) logger.info(f"✅ (后台线程) 视频聊天会话已为用户 {user_id} 创建并发送链接。") else: send_reply(user_id, sender_name, username, merged_message, response_message) logger.error(f"❌ (后台线程) 为用户 {user_id} 创建视频聊天会话失败: {response_message}") except Exception as e: logger.error(f"❌ (后台线程) 启动视频聊天插件时发生严重错误: {e}", exc_info=True) try: send_reply(user_id, sender_name, username, merged_message, "抱歉,视频聊天功能暂时不可用,请稍后重试。") except Exception as send_err: logger.error(f"❌ (后台线程) 发送视频聊天错误消息也失败: {send_err}")

def handle_wxauto_message(msg, who):
"""
处理来自Wxauto的消息,包括可能的提醒、图片/表情、链接内容获取和常规聊天。
"""
global can_send_messages # 引用全局变量以控制发送状态
global last_received_message_timestamp # 引用全局变量以更新活动时间
try:
last_received_message_timestamp = time.time()
username = who
# 获取原始消息内容
original_content = getattr(msg, 'content', None) or getattr(msg, 'text', None)

text
# 如果消息内容为空,则直接返回 if not original_content: logger.warning("收到的消息没有内容。") return # 重置该用户的自动消息计时器 on_user_message(username) # --- 1. 提醒检查 (基于原始消息内容) --- reminder_keywords = ["每日","每天","提醒","提醒我", "定时", "分钟后", "小时后", "计时", "闹钟", "通知我", "叫我", "提醒一下", "倒计时", "稍后提醒", "稍后通知", "提醒时间", "设置提醒", "喊我"] if ENABLE_REMINDERS and any(keyword in original_content for keyword in reminder_keywords): logger.info(f"检测到可能的提醒请求,用户 {username}: {original_content}") # 尝试解析并设置提醒 reminder_set = try_parse_and_set_reminder(original_content, username) # 如果成功设置了提醒,则处理完毕,直接返回 if reminder_set: logger.info(f"成功为用户 {username} 设置提醒,消息处理结束。") return # 停止进一步处理此消息 # --- 2. 图片/表情处理 (基于原始消息内容) --- img_path = None # 图片路径 is_emoji = False # 是否为表情包 # processed_content 初始化为原始消息,后续步骤可能修改它 processed_content = original_content # 检查是否为图片文件路径 if msg.type in ('image'): if ENABLE_IMAGE_RECOGNITION: img_path = msg.download() is_emoji = False processed_content = None # 标记为None,稍后会被识别结果替换 logger.info(f"检测到图片消息,准备识别: {img_path}") else: logger.info("检测到图片消息,但图片识别功能已禁用。") # 检查是否为动画表情 elif msg.type in ('emotion'): if ENABLE_EMOJI_RECOGNITION: img_path = msg.capture() # 截图 is_emoji = True processed_content = None # 标记为None,稍后会被识别结果替换 logger.info("检测到动画表情,准备截图识别...") else: clean_up_temp_files() # 清理可能的临时文件 logger.info("检测到动画表情,但表情识别功能已禁用。") # 如果需要进行图片/表情识别 if img_path: logger.info(f"开始识别图片/表情 - 用户 {username}: {img_path}") # 调用识别函数 recognized_text = recognize_image_with_moonshot(img_path, is_emoji=is_emoji) # 使用识别结果或回退占位符更新 processed_content processed_content = recognized_text if recognized_text else ("[图片]" if not is_emoji else "[动画表情]") clean_up_temp_files() # 清理临时截图文件 can_send_messages = True # 确保识别后可以发送消息 logger.info(f"图片/表情识别完成,结果: {processed_content}") # --- 3. 链接内容获取 (仅当ENABLE_URL_FETCHING为True且当前非图片/表情处理流程时) --- fetched_web_content = None # 只有在启用了URL抓取,并且当前处理的不是图片/表情(即processed_content不为None)时才进行 if ENABLE_URL_FETCHING and processed_content is not None: # 使用正则表达式查找 URL url_pattern = r'https?://[^\s<>"]+|www\.[^\s<>"]+' urls_found = re.findall(url_pattern, original_content) # 仍在原始消息中查找URL if urls_found: # 优先处理第一个找到的有效链接 url_to_fetch = urls_found[0] logger.info(f"检测到链接,用户 {username},准备抓取: {url_to_fetch}") # 调用辅助函数抓取和提取文本 fetched_web_content = fetch_and_extract_text(url_to_fetch) if fetched_web_content: logger.info(f"成功获取链接内容摘要 (长度 {len(fetched_web_content)})。") # 构建包含链接摘要的新消息内容,用于发送给AI # 注意:这里替换了 processed_content,AI将收到包含原始消息和链接摘要的组合信息 processed_content = f"用户发送了消息:\"{original_content}\"\n其中包含的链接的主要内容摘要如下(可能不完整):\n---\n{fetched_web_content}\n---\n" else: logger.warning(f"未能从链接 {url_to_fetch} 提取有效文本内容。将按原始消息处理。") # 如果抓取失败,processed_content 保持不变(可能是原始文本,或图片/表情占位符) # else: (如果没找到URL) 不需要操作,继续使用当前的 processed_content # --- 4. 记录用户消息到记忆 (如果启用) --- log_user_message_to_memory(username, processed_content) # --- 5. 视频聊天关键词即时检查 --- # ✨ REFACTORED LOGIC: This section is moved to process_user_messages # for better control and user experience. It's now handled after # message queueing. # --- 6. 将最终处理后的消息加入队列 --- # 只有在 processed_content 有效时才加入队列 if processed_content: # 获取当前时间戳,添加到消息内容前 current_time_str = datetime.now().strftime("%Y-%m-%d %A %H:%M:%S") content_with_time = f"[{current_time_str}] {processed_content}" # 使用最终处理过的内容 logger.info(f"准备将处理后的消息加入队列 - 用户 {username}: {content_with_time[:150]}...") # 日志截断防止过长 sender_name = username # 发送者名字(对于好友聊天,who就是username) # 使用锁保护对共享队列的访问 with queue_lock: # 如果用户队列不存在,则初始化 if username not in user_queues: user_queues[username] = { 'messages': [content_with_time], 'sender_name': sender_name, 'username': username, 'last_message_time': time.time() } logger.info(f"已为用户 {sender_name} 初始化消息队列并加入消息。") else: # 用户队列已存在,追加消息并管理队列长度 user_queues[username]['messages'].append(content_with_time) # 更新最后消息时间戳 user_queues[username]['last_message_time'] = time.time() logger.info(f"用户 {sender_name} 的消息已加入队列(当前 {len(user_queues[username]['messages'])} 条)并更新时间。") else: # 如果经过所有处理后 processed_content 变为 None 或空字符串,则记录警告 logger.warning(f"在处理后未找到用户 {username} 的可处理内容。原始消息: '{original_content}'") except Exception as e: can_send_messages = True # 确保发生错误时可以恢复发送消息 logger.error(f"消息处理失败 (handle_wxauto_message): {str(e)}", exc_info=True)

def check_inactive_users():
global can_send_messages
while True:
current_time = time.time()
inactive_users = []
with queue_lock:
for username, user_data in user_queues.items():
last_time = user_data.get('last_message_time', 0)
if current_time - last_time > QUEUE_WAITING_TIME and can_send_messages and not is_sending_message:
inactive_users.append(username)

text
for username in inactive_users: process_user_messages(username) time.sleep(1) # 每秒检查一次

def process_user_messages(user_id):
"""处理指定用户的消息队列,包括可能的联网搜索。"""
global can_send_messages # 引用全局变量

text
with queue_lock: if user_id not in user_queues: return # 从队列获取数据并移除该用户条目 user_data = user_queues.pop(user_id) messages = user_data['messages'] sender_name = user_data['sender_name'] username = user_data['username'] # username 可能是群聊名或好友昵称 # 合并消息 merged_message = ' '.join(messages) logger.info(f"开始处理用户 '{sender_name}' (ID: {user_id}) 的合并消息: {merged_message[:100]}...") # ===== ✨ MODIFICATION START: 视频聊天和角色管理逻辑重构 ===== if ENABLE_VIDEO_CHAT: video_chat_keywords = get_video_chat_keywords() if any(keyword in merged_message for keyword in video_chat_keywords): logger.info(f"🎥 检测到视频聊天关键词,为用户 {user_id} 准备会话...") try: # 1. 立即回复,提供即时反馈 send_reply(user_id, sender_name, username, merged_message, "好的,我正在为您准备语音通话,请稍等片刻,链接马上就来...") # 2. 在后台线程中启动耗时的插件调用 thread = threading.Thread( target=start_video_chat_in_background, args=(user_id, sender_name, username, merged_message), daemon=True ) thread.start() # 3. 立即返回,不进行后续的常规AI对话 return except Exception as e: logger.error(f"❌ 启动视频聊天后台线程失败: {e}") send_reply(user_id, sender_name, username, merged_message, "抱歉,启动视频聊天功能时遇到问题。") return # 同样返回,避免继续处理 character_switch_keywords = get_character_switch_keywords() if any(keyword in merged_message for keyword in character_switch_keywords): try: available_characters = list(set(prompt_mapping.values())) char_list_str = "\n".join([f"• {char}" for char in available_characters]) current_character_for_user = prompt_mapping.get(user_id, "默认") reply_message = ( f"当前为您服务的角色是:{current_character_for_user}\n\n" f"🎭 所有可用角色列表如下:\n{char_list_str}\n\n" "您可以在发起语音通话时通过关键词触发特定角色。" ) send_reply(user_id, sender_name, username, merged_message, reply_message) return # 处理完角色查询后,结束本次消息处理 except Exception as e: logger.error(f"❌ 查询角色列表失败: {e}") send_reply(user_id, sender_name, username, merged_message, "查询角色列表时出错,请稍后重试。") return # ===== ✨ MODIFICATION END ===== # 检查是否为主动消息 is_auto_message = "触发主动发消息:" in merged_message reply = None online_info = None try: # --- 新增:联网搜索逻辑 --- if ENABLE_ONLINE_API: # 1. 检测是否需要联网 search_content = needs_online_search(merged_message, user_id) if search_content: # 2. 如果需要,调用在线 API logger.info(f"尝试为用户 {user_id} 执行在线搜索...") merged_message = f"用户原始信息:\n{merged_message}\n\n需要进行联网搜索的信息:\n{search_content}" online_info = get_online_model_response(merged_message, user_id) if online_info: # 3. 如果成功获取在线信息,构建新的提示给主 AI logger.info(f"成功获取在线信息,为用户 {user_id} 准备最终回复...") # 结合用户原始问题、在线信息,让主 AI 生成最终回复 # 注意:get_deepseek_response 会自动加载用户的 prompt 文件 (角色设定) final_prompt = f"""

用户的原始问题是:
"{merged_message}"

根据以下联网搜索到的参考信息:

{online_info}

请结合你的角色设定,以自然的方式回答用户的原始问题。请直接给出回答内容,不要提及你是联网搜索的。
"""
# 调用主 AI 生成最终回复,存储上下文
reply = get_deepseek_response(final_prompt, user_id, store_context=True)
# 这里可以考虑如果在线信息是错误消息(如"在线搜索有点忙..."),是否要特殊处理
# 当前逻辑是:即使在线搜索返回错误信息,也会让主AI尝试基于这个错误信息来回复

text
else: # 在线搜索失败或未返回有效信息 logger.warning(f"在线搜索未能获取有效信息,用户: {user_id}。将按常规流程处理。") # 这里可以选择发送一个错误提示,或者直接回退到无联网信息的回复 # 当前选择回退:下面会执行常规的 get_deepseek_response pass # 继续执行下面的常规流程 # --- 常规回复逻辑 (如果未启用联网、检测不需要联网、或联网失败) --- if reply is None: # 只有在尚未通过联网逻辑生成回复时才执行 logger.info(f"为用户 {user_id} 执行常规回复(无联网信息)。") reply = get_deepseek_response(merged_message, user_id, store_context=True) # --- 发送最终回复 --- if reply: # 如果回复中包含思考标签(如 Deepseek R1),移除它 if "</think>" in reply: reply = reply.split("</think>", 1)[1].strip() # 屏蔽记忆片段发送(如果包含) if "## 记忆片段" not in reply: send_reply(user_id, sender_name, username, merged_message, reply) else: logger.info(f"回复包含记忆片段标记,已屏蔽发送给用户 {user_id}。") else: logger.error(f"未能为用户 {user_id} 生成任何回复。") except Exception as e: if is_auto_message: # 如果是主动消息出错,只记录日志,不发送错误消息给用户 logger.error(f"主动消息处理失败 (用户: {user_id}): {str(e)}") logger.info(f"主动消息API调用失败,已静默处理,不发送错误提示给用户 {user_id}") else: # 如果是正常用户消息出错,记录日志并重新抛出异常(保持原有的错误处理逻辑) logger.error(f"用户消息处理失败 (用户: {user_id}): {str(e)}") raise

def send_reply(user_id, sender_name, username, original_merged_message, reply):
"""发送回复消息,可能分段发送,并管理发送标志。"""
global is_sending_message
if not reply:
logger.warning(f"尝试向 {user_id} 发送空回复。")
return

text
# --- 如果正在发送,等待 --- wait_start_time = time.time() MAX_WAIT_SENDING = 15.0 # 最大等待时间(秒) while is_sending_message: if time.time() - wait_start_time > MAX_WAIT_SENDING: logger.warning(f"等待 is_sending_message 标志超时,准备向 {user_id} 发送回复,继续执行。") break # 避免无限等待 logger.debug(f"等待向 {user_id} 发送回复,另一个发送正在进行中。") time.sleep(0.5) # 短暂等待 try: is_sending_message = True # <<< 在发送前设置标志 logger.info(f"准备向 {sender_name} (用户ID: {user_id}) 发送消息") # --- 表情包发送逻辑 --- emoji_path = None if ENABLE_EMOJI_SENDING: emotion = is_emoji_request(reply) if emotion: logger.info(f"触发表情请求(概率{EMOJI_SENDING_PROBABILITY}%) 用户 {user_id},情绪: {emotion}") emoji_path = send_emoji(emotion) # --- 文本消息处理 --- reply = remove_timestamps(reply) if REMOVE_PARENTHESES: reply = remove_parentheses_and_content(reply) parts = split_message_with_context(reply) if not parts: logger.warning(f"回复消息在分割/清理后为空,无法发送给 {user_id}。") is_sending_message = False return # --- 构建消息队列(文本+表情随机插入)--- message_actions = [('text', part) for part in parts] if emoji_path: # 随机选择插入位置(0到len(message_actions)之间,包含末尾) insert_pos = random.randint(0, len(message_actions)) message_actions.insert(insert_pos, ('emoji', emoji_path)) # --- 发送混合消息队列 --- for idx, (action_type, content) in enumerate(message_actions): if action_type == 'emoji': try: wx.SendFiles(filepath=content, who=user_id) logger.info(f"已向 {user_id} 发送表情包") time.sleep(random.uniform(0.5, 1.5)) # 表情包发送后随机延迟 except Exception as e: logger.error(f"发送表情包失败: {str(e)}") else: wx.SendMsg(msg=content, who=user_id) logger.info(f"分段回复 {idx+1}/{len(message_actions)} 给 {sender_name}: {content[:50]}...") if ENABLE_MEMORY: log_ai_reply_to_memory(username, content) # 处理分段延迟(仅当下一动作为文本时计算) if idx < len(message_actions) - 1: next_action = message_actions[idx + 1] if action_type == 'text' and next_action[0] == 'text': next_part_len = len(next_action[1]) base_delay = next_part_len * AVERAGE_TYPING_SPEED random_delay = random.uniform(RANDOM_TYPING_SPEED_MIN, RANDOM_TYPING_SPEED_MAX) total_delay = max(1.0, base_delay + random_delay) time.sleep(total_delay) else: # 表情包前后使用固定随机延迟 time.sleep(random.uniform(0.5, 1.5)) except Exception as e: logger.error(f"向 {user_id} 发送回复失败: {str(e)}", exc_info=True) finally: is_sending_message = False

def split_message_with_context(text):
"""
将消息文本分割为多个部分,处理换行符、转义字符和符号。处理文本中的换行符和转义字符,并根据配置决定是否分割。无论配置如何,都会以符号。 处理文本中的换行符和转义字符,并根据配置决定是否分割。 无论配置如何,都会以作为分隔符分割消息。

text
特别说明: - 每个$都会作为独立分隔符,所以"Hello$World$Python"会分成三部分 - 连续的$$会产生空部分,这些会被自动跳过 """ result_parts = [] # 首先用$符号分割文本(无论SEPARATE_ROW_SYMBOLS设置如何) # 这会处理多个$的情况,每个$都作为分隔符 dollar_parts = re.split(r'\$', text) # 对每个由$分割的部分应用原有的分隔逻辑 for dollar_part in dollar_parts: # 跳过空的部分(比如连续的$$之间没有内容的情况) if not dollar_part.strip(): continue # 应用原有的分隔逻辑 if SEPARATE_ROW_SYMBOLS: main_parts = re.split(r'(?:\\{3,}|\n)', dollar_part) else: main_parts = re.split(r'\\{3,}', dollar_part) for part in main_parts: part = part.strip() if not part: continue segments = [] last_end = 0 for match in re.finditer(r'\\', part): pos = match.start() should_split_at_current_pos = False advance_by = 1 if pos + 1 < len(part) and part[pos + 1] == 'n': should_split_at_current_pos = True advance_by = 2 else: prev_char = part[pos - 1] if pos > 0 else '' is_last_char_in_part = (pos == len(part) - 1) next_char = '' if not is_last_char_in_part: next_char = part[pos + 1] if not is_last_char_in_part and \ re.match(r'[a-zA-Z0-9]', next_char) and \ (re.match(r'[a-zA-Z0-9]', prev_char) if prev_char else True): should_split_at_current_pos = True else: is_in_emoticon = False i = pos - 1 while i >= 0 and i > pos - 10: if part[i] in '({[(【{': is_in_emoticon = True break if part[i].isalnum() and i < pos - 1: break i -= 1 if not is_last_char_in_part and not is_in_emoticon: _found_forward_emoticon_char = False j = pos + 1 while j < len(part) and j < pos + 10: if part[j] in ')}])】}': _found_forward_emoticon_char = True break if part[j].isalnum() and j > pos + 1: break j += 1 if _found_forward_emoticon_char: is_in_emoticon = True if not is_in_emoticon: should_split_at_current_pos = True if should_split_at_current_pos: segment_to_add = part[last_end:pos].strip() if segment_to_add: segments.append(segment_to_add) last_end = pos + advance_by if last_end < len(part): final_segment = part[last_end:].strip() if final_segment: segments.append(final_segment) if segments: result_parts.extend(segments) elif not segments and part: result_parts.append(part) return [p for p in result_parts if p]

def remove_timestamps(text):
"""
移除文本中所有[YYYY-MM-DD (Weekday) HH:MM(:SS)]格式的时间戳
支持四种格式:
1. [YYYY-MM-DD Weekday HH:MM:SS] - 带星期和秒
2. [YYYY-MM-DD Weekday HH:MM] - 带星期但没有秒
3. [YYYY-MM-DD HH:MM:SS] - 带秒但没有星期
4. [YYYY-MM-DD HH:MM] - 基本格式
并自动清理因去除时间戳产生的多余空格
"""
# 定义支持多种格式的时间戳正则模式
timestamp_pattern = r'''
# 起始方括号 \d{4} # 年份:4位数字 -(?:0[1-9]|1[0-2]) # 月份:01-12 (使用非捕获组) -(?:0[1-9]|[12]\d|3[01]) # 日期:01-31 (使用非捕获组) (?:\s[A-Za-z]+)? # 可选的星期部分 \s # 日期与时间之间的空格 (?:2[0-3]|[01]\d) # 小时:00-23 :[0-5]\d # 分钟:00-59 (?::[0-5]\d)? # 可选的秒数 # 匹配结束方括号 <--- 修正点
'''
# 替换时间戳为空格
text_no_timestamps = re.sub(
pattern = timestamp_pattern,
repl = ' ', # 统一替换为单个空格 (lambda m: ' ' 与 ' ' 等效)
string = text,
flags = re.X | re.M # re.X 等同于 re.VERBOSE
)
# 清理可能产生的连续空格,将其合并为单个空格
cleaned_text = re.sub(r'[^\S\r\n]+', ' ', text_no_timestamps)
# 最后统一清理首尾空格
return cleaned_text.strip()

def remove_parentheses_and_content(text: str) -> str:
"""
去除文本中中文括号、英文括号及其中的内容。
同时去除因移除括号而可能产生的多余空格(例如,连续空格变单个,每行首尾空格去除)。
不去除其它符号和换行符。
"""
processed_text = re.sub(r".?.*?|(.*?)", "", text, flags=re.DOTALL)
processed_text = re.sub(r" {2,}", " ", processed_text)
lines = processed_text.split('\n')
stripped_lines = [line.strip(" ") for line in lines]
processed_text = "\n".join(stripped_lines)
return processed_text

def is_emoji_request(text: str) -> Optional[str]:
"""使用AI判断消息情绪并返回对应的表情文件夹名称"""
try:
# 概率判断
if ENABLE_EMOJI_SENDING and random.randint(0, 100) > EMOJI_SENDING_PROBABILITY:
logger.info(f"未触发表情请求(概率{EMOJI_SENDING_PROBABILITY}%)")
return None

text
# 获取emojis目录下的所有情绪分类文件夹 emoji_categories = [d for d in os.listdir(EMOJI_DIR) if os.path.isdir(os.path.join(EMOJI_DIR, d))] if not emoji_categories: logger.warning("表情包目录下未找到有效情绪分类文件夹") return None # 构造AI提示词 prompt = f"""请判断以下消息表达的情绪,并仅回复一个词语的情绪分类:

{text}
可选的分类有:{', '.join(emoji_categories)}。请直接回复分类名称,不要包含其他内容,注意大小写。若对话未包含明显情绪,请回复None。"""

text
# 根据配置选择使用辅助模型或主模型 if ENABLE_ASSISTANT_MODEL: response = get_assistant_response(prompt, "emoji_detection").strip() logger.info(f"辅助模型情绪识别结果: {response}") else: response = get_deepseek_response(prompt, "system", store_context=False).strip() logger.info(f"主模型情绪识别结果: {response}") # 清洗响应内容 response = re.sub(r"[^\w\u4e00-\u9fff]", "", response) # 移除非文字字符 # 验证是否为有效分类 if response in emoji_categories: return response # 尝试模糊匹配 for category in emoji_categories: if category in response or response in category: return category logger.warning(f"未匹配到有效情绪分类,AI返回: {response}") return None except Exception as e: logger.error(f"情绪判断失败: {str(e)}") return None

def send_emoji(emotion: str) -> Optional[str]:
"""根据情绪类型发送对应表情包"""
if not emotion:
return None

text
emoji_folder = os.path.join(EMOJI_DIR, emotion) try: # 获取文件夹中的所有表情文件 emoji_files = [ f for f in os.listdir(emoji_folder) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')) ] if not emoji_files: logger.warning(f"表情文件夹 {emotion} 为空") return None # 随机选择并返回表情路径 selected_emoji = random.choice(emoji_files) return os.path.join(emoji_folder, selected_emoji) except FileNotFoundError: logger.error(f"表情文件夹不存在: {emoji_folder}") except Exception as e: logger.error(f"表情发送失败: {str(e)}") return None

def clean_up_temp_files ():
if os.path.isdir("wxautox文件下载"):
try:
shutil.rmtree("wxautox文件下载")
except Exception as e:
logger.error(f"删除目录 wxautox文件下载 失败: {str(e)}")
return
logger.info(f"目录 wxautox文件下载 已成功删除")
else:
logger.info(f"目录 wxautox文件下载 不存在,无需删除")

def is_quiet_time():
current_time = datetime.now().time()
if quiet_time_start <= quiet_time_end:
return quiet_time_start <= current_time <= quiet_time_end
else:
return current_time >= quiet_time_start or current_time <= quiet_time_end

记忆管理功能

def append_to_memory_section(user_id, content):
"""将内容追加到用户prompt文件的记忆部分"""
try:
prompts_dir = os.path.join(root_dir, 'prompts')
user_file = os.path.join(prompts_dir, f'{user_id}.md')

text
# 确保用户文件存在 if not os.path.exists(user_file): raise FileNotFoundError(f"用户文件 {user_id}.md 不存在") # 读取并处理文件内容 with open(user_file, 'r+', encoding='utf-8') as file: lines = file.readlines() # 查找记忆插入点 memory_marker = "开始更新:" insert_index = next((i for i, line in enumerate(lines) if memory_marker in line), -1) # 如果没有找到标记,追加到文件末尾 if (insert_index == -1): insert_index = len(lines) lines.append(f"\n{memory_marker}\n") logger.info(f"在用户文件 {user_id}.md 中添加记忆标记") # 插入记忆内容 current_date = datetime.now().strftime("%Y-%m-%d") new_content = f"\n### {current_date}\n{content}\n" # 写入更新内容 lines.insert(insert_index + 1, new_content) file.seek(0) file.writelines(lines) file.truncate() except PermissionError as pe: logger.error(f"文件权限拒绝: {pe} (尝试访问 {user_file})") except IOError as ioe: logger.error(f"文件读写错误: {ioe} (路径: {os.path.abspath(user_file)})") except Exception as e: logger.error(f"记忆存储失败: {str(e)}", exc_info=True) raise # 重新抛出异常供上层处理 except FileNotFoundError as e: logger.error(f"文件未找到: {str(e)}") raise

def summarize_and_save(user_id, force_summarize=False):
"""
总结聊天记录并存储记忆

text
Args: user_id (str): 用户ID force_summarize (bool): 是否强制总结,无视日志长度限制 """ log_file = None temp_file = None backup_file = None try: # --- 前置检查 --- prompt_name = prompt_mapping.get(user_id, user_id) log_file = os.path.join(root_dir, MEMORY_TEMP_DIR, f'{user_id}_{prompt_name}_log.txt') if not os.path.exists(log_file): logger.warning(f"日志文件不存在: {log_file}") return if os.path.getsize(log_file) == 0: logger.info(f"空日志文件: {log_file}") return # --- 读取日志 --- with open(log_file, 'r', encoding='utf-8') as f: logs = [line.strip() for line in f if line.strip()] # 如果不是强制总结,则检查日志长度 if not force_summarize and len(logs) < MAX_MESSAGE_LOG_ENTRIES: logger.info(f"日志条目不足({len(logs)}条),未触发记忆总结。") return # --- 生成总结 --- full_logs = '\n'.join(logs) summary_prompt = f"请以{prompt_name}的视角,用中文总结与{user_id}的对话,提取重要信息总结为一段话作为记忆片段(直接回复一段话):\n{full_logs}" if USE_ASSISTANT_FOR_MEMORY_SUMMARY and ENABLE_ASSISTANT_MODEL: logger.info(f"使用辅助模型为用户 {user_id} 生成记忆总结") summary = get_assistant_response(summary_prompt, "memory_summary", is_summary=True) else: logger.info(f"使用主模型为用户 {user_id} 生成记忆总结") summary = get_deepseek_response(summary_prompt, "system", store_context=False, is_summary=True) summary = re.sub( r'\*{0,2}(重要度|摘要)\*{0,2}[\s:]*\d*[\.]?\d*[\s\\]*|## 记忆片段 $$\d{4}-\d{2}-\d{2}( [A-Za-z]+)? \d{2}:\d{2}(:\d{2})?$$', '', summary, flags=re.MULTILINE ).strip() # --- 评估重要性 --- importance_prompt = f"为以下记忆的重要性评分(1-5,直接回复数字):\n{summary}" if USE_ASSISTANT_FOR_MEMORY_SUMMARY and ENABLE_ASSISTANT_MODEL: importance_response = get_assistant_response(importance_prompt, "memory_importance", is_summary=True) else: importance_response = get_deepseek_response(importance_prompt, "system", store_context=False, is_summary=True) importance_match = re.search(r'[1-5]', importance_response) importance = min(max(int(importance_match.group()), 1), 5) if importance_match else 3 if not importance_match: logger.warning(f"无法解析重要性评分,使用默认值3。原始响应:{importance_response}") # --- 存储记忆 --- current_time = datetime.now().strftime("%Y-%m-%d %A %H:%M") memory_entry = f"""## 记忆片段 [{current_time}]

重要度: {importance}
摘要: {summary}

"""
prompt_name = prompt_mapping.get(user_id, user_id)
prompts_dir = os.path.join(root_dir, 'prompts')
os.makedirs(prompts_dir, exist_ok=True)
user_prompt_file = os.path.join(prompts_dir, f'{prompt_name}.md')
temp_file = f"{user_prompt_file}.tmp"
backup_file = f"{user_prompt_file}.bak"

text
try: with open(temp_file, 'w', encoding='utf-8') as f: if os.path.exists(user_prompt_file): with open(user_prompt_file, 'r', encoding='utf-8') as src: f.write(src.read().rstrip() + '\n\n') f.write(memory_entry) if os.path.exists(user_prompt_file): shutil.copyfile(user_prompt_file, backup_file) shutil.move(temp_file, user_prompt_file) except Exception as e: if os.path.exists(backup_file): shutil.move(backup_file, user_prompt_file) raise # --- 清理日志 --- with open(log_file, 'w', encoding='utf-8') as f: f.truncate() except Exception as e: logger.error(f"记忆保存失败: {str(e)}", exc_info=True) finally: for f in [temp_file, backup_file]: if f and os.path.exists(f): try: os.remove(f) except Exception as e: logger.error(f"清理临时文件失败: {str(e)}")

def memory_manager():
"""记忆管理定时任务"""
while True:
try:
# 检查所有监听用户
for user in user_names:
prompt_name = prompt_mapping.get(user, user) # 获取配置的prompt名
log_file = os.path.join(root_dir, MEMORY_TEMP_DIR, f'{user}_{prompt_name}_log.txt')

text
try: prompt_name = prompt_mapping.get(user, user) # 获取配置的文件名,没有则用昵称 user_prompt_file = os.path.join(root_dir, 'prompts', f'{prompt_name}.md') manage_memory_capacity(user_prompt_file) except Exception as e: logger.error(f"内存管理失败: {str(e)}") if os.path.exists(log_file): with open(log_file, 'r', encoding='utf-8') as f: line_count = sum(1 for _ in f) if line_count >= MAX_MESSAGE_LOG_ENTRIES: summarize_and_save(user) except Exception as e: logger.error(f"记忆管理异常: {str(e)}") finally: time.sleep(60) # 每分钟检查一次

def manage_memory_capacity(user_file):
"""记忆淘汰机制"""
# 允许重要度缺失(使用可选捕获组)
MEMORY_SEGMENT_PATTERN = r'## 记忆片段 (.?)(.*?)\n(?:*{2}重要度*{2}: (\d*)\n)?*{2}摘要*{2}:(.*?)(?=\n## 记忆片段 |\Z)'
try:
with open(user_file, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()

text
# 解析记忆片段 segments = re.findall(MEMORY_SEGMENT_PATTERN, content, re.DOTALL) if len(segments) <= MAX_MEMORY_NUMBER: return # 构建评分体系 now = datetime.now() memory_scores = [] for timestamp, importance, _ in segments: try: # 尝试多种时间格式,支持新旧格式 formats = [ "%Y-%m-%d %A %H:%M:%S", # 新格式,带星期和秒 "%Y-%m-%d %A %H:%M", # 新格式,带星期但没有秒 "%Y-%m-%d %H:%M:%S", # 带秒但没有星期 "%Y-%m-%d %H:%M" # 原始格式 ] parsed_time = None for fmt in formats: try: parsed_time = datetime.strptime(timestamp, fmt) break except ValueError: continue if parsed_time: time_diff = (now - parsed_time).total_seconds() else: # 如果所有格式都解析失败 logger.warning(f"无法解析时间戳: {timestamp}") time_diff = 0 except Exception as e: logger.warning(f"时间戳解析错误: {str(e)}") time_diff = 0 # 处理重要度缺失,默认值为3 importance_value = int(importance) if importance else 3 score = 0.6 * importance_value - 0.4 * (time_diff / 3600) memory_scores.append(score) # 获取保留索引 sorted_indices = sorted(range(len(memory_scores)), key=lambda k: (-memory_scores[k], segments[k][0])) keep_indices = set(sorted_indices[:MAX_MEMORY_NUMBER]) # 重建内容 memory_blocks = re.split(r'(?=## 记忆片段 \[)', content) new_content = [] # 解析时处理缺失值 for idx, block in enumerate(memory_blocks): if idx == 0: new_content.append(block) continue try: # 显式关联 memory_blocks 与 segments 的索引 segment_idx = idx - 1 if segment_idx < len(segments) and segment_idx in keep_indices: new_content.append(block) except Exception as e: logger.warning(f"跳过无效记忆块: {str(e)}") continue # 原子写入 with open(f"{user_file}.tmp", 'w', encoding='utf-8') as f: f.write(''.join(new_content).strip()) shutil.move(f"{user_file}.tmp", user_file) logger.info(f"成功清理记忆") except Exception as e: logger.error(f"记忆整理失败: {str(e)}")

def clear_memory_temp_files(user_id):
"""清除指定用户的Memory_Temp文件"""
try:
logger.warning(f"已开启自动清除Memory_Temp文件功能,尝试清除用户 {user_id} 的Memory_Temp文件")
prompt_name = prompt_mapping.get(user_id, user_id)
log_file = os.path.join(root_dir, MEMORY_TEMP_DIR, f'{user_id}_{prompt_name}_log.txt')
if os.path.exists(log_file):
os.remove(log_file)
logger.warning(f"已清除用户 {user_id} 的Memory_Temp文件: {log_file}")
except Exception as e:
logger.error(f"清除Memory_Temp文件失败: {str(e)}")

def clear_chat_context(user_id):
"""清除指定用户的聊天上下文"""
logger.info(f"已开启自动清除上下文功能,尝试清除用户 {user_id} 的聊天上下文")
try:
with queue_lock:
if user_id in chat_contexts:
del chat_contexts[user_id]
save_chat_contexts()
logger.warning(f"已清除用户 {user_id} 的聊天上下文")
except Exception as e:
logger.error(f"清除聊天上下文失败: {str(e)}")

def send_error_reply(user_id, error_description_for_ai, fallback_message, error_context_log=""):
"""
生成并发送符合人设的错误回复。
Args:
user_id (str): 目标用户ID。
error_description_for_ai (str): 给AI的提示,描述错误情况,要求其生成用户回复。
fallback_message (str): 如果AI生成失败,使用的备用消息。
error_context_log (str): 用于日志记录的错误上下文描述。
"""
logger.warning(f"准备为用户 {user_id} 发送错误提示: {error_context_log}")
try:
# 调用AI生成符合人设的错误消息
ai_error_reply = get_deepseek_response(error_description_for_ai, user_id=user_id, store_context=True)
logger.info(f"AI生成的错误回复: {ai_error_reply[:100]}...")
# 使用send_reply发送AI生成的回复
send_reply(user_id, user_id, user_id, f"[错误处理: {error_context_log}]", ai_error_reply)
except Exception as ai_err:
logger.error(f"调用AI生成错误回复失败 ({error_context_log}): {ai_err}. 使用备用消息。")
try:
# AI失败,使用备用消息通过send_reply发送
send_reply(user_id, user_id, user_id, f"[错误处理备用: {error_context_log}]", fallback_message)
except Exception as send_fallback_err:
# 如果连send_reply都失败了,记录严重错误
logger.critical(f"发送备用错误消息也失败 ({error_context_log}): {send_fallback_err}")

def try_parse_and_set_reminder(message_content, user_id):
"""
尝试解析消息内容,区分短期一次性、长期一次性、重复提醒。
使用 AI 进行分类和信息提取,然后设置短期定时器或保存到文件。
如果成功设置了任一类型的提醒,返回 True,否则返回 False。
"""
global next_timer_id # 引用全局变量,用于生成短期一次性提醒的ID
logger.debug(f"尝试为用户 {user_id} 解析提醒请求 (需要识别类型和时长): '{message_content}'")

text
try: # --- 1. 获取当前时间,准备给 AI 的上下文信息 --- now = dt.datetime.now() # AI 需要知道当前完整日期时间来计算目标时间 current_datetime_str_for_ai = now.strftime("%Y-%m-%d %A %H:%M:%S") logger.debug(f"当前时间: {current_datetime_str_for_ai} (用于AI分析)") # --- 2. 构建新的 AI 提示,要求 AI 分类并提取信息 --- # --- 更新: 增加短期/长期一次性提醒的区分 --- parsing_prompt = f"""

请分析用户的提醒或定时请求。
当前时间是: {current_datetime_str_for_ai}.
用户的请求是: "{message_content}"

请判断这个请求属于以下哪种类型,并计算相关时间:
A) 重复性每日提醒:例如 "每天早上8点叫我起床", "提醒我每天晚上10点睡觉"。
B) 一次性提醒 (延迟 > 10分钟 / 600秒):例如 "1小时后提醒我", "今天下午3点开会", "明天早上叫我"。
C) 一次性提醒 (延迟 <= 10分钟 / 600秒):例如 "5分钟后提醒我", "提醒我600秒后喝水"。
D) 非提醒请求:例如 "今天天气怎么样?", "取消提醒"。

根据判断结果,请严格按照以下格式输出:

  • 如果是 A (重复每日提醒): 返回 JSON 对象 {{"type": "recurring", "time_str": "HH:MM", "message": "提醒的具体内容"}}time_str 必须是 24 小时制的 HH:MM 格式。
  • 如果是 B (长期一次性提醒): 返回 JSON 对象 {{"type": "one-off-long", "target_datetime_str": "YYYY-MM-DD HH:MM", "message": "提醒的具体内容"}}target_datetime_str 必须是计算出的未来目标时间的 YYYY-MM-DD HH:MM 格式。
  • 如果是 C (短期一次性提醒): 返回 JSON 对象 {{"type": "one-off-short", "delay_seconds": number, "message": "提醒的具体内容"}}delay_seconds 必须是从现在开始计算的、小于等于 600 的正整数总秒数。
  • 如果是 D (非提醒): 请直接返回字面单词 null

请看以下例子 (假设当前时间是 2024-05-29 星期三 10:00:00):

  1. "每天早上8点叫我起床" -> {{"type": "recurring", "time_str": "08:00", "message": "叫我起床"}}
  2. "提醒我30分钟后喝水" -> {{"type": "one-off-long", "target_datetime_str": "2024-05-29 10:30", "message": "喝水"}} (超过10分钟)
  3. "下午2点提醒我开会" -> {{"type": "one-off-long", "target_datetime_str": "2024-05-29 14:00", "message": "开会"}}
  4. "明天早上7点叫我起床" -> {{"type": "one-off-long", "target_datetime_str": "2024-05-30 07:00", "message": "叫我起床"}}
  5. "提醒我5分钟后站起来活动" -> {{"type": "one-off-short", "delay_seconds": 300, "message": "站起来活动"}} (小于等于10分钟)
  6. "10分钟后叫我" -> {{"type": "one-off-short", "delay_seconds": 600, "message": "叫我"}} (等于10分钟)
  7. "今天怎么样?" -> null

请务必严格遵守输出格式,只返回指定的 JSON 对象或 null,不要添加任何解释性文字。
"""
# --- 3. 调用 AI 进行解析和分类 ---
# 根据配置选择使用辅助模型或主模型
if ENABLE_ASSISTANT_MODEL:
logger.info(f"向辅助模型发送提醒解析请求(区分时长),用户: {user_id},内容: '{message_content}'")
ai_raw_response = get_assistant_response(parsing_prompt, "reminder_parser_classifier_v2_" + user_id)
logger.debug(f"辅助模型提醒解析原始响应 (分类器 v2): {ai_raw_response}")
else:
logger.info(f"向主模型发送提醒解析请求(区分时长),用户: {user_id},内容: '{message_content}'")
ai_raw_response = get_deepseek_response(parsing_prompt, user_id="reminder_parser_classifier_v2_" + user_id, store_context=False)
logger.debug(f"主模型提醒解析原始响应 (分类器 v2): {ai_raw_response}")

text
# 使用新的清理函数处理AI的原始响应 cleaned_ai_output_str = extract_last_json_or_null(ai_raw_response) logger.debug(f"AI响应清理并提取后内容: '{cleaned_ai_output_str}'") response = cleaned_ai_output_str # --- 4. 解析 AI 的响应 --- # 修改判断条件,使用清理后的结果 if cleaned_ai_output_str is None or cleaned_ai_output_str == "null": # "null" 是AI明确表示非提醒的方式 logger.info(f"AI 未在用户 '{user_id}' 的消息中检测到有效的提醒请求 (清理后结果为 None 或 'null')。原始AI响应: '{ai_raw_response}'") return False try: response_cleaned = re.sub(r"```json\n?|\n?```", "", response).strip() reminder_data = json.loads(response_cleaned) logger.debug(f"解析后的JSON数据 (分类器 v2): {reminder_data}") reminder_type = reminder_data.get("type") reminder_msg = str(reminder_data.get("message", "")).strip() # --- 5. 验证共享数据(提醒内容不能为空)--- if not reminder_msg: logger.warning(f"从AI解析得到的提醒消息为空。用户: {user_id}, 数据: {reminder_data}") error_prompt = f"用户尝试设置提醒,但似乎没有说明要提醒的具体内容(用户的原始请求可能是 '{message_content}')。请用你的语气向用户解释需要提供提醒内容,并鼓励他们再说一次。" fallback = "嗯... 光设置时间还不行哦,得告诉我你要我提醒你做什么事呀?" send_error_reply(user_id, error_prompt, fallback, "提醒内容为空") return False # --- 6. 根据 AI 判断的类型分别处理 --- # --- 6a. 短期一次性提醒 (<= 10分钟) --- if reminder_type == "one-off-short": try: delay_seconds = int(reminder_data['delay_seconds']) if not (0 < delay_seconds <= 600): # 验证延迟在 (0, 600] 秒之间 logger.warning(f"AI 返回的 'one-off-short' 延迟时间无效: {delay_seconds} 秒 (应 > 0 且 <= 600)。用户: {user_id}, 数据: {reminder_data}") error_prompt = f"用户想设置一个短期提醒(原始请求 '{message_content}'),但我计算出的时间 ({delay_seconds}秒) 不在10分钟内或已过去。请用你的语气告诉用户这个时间有点问题,建议他们检查一下或换个说法。" fallback = "哎呀,这个短期提醒的时间好像有点不对劲(要么超过10分钟,要么已经过去了),能麻烦你再说一次吗?" send_error_reply(user_id, error_prompt, fallback, "短期延迟时间无效") return False except (KeyError, ValueError, TypeError) as val_e: logger.error(f"解析AI返回的 'one-off-short' 提醒数据失败。用户: {user_id}, 数据: {reminder_data}, 错误: {val_e}") error_prompt = f"用户想设置短期提醒(原始请求 '{message_content}'),但我没理解好时间({type(val_e).__name__})。请用你的语气抱歉地告诉用户没听懂,并请他们换种方式说,比如'5分钟后提醒我...'" fallback = "抱歉呀,我好像没太明白你的时间意思,设置短期提醒失败了。能麻烦你换种方式再说一遍吗?比如 '5分钟后提醒我...'" send_error_reply(user_id, error_prompt, fallback, f"One-off-short数据解析失败 ({type(val_e).__name__})") return False # 设置 threading.Timer 定时器 target_dt = now + dt.timedelta(seconds=delay_seconds) confirmation_time_str = target_dt.strftime('%Y-%m-%d %H:%M:%S') delay_str_approx = format_delay_approx(delay_seconds, target_dt) logger.info(f"准备为用户 {user_id} 设置【短期一次性】提醒 (<=10min),计划触发时间: {confirmation_time_str} (延迟 {delay_seconds:.2f} 秒),内容: '{reminder_msg}'") with timer_lock: timer_id = next_timer_id next_timer_id += 1 timer_key = (user_id, timer_id) timer = Timer(float(delay_seconds), trigger_reminder, args=[user_id, timer_id, reminder_msg]) active_timers[timer_key] = timer timer.start() logger.info(f"【短期一次性】提醒定时器 (ID: {timer_id}) 已为用户 {user_id} 成功启动。") log_original_message_to_memory(user_id, message_content) # 记录原始请求 confirmation_prompt = f"""用户刚才的请求是:"{message_content}"。

根据这个请求,你已经成功将一个【短期一次性】提醒(10分钟内)安排在 {confirmation_time_str} (也就是 {delay_str_approx}) 触发。
提醒的核心内容是:'{reminder_msg}'。
请你用自然、友好的语气回复用户,告诉他这个【短期】提醒已经设置好了,确认时间和提醒内容。"""
send_confirmation_reply(user_id, confirmation_prompt, f"[短期一次性提醒已设置: {reminder_msg}]", f"收到!【短期提醒】设置好啦,我会在 {delay_str_approx} ({target_dt.strftime('%H:%M')}) 提醒你:{reminder_msg}")
return True

text
# --- 6b. 长期一次性提醒 (> 10分钟) --- elif reminder_type == "one-off-long": try: target_datetime_str = reminder_data['target_datetime_str'] # 在本地再次验证时间格式是否为 YYYY-MM-DD HH:MM target_dt = datetime.strptime(target_datetime_str, '%Y-%m-%d %H:%M') # 验证时间是否在未来 if target_dt <= now: logger.warning(f"AI 返回的 'one-off-long' 目标时间无效: {target_datetime_str} (已过去或就是现在)。用户: {user_id}, 数据: {reminder_data}") error_prompt = f"用户想设置一个提醒(原始请求 '{message_content}'),但我计算出的目标时间 ({target_datetime_str}) 好像是过去或就是现在了。请用你的语气告诉用户这个时间点无法设置,建议他们指定一个未来的时间。" fallback = "哎呀,这个时间点 ({target_dt.strftime('%m月%d日 %H:%M')}) 好像已经过去了或就是现在啦,没办法设置过去的提醒哦。要不试试说一个未来的时间?" send_error_reply(user_id, error_prompt, fallback, "长期目标时间无效") return False except (KeyError, ValueError, TypeError) as val_e: logger.error(f"解析AI返回的 'one-off-long' 提醒数据失败。用户: {user_id}, 数据: {reminder_data}, 错误: {val_e}") error_prompt = f"用户想设置一个较远时间的提醒(原始请求 '{message_content}'),但我没理解好目标时间 ({type(val_e).__name__})。请用你的语气抱歉地告诉用户没听懂,并请他们用明确的日期和时间再说,比如'明天下午3点'或'2024-06-15 10:00'。" fallback = "抱歉呀,我好像没太明白你说的那个未来的时间点,设置提醒失败了。能麻烦你说得更清楚一点吗?比如 '明天下午3点' 或者 '6月15号上午10点' 这样。" send_error_reply(user_id, error_prompt, fallback, f"One-off-long数据解析失败 ({type(val_e).__name__})") return False logger.info(f"准备为用户 {user_id} 添加【长期一次性】提醒 (>10min),目标时间: {target_datetime_str},内容: '{reminder_msg}'") # 创建要存储的提醒信息字典 (包含类型) new_reminder = { "reminder_type": "one-off", # 在存储时统一用 'one-off' "user_id": user_id, "target_datetime_str": target_datetime_str, # 存储目标时间 "content": reminder_msg } # 添加到内存列表并保存到文件 with recurring_reminder_lock: recurring_reminders.append(new_reminder) save_recurring_reminders() # 保存更新后的列表 logger.info(f"【长期一次性】提醒已添加并保存到文件。用户: {user_id}, 时间: {target_datetime_str}, 内容: '{reminder_msg}'") log_original_message_to_memory(user_id, message_content) # 发送确认消息 confirmation_prompt = f"""用户刚才的请求是:"{message_content}"。

根据这个请求,你已经成功为他设置了一个【一次性】提醒。
这个提醒将在【指定时间】 {target_datetime_str} 触发。
提醒的核心内容是:'{reminder_msg}'。
请你用自然、友好的语气回复用户,告诉他这个【一次性】提醒已经设置好了,确认好具体的日期时间和提醒内容。"""
# 使用格式化后的时间发送给用户
friendly_time = target_dt.strftime('%Y年%m月%d日 %H:%M')
send_confirmation_reply(user_id, confirmation_prompt, f"[长期一次性提醒已设置: {reminder_msg}]", f"好嘞!【一次性提醒】设置好啦,我会在 {friendly_time} 提醒你:{reminder_msg}")
return True

text
# --- 6c. 重复性每日提醒 --- elif reminder_type == "recurring": try: time_str = reminder_data['time_str'] datetime.strptime(time_str, '%H:%M') # 验证 HH:MM 格式 except (KeyError, ValueError, TypeError) as val_e: logger.error(f"解析AI返回的 'recurring' 提醒数据失败。用户: {user_id}, 数据: {reminder_data}, 错误: {val_e}") error_prompt = f"用户想设置每日提醒(原始请求 '{message_content}'),但我没理解好时间 ({type(val_e).__name__})。请用你的语气抱歉地告诉用户没听懂,并请他们用明确的'每天几点几分'格式再说,比如'每天早上8点'或'每天22:30'。" fallback = "抱歉呀,我好像没太明白你说的每日提醒时间,设置失败了。能麻烦你说清楚是'每天几点几分'吗?比如 '每天早上8点' 或者 '每天22:30' 这样。" send_error_reply(user_id, error_prompt, fallback, f"Recurring数据解析失败 ({type(val_e).__name__})") return False logger.info(f"准备为用户 {user_id} 添加【每日重复】提醒,时间: {time_str},内容: '{reminder_msg}'") # 创建要存储的提醒信息字典 (包含类型) new_reminder = { "reminder_type": "recurring", # 明确类型 "user_id": user_id, "time_str": time_str, # 存储 HH:MM "content": reminder_msg } # 添加到内存列表并保存到文件 with recurring_reminder_lock: # 检查是否已存在完全相同的重复提醒 exists = any( r.get('reminder_type') == 'recurring' and r.get('user_id') == user_id and r.get('time_str') == time_str and r.get('content') == reminder_msg for r in recurring_reminders ) if not exists: recurring_reminders.append(new_reminder) save_recurring_reminders() logger.info(f"【每日重复】提醒已添加并保存。用户: {user_id}, 时间: {time_str}, 内容: '{reminder_msg}'") else: logger.info(f"相同的【每日重复】提醒已存在,未重复添加。用户: {user_id}, 时间: {time_str}") # 可以选择告知用户提醒已存在 # send_reply(user_id, user_id, user_id, "[重复提醒已存在]", f"嗯嗯,这个 '{reminder_msg}' 的每日 {time_str} 提醒我已经记下啦,不用重复设置哦。") # return True # 即使未添加,也认为设置意图已满足 log_original_message_to_memory(user_id, message_content) # 向用户发送确认消息 confirmation_prompt = f"""用户刚才的请求是:"{message_content}"。

根据这个请求,你已经成功为他设置了一个【每日重复】提醒。
这个提醒将在【每天】的 {time_str} 触发。
提醒的核心内容是:'{reminder_msg}'。
请你用自然、友好的语气回复用户,告诉他【每日】提醒已经设置好了,确认时间和提醒内容。强调这是每天都会提醒的。"""
send_confirmation_reply(user_id, confirmation_prompt, f"[每日提醒已设置: {reminder_msg}]", f"好嘞!【每日提醒】设置好啦,以后我【每天】 {time_str} 都会提醒你:{reminder_msg}")
return True

text
# --- 6d. 未知类型 --- else: logger.error(f"AI 返回了未知的提醒类型: '{reminder_type}'。用户: {user_id}, 数据: {reminder_data}") error_prompt = f"用户想设置提醒(原始请求 '{message_content}'),但我有点糊涂了,没搞清楚时间或者类型。请用你的语气抱歉地告诉用户,请他们说得更清楚一点,比如是几分钟后、明天几点、还是每天提醒。" fallback = "哎呀,我有点没搞懂你的提醒要求,是几分钟后提醒,还是指定某个时间点,或者是每天都提醒呀?麻烦说清楚点我才能帮你设置哦。" send_error_reply(user_id, error_prompt, fallback, f"未知提醒类型 '{reminder_type}'") return False except (json.JSONDecodeError, KeyError, ValueError, TypeError) as json_e: # 处理 JSON 解析本身或后续访问键值对的错误 response_cleaned_str = response_cleaned if 'response_cleaned' in locals() else 'N/A' logger.error(f"解析AI返回的提醒JSON失败 (分类器 v2)。用户: {user_id}, 原始响应: '{response}', 清理后: '{response_cleaned_str}', 错误: {json_e}") error_prompt = f"用户想设置提醒(原始请求可能是 '{message_content}'),但我好像没完全理解时间或者内容,解析的时候出错了 ({type(json_e).__name__})。请用你的语气抱歉地告诉用户没听懂,并请他们换种方式说,比如'30分钟后提醒我...'或'每天下午3点叫我...'。" fallback = "抱歉呀,我好像没太明白你的意思,设置提醒失败了。能麻烦你换种方式再说一遍吗?比如 '30分钟后提醒我...' 或者 '每天下午3点叫我...' 这种。" send_error_reply(user_id, error_prompt, fallback, f"JSON解析失败 ({type(json_e).__name__})") return False except Exception as e: # 捕获此函数中其他所有未预料的错误 logger.error(f"处理用户 {user_id} 的提醒请求 '{message_content}' 时发生未预料的错误 (分类器 v2): {str(e)}", exc_info=True) error_prompt = f"在处理用户设置提醒的请求(可能是 '{message_content}')时,发生了一个我没预料到的内部错误({type(e).__name__})。请用你的语气向用户表达歉意,说明暂时无法完成设置,并建议他们稍后再试。" fallback = "哎呀,好像内部出了点小问题,暂时没法帮你设置提醒了,非常抱歉!要不稍等一下再试试看?" send_error_reply(user_id, error_prompt, fallback, f"通用处理错误 ({type(e).__name__})") return False

def extract_last_json_or_null(ai_response_text: str) -> Optional[str]:
"""
从AI的原始响应文本中清理并提取最后一个有效的JSON对象字符串或字面量 "null"。

text
Args: ai_response_text: AI返回的原始文本。 Returns: 如果找到有效的JSON对象,则返回其字符串形式。 如果AI明确返回 "null" (清理后),则返回字符串 "null"。 如果没有找到有效的JSON或 "null",则返回 None。 """ if ai_response_text is None: return None # 步骤 1: 移除常见的Markdown代码块标记,并去除首尾空格 # 这个正则表达式会移除 ```json\n, ```json, \n```, ``` processed_text = re.sub(r"```json\n?|\n?```", "", ai_response_text).strip() # 步骤 2: 检查清理后的文本是否完全是 "null" (不区分大小写) # 这是AI指示非提醒请求的明确信号 if processed_text.lower() == 'null': return "null" # 返回字面量字符串 "null" # 步骤 3: 查找所有看起来像JSON对象的子字符串 # re.DOTALL 使得 '.' 可以匹配换行符 # 这个正则表达式会找到所有以 '{' 开头并以 '}' 结尾的非重叠子串 json_candidates = re.findall(r'\{.*?\}', processed_text, re.DOTALL) if not json_candidates: # 没有找到任何类似JSON的结构,并且它也不是 "null" return None # 步骤 4: 从后往前尝试解析每个候选JSON字符串 for candidate_str in reversed(json_candidates): try: # 尝试解析以验证它是否是有效的JSON json.loads(candidate_str) # 如果成功解析,说明这是最后一个有效的JSON对象字符串 return candidate_str except json.JSONDecodeError: # 解析失败,继续尝试前一个候选者 continue # 如果所有候选者都解析失败 return None

def format_delay_approx(delay_seconds, target_dt):
"""将延迟秒数格式化为用户友好的大致时间描述。"""
if delay_seconds < 60:
# 少于1分钟,显示秒
return f"大约 {int(delay_seconds)} 秒后"
elif delay_seconds < 3600:
# 少于1小时,显示分钟
return f"大约 {int(delay_seconds / 60)} 分钟后"
elif delay_seconds < 86400:
# 少于1天,显示小时和分钟
hours = int(delay_seconds / 3600)
minutes = int((delay_seconds % 3600) / 60)
# 如果分钟数为0,则只显示小时
return f"大约 {hours} 小时" + (f" {minutes} 分钟后" if minutes > 0 else "后")
else:
# 超过1天,显示天数和目标日期时间
days = int(delay_seconds / 86400)
# 使用中文日期时间格式
return f"大约 {days} 天后 ({target_dt.strftime('%Y年%m月%d日 %H:%M')}左右)"

def log_original_message_to_memory(user_id, message_content):
"""将设置提醒的原始用户消息记录到记忆日志文件(如果启用了记忆功能)。"""
if ENABLE_MEMORY: # 检查是否启用了记忆功能
try:
# 获取用户对应的 prompt 文件名(或用户昵称)
prompt_name = prompt_mapping.get(user_id, user_id)
# 构建日志文件路径
log_file = os.path.join(root_dir, MEMORY_TEMP_DIR, f'{user_id}_{prompt_name}_log.txt')
# 准备日志条目,记录原始用户消息
log_entry = f"{datetime.now().strftime('%Y-%m-%d %A %H:%M:%S')} | [{user_id}] {message_content}\n"
# 确保目录存在
os.makedirs(os.path.dirname(log_file), exist_ok=True)

text
# 以追加模式写入日志条目 with open(log_file, 'a', encoding='utf-8') as f: f.write(log_entry) except Exception as write_err: logger.error(f"写入用户 {user_id} 的提醒设置记忆日志失败: {write_err}")

def send_confirmation_reply(user_id, confirmation_prompt, log_context, fallback_message):
"""使用 AI 生成并发送提醒设置成功的确认消息,包含备用消息逻辑。"""
logger.debug(f"准备发送给 AI 用于生成确认消息的提示词(部分): {confirmation_prompt[:250]}...")
try:
# 调用 AI 生成确认回复,存储上下文
confirmation_msg = get_deepseek_response(confirmation_prompt, user_id=user_id, store_context=True)
logger.info(f"已为用户 {user_id} 生成提醒确认消息: {confirmation_msg[:100]}...")
# 使用 send_reply 发送 AI 生成的确认消息
send_reply(user_id, user_id, user_id, log_context, confirmation_msg)
logger.info(f"已通过 send_reply 向用户 {user_id} 发送提醒确认消息。")
except Exception as api_err:
# 如果 AI 调用失败
logger.error(f"调用API为用户 {user_id} 生成提醒确认消息失败: {api_err}. 将使用备用消息。")
try:
# 尝试使用 send_reply 发送预设的备用确认消息
send_reply(user_id, user_id, user_id, f"{log_context} [备用确认]", fallback_message)
except Exception as send_fallback_err:
# 如果连发送备用消息都失败了,记录严重错误
logger.critical(f"发送备用确认消息也失败 ({log_context}): {send_fallback_err}")

def trigger_reminder(user_id, timer_id, reminder_message):
"""当短期提醒到期时由 threading.Timer 调用的函数。"""
global is_sending_message

text
timer_key = (user_id, timer_id) logger.info(f"触发【短期】提醒 (ID: {timer_id}),用户 {user_id},内容: {reminder_message}") # 从活动计时器列表中移除 (短期提醒) with timer_lock: if timer_key in active_timers: del active_timers[timer_key] else: logger.warning(f"触发时未在 active_timers 中找到短期计时器键 {timer_key}。") if is_quiet_time() and not ALLOW_REMINDERS_IN_QUIET_TIME: logger.info(f"当前为安静时间:抑制【短期】提醒 (ID: {timer_id}),用户 {user_id}。") return try: # 创建提醒前缀,让AI知道这是一个提醒触发 reminder_prefix = f"提醒触发:{reminder_message}" # 将提醒消息添加到用户的消息队列,而不是直接调用API current_time_str = datetime.now().strftime("%Y-%m-%d %A %H:%M:%S") formatted_message = f"[{current_time_str}] {reminder_prefix}" with queue_lock: if user_id not in user_queues: user_queues[user_id] = { 'messages': [formatted_message], 'sender_name': user_id, 'username': user_id, 'last_message_time': time.time() } else: user_queues[user_id]['messages'].append(formatted_message) user_queues[user_id]['last_message_time'] = time.time() logger.info(f"已将提醒消息 '{reminder_message}' 添加到用户 {user_id} 的消息队列,用以执行联网检查流程") # 可选:如果仍需语音通话功能,保留这部分 if USE_VOICE_CALL_FOR_REMINDERS: try: wx.VoiceCall(user_id) logger.info(f"通过语音通话提醒用户 {user_id} (短期提醒)。") except Exception as voice_err: logger.error(f"语音通话提醒失败 (短期提醒),用户 {user_id}: {voice_err}") except Exception as e: logger.error(f"处理【短期】提醒失败 (ID: {timer_id}),用户 {user_id}: {str(e)}", exc_info=True) # 即使出错,也不再使用原来的直接发送备用消息方法 # 而是尽可能添加到队列 try: fallback_msg = f"[{datetime.now().strftime('%Y-%m-%d %A %H:%M:%S')}] 提醒时间到:{reminder_message}" with queue_lock: if user_id in user_queues: user_queues[user_id]['messages'].append(fallback_msg) user_queues[user_id]['last_message_time'] = time.time() else: user_queues[user_id] = { 'messages': [fallback_msg], 'sender_name': user_id, 'username': user_id, 'last_message_time': time.time() } logger.info(f"已将备用提醒消息添加到用户 {user_id} 的消息队列") except Exception as fallback_e: logger.error(f"添加提醒备用消息到队列失败,用户 {user_id}: {fallback_e}")

def log_ai_reply_to_memory(username, reply_part):
"""将 AI 的回复部分记录到用户的记忆日志文件中。"""
if not ENABLE_MEMORY: # 双重检查是否意外调用
return
try:
prompt_name = prompt_mapping.get(username, username) # 使用配置的提示名作为 AI 身份
log_file = os.path.join(root_dir, MEMORY_TEMP_DIR, f'{username}_{prompt_name}_log.txt')
log_entry = f"{datetime.now().strftime('%Y-%m-%d %A %H:%M:%S')} | [{prompt_name}] {reply_part}\n"

text
# 确保日志目录存在 os.makedirs(os.path.dirname(log_file), exist_ok=True) with open(log_file, 'a', encoding='utf-8') as f: f.write(log_entry) except Exception as log_err: logger.error(f"记录 AI 回复到记忆日志失败,用户 {username}: {log_err}")

def load_recurring_reminders():
"""从 JSON 文件加载重复和长期一次性提醒到内存中。"""
global recurring_reminders
reminders_loaded = []
try:
if os.path.exists(RECURRING_REMINDERS_FILE):
with open(RECURRING_REMINDERS_FILE, 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
if isinstance(loaded_data, list):
valid_reminders_count = 0
now = datetime.now() # 获取当前时间用于检查一次性提醒是否已过期
for item in loaded_data:
# 基本结构验证
if not (isinstance(item, dict) and
'reminder_type' in item and
'user_id' in item and
'content' in item):
logger.warning(f"跳过无效格式的提醒项: {item}")
continue

text
user_id = item.get('user_id') reminder_type = item.get('reminder_type') content = item.get('content') # 用户有效性检查 if user_id not in user_names: logger.warning(f"跳过未在监听列表中的用户提醒: {user_id}") continue # 类型特定验证 is_valid = False if reminder_type == 'recurring': time_str = item.get('time_str') if time_str: try: datetime.strptime(time_str, '%H:%M') is_valid = True except ValueError: logger.warning(f"跳过无效时间格式的重复提醒: {item}") else: logger.warning(f"跳过缺少 time_str 的重复提醒: {item}") elif reminder_type == 'one-off': target_datetime_str = item.get('target_datetime_str') if target_datetime_str: try: target_dt = datetime.strptime(target_datetime_str, '%Y-%m-%d %H:%M') # 只加载未过期的一次性提醒 if target_dt > now: is_valid = True else: logger.info(f"跳过已过期的一次性提醒: {item}") except ValueError: logger.warning(f"跳过无效日期时间格式的一次性提醒: {item}") else: logger.warning(f"跳过缺少 target_datetime_str 的一次性提醒: {item}") else: logger.warning(f"跳过未知 reminder_type 的提醒: {item}") if is_valid: reminders_loaded.append(item) valid_reminders_count += 1 # 使用锁安全地更新全局列表 with recurring_reminder_lock: recurring_reminders = reminders_loaded logger.info(f"成功从 {RECURRING_REMINDERS_FILE} 加载 {valid_reminders_count} 条有效提醒。") else: logger.error(f"{RECURRING_REMINDERS_FILE} 文件内容不是有效的列表格式。将初始化为空列表。") with recurring_reminder_lock: recurring_reminders = [] else: logger.info(f"{RECURRING_REMINDERS_FILE} 文件未找到。将以无提醒状态启动。") with recurring_reminder_lock: recurring_reminders = [] except json.JSONDecodeError: logger.error(f"解析 {RECURRING_REMINDERS_FILE} 文件 JSON 失败。将初始化为空列表。") with recurring_reminder_lock: recurring_reminders = [] except Exception as e: logger.error(f"加载提醒失败: {str(e)}", exc_info=True) with recurring_reminder_lock: recurring_reminders = [] # 确保出错时列表也被初始化

def save_recurring_reminders():
"""将内存中的当前提醒列表(重复和长期一次性)保存到 JSON 文件。"""
global recurring_reminders
with recurring_reminder_lock: # 获取锁保证线程安全
temp_file_path = RECURRING_REMINDERS_FILE + ".tmp"
# 创建要保存的列表副本,以防在写入时列表被其他线程修改
reminders_to_save = list(recurring_reminders)
try:
with open(temp_file_path, 'w', encoding='utf-8') as f:
json.dump(reminders_to_save, f, ensure_ascii=False, indent=4)
shutil.move(temp_file_path, RECURRING_REMINDERS_FILE)
logger.info(f"成功将 {len(reminders_to_save)} 条提醒保存到 {RECURRING_REMINDERS_FILE}")
except Exception as e:
logger.error(f"保存提醒失败: {str(e)}", exc_info=True)
if os.path.exists(temp_file_path):
try:
os.remove(temp_file_path)
except OSError:
pass

def recurring_reminder_checker():
"""后台线程函数,每分钟检查是否有到期的重复或长期一次性提醒。"""
last_checked_minute_str = None # 记录上次检查的 YYYY-MM-DD HH:MM
while True:
try:
now = datetime.now()
# 需要精确到分钟进行匹配
current_datetime_minute_str = now.strftime("%Y-%m-%d %H:%M")
current_time_minute_str = now.strftime("%H:%M") # 仅用于匹配每日重复

text
# 仅当分钟数变化时才执行检查 if current_datetime_minute_str != last_checked_minute_str: reminders_to_trigger_now = [] reminders_to_remove_indices = [] # 记录需要删除的一次性提醒的索引 # 在锁保护下读取当前的提醒列表副本 with recurring_reminder_lock: current_reminders_copy = list(recurring_reminders) # 创建副本 for index, reminder in enumerate(current_reminders_copy): reminder_type = reminder.get('reminder_type') user_id = reminder.get('user_id') content = reminder.get('content') should_trigger = False if reminder_type == 'recurring': # 检查每日重复提醒 (HH:MM) if reminder.get('time_str') == current_time_minute_str: should_trigger = True logger.info(f"匹配到每日重复提醒: 用户 {user_id}, 时间 {current_time_minute_str}, 内容: {content}") elif reminder_type == 'one-off': # 检查长期一次性提醒 (YYYY-MM-DD HH:MM) if reminder.get('target_datetime_str') == current_datetime_minute_str: should_trigger = True # 标记此一次性提醒以便稍后删除 reminders_to_remove_indices.append(index) logger.info(f"匹配到长期一次性提醒: 用户 {user_id}, 时间 {current_datetime_minute_str}, 内容: {content}") if should_trigger: reminders_to_trigger_now.append(reminder.copy()) # 添加副本到触发列表 # --- 触发提醒 --- if reminders_to_trigger_now: logger.info(f"当前时间 {current_datetime_minute_str},发现 {len(reminders_to_trigger_now)} 条到期的提醒。") if is_quiet_time() and not ALLOW_REMINDERS_IN_QUIET_TIME: logger.info(f"处于安静时间,将抑制 {len(reminders_to_trigger_now)} 条提醒。") else: for reminder in reminders_to_trigger_now: user_id = reminder['user_id'] content = reminder['content'] reminder_type = reminder['reminder_type'] # 获取类型用于日志和提示 logger.info(f"正在为用户 {user_id} 触发【{reminder_type}】提醒:{content}") # 修改:不再直接调用API,而是将提醒添加到消息队列 try: # 构造提醒消息前缀 if reminder_type == 'recurring': prefix = f"每日提醒:{content}" else: # one-off prefix = f"一次性提醒:{content}" # 将提醒添加到用户的消息队列 formatted_message = f"[{now.strftime('%Y-%m-%d %A %H:%M:%S')}] {prefix}" with queue_lock: if user_id not in user_queues: user_queues[user_id] = { 'messages': [formatted_message], 'sender_name': user_id, 'username': user_id, 'last_message_time': time.time() } else: user_queues[user_id]['messages'].append(formatted_message) user_queues[user_id]['last_message_time'] = time.time() logger.info(f"已将{reminder_type}提醒 '{content}' 添加到用户 {user_id} 的消息队列,用以执行联网检查流程") # 保留语音通话功能(如果启用) if USE_VOICE_CALL_FOR_REMINDERS: try: wx.VoiceCall(user_id) logger.info(f"通过语音通话提醒用户 {user_id} ({reminder_type}提醒)。") except Exception as voice_err: logger.error(f"语音通话提醒失败 ({reminder_type}提醒),用户 {user_id}: {voice_err}") except Exception as trigger_err: logger.error(f"将提醒添加到消息队列失败,用户 {user_id},提醒:{content}:{trigger_err}") # --- 删除已触发的一次性提醒 --- if reminders_to_remove_indices: logger.info(f"准备从列表中删除 {len(reminders_to_remove_indices)} 条已触发的一次性提醒。") something_removed = False with recurring_reminder_lock: # 从后往前删除,避免索引错乱 indices_to_delete_sorted = sorted(reminders_to_remove_indices, reverse=True) original_length = len(recurring_reminders) for index in indices_to_delete_sorted: # 再次检查索引是否有效(理论上应该总是有效) if 0 <= index < len(recurring_reminders): removed_item = recurring_reminders.pop(index) logger.debug(f"已从内存列表中删除索引 {index} 的一次性提醒: {removed_item.get('content')}") something_removed = True else: logger.warning(f"尝试删除索引 {index} 时发现其无效(当前列表长度 {len(recurring_reminders)})。") if something_removed: # 只有实际删除了内容才保存文件 logger.info(f"已从内存中删除 {original_length - len(recurring_reminders)} 条一次性提醒,正在保存更新后的列表...") save_recurring_reminders() # 保存更新后的列表 else: logger.info("没有实际删除任何一次性提醒(可能索引无效或列表已空)。") # 更新上次检查的分钟数 last_checked_minute_str = current_datetime_minute_str # 休眠,接近一分钟检查一次 time.sleep(58) except Exception as e: logger.error(f"提醒检查器循环出错: {str(e)}", exc_info=True) time.sleep(60) # 出错后等待时间稍长

--- 检测是否需要联网搜索的函数 ---

def needs_online_search(message: str, user_id: str) -> Optional[str]:
"""
使用主 AI 判断用户消息是否需要联网搜索,并返回需要搜索的内容。

text
参数: message (str): 用户的消息。 user_id (str): 用户标识符 (用于日志)。 返回: Optional[str]: 如果需要联网搜索,返回需要搜索的内容;否则返回 None。 """ if not ENABLE_ONLINE_API: # 如果全局禁用,直接返回 None return None # 构建用于检测的提示词 detection_prompt = f"""

请判断以下用户消息是否明确需要查询当前、实时或非常具体的外部信息(例如:{SEARCH_DETECTION_PROMPT})。
用户消息:"{message}"

如果需要联网搜索,请回答 "需要联网",并在下一行提供你认为需要搜索的内容。
如果不需要联网搜索(例如:常规聊天、询问一般知识、历史信息、角色扮演对话等),请只回答 "不需要联网"。
请不要添加任何其他解释。
"""
try:
# 根据配置选择使用辅助模型或主模型
if ENABLE_ASSISTANT_MODEL:
logger.info(f"向辅助模型发送联网检测请求,用户: {user_id},消息: '{message[:50]}...'")
response = get_assistant_response(detection_prompt, f"online_detection_{user_id}")
else:
logger.info(f"向主 AI 发送联网检测请求,用户: {user_id},消息: '{message[:50]}...'")
response = get_deepseek_response(detection_prompt, user_id=f"online_detection_{user_id}", store_context=False)

text
# 清理并判断响应 cleaned_response = response.strip() if "</think>" in cleaned_response: cleaned_response = cleaned_response.split("</think>", 1)[1].strip() if ENABLE_ASSISTANT_MODEL: logger.info(f"辅助模型联网检测响应: '{cleaned_response}'") else: logger.info(f"主模型联网检测响应: '{cleaned_response}'") if "不需要联网" in cleaned_response: logger.info(f"用户 {user_id} 的消息不需要联网。") return None elif "需要联网" in cleaned_response: # 提取需要搜索的内容 search_content = cleaned_response.split("\n", 1)[1].strip() if "\n" in cleaned_response else "" logger.info(f"检测到用户 {user_id} 的消息需要联网,搜索内容: '{search_content}'") return search_content else: logger.warning(f"无法解析联网检测响应,用户: {user_id},响应: '{cleaned_response}'") return None except Exception as e: logger.error(f"联网检测失败,用户: {user_id},错误: {e}", exc_info=True) return None # 出错时默认不需要联网

--- 调用在线模型的函数 ---

def get_online_model_response(query: str, user_id: str) -> Optional[str]:
"""
使用配置的在线 API 获取搜索结果。

text
参数: query (str): 要发送给在线模型的查询(通常是用户消息)。 user_id (str): 用户标识符 (用于日志)。 返回: Optional[str]: 在线 API 的回复内容,如果失败则返回 None。 """ if not online_client: # 检查在线客户端是否已成功初始化 logger.error(f"在线 API 客户端未初始化,无法为用户 {user_id} 执行在线搜索。") return None # 获取当前时间并格式化为字符串 current_time_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 结合固定的提示词、当前时间和用户查询 online_query_prompt = f"请在互联网上查找相关信息,忽略过时信息,并给出简要的回答。\n{ONLINE_FIXED_PROMPT}\n当前时间:{current_time_str}\n\n{query}" try: logger.info(f"调用在线 API - 用户: {user_id}, 查询: '{query[:100]}...'") # 使用 online_client 调用在线模型 response = online_client.chat.completions.create( model=ONLINE_MODEL, messages=[{"role": "user", "content": online_query_prompt}], temperature=ONLINE_API_TEMPERATURE, max_tokens=ONLINE_API_MAX_TOKEN, stream=False ) if not response.choices: logger.error(f"在线 API 返回了空的选择项,用户: {user_id}") return None reply = response.choices[0].message.content.strip() # 清理回复,去除思考过程 if "</think>" in reply: reply = reply.split("</think>", 1)[1].strip() logger.info(f"在线 API 响应 (用户 {user_id}): {reply}") return reply except Exception as e: logger.error(f"调用在线 API 失败,用户: {user_id}: {e}", exc_info=True) return "抱歉,在线搜索功能暂时出错了。"

def monitor_memory_usage():
import psutil
MEMORY_THRESHOLD = 328 # 内存使用阈值328MB
while True:
process = psutil.Process(os.getpid())
memory_usage = process.memory_info().rss / 1024 / 1024 # MB
logger.info(f"当前内存使用: {memory_usage:.2f} MB")
if memory_usage > MEMORY_THRESHOLD:
logger.warning(f"内存使用超过阈值 ({MEMORY_THRESHOLD} MB),执行垃圾回收")
import gc
gc.collect()
time.sleep(600)

def scheduled_restart_checker():
"""
定时检查是否需要重启程序。
重启条件:
1. 已达到RESTART_INTERVAL_HOURS的运行时间
2. 在RESTART_INACTIVITY_MINUTES内没有活动,或活动结束后又等待了RESTART_INACTIVITY_MINUTES
3. 没有正在进行的短期提醒事件
4. 没有即将到来(5分钟内)的长期提醒或每日重复提醒事件
"""
global program_start_time, last_received_message_timestamp # 引用全局变量

text
if not ENABLE_SCHEDULED_RESTART: logger.info("定时重启功能已禁用。") return logger.info(f"定时重启功能已启用。重启间隔: {RESTART_INTERVAL_HOURS} 小时,不活跃期: {RESTART_INACTIVITY_MINUTES} 分钟。") restart_interval_seconds = RESTART_INTERVAL_HOURS * 3600 inactivity_seconds = RESTART_INACTIVITY_MINUTES * 60 if restart_interval_seconds <= 0: logger.error("重启间隔时间必须大于0,定时重启功能将不会启动。") return # 初始化下一次检查重启的时间点 next_restart_time = program_start_time + restart_interval_seconds restart_pending = False # 标记是否处于待重启状态(已达到间隔时间但在等待不活跃期) while True: current_time = time.time() time_since_last_activity = current_time - last_received_message_timestamp # 准备重启的三个条件检查 interval_reached = current_time >= next_restart_time or restart_pending inactive_enough = time_since_last_activity >= inactivity_seconds # 只有在准备重启时才检查提醒事件,避免不必要的检查 if interval_reached and inactive_enough: # 检查是否有正在进行的短期提醒 has_active_short_reminders = False with timer_lock: if active_timers: logger.info(f"当前有 {len(active_timers)} 个短期提醒进行中,等待它们完成后再重启。") has_active_short_reminders = True # 检查是否有即将到来的提醒(5分钟内) has_upcoming_reminders = False now = datetime.now() five_min_later = now + dt.timedelta(minutes=5) with recurring_reminder_lock: for reminder in recurring_reminders: target_dt = None # 处理长期一次性提醒 if reminder.get('reminder_type') == 'one-off': try: target_dt = datetime.strptime(reminder.get('target_datetime_str'), '%Y-%m-%d %H:%M') except (ValueError, TypeError): continue # 处理每日重复提醒 - 需要结合当前日期计算今天的触发时间 elif reminder.get('reminder_type') == 'recurring': try: time_str = reminder.get('time_str') if time_str: # 解析时间字符串获取小时和分钟 reminder_time = datetime.strptime(time_str, '%H:%M').time() # 结合当前日期构建完整的目标时间 target_dt = datetime.combine(now.date(), reminder_time) # 如果今天的触发时间已过,检查明天的触发时间是否在5分钟内 # (极少情况:如果定时检查恰好在23:55-00:00之间,且有0:00-0:05的提醒) if target_dt < now: target_dt = datetime.combine(now.date() + dt.timedelta(days=1), reminder_time) except (ValueError, TypeError): continue # 检查目标时间是否在5分钟内 if target_dt and now <= target_dt <= five_min_later: reminder_type = "长期一次性" if reminder.get('reminder_type') == 'one-off' else "每日重复" display_time = target_dt.strftime('%Y-%m-%d %H:%M') if reminder.get('reminder_type') == 'one-off' else target_dt.strftime('%H:%M') logger.info(f"检测到5分钟内即将执行的{reminder_type}提醒,延迟重启。提醒时间: {display_time}") has_upcoming_reminders = True break # 如果没有提醒阻碍,则可以重启 if not has_active_short_reminders and not has_upcoming_reminders: logger.warning(f"满足重启条件:已运行约 {(current_time - program_start_time)/3600:.2f} 小时,已持续 {time_since_last_activity/60:.1f} 分钟无活动,且没有即将执行的提醒。准备重启程序...") try: # --- 执行重启前的清理操作 --- logger.info("定时重启前:保存聊天上下文...") with queue_lock: save_chat_contexts() # 保存用户计时器状态 if ENABLE_AUTO_MESSAGE: logger.info("定时重启前:保存用户计时器状态...") save_user_timers() if ENABLE_REMINDERS: logger.info("定时重启前:保存提醒列表...") with recurring_reminder_lock: save_recurring_reminders() # 关闭异步HTTP日志处理器 if 'async_http_handler' in globals() and isinstance(async_http_handler, AsyncHTTPHandler): logger.info("定时重启前:关闭异步HTTP日志处理器...") async_http_handler.close() logger.info("定时重启前:执行最终临时文件清理...") clean_up_temp_files() logger.info("正在执行重启...") # 替换当前进程为新启动的 Python 脚本实例 os.execv(sys.executable, ['python'] + sys.argv) except Exception as e: logger.error(f"执行重启操作时发生错误: {e}", exc_info=True) # 如果重启失败,推迟下一次检查,避免短时间内连续尝试 restart_pending = False next_restart_time = current_time + restart_interval_seconds logger.info(f"重启失败,下一次重启检查时间推迟到: {datetime.fromtimestamp(next_restart_time).strftime('%Y-%m-%d %H:%M:%S')}") elif has_upcoming_reminders: # 有提醒即将执行,延长10分钟后再检查 logger.info(f"由于5分钟内有提醒将执行,延长重启时间10分钟。") next_restart_time = current_time + 600 # 延长10分钟 restart_pending = True # 保持待重启状态 else: # 有短期提醒正在进行,稍后再检查 logger.info(f"由于有短期提醒正在进行,将在下一轮检查是否可以重启。") restart_pending = True # 保持待重启状态 elif interval_reached and not inactive_enough: # 已达到间隔时间但最近有活动,设置待重启状态 if not restart_pending: logger.info(f"已达到重启间隔({RESTART_INTERVAL_HOURS}小时),但最近 {time_since_last_activity/60:.1f} 分钟内有活动,将在 {RESTART_INACTIVITY_MINUTES} 分钟无活动后重启。") restart_pending = True # 不更新next_restart_time,因为我们现在是等待不活跃期 elif current_time >= next_restart_time and not restart_pending: # 第一次达到重启时间点 logger.info(f"已达到计划重启检查点 ({RESTART_INTERVAL_HOURS}小时)。距离上次活动: {time_since_last_activity/60:.1f}分钟 (不活跃阈值: {RESTART_INACTIVITY_MINUTES}分钟)。") restart_pending = True # 进入待重启状态 # 每分钟检查一次条件 time.sleep(60)

发送心跳的函数

def send_heartbeat():
"""向Flask后端发送心跳信号"""
heartbeat_url = f"{FLASK_SERVER_URL_BASE}/bot_heartbeat"
payload = {
'status': 'alive',
'pid': os.getpid() # 发送当前进程PID,方便调试
}
try:
response = requests.post(heartbeat_url, json=payload, timeout=5)
if response.status_code == 200:
logger.debug(f"心跳发送成功至 {heartbeat_url} (PID: {os.getpid()})")
else:
logger.warning(f"发送心跳失败,状态码: {response.status_code} (PID: {os.getpid()})")
except requests.exceptions.RequestException as e:
logger.error(f"发送心跳时发生网络错误: {e} (PID: {os.getpid()})")
except Exception as e:
logger.error(f"发送心跳时发生未知错误: {e} (PID: {os.getpid()})")

心跳线程函数

def heartbeat_thread_func():
"""心跳线程,定期发送心跳"""
logger.info(f"机器人心跳线程启动 (PID: {os.getpid()}),每 {HEARTBEAT_INTERVAL} 秒发送一次心跳。")
while True:
send_heartbeat()
time.sleep(HEARTBEAT_INTERVAL)

视频聊天回调服务器

def start_video_chat_callback_server():
"""启动视频聊天回调API服务器"""
try:
from flask import Flask, request, jsonify

text
callback_app = Flask(__name__) callback_app.logger.disabled = True # 禁用Flask默认日志避免干扰 @callback_app.route('/api/conversation_callback', methods=['POST']) def handle_conversation_callback(): """接收并处理视频聊天结束的回调""" try: data = request.get_json() user_id = data.get('user_id') conversation_log = data.get('conversation_log') session_id = data.get('session_id') if not user_id or not conversation_log: logger.error("回调数据不完整,缺少user_id或conversation_log") return jsonify({"status": "error", "message": "无效的回调数据"}), 400 logger.info(f"收到来自会话 {session_id} 用户 {user_id} 的对话回调,共 {len(conversation_log)} 条记录") # 1. 将收到的对话记录写入临时文件,以便 summarize_and_save 复用 prompt_name = prompt_mapping.get(user_id, user_id) log_file = os.path.join(root_dir, MEMORY_TEMP_DIR, f'{user_id}_{prompt_name}_log.txt') os.makedirs(os.path.dirname(log_file), exist_ok=True) with open(log_file, 'w', encoding='utf-8') as f: for turn in conversation_log: # 确保 'content' 和 'role' 键存在 text = turn.get('content', '') role = turn.get('role', 'unknown') speaker = role.replace('assistant', prompt_name).replace('user', user_id) # 模拟原始日志格式 f.write(f"{datetime.now().strftime('%Y-%m-%d %A %H:%M:%S')} | [{speaker}] {text}\n") logger.info(f"对话记录已写入临时文件: {log_file}") # 2. 调用强制记忆总结和保存功能 summarize_and_save(user_id, force_summarize=True) logger.info(f"已为用户 {user_id} 完成记忆总结和保存") # 3. 向用户发送结束通知 summary_message = f"💕 视频聊天结束" send_reply(user_id, user_id, user_id, "[视频聊天结束]", summary_message) return jsonify({"status": "success", "message": "回调处理成功"}) except Exception as e: logger.error(f"处理视频聊天回调时发生严重错误: {e}", exc_info=True) return jsonify({"status": "error", "message": str(e)}), 500 @callback_app.route('/api/send_message', methods=['POST']) def handle_send_message(): """接收发送消息的请求""" try: data = request.get_json() user_id = data['user_id'] message = data['message'] # 调用主程序的发送消息函数 send_reply(user_id, message) logger.info(f"总结消息已发送给用户: {user_id}") return jsonify({"status": "success", "message": "消息发送成功"}) except Exception as e: logger.error(f"发送消息失败: {e}") return jsonify({"status": "error", "message": str(e)}), 500 # 启动服务器(使用不同的端口避免冲突) # 回调服务器只需要本机访问,使用127.0.0.1避免外网暴露 callback_port = PORT + 1 logger.info(f"视频聊天回调API服务器启动在端口: {callback_port}") # 使用threaded=True确保Flask在线程中非阻塞运行 callback_app.run(host='127.0.0.1', port=callback_port, debug=False, use_reloader=False, threaded=True) except Exception as e: logger.error(f"启动视频聊天回调服务器失败: {e}")

保存用户计时器状态的函数

def save_user_timers():
"""将用户计时器状态保存到文件"""
temp_file_path = USER_TIMERS_FILE + ".tmp"
try:
timer_data = {
'user_timers': dict(user_timers),
'user_wait_times': dict(user_wait_times)
}
with open(temp_file_path, 'w', encoding='utf-8') as f:
json.dump(timer_data, f, ensure_ascii=False, indent=4)
shutil.move(temp_file_path, USER_TIMERS_FILE)
logger.info(f"用户计时器状态已保存到 {USER_TIMERS_FILE}")
except Exception as e:
logger.error(f"保存用户计时器状态失败: {e}", exc_info=True)
if os.path.exists(temp_file_path):
try:
os.remove(temp_file_path)
except OSError:
pass

加载用户计时器状态的函数

def load_user_timers():
"""从文件加载用户计时器状态"""
global user_timers, user_wait_times
try:
if os.path.exists(USER_TIMERS_FILE):
with open(USER_TIMERS_FILE, 'r', encoding='utf-8') as f:
timer_data = json.load(f)
if isinstance(timer_data, dict):
loaded_user_timers = timer_data.get('user_timers', {})
loaded_user_wait_times = timer_data.get('user_wait_times', {})

text
# 验证并恢复有效的计时器状态 restored_count = 0 for user in user_names: if (user in loaded_user_timers and user in loaded_user_wait_times and isinstance(loaded_user_timers[user], (int, float)) and isinstance(loaded_user_wait_times[user], (int, float))): user_timers[user] = loaded_user_timers[user] user_wait_times[user] = loaded_user_wait_times[user] restored_count += 1 logger.debug(f"已恢复用户 {user} 的计时器状态") else: # 如果没有保存的状态或状态无效,则初始化 reset_user_timer(user) logger.debug(f"为用户 {user} 重新初始化计时器状态") logger.info(f"成功从 {USER_TIMERS_FILE} 恢复 {restored_count} 个用户的计时器状态") else: logger.warning(f"{USER_TIMERS_FILE} 文件格式不正确,将重新初始化所有计时器") initialize_all_user_timers() else: logger.info(f"{USER_TIMERS_FILE} 未找到,将初始化所有用户计时器") initialize_all_user_timers() except json.JSONDecodeError: logger.error(f"解析 {USER_TIMERS_FILE} 失败,将重新初始化所有计时器") initialize_all_user_timers() except Exception as e: logger.error(f"加载用户计时器状态失败: {e}", exc_info=True) initialize_all_user_timers()

def initialize_all_user_timers():
"""初始化所有用户的计时器"""
for user in user_names:
reset_user_timer(user)
logger.info("所有用户计时器已重新初始化")
def main():
# --- 全局变量声明提前 ---
global wx, program_start_time, last_received_message_timestamp, video_chat_process

text
try: # --- 启动前检查 --- logger.info("\033[32m进行启动前检查...\033[0m") # 预检查所有用户prompt文件 for user in user_names: prompt_file = prompt_mapping.get(user, user) prompt_path = os.path.join(root_dir, 'prompts', f'{prompt_file}.md') if not os.path.exists(prompt_path): raise FileNotFoundError(f"用户 {user} 的prompt文件 {prompt_file}.md 不存在") # 确保临时目录存在 memory_temp_dir = os.path.join(root_dir, MEMORY_TEMP_DIR) os.makedirs(memory_temp_dir, exist_ok=True) # 加载聊天上下文 logger.info("正在加载聊天上下文...") load_chat_contexts() # 调用加载函数 if ENABLE_REMINDERS: logger.info("提醒功能已启用。") load_recurring_reminders() if not isinstance(ALLOW_REMINDERS_IN_QUIET_TIME, bool): logger.warning("配置项 ALLOW_REMINDERS_IN_QUIET_TIME 的值不是布尔类型 (True/False),可能导致意外行为。") else: logger.info("提醒功能已禁用 (所有类型提醒将无法使用)。") # --- 初始化视频聊天插件 --- if ENABLE_VIDEO_CHAT: logger.info("\033[32m初始化视频聊天插件...\033[0m") try: video_chat_plugin_path = os.path.join(root_dir, 'video_chat_plugin') # 不再需要,因为 bot.py 现在从根目录运行 plugin_config_path = os.path.join(video_chat_plugin_path, 'video_chat_config.json') if not os.path.exists(plugin_config_path): raise FileNotFoundError(f"视频聊天插件的配置文件 'video_chat_config.json' 未找到。请先运行 config_gui.py 进行配置。") with open(plugin_config_path, 'r', encoding='utf-8') as f: plugin_config = json.load(f) plugin_config['root_dir'] = root_dir plugin_config['callback_url'] = f"http://127.0.0.1:{PORT+1}/api/conversation_callback" logger.info("成功加载 video_chat_config.json") from video_chat_plugin.wechat_integration import init_wechat_integration init_wechat_integration(plugin_config) logger.info("✅ 视频聊天插件初始化成功") logger.info("启动视频聊天插件的 FastAPI 服务器...") # 使用模块化启动,确保Python路径正确 video_server_module = 'video_chat_plugin.main' creation_flags = subprocess.CREATE_NEW_PROCESS_GROUP if os.name == 'nt' else 0 video_chat_process = subprocess.Popen( [sys.executable, "-m", video_server_module], cwd=root_dir, # 从项目根目录启动 creationflags=creation_flags ) logger.info(f"🚀 视频聊天插件服务器已启动,PID: {video_chat_process.pid}") except Exception as e: logger.error(f"❌ 视频聊天插件初始化失败: {e}", exc_info=True) logger.warning("⚠️ 视频聊天功能将不可用") else: logger.info("视频聊天功能已禁用") # --- 初始化 --- logger.info("\033[32m初始化微信接口和清理临时文件...\033[0m") clean_up_temp_files() try: wx = WeChat() except Exception: logger.error(f"\033[31m无法初始化微信接口,请确保您安装的是微信3.9版本,并且已经登录!\033[0m") exit(1) for user_name in user_names: if user_name == ROBOT_WX_NAME: logger.error(f"\033[31m您填写的用户列表中包含自己登录的微信昵称,请删除后再试!\033[0m") exit(1) ListenChat = wx.AddListenChat(nickname=user_name, callback=message_listener) if ListenChat: logger.info(f"成功添加监听用户{ListenChat}") else: logger.error(f"\033[31m添加监听用户{user_name}失败,请确保您在用户列表填写的微信昵称/备注与实际完全匹配,并且不要包含表情符号和特殊符号,注意填写的不是自己登录的微信昵称!\033[0m") exit(1) logger.info("监听用户添加完成") if ENABLE_AUTO_MESSAGE: logger.info("正在加载用户自动消息计时器状态...") load_user_timers() logger.info("用户自动消息计时器状态加载完成。") if IGNORE_GROUP_CHAT_FOR_AUTO_MESSAGE: logger.info("主动消息群聊忽略功能已启用,正在初始化群聊类型缓存...") update_group_chat_cache() logger.info("群聊类型缓存初始化完成。") else: logger.info("主动消息群聊忽略功能已禁用。") else: logger.info("自动消息功能已禁用,跳过计时器初始化。") # --- 启动窗口保活线程 --- logger.info("\033[32m启动窗口保活线程...\033[0m") listener_thread = threading.Thread(target=keep_alive, name="keep_alive", daemon=True) listener_thread.start() logger.info("消息窗口保活已启动。") checker_thread = threading.Thread(target=check_inactive_users, name="InactiveUserChecker", daemon=True) checker_thread.start() logger.info("非活跃用户检查与消息处理线程已启动。") program_start_time = time.time() last_received_message_timestamp = time.time() if ENABLE_SCHEDULED_RESTART: restart_checker_thread = threading.Thread(target=scheduled_restart_checker, name="ScheduledRestartChecker", daemon=True) restart_checker_thread.start() logger.info("定时重启检查线程已启动。") if ENABLE_MEMORY: memory_thread = threading.Thread(target=memory_manager, name="MemoryManager", daemon=True) memory_thread.start() logger.info("记忆管理线程已启动。") else: logger.info("记忆功能已禁用。") if ENABLE_REMINDERS: reminder_checker_thread = threading.Thread(target=recurring_reminder_checker, name="ReminderChecker", daemon=True) reminder_checker_thread.start() logger.info("提醒检查线程(重复和长期一次性)已启动。") if ENABLE_AUTO_MESSAGE: auto_message_thread = threading.Thread(target=check_user_timeouts, name="AutoMessageChecker", daemon=True) auto_message_thread.start() logger.info("主动消息检查线程已启动。") heartbeat_th = threading.Thread(target=heartbeat_thread_func, name="BotHeartbeatThread", daemon=True) heartbeat_th.start() if ENABLE_VIDEO_CHAT: callback_server_thread = threading.Thread(target=start_video_chat_callback_server, name="VideoChatCallbackServer", daemon=True) callback_server_thread.start() logger.info("视频聊天回调API服务器已启动。") logger.info("\033[32mBOT已成功启动并运行中...\033[0m") monitor_memory_usage_thread = threading.Thread(target=monitor_memory_usage, name="MemoryUsageMonitor", daemon=True) monitor_memory_usage_thread.start() logger.info("内存使用监控线程已启动。") wx.KeepRunning() while True: time.sleep(60) while True: time.sleep(60) except FileNotFoundError as e: logger.critical(f"初始化失败: 缺少必要的文件或目录 - {str(e)}") logger.error(f"\033[31m错误:{str(e)}\033[0m") except Exception as e: logger.critical(f"主程序发生严重错误: {str(e)}", exc_info=True) finally: logger.info("程序准备退出,执行清理操作...") # 终止视频聊天插件服务器 if video_chat_process and video_chat_process.poll() is None: logger.info("正在停止视频聊天插件服务器...") try: if os.name == 'nt': subprocess.call(['taskkill', '/F', '/T', '/PID', str(video_chat_process.pid)]) else: video_chat_process.terminate() video_chat_process.wait(timeout=5) logger.info("视频聊天插件服务器已停止。") except Exception as e: logger.error(f"停止视频聊天插件服务器失败: {e}") if video_chat_process.poll() is None: video_chat_process.kill() # 保存用户计时器状态(如果启用了自动消息) if ENABLE_AUTO_MESSAGE: logger.info("程序退出前:保存用户计时器状态...") save_user_timers() # 取消活动的短期一次性提醒定时器 with timer_lock: if active_timers: logger.info(f"正在取消 {len(active_timers)} 个活动的短期一次性提醒定时器...") cancelled_count = 0 # 使用 list(active_timers.items()) 创建副本进行迭代 for timer_key, timer in list(active_timers.items()): try: timer.cancel() cancelled_count += 1 except Exception as cancel_err: logger.warning(f"取消短期定时器 {timer_key} 时出错: {cancel_err}") active_timers.clear() logger.info(f"已取消 {cancelled_count} 个短期一次性定时器。") else: logger.info("没有活动的短期一次性提醒定时器需要取消。") if 'async_http_handler' in globals() and isinstance(async_http_handler, AsyncHTTPHandler): logger.info("正在关闭异步HTTP日志处理器...") try: async_http_handler.close() logger.info("异步HTTP日志处理器已关闭。") except Exception as log_close_err: logger.error(f"关闭异步日志处理器时出错: {log_close_err}") logger.info("执行最终临时文件清理...") clean_up_temp_files() logger.info("程序退出。")

if name == 'main':
try:
main()
except KeyboardInterrupt:
logger.info("接收到用户中断信号 (Ctrl+C),程序将退出。")
except Exception as e:
logger.error(f"程序启动或运行期间发生未捕获的顶层异常: {str(e)}", exc_info=True)
print(f"FALLBACK LOG: {datetime.now()} - CRITICAL ERROR - {str(e)}")

思考

Clarifying code assistance

It looks like the user pasted two large Python files, including a FastAPI implementation and a bot.py with a lot of features. Since there's no explicit question, it seems like they might need help with debugging or reviewing the code. They also mentioned fixing an import error related to WorkerPermissions. I should ask how they'd like me to assist — perhaps with error explanation, improvements, or anything specific they want help with. I'll ask, "How can I help you with this code?"

Thought for 5 seconds

回答

Looks like you just dropped the full source for your video_chat_plugin/main.py and bot.py—quite a hefty codebase! 🙌

I can jump in wherever you need:

  • Debugging (import errors, race conditions, memory spikes, LiveKit mis‑fires, etc.)
  • Architecture / readability review (threading model, separation of concerns, logging strategy)
  • Performance tuning (async vs threading, memory, I/O, API‑rate‑limit handling)
  • Feature tweaks (reminder logic, video‑chat flow, prompt/role handling, auto‑restart rules)
  • Docker/CI deployment advice, or anything else.

Let me know the exact pain‑point—or share an error trace/log—and we’ll zero in on a fix or improvement.

このQ&Aを共有