应要求提交上未完成的HFC, expression部分

This commit is contained in:
UnCLAS-Prommer
2026-02-28 21:14:46 +08:00
parent c58da95353
commit a39ccedb9a
6 changed files with 423 additions and 1080 deletions

View File

@@ -0,0 +1,60 @@
from typing import TYPE_CHECKING, Optional
import time
from src.common.logger import get_logger
from src.common.database.database import get_db_session
from src.llm_models.utils_model import LLMRequest
from src.config.config import model_config
if TYPE_CHECKING:
from src.common.data_models.expression_data_model import MaiExpression
# TODO: 这个LLMRequest实例被更优雅的方式替换掉
judge_model = LLMRequest(model_set=model_config.model_task_config.tool_use, request_type="reflect.tracker")
logger = get_logger("reflect_tracker")
class ReflectTracker:
def __init__(self, session_id: str):
self.session_id = session_id
self.last_check_msg_count = 0
self.max_msg_count = 30
self.max_duration = 15 * 60 # 15 分钟
self.expression: Optional["MaiExpression"] = None # 当前正在追踪的表达,由外部设置
# 运行状态
self.tracking = False
self.tracking_start_time: float = 0.0
def register_expression_and_track(self, expression: "MaiExpression"):
"""注册需要追踪的表达"""
if self.tracking:
raise RuntimeError("ReflectTracker is already tracking an expression.")
self.expression = expression
self.tracking = True
self.tracking_start_time = time.time()
def _reset_tracker(self):
"""重置追踪状态"""
self.expression = None
self.tracking = False
self.last_check_msg_count = 0
async def trigger_tracker(self) -> bool:
"""
触发追踪检查
Returns:
return (bool): 如果返回True表示追踪完成Tracker运行结束运行状态置为`False`如果返回False表示继续追踪
"""
# 对于没有正在追踪的表达直接返回False
if not self.tracking:
return False
# 检查是否超时(无论是消息数量还是时间)
if time.time() - self.tracking_start_time > self.max_duration:
self._reset_tracker()
return True
# TODO: 完成追踪检查逻辑

View File

