feat:pfc Lite(hearfFC)在群聊初步可用

This commit is contained in:
SengokuCola
2025-04-17 23:43:41 +08:00
parent a2333f9f82
commit e3d22b571b
10 changed files with 985 additions and 121 deletions

View File

@@ -1,8 +1,9 @@
import time
from random import random
import traceback
from typing import List, Optional
from typing import List, Optional, Dict
import asyncio
from asyncio import Lock
from ...moods.moods import MoodManager
from ....config.config import global_config
from ...chat.emoji_manager import emoji_manager
@@ -19,7 +20,8 @@ from ...utils.timer_calculater import Timer
from src.do_tool.tool_use import ToolUser
from .interest import InterestManager, InterestChatting
from src.plugins.chat.chat_stream import chat_manager
from src.plugins.chat.message import MessageInfo
from src.plugins.chat.message import BaseMessageInfo
from .pf_chatting import PFChatting
# 定义日志配置
chat_config = LogConfig(
@@ -33,13 +35,32 @@ logger = get_module_logger("heartFC_chat", config=chat_config)
INTEREST_MONITOR_INTERVAL_SECONDS = 1
class HeartFC_Chat:
_instance = None # For potential singleton access if needed by MessageManager
def __init__(self):
# --- Updated Init ---
if HeartFC_Chat._instance is not None:
# Prevent re-initialization if used as a singleton
return
self.logger = logger # Make logger accessible via self
self.gpt = ResponseGenerator()
self.mood_manager = MoodManager.get_instance()
self.mood_manager.start_mood_update()
self.tool_user = ToolUser()
self.interest_manager = InterestManager()
self._interest_monitor_task: Optional[asyncio.Task] = None
# --- New PFChatting Management ---
self.pf_chatting_instances: Dict[str, PFChatting] = {}
self._pf_chatting_lock = Lock()
# --- End New PFChatting Management ---
HeartFC_Chat._instance = self # Register instance
# --- End Updated Init ---
# --- Added Class Method for Singleton Access ---
@classmethod
def get_instance(cls):
return cls._instance
# --- End Added Class Method ---
async def start(self):
"""启动异步任务,如兴趣监控器"""
@@ -61,14 +82,29 @@ class HeartFC_Chat:
else:
logger.warning("跳过兴趣监控任务创建:任务已存在或正在运行。")
# --- Added PFChatting Instance Manager ---
async def _get_or_create_pf_chatting(self, stream_id: str) -> Optional[PFChatting]:
"""获取现有PFChatting实例或创建新实例。"""
async with self._pf_chatting_lock:
if stream_id not in self.pf_chatting_instances:
self.logger.info(f"为流 {stream_id} 创建新的PFChatting实例")
# 传递 self (HeartFC_Chat 实例) 进行依赖注入
instance = PFChatting(stream_id, self)
# 执行异步初始化
if not await instance._initialize():
self.logger.error(f"为流 {stream_id} 初始化PFChatting失败")
return None
self.pf_chatting_instances[stream_id] = instance
return self.pf_chatting_instances[stream_id]
# --- End Added PFChatting Instance Manager ---
async def _interest_monitor_loop(self):
"""后台任务,定期检查兴趣度变化并触发回复"""
logger.info("兴趣监控循环开始...")
while True:
await asyncio.sleep(INTEREST_MONITOR_INTERVAL_SECONDS)
try:
# --- 修改:遍历 SubHeartflow 并检查触发器 ---
active_stream_ids = list(heartflow.get_all_subheartflows_streams_ids()) # 需要 heartflow 提供此方法
active_stream_ids = list(heartflow.get_all_subheartflows_streams_ids())
logger.trace(f"检查 {len(active_stream_ids)} 个活跃流是否足以开启心流对话...")
for stream_id in active_stream_ids:
@@ -77,26 +113,28 @@ class HeartFC_Chat:
logger.warning(f"监控循环: 无法获取活跃流 {stream_id} 的 sub_hf")
continue
# --- 获取 Observation 和消息列表 --- #
observation = sub_hf._get_primary_observation()
if not observation:
logger.warning(f"[{stream_id}] SubHeartflow 没有在观察,无法检查触发器。")
continue
observed_messages = observation.talking_message # 获取消息字典列表
# --- 结束获取 --- #
should_trigger = False
try:
# check_reply_trigger 可以选择性地接收 observed_messages 作为参数
should_trigger = await sub_hf.check_reply_trigger() # 目前 check_reply_trigger 还不处理这个
interest_chatting = self.interest_manager.get_interest_chatting(stream_id)
if interest_chatting:
should_trigger = interest_chatting.should_evaluate_reply()
if should_trigger:
logger.info(f"[{stream_id}] 基于兴趣概率决定启动交流模式 (概率: {interest_chatting.current_reply_probability:.4f})。")
else:
logger.trace(f"[{stream_id}] 没有找到对应的 InterestChatting 实例,跳过基于兴趣的触发检查。")
except Exception as e:
logger.error(f"错误调用 check_reply_trigger{stream_id}: {e}")
logger.error(f"检查兴趣触发器时出错{stream_id}: {e}")
logger.error(traceback.format_exc())
if should_trigger:
logger.info(f"[{stream_id}] SubHeartflow 决定开启心流对话。")
# 调用修改后的处理函数,传递 stream_id 和 observed_messages
asyncio.create_task(self._process_triggered_reply(stream_id, observed_messages))
logger.info(f"[{stream_id}] 触发条件满足, 委托给PFChatting.")
# --- 修改: 获取 PFChatting 实例并调用 add_time (无参数,时间由内部衰减逻辑决定) ---
pf_instance = await self._get_or_create_pf_chatting(stream_id)
if pf_instance:
# 调用 add_time 启动或延长循环,时间由 PFChatting 内部决定
asyncio.create_task(pf_instance.add_time())
else:
logger.error(f"[{stream_id}] 无法获取或创建PFChatting实例。跳过触发。")
except asyncio.CancelledError:
@@ -107,32 +145,6 @@ class HeartFC_Chat:
logger.error(traceback.format_exc())
await asyncio.sleep(5) # 发生错误时等待
async def _process_triggered_reply(self, stream_id: str, observed_messages: List[dict]):
"""Helper coroutine to handle the processing of a triggered reply based on SubHeartflow trigger."""
try:
logger.info(f"[{stream_id}] SubHeartflow 触发回复...")
# 调用修改后的 trigger_reply_generation
await self.trigger_reply_generation(stream_id, observed_messages)
# --- 调整兴趣降低逻辑 ---
# 这里的兴趣降低可能不再适用,或者需要基于不同的逻辑
# 例如,回复后可以将 SubHeartflow 的某种"回复意愿"状态重置
# 暂时注释掉,或根据需要调整
# chatting_instance = self.interest_manager.get_interest_chatting(stream_id)
# if chatting_instance:
# decrease_value = chatting_instance.trigger_threshold / 2 # 使用实例的阈值
# self.interest_manager.decrease_interest(stream_id, value=decrease_value)
# post_trigger_interest = self.interest_manager.get_interest(stream_id) # 获取更新后的兴趣
# logger.info(f"[{stream_id}] Interest decreased by {decrease_value:.2f} (InstanceThreshold/2) after processing triggered reply. Current interest: {post_trigger_interest:.2f}")
# else:
# logger.warning(f"[{stream_id}] Could not find InterestChatting instance after reply processing to decrease interest.")
logger.debug(f"[{stream_id}] Reply processing finished. (Interest decrease logic needs review).")
except Exception as e:
logger.error(f"Error processing SubHeartflow-triggered reply for stream_id {stream_id}: {e}") # 更新日志信息
logger.error(traceback.format_exc())
# --- 结束修改 ---
async def _create_thinking_message(self, anchor_message: Optional[MessageRecv]):
"""创建思考消息 (尝试锚定到 anchor_message)"""
if not anchor_message or not anchor_message.chat_stream:
@@ -270,7 +282,7 @@ class HeartFC_Chat:
sub_hf = None
anchor_message: Optional[MessageRecv] = None # <--- 重命名,用于锚定回复的消息对象
userinfo: Optional[UserInfo] = None
messageinfo: Optional[MessageInfo] = None
messageinfo: Optional[BaseMessageInfo] = None
timing_results = {}
current_mind = None
@@ -295,33 +307,58 @@ class HeartFC_Chat:
logger.error(traceback.format_exc())
return
# --- 2. 尝试从 observed_messages 重建最后一条消息作为锚点 --- #
# --- 2. 尝试从 observed_messages 重建最后一条消息作为锚点, 失败则创建占位符 --- #
try:
with Timer("获取最后消息锚点", timing_results):
with Timer("获取或创建锚点消息", timing_results):
reconstruction_failed = False
if observed_messages:
last_msg_dict = observed_messages[-1] # 直接从传入列表获取最后一条
# 尝试从字典重建 MessageRecv 对象(可能需要调整 MessageRecv 的构造方式或创建一个辅助函数)
# 这是一个简化示例,假设 MessageRecv 可以从字典初始化
# 你可能需要根据 MessageRecv 的实际 __init__ 来调整
try:
anchor_message = MessageRecv(last_msg_dict) # 假设 MessageRecv 支持从字典创建
last_msg_dict = observed_messages[-1]
logger.debug(f"[{stream_id}] Attempting to reconstruct MessageRecv from last observed message.")
anchor_message = MessageRecv(last_msg_dict, chat_stream=chat)
if not (anchor_message and anchor_message.message_info and anchor_message.message_info.message_id and anchor_message.message_info.user_info):
raise ValueError("Reconstructed MessageRecv missing essential info.")
userinfo = anchor_message.message_info.user_info
messageinfo = anchor_message.message_info
logger.debug(f"[{stream_id}] 获取到最后消息作为锚点: ID={messageinfo.message_id}, Sender={userinfo.user_nickname}")
except Exception as e_msg:
logger.error(f"[{stream_id}] 从字典重建最后消息 MessageRecv 失败: {e_msg}. 字典: {last_msg_dict}")
anchor_message = None # 重置以表示失败
logger.debug(f"[{stream_id}] Successfully reconstructed anchor message: ID={messageinfo.message_id}, Sender={userinfo.user_nickname}")
except Exception as e_reconstruct:
logger.warning(f"[{stream_id}] Reconstructing MessageRecv from observed message failed: {e_reconstruct}. Will create placeholder.")
reconstruction_failed = True
else:
logger.warning(f"[{stream_id}] 无法从 Observation 获取最后消息锚点。")
except Exception as e:
logger.error(f"[{stream_id}] 获取最后消息锚点时出错: {e}")
logger.error(traceback.format_exc())
# 即使没有锚点,也可能继续尝试生成非回复性消息,取决于后续逻辑
logger.warning(f"[{stream_id}] observed_messages is empty. Will create placeholder anchor message.")
reconstruction_failed = True # Treat empty observed_messages as a failure to reconstruct
# --- 3. 检查是否能继续 (需要思考消息锚点) ---
if not anchor_message:
logger.warning(f"[{stream_id}] 没有有效的消息锚点,无法创建思考消息和发送回复。取消回复生成。")
return
# 如果重建失败或 observed_messages 为空,创建占位符
if reconstruction_failed:
placeholder_id = f"mid_{int(time.time() * 1000)}" # 使用毫秒时间戳增加唯一性
placeholder_user = UserInfo(user_id="system_trigger", user_nickname="系统触发")
placeholder_msg_info = BaseMessageInfo(
message_id=placeholder_id,
platform=chat.platform,
group_info=chat.group_info,
user_info=placeholder_user,
time=time.time()
# 其他 BaseMessageInfo 可能需要的字段设为默认值或 None
)
# 创建 MessageRecv 实例,注意它需要消息字典结构,我们创建一个最小化的
placeholder_msg_dict = {
"message_info": placeholder_msg_info.to_dict(),
"processed_plain_text": "", # 提供空文本
"raw_message": "",
"time": placeholder_msg_info.time,
}
# 先只用字典创建实例
anchor_message = MessageRecv(placeholder_msg_dict)
# 然后调用方法更新 chat_stream
anchor_message.update_chat_stream(chat)
userinfo = anchor_message.message_info.user_info
messageinfo = anchor_message.message_info
logger.info(f"[{stream_id}] Created placeholder anchor message: ID={messageinfo.message_id}, Sender={userinfo.user_nickname}")
except Exception as e:
logger.error(f"[{stream_id}] 获取或创建锚点消息时出错: {e}")
logger.error(traceback.format_exc())
anchor_message = None # 确保出错时 anchor_message 为 None
# --- 4. 检查并发思考限制 (使用 anchor_message 简化获取) ---
try:
@@ -399,6 +436,7 @@ class HeartFC_Chat:
with Timer("生成内心想法(SubHF)", timing_results):
# 不再传递 message_txt 和 sender_info, SubHeartflow 应基于其内部观察
current_mind, past_mind = await sub_hf.do_thinking_before_reply(
# sender_info=userinfo,
chat_stream=chat,
extra_info=tool_result_info,
obs_id=get_mid_memory_id,
@@ -415,7 +453,8 @@ class HeartFC_Chat:
# --- 9. 调用 ResponseGenerator 生成回复 (使用 anchor_message 和 current_mind) ---
try:
with Timer("生成最终回复(GPT)", timing_results):
response_set = await self.gpt.generate_response(anchor_message, thinking_id, current_mind=current_mind)
# response_set = await self.gpt.generate_response(anchor_message, thinking_id, current_mind=current_mind)
response_set = await self.gpt.generate_response(anchor_message, thinking_id)
except Exception as e:
logger.error(f"[{stream_id}] GPT 生成回复失败: {e}")
logger.error(traceback.format_exc())