feat:改为单planner,并解析多个动作

This commit is contained in:
SengokuCola
2025-09-11 14:25:02 +08:00
parent 8ed94d1f26
commit a4285673aa
7 changed files with 1284 additions and 886 deletions

View File

@@ -1,6 +1,9 @@
from typing import Optional
from datetime import datetime, timedelta
import statistics
from src.config.config import global_config
from src.chat.frequency_control.utils import parse_stream_config_to_chat_id
from src.common.database.database_model import Messages
def get_config_base_talk_frequency(chat_id: Optional[str] = None) -> float:
@@ -124,3 +127,146 @@ def get_global_frequency() -> Optional[float]:
return get_time_based_frequency(config_item[1:])
return None
def get_weekly_hourly_message_stats(chat_id: str):
"""
计算指定聊天最近一周每个小时的消息数量和用户数量
Args:
chat_id: 聊天ID对应 Messages 表的 chat_id 字段)
Returns:
dict: 包含24个小时统计数据格式为:
{
"0": {"message_count": [5, 8, 3, 12, 6, 9, 7], "message_std_dev": 2.1},
"1": {"message_count": [10, 15, 8, 20, 12, 18, 14], "message_std_dev": 3.2},
...
}
"""
# 计算一周前的时间戳
one_week_ago = datetime.now() - timedelta(days=7)
one_week_ago_timestamp = one_week_ago.timestamp()
# 初始化数据结构:按小时存储每天的消息计数
hourly_data = {}
for hour in range(24):
hourly_data[f"hour_{hour}"] = {"daily_counts": []}
try:
# 查询指定聊天最近一周的消息
messages = Messages.select().where(
(Messages.time >= one_week_ago_timestamp) &
(Messages.chat_id == chat_id)
)
# 统计每个小时的数据
for message in messages:
# 将时间戳转换为datetime
msg_time = datetime.fromtimestamp(message.time)
hour = msg_time.hour
# 记录每天的消息计数(按日期分组)
day_key = msg_time.strftime("%Y-%m-%d")
hour_key = f"{hour}"
# 为该小时添加当天的消息计数
found = False
for day_count in hourly_data[hour_key]["daily_counts"]:
if day_count["date"] == day_key:
day_count["count"] += 1
found = True
break
if not found:
hourly_data[hour_key]["daily_counts"].append({"date": day_key, "count": 1})
except Exception as e:
# 如果查询失败,返回空的统计结果
print(f"Error getting weekly hourly message stats for chat {chat_id}: {e}")
hourly_stats = {}
for hour in range(24):
hourly_stats[f"hour_{hour}"] = {
"message_count": [],
"message_std_dev": 0.0
}
return hourly_stats
# 计算每个小时的统计结果
hourly_stats = {}
for hour in range(24):
hour_key = f"hour_{hour}"
daily_counts = [day["count"] for day in hourly_data[hour_key]["daily_counts"]]
# 计算总消息数
total_messages = sum(daily_counts)
# 计算标准差
message_std_dev = 0.0
if len(daily_counts) > 1:
message_std_dev = statistics.stdev(daily_counts)
elif len(daily_counts) == 1:
message_std_dev = 0.0
# 按日期排序每日消息计数
daily_counts_sorted = sorted(hourly_data[hour_key]["daily_counts"], key=lambda x: x["date"])
hourly_stats[hour_key] = {
"message_count": [day["count"] for day in daily_counts_sorted],
"message_std_dev": message_std_dev
}
return hourly_stats
def get_recent_15min_stats(chat_id: str):
"""
获取最近15分钟指定聊天的消息数量和发言人数
Args:
chat_id: 聊天ID对应 Messages 表的 chat_id 字段)
Returns:
dict: 包含消息数量和发言人数,格式为:
{
"message_count": 25,
"user_count": 8,
"time_range": "2025-01-01 14:30:00 - 2025-01-01 14:45:00"
}
"""
# 计算15分钟前的时间戳
fifteen_min_ago = datetime.now() - timedelta(minutes=15)
fifteen_min_ago_timestamp = fifteen_min_ago.timestamp()
current_time = datetime.now()
# 初始化统计结果
message_count = 0
user_set = set()
try:
# 查询最近15分钟的消息
messages = Messages.select().where(
(Messages.time >= fifteen_min_ago_timestamp) &
(Messages.chat_id == chat_id)
)
# 统计消息数量和用户
for message in messages:
message_count += 1
if message.user_id:
user_set.add(message.user_id)
except Exception as e:
# 如果查询失败,返回空结果
print(f"Error getting recent 15min stats for chat {chat_id}: {e}")
return {
"message_count": 0,
"user_count": 0,
"time_range": f"{fifteen_min_ago.strftime('%Y-%m-%d %H:%M:%S')} - {current_time.strftime('%Y-%m-%d %H:%M:%S')}"
}
return {
"message_count": message_count,
"user_count": len(user_set),
"time_range": f"{fifteen_min_ago.strftime('%Y-%m-%d %H:%M:%S')} - {current_time.strftime('%Y-%m-%d %H:%M:%S')}"
}