@@ -1,22 +1,35 @@
from rich.traceback import install
from sqlmodel import select
from typing import TYPE_CHECKING
import random
import time
from typing import Optional, Dict
from src.common.logger import get_logger
from src.common.database.database_model import Expression
from src.config.config import global_config
from src.chat.message_receive.chat_stream import get_chat_manager
from src.plugin_system.apis import send_api
from src.common.database.database_model import Expression
from src.common.database.database import get_db_session
from src.common.data_models.expression_data_model import MaiExpression
from src.common.utils.utils_session import SessionUtils
from .expression_reflect_tracker import ReflectTracker
if TYPE_CHECKING:
from src.config.official_configs import TargetItem
logger = get_logger("expression_reflector")
install(extra_lines=3)
LOG_PREFIX = "[Expression Reflector]"
class ExpressionReflector:
"""表达反思器,管理单个聊天流的表达反思提问"""
"""表达反思器,管理单个聊天流的表达反思提问使用每个session_id独立的实例"""
def __init__(self, chat_id: str):
self.chat_id = chat_id
self.last_ask_time: float = 0.0
def __init__(self, session_id: str):
self.session_id = session_id
self.last_ask_time: float = time.time()
self.reflect_tracker: ReflectTracker = ReflectTracker(session_id)
async def check_and_ask(self) -> bool:
"""
@@ -25,226 +38,99 @@ class ExpressionReflector:
Returns:
bool: 是否执行了提问
"""
try:
logger.debug(f"[Expression Reflection] 开始检查是否需要提问 (stream_id: {self.chat_id})")
if not global_config.expression.expression_manual_reflect:
logger.debug("[Expression Reflection] 表达反思功能未启用,跳过")
return False
operator_config = global_config.expression.manual_reflect_operator_id
if not operator_config:
logger.debug("[Expression Reflection] Operator ID 未配置,跳过")
return False
# 检查是否在允许列表中
allow_reflect = global_config.expression.allow_reflect
if allow_reflect:
# 将 allow_reflect 中的 platform:id:type 格式转换为 chat_id 列表
allow_reflect_chat_ids = []
for stream_config in allow_reflect:
parsed_chat_id = global_config.expression._parse_stream_config_to_chat_id(stream_config)
if parsed_chat_id:
allow_reflect_chat_ids.append(parsed_chat_id)
else:
logger.warning(f"[Expression Reflection] 无法解析 allow_reflect 配置项: {stream_config}")
if self.chat_id not in allow_reflect_chat_ids:
logger.info(f"[Expression Reflection] 当前聊天流 {self.chat_id} 不在允许列表中,跳过")
return False
# 检查上一次提问时间
current_time = time.time()
time_since_last_ask = current_time - self.last_ask_time
# 5-10分钟间隔随机选择
min_interval = 10 * 60 # 5分钟
max_interval = 15 * 60 # 10分钟
interval = random.uniform(min_interval, max_interval)
logger.info(
f"[Expression Reflection] 上次提问时间: {self.last_ask_time:.2f}, 当前时间: {current_time:.2f}, 已过时间: {time_since_last_ask:.2f}秒 ({time_since_last_ask / 60:.2f}分钟), 需要间隔: {interval:.2f}秒 ({interval / 60:.2f}分钟)"
)
if time_since_last_ask < interval:
remaining_time = interval - time_since_last_ask
logger.info(
f"[Expression Reflection] 距离上次提问时间不足,还需等待 {remaining_time:.2f}秒 ({remaining_time / 60:.2f}分钟),跳过"
)
return False
# 检查是否已经有针对该 Operator 的 Tracker 在运行
logger.info(f"[Expression Reflection] 检查 Operator {operator_config} 是否已有活跃的 Tracker")
if await _check_tracker_exists(operator_config):
logger.info(f"[Expression Reflection] Operator {operator_config} 已有活跃的 Tracker跳过本次提问")
return False
# 获取未检查的表达
try:
logger.info("[Expression Reflection] 查询未检查且未拒绝的表达")
expressions = Expression.select().where((~Expression.checked) & (~Expression.rejected)).limit(50)
expr_list = list(expressions)
logger.info(f"[Expression Reflection] 找到 {len(expr_list)} 个候选表达")
if not expr_list:
logger.info("[Expression Reflection] 没有可用的表达,跳过")
return False
target_expr: Expression = random.choice(expr_list)
logger.info(
f"[Expression Reflection] 随机选择了表达 ID: {target_expr.id}, Situation: {target_expr.situation}, Style: {target_expr.style}"
)
# 生成询问文本
ask_text = _generate_ask_text(target_expr)
if not ask_text:
logger.warning("[Expression Reflection] 生成询问文本失败,跳过")
return False
logger.info(f"[Expression Reflection] 准备向 Operator {operator_config} 发送提问")
# 发送给 Operator
await _send_to_operator(operator_config, ask_text, target_expr)
# 更新上一次提问时间
self.last_ask_time = current_time
logger.info(f"[Expression Reflection] 提问成功,已更新上次提问时间为 {current_time:.2f}")
return True
except Exception as e:
logger.error(f"[Expression Reflection] 检查或提问过程中出错: {e}")
import traceback
logger.error(traceback.format_exc())
return False
except Exception as e:
logger.error(f"[Expression Reflection] 检查或提问过程中出错: {e}")
import traceback
logger.error(traceback.format_exc())
if not await self.check_need_ask():
return False
class ExpressionReflectorManager:
"""表达反思管理器,管理多个聊天流的表达反思实例"""
def __init__(self):
self.reflectors: Dict[str, ExpressionReflector] = {}
def get_or_create_reflector(self, chat_id: str) -> ExpressionReflector:
"""获取或创建指定聊天流的表达反思实例"""
if chat_id not in self.reflectors:
self.reflectors[chat_id] = ExpressionReflector(chat_id)
return self.reflectors[chat_id]
# 创建全局实例
expression_reflector_manager = ExpressionReflectorManager()
async def _check_tracker_exists(operator_config: str) -> bool:
"""检查指定 Operator 是否已有活跃的 Tracker"""
from src.bw_learner.reflect_tracker import reflect_tracker_manager
chat_manager = get_chat_manager()
chat_stream = None
# 尝试解析配置字符串 "platform:id:type"
parts = operator_config.split(":")
if len(parts) == 3:
platform = parts[0]
id_str = parts[1]
stream_type = parts[2]
user_info = None
group_info = None
from maim_message import UserInfo, GroupInfo
if stream_type == "group":
group_info = GroupInfo(group_id=id_str, platform=platform)
user_info = UserInfo(user_id="system", user_nickname="System", platform=platform)
elif stream_type == "private":
user_info = UserInfo(user_id=id_str, platform=platform, user_nickname="Operator")
else:
operator_config = global_config.expression.manual_reflect_operator_id
if not operator_config:
logger.debug(f"{LOG_PREFIX} Operator ID 未配置,跳过")
return False
if user_info:
try:
chat_stream = await chat_manager.get_or_create_stream(platform, user_info, group_info)
except Exception as e:
logger.error(f"Failed to get or create chat stream for checking tracker: {e}")
return False
else:
chat_stream = chat_manager.get_stream(operator_config)
if not chat_stream:
if await self.ask_reflection(operator_config):
self.last_ask_time = time.time()
return True
return False
return reflect_tracker_manager.get_tracker(chat_stream.stream_id) is not None
async def check_need_ask(self) -> bool:
"""
检查是否需要提问表达反思
Returns:
bool: 是否执行了提问
"""
if not global_config.expression.expression_manual_reflect:
logger.debug(f"{LOG_PREFIX} 表达反思功能未启用,跳过")
return False
logger.debug(f"{LOG_PREFIX} 开始检查是否需要提问 (session_id: {self.session_id})")
operator_config = global_config.expression.manual_reflect_operator_id
if not operator_config:
logger.debug(f"{LOG_PREFIX} Operator ID 未配置,跳过")
return False
if allow_reflect_list := global_config.expression.allow_reflect:
# 转换配置项为session_id列表
allow_reflect_session_ids = [
self._parse_config_item_2_session_id(stream_config) for stream_config in allow_reflect_list
]
if self.session_id not in allow_reflect_session_ids:
logger.info(f"{LOG_PREFIX} 当前聊天流 {self.session_id} 不在允许列表中,跳过")
return False
# 检查上一次提问时间
current_time = time.time()
time_since_last_ask = current_time - self.last_ask_time
# 随机选择10-15分钟间隔
ask_interval = random.uniform(10 * 60, 15 * 60)
if time_since_last_ask < ask_interval:
logger.info(
f"{LOG_PREFIX} 距离上次提问时间 {time_since_last_ask:.2f} 秒,未达到随机间隔 {ask_interval:.2f} 秒,跳过"
)
return False
if self.reflect_tracker.tracking:
logger.info(f"{LOG_PREFIX} Operator {operator_config} 已有活跃的 Tracker跳过本次提问")
return False
return True
async def ask_reflection(self, operator_config: "TargetItem") -> bool:
"""执行提问表达反思的操作"""
# 选取未检查过的表达
logger.info(f"{LOG_PREFIX} 查询未检查且未拒绝的表达")
try:
with get_db_session() as session:
statement = select(Expression).filter_by(checked=False, rejected=False).limit(50)
results = session.exec(statement).all()
if not results:
logger.info(f"{LOG_PREFIX} 未找到未检查且未拒绝的表达")
return False
logger.info(f"{LOG_PREFIX} 找到 {len(results)} 个未检查且未拒绝的表达")
except Exception as selected_expression:
logger.error(f"{LOG_PREFIX} 查询表达时发生错误: {selected_expression}")
return False
# 随机选取一个表达进行提问
selected_expression = MaiExpression.from_db_instance(random.choice(results))
item_id = selected_expression.item_id
situation = selected_expression.situation
style = selected_expression.style
logger.info(f"{LOG_PREFIX} 随机选择了表达 ID: {item_id}, Situation: {situation}, Style: {style}")
def _generate_ask_text(expr: Expression) -> Optional[str]:
try:
ask_text = (
f"我正在学习新的表达方式,请帮我看看这个是否合适?\n\n"
f"**学习到的表达信息**\n"
f"- 情景 (Situation): {expr.situation}\n"
f"- 风格 (Style): {expr.style}\n"
f"- 情景 (Situation): {situation}\n"
f"- 风格 (Style): {style}\n"
)
return ask_text
except Exception as e:
logger.error(f"Failed to generate ask text: {e}")
return None
# TODO: 在发送相关API重构完成后完成发送给operator的逻辑
async def _send_to_operator(operator_config: str, text: str, expr: Expression):
chat_manager = get_chat_manager()
chat_stream = None
self.reflect_tracker.register_expression_and_track(selected_expression)
return True
# 尝试解析配置字符串 "platform:id:type"
parts = operator_config.split(":")
if len(parts) == 3:
platform = parts[0]
id_str = parts[1]
stream_type = parts[2]
user_info = None
group_info = None
from maim_message import UserInfo, GroupInfo
if stream_type == "group":
group_info = GroupInfo(group_id=id_str, platform=platform)
user_info = UserInfo(user_id="system", user_nickname="System", platform=platform)
elif stream_type == "private":
user_info = UserInfo(user_id=id_str, platform=platform, user_nickname="Operator")
def _parse_config_item_2_session_id(self, config_item: "TargetItem") -> str:
if config_item.rule_type == "group":
return SessionUtils.calculate_session_id(config_item.platform, group_id=str(config_item.item_id))
else:
logger.warning(f"Unknown stream type in operator config: {stream_type}")
return
if user_info:
try:
chat_stream = await chat_manager.get_or_create_stream(platform, user_info, group_info)
except Exception as e:
logger.error(f"Failed to get or create chat stream for operator {operator_config}: {e}")
return
else:
chat_stream = chat_manager.get_stream(operator_config)
if not chat_stream:
logger.warning(f"Could not find or create chat stream for operator: {operator_config}")
return
stream_id = chat_stream.stream_id
# 注册 Tracker
from src.bw_learner.reflect_tracker import ReflectTracker, reflect_tracker_manager
tracker = ReflectTracker(chat_stream=chat_stream, expression=expr, created_time=time.time())
reflect_tracker_manager.add_tracker(stream_id, tracker)
# 发送消息
await send_api.text_to_stream(text=text, stream_id=stream_id, typing=True)
logger.info(f"Sent expression reflect query to operator {operator_config} for expr {expr.id}")
return SessionUtils.calculate_session_id(config_item.platform, user_id=str(config_item.item_id))