feat:为私聊增加checker和次级回复,更多动作

This commit is contained in:
SengokuCola
2025-12-04 23:39:09 +08:00
parent a155194157
commit 95a1712c90
19 changed files with 4146 additions and 42 deletions

View File

@@ -25,6 +25,7 @@ from src.chat.utils.chat_message_builder import (
build_readable_messages_with_id,
get_raw_msg_before_timestamp_with_chat,
)
from src.chat.brain_chat.brain_reply_checker import BrainReplyChecker, BrainLLMReplyChecker
if TYPE_CHECKING:
from src.common.data_models.database_data_model import DatabaseMessages
@@ -87,6 +88,11 @@ class BrainChatting:
self.running: bool = False
self._loop_task: Optional[asyncio.Task] = None # 主循环任务
# 轻量级回复检查器(比 PFC 更宽松)
self.reply_checker = BrainReplyChecker(chat_id=self.stream_id)
# 使用 planner 模型的一次性 LLM 检查器
self.llm_reply_checker = BrainLLMReplyChecker(chat_id=self.stream_id, max_retries=1)
# 添加循环信息管理相关的属性
self.history_loop: List[CycleDetail] = []
self._cycle_counter = 0
@@ -96,6 +102,12 @@ class BrainChatting:
self.more_plan = False
# 最近一次是否成功进行了 reply用于选择 BrainPlanner 的 Prompt
self._last_successful_reply: bool = False
# 类似 PFC 的 block_and_ignore在该时间点之前不主动参与该聊天
self._ignore_until_timestamp: Optional[float] = None
async def start(self):
"""检查是否需要启动主循环,如果未激活则启动。"""
@@ -157,6 +169,14 @@ class BrainChatting:
)
async def _loopbody(self): # sourcery skip: hoist-if-from-if
# 如果当前处于 block_and_ignore 冷却期,直接跳过本轮思考
if self._ignore_until_timestamp and time.time() < self._ignore_until_timestamp:
await asyncio.sleep(0.5)
return True
elif self._ignore_until_timestamp and time.time() >= self._ignore_until_timestamp:
logger.info(f"{self.log_prefix} block_and_ignore 冷却结束,恢复该聊天的正常思考")
self._ignore_until_timestamp = None
recent_messages_list = message_api.get_messages_by_time_in_chat(
chat_id=self.stream_id,
start_time=self.last_read_time,
@@ -296,6 +316,10 @@ class BrainChatting:
chat_content_block=chat_content_block,
message_id_list=message_id_list,
interest=global_config.personality.interest,
prompt_key=(
"brain_planner_prompt_follow_up" if self._last_successful_reply else "brain_planner_prompt_initial"
),
log_prompt=True,
)
continue_flag, modified_message = await events_manager.handle_mai_events(
EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id
@@ -309,6 +333,7 @@ class BrainChatting:
action_to_use_info = await self.action_planner.plan(
loop_start_time=self.last_read_time,
available_actions=available_actions,
last_successful_reply=self._last_successful_reply,
)
# 3. 并行执行所有动作
@@ -524,50 +549,131 @@ class BrainChatting:
return {"action_type": "no_reply", "success": True, "reply_text": "", "command": ""}
elif action_planner_info.action_type == "reply":
try:
success, llm_response = await generator_api.generate_reply(
chat_stream=self.chat_stream,
reply_message=action_planner_info.action_message,
available_actions=available_actions,
chosen_actions=chosen_action_plan_infos,
reply_reason=action_planner_info.reasoning or "",
enable_tool=global_config.tool.enable_tool,
request_type="replyer",
from_plugin=False,
)
# 使用规则 + 一次 LLM ReplyChecker 包一层重试逻辑
retry_count = 0
if not success or not llm_response or not llm_response.reply_set:
if action_planner_info.action_message:
logger.info(
f"{action_planner_info.action_message.processed_plain_text} 的回复生成失败"
)
else:
logger.info("回复生成失败")
while True:
try:
success, llm_response = await generator_api.generate_reply(
chat_stream=self.chat_stream,
reply_message=action_planner_info.action_message,
available_actions=available_actions,
chosen_actions=chosen_action_plan_infos,
reply_reason=action_planner_info.reasoning or "",
enable_tool=global_config.tool.enable_tool,
request_type="replyer",
from_plugin=False,
)
if not success or not llm_response or not llm_response.reply_set:
if action_planner_info.action_message:
logger.info(
f"{action_planner_info.action_message.processed_plain_text} 的回复生成失败"
)
else:
logger.info("回复生成失败")
return {
"action_type": "reply",
"success": False,
"reply_text": "",
"loop_info": None,
}
except asyncio.CancelledError:
logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消")
return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None}
except asyncio.CancelledError:
logger.debug(f"{self.log_prefix} 并行执行:回复生成任务已被取消")
return {"action_type": "reply", "success": False, "reply_text": "", "loop_info": None}
response_set = llm_response.reply_set
selected_expressions = llm_response.selected_expressions
loop_info, reply_text, _ = await self._send_and_store_reply(
response_set=response_set,
action_message=action_planner_info.action_message, # type: ignore
cycle_timers=cycle_timers,
thinking_id=thinking_id,
actions=chosen_action_plan_infos,
selected_expressions=selected_expressions,
)
return {
"action_type": "reply",
"success": True,
"reply_text": reply_text,
"loop_info": loop_info,
}
response_set = llm_response.reply_set
# 预先拼接一次纯文本,供检查使用(与发送逻辑解耦)
preview_text = ""
for reply_content in response_set.reply_data:
if reply_content.content_type != ReplyContentType.TEXT:
continue
data: str = reply_content.content # type: ignore
preview_text += data
# 规则检查(不调用 LLM
rule_suitable, rule_reason, rule_need_retry = self.reply_checker.check(
reply_text=preview_text, retry_count=retry_count
)
# LLM 检查(使用 planner 模型,一次机会)
llm_suitable, llm_reason, llm_need_retry = await self.llm_reply_checker.check(
reply_text=preview_text, retry_count=retry_count
)
# 是否需要重生成:只要有一方建议重试,且还在重试次数之内
if (rule_need_retry or llm_need_retry) and retry_count < max(
self.reply_checker.max_retries, self.llm_reply_checker.max_retries
):
retry_count += 1
logger.info(
f"{self.log_prefix} ReplyChecker 建议重试(第 {retry_count} 次),"
f"rule: {rule_reason}; llm: {llm_reason}"
)
continue
# 到这里为止,不再重试:即使有一方认为“不太理想”,也只记录原因并放行
if not rule_suitable or not llm_suitable:
logger.info(
f"{self.log_prefix} ReplyChecker 判断回复可能不太理想,"
f"rule: {rule_reason}; llm: {llm_reason},本次仍将发送。"
)
selected_expressions = llm_response.selected_expressions
loop_info, reply_text, _ = await self._send_and_store_reply(
response_set=response_set,
action_message=action_planner_info.action_message, # type: ignore
cycle_timers=cycle_timers,
thinking_id=thinking_id,
actions=chosen_action_plan_infos,
selected_expressions=selected_expressions,
)
# 标记这次循环已经成功进行了回复,下一轮 Planner 使用 follow_up Prompt
self._last_successful_reply = True
return {
"action_type": "reply",
"success": True,
"reply_text": reply_text,
"loop_info": loop_info,
}
# 其他动作
else:
# 执行普通动作
# 内建 wait / listening / block_and_ignore不通过插件系统直接在这里处理
if action_planner_info.action_type in ["wait", "listening", "block_and_ignore"]:
reason = action_planner_info.reasoning or ""
if action_planner_info.action_type == "block_and_ignore":
# 设置一段时间的忽略窗口,例如 10 分钟
ignore_minutes = 10
self._ignore_until_timestamp = time.time() + ignore_minutes * 60
logger.info(
f"{self.log_prefix} 收到 block_and_ignore 动作,将在接下来 {ignore_minutes} 分钟内不再主动参与该聊天"
)
# 统一将这三种策略动作记录到数据库,便于后续分析
await database_api.store_action_info(
chat_stream=self.chat_stream,
action_build_into_prompt=False,
action_prompt_display=reason or f"执行动作: {action_planner_info.action_type}",
action_done=True,
thinking_id=thinking_id,
action_data={"reason": reason},
action_name=action_planner_info.action_type,
)
# 这些动作本身不产生文本回复
self._last_successful_reply = False
return {
"action_type": action_planner_info.action_type,
"success": True,
"reply_text": "",
"command": "",
}
# 其余动作:走原有插件 Action 体系
with Timer("动作执行", cycle_timers):
success, reply_text, command = await self._handle_action(
action_planner_info.action_type,
@@ -577,6 +683,10 @@ class BrainChatting:
thinking_id,
action_planner_info.action_message,
)
# 非 reply 类动作执行成功时,清空最近成功回复标记,让下一轮回到 initial Prompt
if success and action_planner_info.action_type != "reply":
self._last_successful_reply = False
return {
"action_type": action_planner_info.action_type,
"success": success,