View File

@@ -18,7 +18,6 @@ from src.chat.planner_actions.action_modifier import ActionModifier
from src.chat.planner_actions.action_manager import ActionManager
from src.chat.heart_flow.hfc_utils import CycleDetail
from src.chat.heart_flow.hfc_utils import send_typing, stop_typing
from src.chat.frequency_control.frequency_control import frequency_control_manager
from src.chat.express.expression_learner import expression_learner_manager
from src.person_info.person_info import Person
from src.plugin_system.base.component_types import ChatMode, EventType, ActionInfo
@@ -52,6 +51,16 @@ ERROR_LOOP_INFO = {
},
}
# ?什么时候发言:
# 1.聊天频率较低:与过去该时段发言频率进行比较
# 2.感兴趣的话题暂时使用Emb计算
# 3.感兴趣的人:认识次数
# 4.直接提及
# 什么时候不发言:
# 1.敏感话题:判断较难
# 2.发言频率太高:近时判断,例如发言频率> 1/人数 *2
# 3.明确被拒绝planner判断
install(extra_lines=3)
@@ -85,8 +94,6 @@ class HeartFChatting:
self.expression_learner = expression_learner_manager.get_expression_learner(self.stream_id)
self.frequency_control = frequency_control_manager.get_or_create_frequency_control(self.stream_id)
self.action_manager = ActionManager()
self.action_planner = ActionPlanner(chat_id=self.stream_id, action_manager=self.action_manager)
self.action_modifier = ActionModifier(action_manager=self.action_manager, chat_id=self.stream_id)
@@ -101,6 +108,10 @@ class HeartFChatting:
self._current_cycle_detail: CycleDetail = None # type: ignore
self.last_read_time = time.time() - 10
self.no_reply_until_call = False
async def start(self):
"""检查是否需要启动主循环,如果未激活则启动。"""
@@ -156,38 +167,12 @@ class HeartFChatting:
formatted_time = f"{elapsed * 1000:.2f}毫秒" if elapsed < 1 else f"{elapsed:.2f}"
timer_strings.append(f"{name}: {formatted_time}")
# 获取动作类型,兼容新旧格式
# 移除无用代码
# action_type = "未知动作"
# if hasattr(self, "_current_cycle_detail") and self._current_cycle_detail:
# loop_plan_info = self._current_cycle_detail.loop_plan_info
# if isinstance(loop_plan_info, dict):
# action_result = loop_plan_info.get("action_result", {})
# if isinstance(action_result, dict):
# # 旧格式action_result是字典
# action_type = action_result.get("action_type", "未知动作")
# elif isinstance(action_result, list) and action_result:
# # 新格式action_result是actions列表
# # TODO: 把这里写明白
# action_type = action_result[0].action_type or "未知动作"
# elif isinstance(loop_plan_info, list) and loop_plan_info:
# # 直接是actions列表的情况
# action_type = loop_plan_info[0].get("action_type", "未知动作")
logger.info(
f"{self.log_prefix}{self._current_cycle_detail.cycle_id}次思考,"
f"耗时: {self._current_cycle_detail.end_time - self._current_cycle_detail.start_time:.1f}" # type: ignore
+ (f"\n详情: {'; '.join(timer_strings)}" if timer_strings else "")
)
async def calculate_interest_value(self, recent_messages_list: List["DatabaseMessages"]) -> float:
total_interest = 0.0
for msg in recent_messages_list:
interest_value = msg.interest_value
if interest_value is not None and msg.processed_plain_text:
total_interest += float(interest_value)
return total_interest / len(recent_messages_list)
async def _loopbody(self):
recent_messages_list = message_api.get_messages_by_time_in_chat(
chat_id=self.stream_id,
@@ -200,9 +185,22 @@ class HeartFChatting:
)
if recent_messages_list:
# !处理no_reply_until_call逻辑
if self.no_reply_until_call:
for message in recent_messages_list:
if message.is_mentioned or message.is_at:
self.no_reply_until_call = False
break
# 没有提到,继续保持沉默
if self.no_reply_until_call:
logger.info(f"{self.log_prefix} 没有提到,继续保持沉默")
await asyncio.sleep(1)
return True
self.last_read_time = time.time()
await self._observe(
interest_value=await self.calculate_interest_value(recent_messages_list),
recent_messages_list=recent_messages_list,
)
else:
@@ -262,190 +260,140 @@ class HeartFChatting:
return loop_info, reply_text, cycle_timers
async def _observe(
self, interest_value: float = 0.0, recent_messages_list: Optional[List["DatabaseMessages"]] = None
self, # interest_value: float = 0.0,
recent_messages_list: Optional[List["DatabaseMessages"]] = None
) -> bool:
if recent_messages_list is None:
recent_messages_list = []
reply_text = "" # 初始化reply_text变量避免UnboundLocalError
# 使用sigmoid函数将interest_value转换为概率
# 当interest_value为0时概率接近0使用Focus模式
# 当interest_value很高时概率接近1使用Normal模式
def calculate_normal_mode_probability(interest_val: float) -> float:
# 使用sigmoid函数调整参数使概率分布更合理
# 当interest_value = 0时概率约为0.1
# 当interest_value = 1时概率约为0.5
# 当interest_value = 2时概率约为0.8
# 当interest_value = 3时概率约为0.95
k = 2.0 # 控制曲线陡峭程度
x0 = 1.0 # 控制曲线中心点
return 1.0 / (1.0 + math.exp(-k * (interest_val - x0)))
normal_mode_probability = (
calculate_normal_mode_probability(interest_value) * 2 * self.frequency_control.get_final_talk_frequency()
)
# 对呼唤名字进行增幅
for msg in recent_messages_list:
if msg.reply_probability_boost is not None and msg.reply_probability_boost > 0.0:
normal_mode_probability += msg.reply_probability_boost
if global_config.chat.mentioned_bot_reply and msg.is_mentioned:
normal_mode_probability += global_config.chat.mentioned_bot_reply
if global_config.chat.at_bot_inevitable_reply and msg.is_at:
normal_mode_probability += global_config.chat.at_bot_inevitable_reply
# 根据概率决定使用直接回复
interest_triggered = False
focus_triggered = False
if random.random() < normal_mode_probability:
interest_triggered = True
logger.info(f"{self.log_prefix} 有新消息,在{normal_mode_probability * 100:.0f}%概率下选择回复")
if s4u_config.enable_s4u:
await send_typing()
async with global_prompt_manager.async_message_scope(self.chat_stream.context.get_template_name()):
await self.expression_learner.trigger_learning_for_chat()
cycle_timers, thinking_id = self.start_cycle()
logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考")
# 第一步:动作检查
available_actions: Dict[str, ActionInfo] = {}
try:
await self.action_modifier.modify_actions()
available_actions = self.action_manager.get_using_actions()
except Exception as e:
logger.error(f"{self.log_prefix} 动作修改失败: {e}")
# 如果兴趣度不足以激活
if not interest_triggered:
# 看看专注值够不够
if random.random() < self.frequency_control.get_final_focus_value():
# 专注值足够,仍然进入正式思考
focus_triggered = True # 都没触发,路边
# 执行planner
is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info()
# 任意一种触发都行
if interest_triggered or focus_triggered:
# 进入正式思考模式
cycle_timers, thinking_id = self.start_cycle()
logger.info(f"{self.log_prefix} 开始第{self._cycle_counter}次思考")
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=self.stream_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.6),
)
chat_content_block, message_id_list = build_readable_messages_with_id(
messages=message_list_before_now,
timestamp_mode="normal_no_YMD",
read_mark=self.action_planner.last_obs_time_mark,
truncate=True,
show_actions=True,
)
# 第一步:动作检查
try:
await self.action_modifier.modify_actions()
available_actions = self.action_manager.get_using_actions()
except Exception as e:
logger.error(f"{self.log_prefix} 动作修改失败: {e}")
# 执行planner
is_group_chat, chat_target_info, _ = self.action_planner.get_necessary_info()
message_list_before_now = get_raw_msg_before_timestamp_with_chat(
chat_id=self.stream_id,
timestamp=time.time(),
limit=int(global_config.chat.max_context_size * 0.6),
)
chat_content_block, message_id_list = build_readable_messages_with_id(
messages=message_list_before_now,
timestamp_mode="normal_no_YMD",
read_mark=self.action_planner.last_obs_time_mark,
truncate=True,
show_actions=True,
prompt_info = await self.action_planner.build_planner_prompt(
is_group_chat=is_group_chat,
chat_target_info=chat_target_info,
current_available_actions=available_actions,
chat_content_block=chat_content_block,
message_id_list=message_id_list,
interest=global_config.personality.interest,
)
continue_flag, modified_message = await events_manager.handle_mai_events(
EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id
)
if not continue_flag:
return False
if modified_message and modified_message._modify_flags.modify_llm_prompt:
prompt_info = (modified_message.llm_prompt, prompt_info[1])
with Timer("规划器", cycle_timers):
action_to_use_info, _ = await self.action_planner.plan(
loop_start_time=self.last_read_time,
available_actions=available_actions,
)
prompt_info = await self.action_planner.build_planner_prompt(
is_group_chat=is_group_chat,
chat_target_info=chat_target_info,
# current_available_actions=planner_info[2],
chat_content_block=chat_content_block,
# actions_before_now_block=actions_before_now_block,
message_id_list=message_id_list,
# 3. 并行执行所有动作
action_tasks = [
asyncio.create_task(
self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers)
)
continue_flag, modified_message = await events_manager.handle_mai_events(
EventType.ON_PLAN, None, prompt_info[0], None, self.chat_stream.stream_id
)
if not continue_flag:
return False
if modified_message and modified_message._modify_flags.modify_llm_prompt:
prompt_info = (modified_message.llm_prompt, prompt_info[1])
with Timer("规划器", cycle_timers):
# 根据不同触发进入不同plan
if focus_triggered:
mode = ChatMode.FOCUS
for action in action_to_use_info
]
# 并行执行所有任务
results = await asyncio.gather(*action_tasks, return_exceptions=True)
# 处理执行结果
reply_loop_info = None
reply_text_from_reply = ""
action_success = False
action_reply_text = ""
action_command = ""
for i, result in enumerate(results):
if isinstance(result, BaseException):
logger.error(f"{self.log_prefix} 动作执行异常: {result}")
continue
_cur_action = action_to_use_info[i]
if result["action_type"] != "reply":
action_success = result["success"]
action_reply_text = result["reply_text"]
action_command = result.get("command", "")
elif result["action_type"] == "reply":
if result["success"]:
reply_loop_info = result["loop_info"]
reply_text_from_reply = result["reply_text"]
else:
mode = ChatMode.NORMAL
logger.warning(f"{self.log_prefix} 回复动作执行失败")
action_to_use_info, _ = await self.action_planner.plan(
mode=mode,
loop_start_time=self.last_read_time,
available_actions=available_actions,
)
# 3. 并行执行所有动作
action_tasks = [
asyncio.create_task(
self._execute_action(action, action_to_use_info, thinking_id, available_actions, cycle_timers)
)
for action in action_to_use_info
]
# 并行执行所有任务
results = await asyncio.gather(*action_tasks, return_exceptions=True)
# 处理执行结果
reply_loop_info = None
reply_text_from_reply = ""
action_success = False
action_reply_text = ""
action_command = ""
for i, result in enumerate(results):
if isinstance(result, BaseException):
logger.error(f"{self.log_prefix} 动作执行异常: {result}")
continue
_cur_action = action_to_use_info[i]
if result["action_type"] != "reply":
action_success = result["success"]
action_reply_text = result["reply_text"]
action_command = result.get("command", "")
elif result["action_type"] == "reply":
if result["success"]:
reply_loop_info = result["loop_info"]
reply_text_from_reply = result["reply_text"]
else:
logger.warning(f"{self.log_prefix} 回复动作执行失败")
# 构建最终的循环信息
if reply_loop_info:
# 如果有回复信息使用回复的loop_info作为基础
loop_info = reply_loop_info
# 更新动作执行信息
loop_info["loop_action_info"].update(
{
"action_taken": action_success,
"command": action_command,
"taken_time": time.time(),
}
)
reply_text = reply_text_from_reply
else:
# 没有回复信息构建纯动作的loop_info
loop_info = {
"loop_plan_info": {
"action_result": action_to_use_info,
},
"loop_action_info": {
"action_taken": action_success,
"reply_text": action_reply_text,
"command": action_command,
"taken_time": time.time(),
},
# 构建最终的循环信息
if reply_loop_info:
# 如果有回复信息使用回复的loop_info作为基础
loop_info = reply_loop_info
# 更新动作执行信息
loop_info["loop_action_info"].update(
{
"action_taken": action_success,
"command": action_command,
"taken_time": time.time(),
}
reply_text = action_reply_text
)
reply_text = reply_text_from_reply
else:
# 没有回复信息构建纯动作的loop_info
loop_info = {
"loop_plan_info": {
"action_result": action_to_use_info,
},
"loop_action_info": {
"action_taken": action_success,
"reply_text": action_reply_text,
"command": action_command,
"taken_time": time.time(),
},
}
reply_text = action_reply_text
self.end_cycle(loop_info, cycle_timers)
self.print_cycle_info(cycle_timers)
self.end_cycle(loop_info, cycle_timers)
self.print_cycle_info(cycle_timers)
"""S4U内容暂时保留"""
if s4u_config.enable_s4u:
await stop_typing()
await mai_thinking_manager.get_mai_think(self.stream_id).do_think_after_response(reply_text)
"""S4U内容暂时保留"""
"""S4U内容暂时保留"""
if s4u_config.enable_s4u:
await stop_typing()
await mai_thinking_manager.get_mai_think(self.stream_id).do_think_after_response(reply_text)
"""S4U内容暂时保留"""
return True
@@ -579,7 +527,7 @@ class HeartFChatting:
):
"""执行单个动作的通用函数"""
try:
if action_planner_info.action_type == "no_action":
if action_planner_info.action_type == "no_reply":
# 直接处理no_action逻辑不再通过动作系统
reason = action_planner_info.reasoning or "选择不回复"
logger.info(f"{self.log_prefix} 选择不回复,原因: {reason}")
@@ -594,26 +542,19 @@ class HeartFChatting:
action_data={"reason": reason},
action_name="no_action",
)
return {"action_type": "no_action", "success": True, "reply_text": "", "command": ""}
elif action_planner_info.action_type != "reply":
# 执行普通动作
with Timer("动作执行", cycle_timers):
success, reply_text, command = await self._handle_action(
action_planner_info.action_type,
action_planner_info.reasoning or "",
action_planner_info.action_data or {},
cycle_timers,
thinking_id,
action_planner_info.action_message,
)
return {
"action_type": action_planner_info.action_type,
"success": success,
"reply_text": reply_text,
"command": command,
}
else:
elif action_planner_info.action_type == "wait_time":
logger.info(f"{self.log_prefix} 等待{action_planner_info.action_data['time']}秒后回复")
await asyncio.sleep(action_planner_info.action_data["time"])
return {"action_type": "wait_time", "success": True, "reply_text": "", "command": ""}
elif action_planner_info.action_type == "no_reply_until_call":
logger.info(f"{self.log_prefix} 保持沉默,直到有人直接叫的名字")
self.no_reply_until_call = True
return {"action_type": "no_reply_until_call", "success": True, "reply_text": "", "command": ""}
elif action_planner_info.action_type == "reply":
try:
success, llm_response = await generator_api.generate_reply(
chat_stream=self.chat_stream,
@@ -652,6 +593,26 @@ class HeartFChatting:
"reply_text": reply_text,
"loop_info": loop_info,
}
# 其他动作
else:
# 执行普通动作
with Timer("动作执行", cycle_timers):
success, reply_text, command = await self._handle_action(
action_planner_info.action_type,
action_planner_info.reasoning or "",
action_planner_info.action_data or {},
cycle_timers,
thinking_id,
action_planner_info.action_message,
)
return {
"action_type": action_planner_info.action_type,
"success": success,
"reply_text": reply_text,
"command": command,
}
except Exception as e:
logger.error(f"{self.log_prefix} 执行动作时出错: {e}")
logger.error(f"{self.log_prefix} 错误信息: {traceback.format_exc()}")

File diff suppressed because it is too large Load Diff