feat:大幅优化聊天流控制,更精准简洁

This commit is contained in:
SengokuCola
2025-08-31 12:35:01 +08:00
parent 4bee6002ff
commit a11e65f794
18 changed files with 458 additions and 313 deletions

View File

@@ -18,8 +18,7 @@ from src.chat.planner_actions.action_modifier import ActionModifier
from src.chat.planner_actions.action_manager import ActionManager
from src.chat.heart_flow.hfc_utils import CycleDetail
from src.chat.heart_flow.hfc_utils import send_typing, stop_typing
from src.chat.frequency_control.talk_frequency_control import talk_frequency_control
from src.chat.frequency_control.focus_value_control import focus_value_control
from src.chat.frequency_control.frequency_control import frequency_control_manager
from src.chat.express.expression_learner import expression_learner_manager
from src.person_info.person_info import Person
from src.plugin_system.base.component_types import ChatMode, EventType, ActionInfo
@@ -85,8 +84,7 @@ class HeartFChatting:
self.expression_learner = expression_learner_manager.get_expression_learner(self.stream_id)
self.talk_frequency_control = talk_frequency_control.get_talk_frequency_control(self.stream_id)
self.focus_value_control = focus_value_control.get_focus_value_control(self.stream_id)
self.frequency_control = frequency_control_manager.get_or_create_frequency_control(self.stream_id)
self.action_manager = ActionManager()
self.action_planner = ActionPlanner(chat_id=self.stream_id, action_manager=self.action_manager)
@@ -101,15 +99,8 @@ class HeartFChatting:
self._cycle_counter = 0
self._current_cycle_detail: CycleDetail = None # type: ignore
self.reply_timeout_count = 0
self.plan_timeout_count = 0
self.last_read_time = time.time() - 10
self.focus_energy = 1
self.no_action_consecutive = 0
# 最近三次no_action的新消息兴趣度记录
self.recent_interest_records: deque = deque(maxlen=3)
async def start(self):
"""检查是否需要启动主循环,如果未激活则启动。"""
@@ -187,87 +178,14 @@ class HeartFChatting:
f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}秒, " # type: ignore
f"选择动作: {action_type}" + (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
)
def _determine_form_type(self) -> None:
"""判断使用哪种形式的no_action"""
# 如果连续no_action次数少于3次使用waiting形式
if self.no_action_consecutive <= 3:
self.focus_energy = 1
else:
# 计算最近三次记录的兴趣度总和
total_recent_interest = sum(self.recent_interest_records)
# 计算调整后的阈值
adjusted_threshold = 1 / self.talk_frequency_control.get_current_talk_frequency()
logger.info(
f"{self.log_prefix} 最近三次兴趣度总和: {total_recent_interest:.2f}, 调整后阈值: {adjusted_threshold:.2f}"
)
# 如果兴趣度总和小于阈值进入breaking形式
if total_recent_interest < adjusted_threshold:
logger.info(f"{self.log_prefix} 兴趣度不足,进入休息")
self.focus_energy = random.randint(3, 6)
else:
logger.info(f"{self.log_prefix} 兴趣度充足,等待新消息")
self.focus_energy = 1
async def _should_process_messages(self, new_message: List["DatabaseMessages"]) -> tuple[bool, float]:
"""
判断是否应该处理消息
Args:
new_message: 新消息列表
mode: 当前聊天模式
Returns:
bool: 是否应该处理消息
"""
new_message_count = len(new_message)
talk_frequency = self.talk_frequency_control.get_current_talk_frequency()
modified_exit_count_threshold = self.focus_energy * 0.5 / talk_frequency
modified_exit_interest_threshold = 1.5 / talk_frequency
async def caculate_interest_value(self, recent_messages_list: List["DatabaseMessages"]) -> float:
total_interest = 0.0
for msg in new_message:
for msg in recent_messages_list:
interest_value = msg.interest_value
if interest_value is not None and msg.processed_plain_text:
total_interest += float(interest_value)
if new_message_count >= modified_exit_count_threshold:
self.recent_interest_records.append(total_interest)
logger.info(
f"{self.log_prefix} 累计消息数量达到{new_message_count}条(>{modified_exit_count_threshold:.1f}),结束等待"
)
# logger.info(self.last_read_time)
# logger.info(new_message)
return True, total_interest / new_message_count if new_message_count > 0 else 0.0
# 检查累计兴趣值
if new_message_count > 0:
# 只在兴趣值变化时输出log
if not hasattr(self, "_last_accumulated_interest") or total_interest != self._last_accumulated_interest:
logger.info(
f"{self.log_prefix} 休息中,新消息:{new_message_count}条,累计兴趣值: {total_interest:.2f}, 活跃度: {talk_frequency:.1f}"
)
self._last_accumulated_interest = total_interest
if total_interest >= modified_exit_interest_threshold:
# 记录兴趣度到列表
self.recent_interest_records.append(total_interest)
logger.info(
f"{self.log_prefix} 累计兴趣值达到{total_interest:.2f}(>{modified_exit_interest_threshold:.1f}),结束等待"
)
return True, total_interest / new_message_count if new_message_count > 0 else 0.0
# 每10秒输出一次等待状态
if int(time.time() - self.last_read_time) > 0 and int(time.time() - self.last_read_time) % 15 == 0:
logger.debug(
f"{self.log_prefix} 已等待{time.time() - self.last_read_time:.0f}秒,累计{new_message_count}条消息,累计兴趣{total_interest:.1f},继续等待..."
)
await asyncio.sleep(0.5)
return False, 0.0
return total_interest / len(recent_messages_list)
async def _loopbody(self):
recent_messages_list = message_api.get_messages_by_time_in_chat(
@@ -279,16 +197,13 @@ class HeartFChatting:
filter_mai=True,
filter_command=True,
)
# 统一的消息处理逻辑
should_process, interest_value = await self._should_process_messages(recent_messages_list)
if should_process:
if recent_messages_list:
self.last_read_time = time.time()
await self._observe(interest_value=interest_value)
await self._observe(interest_value=await self.caculate_interest_value(recent_messages_list),recent_messages_list=recent_messages_list)
else:
# Normal模式消息数量不足等待
await asyncio.sleep(0.5)
await asyncio.sleep(0.2)
return True
return True
@@ -342,8 +257,7 @@ class HeartFChatting:
return loop_info, reply_text, cycle_timers
async def _observe(self, interest_value: float = 0.0) -> bool:
action_type = "no_action"
async def _observe(self, interest_value: float = 0.0,recent_messages_list: List["DatabaseMessages"] = []) -> bool:
reply_text = "" # 初始化reply_text变量避免UnboundLocalError
# 使用sigmoid函数将interest_value转换为概率
@@ -362,22 +276,28 @@ class HeartFChatting:
normal_mode_probability = (
calculate_normal_mode_probability(interest_value)
* 2
* self.talk_frequency_control.get_current_talk_frequency()
* self.frequency_control.get_final_talk_frequency()
)
#对呼唤名字进行增幅
for msg in recent_messages_list:
if msg.reply_probability_boost is not None and msg.reply_probability_boost > 0.0:
normal_mode_probability += msg.reply_probability_boost
if global_config.chat.mentioned_bot_reply and msg.is_mentioned:
normal_mode_probability += global_config.chat.mentioned_bot_reply
if global_config.chat.at_bot_inevitable_reply and msg.is_at:
normal_mode_probability += global_config.chat.at_bot_inevitable_reply
# 根据概率决定使用哪种模式
# 根据概率决定使用直接回复
interest_triggerd = False
focus_triggerd = False
if random.random() < normal_mode_probability:
mode = ChatMode.NORMAL
interest_triggerd = True
logger.info(
f"{self.log_prefix}兴趣({interest_value:.2f}),在{normal_mode_probability * 100:.0f}%概率下选择回复"
f"{self.log_prefix}新消息,在{normal_mode_probability * 100:.0f}%概率下选择回复"
)
else:
mode = ChatMode.FOCUS
# 创建新的循环信息
cycle_timers, thinking_id = self.start_cycle()
logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考")
if s4u_config.enable_s4u:
await send_typing()
@@ -386,16 +306,21 @@ class HeartFChatting:
await self.expression_learner.trigger_learning_for_chat()
available_actions: Dict[str, ActionInfo] = {}
if random.random() > self.focus_value_control.get_current_focus_value() and mode == ChatMode.FOCUS:
# 如果激活度没有激活并且聊天活跃度低有可能不进行plan相当于不在电脑前不进行认真思考
action_to_use_info = [
ActionPlannerInfo(
action_type="no_action",
reasoning="专注不足",
action_data={},
)
]
else:
#如果兴趣度不足以激活
if not interest_triggerd:
#看看专注值够不够
if random.random() < self.frequency_control.get_final_focus_value():
#专注值足够,仍然进入正式思考
focus_triggerd = True #都没触发,路边
# 任意一种触发都行
if interest_triggerd or focus_triggerd:
# 进入正式思考模式
cycle_timers, thinking_id = self.start_cycle()
logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考")
# 第一步:动作检查
with Timer("动作检查", cycle_timers):
try:
@@ -433,103 +358,93 @@ class HeartFChatting:
):
return False
with Timer("规划器", cycle_timers):
# 根据不同触发进入不同plan
if focus_triggerd:
mode = ChatMode.FOCUS
else:
mode = ChatMode.NORMAL
action_to_use_info, _ = await self.action_planner.plan(
mode=mode,
loop_start_time=self.last_read_time,
available_actions=available_actions,
)
# for action in action_to_use_info:
# print(action.action_type)
# 3. 并行执行所有动作
action_tasks = [
asyncio.create_task(
self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers)
)
for action in action_to_use_info
]
# 3. 并行执行所有动作
action_tasks = [
asyncio.create_task(
self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers)
)
for action in action_to_use_info
]
# 并行执行所有任务
results = await asyncio.gather(*action_tasks, return_exceptions=True)
# 并行执行所有任务
results = await asyncio.gather(*action_tasks, return_exceptions=True)
# 处理执行结果
reply_loop_info = None
reply_text_from_reply = ""
action_success = False
action_reply_text = ""
action_command = ""
# 处理执行结果
reply_loop_info = None
reply_text_from_reply = ""
action_success = False
action_reply_text = ""
action_command = ""
for i, result in enumerate(results):
if isinstance(result, BaseException):
logger.error(f"{self.log_prefix} 动作执行异常: {result}")
continue
for i, result in enumerate(results):
if isinstance(result, BaseException):
logger.error(f"{self.log_prefix} 动作执行异常: {result}")
continue
_cur_action = action_to_use_info[i]
if result["action_type"] != "reply":
action_success = result["success"]
action_reply_text = result["reply_text"]
action_command = result.get("command", "")
elif result["action_type"] == "reply":
if result["success"]:
reply_loop_info = result["loop_info"]
reply_text_from_reply = result["reply_text"]
else:
logger.warning(f"{self.log_prefix} 回复动作执行失败")
_cur_action = action_to_use_info[i]
if result["action_type"] != "reply":
action_success = result["success"]
action_reply_text = result["reply_text"]
action_command = result.get("command", "")
elif result["action_type"] == "reply":
if result["success"]:
reply_loop_info = result["loop_info"]
reply_text_from_reply = result["reply_text"]
else:
logger.warning(f"{self.log_prefix} 回复动作执行失败")
# 构建最终的循环信息
if reply_loop_info:
# 如果有回复信息,使用回复的loop_info作为基础
loop_info = reply_loop_info
# 更新动作执行信息
loop_info["loop_action_info"].update(
{
"action_taken": action_success,
"command": action_command,
"taken_time": time.time(),
# 构建最终的循环信息
if reply_loop_info:
# 如果有回复信息使用回复的loop_info作为基础
loop_info = reply_loop_info
# 更新动作执行信息
loop_info["loop_action_info"].update(
{
"action_taken": action_success,
"command": action_command,
"taken_time": time.time(),
}
)
reply_text = reply_text_from_reply
else:
# 有回复信息,构建纯动作的loop_info
loop_info = {
"loop_plan_info": {
"action_result": action_to_use_info,
},
"loop_action_info": {
"action_taken": action_success,
"reply_text": action_reply_text,
"command": action_command,
"taken_time": time.time(),
},
}
)
reply_text = reply_text_from_reply
else:
# 没有回复信息构建纯动作的loop_info
loop_info = {
"loop_plan_info": {
"action_result": action_to_use_info,
},
"loop_action_info": {
"action_taken": action_success,
"reply_text": action_reply_text,
"command": action_command,
"taken_time": time.time(),
},
}
reply_text = action_reply_text
reply_text = action_reply_text
self.end_cycle(loop_info, cycle_timers)
self.print_cycle_info(cycle_timers)
if s4u_config.enable_s4u:
await stop_typing()
await mai_thinking_manager.get_mai_think(self.stream_id).do_think_after_response(reply_text)
"""S4U内容暂时保留"""
if s4u_config.enable_s4u:
await stop_typing()
await mai_thinking_manager.get_mai_think(self.stream_id).do_think_after_response(reply_text)
"""S4U内容暂时保留"""
self.end_cycle(loop_info, cycle_timers)
self.print_cycle_info(cycle_timers)
# await self.willing_manager.after_generate_reply_handle(message_data.get("message_id", ""))
action_type = action_to_use_info[0].action_type if action_to_use_info else "no_action"
# 管理no_action计数器当执行了非no_action动作时重置计数器
if action_type != "no_action":
# no_action逻辑已集成到heartFC_chat.py中直接重置计数器
self.recent_interest_records.clear()
self.no_action_consecutive = 0
logger.debug(f"{self.log_prefix} 执行了{action_type}动作重置no_action计数器")
return True
if action_type == "no_action":
self.no_action_consecutive += 1
self._determine_form_type()
return True
async def _main_chat_loop(self):
"""主循环,持续进行计划并可能回复消息,直到被外部取消。"""
try:

View File

@@ -32,10 +32,10 @@ async def _calculate_interest(message: MessageRecv) -> Tuple[float, list[str]]:
Returns:
Tuple[float, bool, list[str]]: (兴趣度, 是否被提及, 关键词)
"""
if message.is_picid:
if message.is_picid or message.is_emoji:
return 0.0, []
is_mentioned, _ = is_mentioned_bot_in_message(message)
is_mentioned,is_at,reply_probability_boost = is_mentioned_bot_in_message(message)
interested_rate = 0.0
with Timer("记忆激活"):
@@ -79,17 +79,13 @@ async def _calculate_interest(message: MessageRecv) -> Tuple[float, list[str]]:
# 确保在范围内
base_interest = min(max(base_interest, 0.01), 0.3)
interested_rate += base_interest
if is_mentioned:
interest_increase_on_mention = 2
interested_rate += interest_increase_on_mention
message.interest_value = interested_rate
message.interest_value = base_interest
message.is_mentioned = is_mentioned
return interested_rate, keywords
message.is_at = is_at
message.reply_probability_boost = reply_probability_boost
return base_interest, keywords
class HeartFCMessageReceiver: