diff --git a/prompts/zh-CN/planner.prompt b/prompts/zh-CN/planner.prompt deleted file mode 100644 index d6bf0de7..00000000 --- a/prompts/zh-CN/planner.prompt +++ /dev/null @@ -1,44 +0,0 @@ -{time_block} -{name_block} -{chat_context_description},以下是具体的聊天内容 -**聊天内容** -{chat_content_block} - -**可选的action** -reply -动作描述: -1.你可以选择呼叫了你的名字,但是你没有做出回应的消息进行回复 -2.你可以自然的顺着正在进行的聊天内容进行回复或自然的提出一个问题 -3.最好一次对一个话题进行回复,免得啰嗦或者回复内容太乱。 -4.不要选择回复你自己发送的消息 -5.不要单独对表情包进行回复 -6.将上下文中所有含义不明的,疑似黑话的,缩写词均写入unknown_words中 -{reply_action_example} - -no_reply -动作描述: -保持沉默,不回复直到有新消息 -控制聊天频率,不要太过频繁的发言 -{{"action":"no_reply"}} - -{action_options_text} - -**你之前的action执行和思考记录** -{actions_before_now_block} - -请选择**可选的**且符合使用条件的action,并说明触发action的消息id(消息id格式:m+数字) -先输出你的简短的选择思考理由,再输出你选择的action,理由不要分点,精简。 -**动作选择要求** -请你根据聊天内容,用户的最新消息和以下标准选择合适的动作: -{plan_style} -{moderation_prompt} - -target_message_id为必填,表示触发消息的id -请选择所有符合使用要求的action,每个动作最多选择一次,但是可以选择多个动作; -动作用json格式输出,用```json包裹,如果输出多个json,每个json都要单独一行放在同一个```json代码块内: -**示例** -// 理由文本(简短) -```json -{{"action":"动作名", "target_message_id":"m123", .....}} -{{"action":"动作名", "target_message_id":"m456", .....}} -``` \ No newline at end of file diff --git a/src/chat/brain_chat/PFC/pfc_KnowledgeFetcher.py b/src/chat/brain_chat/PFC/pfc_KnowledgeFetcher.py deleted file mode 100644 index fe875540..00000000 --- a/src/chat/brain_chat/PFC/pfc_KnowledgeFetcher.py +++ /dev/null @@ -1,133 +0,0 @@ -from typing import Any, Dict, List, Tuple - -from src.chat.message_receive.chat_manager import chat_manager as _chat_manager -from src.common.logger import get_logger - -from src.person_info.person_info import resolve_person_id_for_memory -from src.services.memory_service import memory_service - -logger = get_logger("knowledge_fetcher") - - -class KnowledgeFetcher: - """知识调取器""" - - def __init__(self, private_name: str, stream_id: str): - self.private_name = private_name - self.stream_id = stream_id - - def _resolve_private_memory_context(self) -> Dict[str, str]: - session = _chat_manager.get_session_by_session_id(self.stream_id) - if session is None: - return {"chat_id": self.stream_id} - - group_id = str(getattr(session, "group_id", "") or "").strip() - user_id = str(getattr(session, "user_id", "") or "").strip() - platform = str(getattr(session, "platform", "") or "").strip() - - person_id = "" - if not group_id: - try: - person_id = resolve_person_id_for_memory( - person_name=self.private_name, - platform=platform, - user_id=user_id, - ) - except Exception as exc: - logger.debug(f"[私聊][{self.private_name}]解析人物ID失败: {exc}") - - return { - "chat_id": self.stream_id, - "person_id": person_id, - "user_id": user_id, - "group_id": group_id, - } - - async def _memory_get_knowledge(self, query: str) -> str: - """获取相关知识 - - Args: - query: 查询内容 - - Returns: - str: 构造好的,带相关度的知识 - """ - - logger.debug(f"[私聊][{self.private_name}]正在从长期记忆中获取知识") - try: - context = self._resolve_private_memory_context() - search_kwargs = { - "limit": 5, - "mode": "search", - "chat_id": context.get("chat_id", ""), - "person_id": context.get("person_id", ""), - "user_id": context.get("user_id", ""), - "group_id": context.get("group_id", ""), - "respect_filter": True, - } - result = await memory_service.search(query, **search_kwargs) - if not result.success: - logger.warning( - f"[私聊][{self.private_name}]长期记忆查询失败: {result.error or '未知错误'}" - ) - return f"长期记忆检索失败:{result.error or '未知错误'}" - if not result.filtered and not result.hits and search_kwargs["person_id"]: - fallback_kwargs = dict(search_kwargs) - fallback_kwargs["person_id"] = "" - logger.debug(f"[私聊][{self.private_name}]人物过滤未命中,退回仅按会话检索长期记忆") - result = await memory_service.search(query, **fallback_kwargs) - if not result.success: - logger.warning( - f"[私聊][{self.private_name}]长期记忆回退查询失败: {result.error or '未知错误'}" - ) - return f"长期记忆检索失败:{result.error or '未知错误'}" - knowledge_info = result.to_text(limit=5) - if result.filtered: - logger.debug(f"[私聊][{self.private_name}]长期记忆查询被聊天过滤策略跳过") - else: - logger.debug(f"[私聊][{self.private_name}]长期记忆查询结果: {knowledge_info[:150]}") - return knowledge_info or "未找到匹配的知识" - except Exception as e: - logger.error(f"[私聊][{self.private_name}]长期记忆搜索工具执行失败: {str(e)}") - return "未找到匹配的知识" - - async def fetch(self, query: str, chat_history: List[Dict[str, Any]]) -> Tuple[str, str]: - """获取相关知识 - - Args: - query: 查询内容 - chat_history: 聊天历史 (PFC dict format) - - Returns: - Tuple[str, str]: (获取的知识, 知识来源) - """ - _ = chat_history - - # NOTE: Hippocampus memory system was redesigned in v0.12.2 - # The old get_memory_from_text API no longer exists - # For now, we'll skip the memory retrieval part and only use LPMM knowledge - # TODO: Integrate with new memory system if needed - knowledge_text = "" - sources_text = "无记忆匹配" # 默认值 - - # # 从记忆中获取相关知识 (DISABLED - old Hippocampus API) - # related_memory = await HippocampusManager.get_instance().get_memory_from_text( - # text=f"{query}\n{chat_history_text}", - # max_memory_num=3, - # max_memory_length=2, - # max_depth=3, - # fast_retrieval=False, - # ) - # if related_memory: - # sources = [] - # for memory in related_memory: - # knowledge_text += memory[1] + "\n" - # sources.append(f"记忆片段{memory[0]}") - # knowledge_text = knowledge_text.strip() - # sources_text = ",".join(sources) - - knowledge_text += "\n现在有以下**知识**可供参考:\n " - knowledge_text += await self._memory_get_knowledge(query) - knowledge_text += "\n请记住这些**知识**,并根据**知识**回答问题。\n" - - return knowledge_text or "未找到相关知识", sources_text or "无记忆匹配" diff --git a/src/chat/replyer/maisaka_expression_selector.py b/src/chat/replyer/maisaka_expression_selector.py new file mode 100644 index 00000000..aa460350 --- /dev/null +++ b/src/chat/replyer/maisaka_expression_selector.py @@ -0,0 +1,280 @@ +from dataclasses import dataclass, field +from datetime import datetime +import json +from typing import Any, Awaitable, Callable, List, Optional + +from json_repair import repair_json +from sqlmodel import select + +from src.chat.message_receive.message import SessionMessage +from src.common.database.database import get_db_session +from src.common.database.database_model import Expression +from src.common.logger import get_logger +from src.common.utils.utils_config import ExpressionConfigUtils +from src.common.utils.utils_session import SessionUtils +from src.config.config import global_config +from src.learners.learner_utils_old import weighted_sample +from src.maisaka.context_messages import LLMContextMessage + +logger = get_logger("maisaka_expression_selector") + +SubAgentRunner = Callable[[str], Awaitable[str]] + + +@dataclass +class MaisakaExpressionSelectionResult: + """Maisaka replyer 的表达方式选择结果。""" + + expression_habits: str = "" + selected_expression_ids: List[int] = field(default_factory=list) + + +class MaisakaExpressionSelector: + """负责在 replyer 侧完成表达方式筛选与子代理选择。""" + + def _can_use_expressions(self, session_id: str) -> bool: + try: + use_expression, _, _ = ExpressionConfigUtils.get_expression_config_for_chat(session_id) + return use_expression + except Exception as exc: + logger.error(f"检查表达方式使用开关失败: {exc}") + return False + + def _get_related_session_ids(self, session_id: str) -> List[str]: + related_session_ids = {session_id} + expression_groups = global_config.expression.expression_groups + + for expression_group in expression_groups: + target_items = expression_group.expression_groups + group_session_ids: set[str] = set() + contains_current_session = False + + for target_item in target_items: + platform = target_item.platform.strip() + item_id = target_item.item_id.strip() + if not platform or not item_id: + continue + + rule_type = target_item.rule_type + target_session_id = SessionUtils.calculate_session_id( + platform, + group_id=item_id if rule_type == "group" else None, + user_id=None if rule_type == "group" else item_id, + ) + group_session_ids.add(target_session_id) + if target_session_id == session_id: + contains_current_session = True + + if contains_current_session: + related_session_ids.update(group_session_ids) + + return list(related_session_ids) + + def _load_expression_candidates(self, session_id: str) -> List[dict[str, Any]]: + related_session_ids = self._get_related_session_ids(session_id) + + with get_db_session(auto_commit=False) as session: + base_query = select(Expression).where(Expression.rejected.is_(False)) # type: ignore[attr-defined] + scoped_query = base_query.where( + (Expression.session_id.in_(related_session_ids)) | (Expression.session_id.is_(None)) # type: ignore[attr-defined] + ) + if global_config.expression.expression_checked_only: + scoped_query = scoped_query.where(Expression.checked.is_(True)) # type: ignore[attr-defined] + expressions = session.exec(scoped_query).all() + + all_candidates = [ + { + "id": expression.id, + "situation": expression.situation, + "style": expression.style, + "count": expression.count if getattr(expression, "count", None) is not None else 1, + } + for expression in expressions + if expression.id is not None and expression.situation and expression.style + ] + if len(all_candidates) < 10: + return [] + + high_count_candidates = [item for item in all_candidates if (item.get("count", 1) or 1) > 1] + selected_high = ( + weighted_sample(high_count_candidates, min(len(high_count_candidates), 5)) + if len(high_count_candidates) >= 10 + else [] + ) + selected_random = weighted_sample(all_candidates, min(len(all_candidates), 5)) + + candidate_pool: List[dict[str, Any]] = [] + seen_ids: set[int] = set() + for candidate in [*selected_high, *selected_random]: + candidate_id = candidate.get("id") + if not isinstance(candidate_id, int) or candidate_id in seen_ids: + continue + seen_ids.add(candidate_id) + candidate_pool.append(candidate) + + return candidate_pool + + @staticmethod + def _format_candidate_preview(candidates: List[dict[str, Any]]) -> str: + """构建候选表达方式的简短日志预览。""" + preview_items: List[str] = [] + for candidate in candidates[:5]: + candidate_id = candidate.get("id") + situation = str(candidate.get("situation") or "").strip() + style = str(candidate.get("style") or "").strip() + count = candidate.get("count") + preview_items.append( + f"id={candidate_id}, situation={situation!r}, style={style!r}, count={count}" + ) + return "; ".join(preview_items) + + @staticmethod + def _build_expression_habits_block(selected_expressions: List[dict[str, Any]]) -> str: + if not selected_expressions: + return "" + lines = [ + f"- 当{expression['situation']}时,可以自然地用{expression['style']}这种表达习惯。" + for expression in selected_expressions + ] + return "【表达习惯参考】\n" + "\n".join(lines) + + @staticmethod + def _normalize_history_line(message: LLMContextMessage) -> str: + content = " ".join((message.processed_plain_text or "").split()).strip() + if len(content) > 120: + content = content[:120] + "..." + timestamp = message.timestamp.strftime("%H:%M:%S") if isinstance(message.timestamp, datetime) else "" + return f"- {timestamp} {message.role}: {content}".strip() + + def _build_selector_prompt( + self, + *, + chat_history: List[LLMContextMessage], + reply_message: Optional[SessionMessage], + reply_reason: str, + candidates: List[dict[str, Any]], + ) -> str: + history_lines = [ + self._normalize_history_line(message) + for message in chat_history[-10:] + if (message.processed_plain_text or "").strip() + ] + history_block = "\n".join(history_lines) if history_lines else "- 无可用上下文" + candidate_lines = [ + f"{candidate['id']}: 情景={candidate['situation']} | 风格={candidate['style']} | count={candidate['count']}" + for candidate in candidates + ] + target_text = (reply_message.processed_plain_text or "").strip() if reply_message is not None else "" + + return ( + "你是 Maisaka 的表达方式选择子代理。\n" + "你只负责根据最近聊天上下文,为这一次可见回复挑选最合适的表达方式。\n" + "请只从下面候选中选择 0 到 3 条最适合当前语境的表达方式。\n" + "优先考虑自然、贴合上下文、不生硬、不模板化。\n" + "如果没有明显合适的,就返回空列表。\n" + '严格只输出 JSON,对象格式为 {"selected_ids":[123,456]}。\n\n' + f"最近上下文:\n{history_block}\n\n" + f"目标消息:{target_text or '无'}\n" + f"回复理由:{reply_reason.strip() or '无'}\n\n" + f"候选表达方式:\n{chr(10).join(candidate_lines)}" + ) + + def _parse_selected_ids(self, raw_response: str, candidates: List[dict[str, Any]]) -> List[int]: + if not raw_response.strip(): + return [] + try: + parsed_result = json.loads(repair_json(raw_response)) + except Exception: + logger.warning(f"表达方式选择结果解析失败: {raw_response!r}") + return [] + + raw_selected_ids = parsed_result.get("selected_ids", []) if isinstance(parsed_result, dict) else [] + if not isinstance(raw_selected_ids, list): + return [] + + candidate_map = { + candidate["id"]: candidate + for candidate in candidates + if isinstance(candidate.get("id"), int) + } + selected_ids: List[int] = [] + for candidate_id in raw_selected_ids: + if not isinstance(candidate_id, int): + continue + if candidate_id not in candidate_map or candidate_id in selected_ids: + continue + selected_ids.append(candidate_id) + if len(selected_ids) >= 3: + break + return selected_ids + + def _update_last_active_time(self, selected_ids: List[int]) -> None: + if not selected_ids: + return + with get_db_session() as session: + expressions = session.exec(select(Expression).where(Expression.id.in_(selected_ids))).all() # type: ignore[attr-defined] + now = datetime.now() + for expression in expressions: + expression.last_active_time = now + session.add(expression) + + async def select_for_reply( + self, + *, + session_id: str, + chat_history: List[LLMContextMessage], + reply_message: Optional[SessionMessage], + reply_reason: str, + sub_agent_runner: Optional[SubAgentRunner], + ) -> MaisakaExpressionSelectionResult: + if not session_id: + logger.info("表达方式选择已跳过:缺少 session_id") + return MaisakaExpressionSelectionResult() + if not self._can_use_expressions(session_id): + logger.info(f"表达方式选择已跳过:当前会话未启用表达方式,session_id={session_id}") + return MaisakaExpressionSelectionResult() + if sub_agent_runner is None: + logger.info(f"表达方式选择已跳过:缺少 sub_agent_runner,session_id={session_id}") + return MaisakaExpressionSelectionResult() + + candidates = self._load_expression_candidates(session_id) + if not candidates: + logger.info(f"表达方式选择已跳过:本地候选不足,session_id={session_id}") + return MaisakaExpressionSelectionResult() + + logger.info( + f"表达方式选择开始:session_id={session_id} 候选数={len(candidates)} " + f"候选预览={self._format_candidate_preview(candidates)}" + ) + + selector_prompt = self._build_selector_prompt( + chat_history=chat_history, + reply_message=reply_message, + reply_reason=reply_reason, + candidates=candidates, + ) + try: + raw_response = await sub_agent_runner(selector_prompt) + except Exception: + logger.exception("表达方式选择子代理执行失败") + return MaisakaExpressionSelectionResult() + + logger.info(f"表达方式子代理原始结果:session_id={session_id} response={raw_response!r}") + selected_ids = self._parse_selected_ids(raw_response, candidates) + if not selected_ids: + logger.info(f"表达方式选择完成但未命中:session_id={session_id}") + return MaisakaExpressionSelectionResult() + + selected_expressions = [candidate for candidate in candidates if candidate.get("id") in selected_ids] + self._update_last_active_time(selected_ids) + logger.info( + f"表达方式选择完成:session_id={session_id} 已选数={len(selected_ids)} " + f"selected_ids={selected_ids!r} 已选预览={self._format_candidate_preview(selected_expressions)}" + ) + return MaisakaExpressionSelectionResult( + expression_habits=self._build_expression_habits_block(selected_expressions), + selected_expression_ids=selected_ids, + ) + + +maisaka_expression_selector = MaisakaExpressionSelector() diff --git a/src/chat/replyer/maisaka_generator.py b/src/chat/replyer/maisaka_generator.py index 05df760a..41154ab0 100644 --- a/src/chat/replyer/maisaka_generator.py +++ b/src/chat/replyer/maisaka_generator.py @@ -1,15 +1,12 @@ from dataclasses import dataclass, field from datetime import datetime -from typing import Dict, List, Optional, Tuple +from typing import Awaitable, Callable, Dict, List, Optional, Tuple import random import time -from sqlmodel import select - from src.chat.message_receive.chat_manager import BotChatSession -from src.common.database.database import get_db_session -from src.common.database.database_model import Expression +from src.chat.message_receive.message import SessionMessage from src.common.data_models.reply_generation_data_models import ( GenerationMetrics, LLMCompletionResult, @@ -17,13 +14,12 @@ from src.common.data_models.reply_generation_data_models import ( ) from src.common.logger import get_logger from src.common.prompt_i18n import load_prompt -from src.common.utils.utils_session import SessionUtils from src.config.config import global_config from src.core.types import ActionInfo from src.services.llm_service import LLMServiceClient -from src.chat.message_receive.message import SessionMessage from src.maisaka.context_messages import AssistantMessage, LLMContextMessage, ReferenceMessage, SessionBackedMessage, ToolResultMessage +from .maisaka_expression_selector import maisaka_expression_selector from src.maisaka.message_adapter import parse_speaker_content logger = get_logger("replyer") @@ -37,15 +33,6 @@ class MaisakaReplyContext: selected_expression_ids: List[int] = field(default_factory=list) -@dataclass -class _ExpressionRecord: - """表达方式的轻量记录。""" - - expression_id: Optional[int] - situation: str - style: str - - class MaisakaReplyGenerator: """生成 Maisaka 的最终可见回复。""" @@ -238,109 +225,30 @@ class MaisakaReplyGenerator: reply_message: Optional[SessionMessage], reply_reason: str, stream_id: Optional[str], + sub_agent_runner: Optional[Callable[[str], Awaitable[str]]], ) -> MaisakaReplyContext: - """在 replyer 内部构建表达习惯和黑话解释。""" + """构建回复上下文:表达习惯和已选表达 ID。""" session_id = self._resolve_session_id(stream_id) if not session_id: logger.warning("构建 Maisaka 回复上下文失败:缺少会话标识") return MaisakaReplyContext() - expression_habits, selected_expression_ids = self._build_expression_habits( + if sub_agent_runner is None: + logger.info("表达方式选择跳过:缺少子代理执行器") + return MaisakaReplyContext() + + selection_result = await maisaka_expression_selector.select_for_reply( session_id=session_id, chat_history=chat_history, reply_message=reply_message, reply_reason=reply_reason, + sub_agent_runner=sub_agent_runner, ) return MaisakaReplyContext( - expression_habits=expression_habits, - selected_expression_ids=selected_expression_ids, + expression_habits=selection_result.expression_habits, + selected_expression_ids=selection_result.selected_expression_ids, ) - def _build_expression_habits( - self, - session_id: str, - chat_history: List[LLMContextMessage], - reply_message: Optional[SessionMessage], - reply_reason: str, - ) -> tuple[str, List[int]]: - """查询并格式化适合当前会话的表达习惯。""" - del chat_history - del reply_message - del reply_reason - - expression_records = self._load_expression_records(session_id) - if not expression_records: - return "", [] - - lines: List[str] = [] - selected_ids: List[int] = [] - for expression in expression_records: - if expression.expression_id is not None: - selected_ids.append(expression.expression_id) - lines.append(f"- 当{expression.situation}时,可以自然地用{expression.style}这种表达习惯。") - - block = "【表达习惯参考】\n" + "\n".join(lines) - logger.info( - f"已构建 Maisaka 表达习惯: 会话标识={session_id} " - f"数量={len(selected_ids)} 表达编号={selected_ids!r}" - ) - return block, selected_ids - - def _get_related_session_ids(self, session_id: str) -> List[str]: - """根据表达互通组配置,解析当前会话可共享的会话 ID。""" - related_session_ids = {session_id} - expression_groups = global_config.expression.expression_groups - - for expression_group in expression_groups: - target_items = expression_group.expression_groups - group_session_ids: set[str] = set() - contains_current_session = False - - for target_item in target_items: - platform = target_item.platform.strip() - item_id = target_item.item_id.strip() - if not platform or not item_id: - continue - - rule_type = target_item.rule_type - target_session_id = SessionUtils.calculate_session_id( - platform, - group_id=item_id if rule_type == "group" else None, - user_id=None if rule_type == "group" else item_id, - ) - group_session_ids.add(target_session_id) - if target_session_id == session_id: - contains_current_session = True - - if contains_current_session: - related_session_ids.update(group_session_ids) - - return list(related_session_ids) - - def _load_expression_records(self, session_id: str) -> List[_ExpressionRecord]: - """提取表达方式静态数据,避免 detached ORM 对象。""" - related_session_ids = self._get_related_session_ids(session_id) - - with get_db_session(auto_commit=False) as session: - base_query = select(Expression).where(Expression.rejected.is_(False)) # type: ignore[attr-defined] - scoped_query = base_query.where( - (Expression.session_id.in_(related_session_ids)) | (Expression.session_id.is_(None)) # type: ignore[attr-defined] - ).order_by(Expression.count.desc(), Expression.last_active_time.desc()) # type: ignore[attr-defined] - - if global_config.expression.expression_checked_only: - scoped_query = scoped_query.where(Expression.checked.is_(True)) # type: ignore[attr-defined] - - expressions = session.exec(scoped_query.limit(5)).all() - - return [ - _ExpressionRecord( - expression_id=expression.id, - situation=expression.situation, - style=expression.style, - ) - for expression in expressions - ] - async def generate_reply_with_context( self, extra_info: str = "", @@ -357,6 +265,7 @@ class MaisakaReplyGenerator: chat_history: Optional[List[LLMContextMessage]] = None, expression_habits: str = "", selected_expression_ids: Optional[List[int]] = None, + sub_agent_runner: Optional[Callable[[str], Awaitable[str]]] = None, ) -> Tuple[bool, ReplyGenerationResult]: """结合上下文生成 Maisaka 的最终可见回复。""" del available_actions @@ -399,6 +308,7 @@ class MaisakaReplyGenerator: reply_message=reply_message, reply_reason=reply_reason or "", stream_id=stream_id, + sub_agent_runner=sub_agent_runner, ) except Exception as exc: import traceback diff --git a/src/chat/replyer/maisaka_generator_multi.py b/src/chat/replyer/maisaka_generator_multi.py index 626dac5a..74631441 100644 --- a/src/chat/replyer/maisaka_generator_multi.py +++ b/src/chat/replyer/maisaka_generator_multi.py @@ -2,18 +2,15 @@ import random import time from dataclasses import dataclass, field from datetime import datetime -from typing import Dict, List, Optional, Tuple +from typing import Awaitable, Callable, Dict, List, Optional, Tuple from rich.console import Group, RenderableType from rich.panel import Panel from rich.text import Text -from sqlmodel import select from src.chat.message_receive.chat_manager import BotChatSession from src.chat.message_receive.message import SessionMessage from src.cli.console import console -from src.common.database.database import get_db_session -from src.common.database.database_model import Expression from src.common.data_models.message_component_data_model import MessageSequence, TextComponent from src.common.data_models.reply_generation_data_models import ( GenerationMetrics, @@ -22,7 +19,6 @@ from src.common.data_models.reply_generation_data_models import ( ) from src.common.logger import get_logger from src.common.prompt_i18n import load_prompt -from src.common.utils.utils_session import SessionUtils from src.config.config import global_config from src.core.types import ActionInfo from src.llm_models.payload_content.message import ImageMessagePart, Message, MessageBuilder, RoleType, TextMessagePart @@ -35,6 +31,7 @@ from src.maisaka.context_messages import ( SessionBackedMessage, ToolResultMessage, ) +from .maisaka_expression_selector import maisaka_expression_selector from src.maisaka.message_adapter import clone_message_sequence, parse_speaker_content from src.maisaka.prompt_cli_renderer import PromptCLIVisualizer @@ -49,17 +46,8 @@ class MaisakaReplyContext: selected_expression_ids: List[int] = field(default_factory=list) -@dataclass -class _ExpressionRecord: - """表达方式的轻量记录。""" - - expression_id: Optional[int] - situation: str - style: str - - class MaisakaReplyGenerator: - """生成 Maisaka 的最终可见回复。""" + """生成 Maisaka 的最终可见回复(多模态管线)。""" def __init__( self, @@ -75,7 +63,7 @@ class MaisakaReplyGenerator: self._personality_prompt = self._build_personality_prompt() def _build_personality_prompt(self) -> str: - """构建 replyer 使用的人设描述。""" + """构建 replyer 使用的人设提示。""" try: bot_name = global_config.bot.nickname alias_names = global_config.bot.alias_names @@ -117,7 +105,6 @@ class MaisakaReplyGenerator: @staticmethod def _split_user_message_segments(raw_content: str) -> List[tuple[Optional[str], str]]: - """按说话人拆分用户消息。""" segments: List[tuple[Optional[str], str]] = [] current_speaker: Optional[str] = None current_lines: List[str] = [] @@ -139,7 +126,6 @@ class MaisakaReplyGenerator: return segments def _build_target_message_block(self, reply_message: Optional[SessionMessage]) -> str: - """构建当前需要回复的目标消息摘要。""" if reply_message is None: return "" @@ -155,7 +141,7 @@ class MaisakaReplyGenerator: f"- 目标消息ID:{target_message_id}\n" f"- 发送者:{sender_name}\n" f"- 消息内容:{target_content}\n" - "- 你这次要回复的就是这条目标消息,请结合整段上下文理解,但不要误把其他历史消息当成当前回复对象。" + "- 你这次要回复的就是这条目标消息,请结合整段上下文理解,但不要把其他历史消息当成当前回复对象。" ) def _build_system_prompt( @@ -164,7 +150,6 @@ class MaisakaReplyGenerator: reply_reason: str, expression_habits: str = "", ) -> str: - """构建 Maisaka replyer 使用的系统提示词。""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") target_message_block = self._build_target_message_block(reply_message) @@ -179,27 +164,25 @@ class MaisakaReplyGenerator: except Exception: system_prompt = "你是一个友好的 AI 助手,请根据聊天记录自然回复。" - extra_sections: List[str] = [] + sections: List[str] = [] if expression_habits.strip(): - extra_sections.append(expression_habits.strip()) + sections.append(expression_habits.strip()) if target_message_block: - extra_sections.append(target_message_block) + sections.append(target_message_block) if reply_reason.strip(): - extra_sections.append(f"【回复信息参考】\n{reply_reason}") - if not extra_sections: + sections.append(f"【回复信息参考】\n{reply_reason}") + if not sections: return system_prompt - return f"{system_prompt}\n\n" + "\n\n".join(extra_sections) + return f"{system_prompt}\n\n" + "\n\n".join(sections) def _build_reply_instruction(self) -> str: - """构建追加在上下文末尾的回复指令。""" - return "请基于以上逐条对话消息,自然地继续回复。直接输出你要说的话,不要额外解释。" + return "请基于以上上下文,自然地继续回复。直接输出你要说的话,不需要额外解释。" def _build_multimodal_user_message( self, message: SessionBackedMessage, default_user_name: str, ) -> Optional[Message]: - """构建保留图片等多模态片段的用户消息。""" speaker_name, _ = parse_speaker_content(message.processed_plain_text.strip()) visible_speaker = speaker_name or default_user_name @@ -223,7 +206,6 @@ class MaisakaReplyGenerator: return multimodal_message.to_llm_message() def _build_history_messages(self, chat_history: List[LLMContextMessage]) -> List[Message]: - """将 replyer 上下文拆成多条 LLM 消息。""" bot_nickname = global_config.bot.nickname.strip() or "Bot" default_user_name = global_config.maisaka.cli_user_name.strip() or "User" messages: List[Message] = [] @@ -277,7 +259,6 @@ class MaisakaReplyGenerator: reply_reason: str, expression_habits: str = "", ) -> List[Message]: - """构建发给大模型的消息列表。""" messages: List[Message] = [] system_prompt = self._build_system_prompt( reply_message=reply_message, @@ -293,7 +274,6 @@ class MaisakaReplyGenerator: @staticmethod def _build_request_prompt_preview(messages: List[Message]) -> str: - """将消息列表转为便于调试的文本预览。""" preview_lines: List[str] = [] for message in messages: role_name = message.role.value.capitalize() @@ -308,7 +288,6 @@ class MaisakaReplyGenerator: return "\n\n".join(preview_lines) def _resolve_session_id(self, stream_id: Optional[str]) -> str: - """解析当前回复使用的会话 ID。""" if stream_id: return stream_id if self.chat_stream is not None: @@ -321,109 +300,29 @@ class MaisakaReplyGenerator: reply_message: Optional[SessionMessage], reply_reason: str, stream_id: Optional[str], + sub_agent_runner: Optional[Callable[[str], Awaitable[str]]], ) -> MaisakaReplyContext: - """在 replyer 内部构建表达习惯和黑话解释。""" session_id = self._resolve_session_id(stream_id) if not session_id: logger.warning("构建 Maisaka 回复上下文失败:缺少会话标识") return MaisakaReplyContext() - expression_habits, selected_expression_ids = self._build_expression_habits( + if sub_agent_runner is None: + logger.info("表达方式选择跳过:缺少子代理执行器") + return MaisakaReplyContext() + + selection_result = await maisaka_expression_selector.select_for_reply( session_id=session_id, chat_history=chat_history, reply_message=reply_message, reply_reason=reply_reason, + sub_agent_runner=sub_agent_runner, ) return MaisakaReplyContext( - expression_habits=expression_habits, - selected_expression_ids=selected_expression_ids, + expression_habits=selection_result.expression_habits, + selected_expression_ids=selection_result.selected_expression_ids, ) - def _build_expression_habits( - self, - session_id: str, - chat_history: List[LLMContextMessage], - reply_message: Optional[SessionMessage], - reply_reason: str, - ) -> tuple[str, List[int]]: - """查询并格式化适合当前会话的表达习惯。""" - del chat_history - del reply_message - del reply_reason - - expression_records = self._load_expression_records(session_id) - if not expression_records: - return "", [] - - lines: List[str] = [] - selected_ids: List[int] = [] - for expression in expression_records: - if expression.expression_id is not None: - selected_ids.append(expression.expression_id) - lines.append(f"- 当{expression.situation}时,可以自然地用{expression.style}这种表达习惯。") - - block = "【表达习惯参考】\n" + "\n".join(lines) - logger.info( - f"已构建 Maisaka 表达习惯: 会话标识={session_id} " - f"数量={len(selected_ids)} 表达编号={selected_ids!r}" - ) - return block, selected_ids - - def _get_related_session_ids(self, session_id: str) -> List[str]: - """根据表达互通组配置,解析当前会话可共享的会话 ID。""" - related_session_ids = {session_id} - expression_groups = global_config.expression.expression_groups - - for expression_group in expression_groups: - target_items = expression_group.expression_groups - group_session_ids: set[str] = set() - contains_current_session = False - - for target_item in target_items: - platform = target_item.platform.strip() - item_id = target_item.item_id.strip() - if not platform or not item_id: - continue - - rule_type = target_item.rule_type - target_session_id = SessionUtils.calculate_session_id( - platform, - group_id=item_id if rule_type == "group" else None, - user_id=None if rule_type == "group" else item_id, - ) - group_session_ids.add(target_session_id) - if target_session_id == session_id: - contains_current_session = True - - if contains_current_session: - related_session_ids.update(group_session_ids) - - return list(related_session_ids) - - def _load_expression_records(self, session_id: str) -> List[_ExpressionRecord]: - """提取表达方式静态数据,避免 detached ORM 对象。""" - related_session_ids = self._get_related_session_ids(session_id) - - with get_db_session(auto_commit=False) as session: - base_query = select(Expression).where(Expression.rejected.is_(False)) # type: ignore[attr-defined] - scoped_query = base_query.where( - (Expression.session_id.in_(related_session_ids)) | (Expression.session_id.is_(None)) # type: ignore[attr-defined] - ).order_by(Expression.count.desc(), Expression.last_active_time.desc()) # type: ignore[attr-defined] - - if global_config.expression.expression_checked_only: - scoped_query = scoped_query.where(Expression.checked.is_(True)) # type: ignore[attr-defined] - - expressions = session.exec(scoped_query.limit(5)).all() - - return [ - _ExpressionRecord( - expression_id=expression.id, - situation=expression.situation, - style=expression.style, - ) - for expression in expressions - ] - async def generate_reply_with_context( self, extra_info: str = "", @@ -440,8 +339,8 @@ class MaisakaReplyGenerator: chat_history: Optional[List[LLMContextMessage]] = None, expression_habits: str = "", selected_expression_ids: Optional[List[int]] = None, + sub_agent_runner: Optional[Callable[[str], Awaitable[str]]] = None, ) -> Tuple[bool, ReplyGenerationResult]: - """结合上下文生成 Maisaka 的最终可见回复。""" del available_actions del chosen_actions del extra_info @@ -457,9 +356,8 @@ class MaisakaReplyGenerator: return False, result logger.info( - f"Maisaka 回复器开始生成: 会话流标识={stream_id} 回复原因={reply_reason!r} " - f"历史消息数={len(chat_history)} 目标消息编号=" - f"{reply_message.message_id if reply_message else None}" + f"Maisaka 回复器开始生成: 流={stream_id} 原因={reply_reason!r} " + f"历史条数={len(chat_history)} 目标ID={reply_message.message_id if reply_message else None}" ) filtered_history = [ @@ -468,11 +366,8 @@ class MaisakaReplyGenerator: if not isinstance(message, (ReferenceMessage, ToolResultMessage)) ] - logger.debug(f"Maisaka 回复器过滤后历史消息数={len(filtered_history)}") - - # Validate that express_model is properly initialized if self.express_model is None: - logger.error("Maisaka 回复器的回复模型未初始化") + logger.error("回复模型未初始化") result.error_message = "回复模型尚未初始化" return False, result @@ -482,10 +377,11 @@ class MaisakaReplyGenerator: reply_message=reply_message, reply_reason=reply_reason or "", stream_id=stream_id, + sub_agent_runner=sub_agent_runner, ) except Exception as exc: import traceback - logger.error(f"Maisaka 回复器构建回复上下文失败: {exc}\n{traceback.format_exc()}") + logger.error(f"构建回复上下文失败: {exc}\n{traceback.format_exc()}") result.error_message = f"构建回复上下文失败: {exc}" return False, result @@ -497,8 +393,7 @@ class MaisakaReplyGenerator: ) logger.info( - f"Maisaka 回复上下文构建完成: 会话流标识={stream_id} " - f"已选表达编号={result.selected_expression_ids!r}" + f"回复上下文完成: 流={stream_id} 已选表达={result.selected_expression_ids!r}" ) try: @@ -510,7 +405,7 @@ class MaisakaReplyGenerator: ) except Exception as exc: import traceback - logger.error(f"Maisaka 回复器构建提示词失败: {exc}\n{traceback.format_exc()}") + logger.error(f"构建提示词失败: {exc}\n{traceback.format_exc()}") result.error_message = f"构建提示词失败: {exc}" return False, result @@ -528,13 +423,15 @@ class MaisakaReplyGenerator: category="replyer", chat_id=preview_chat_id, request_kind="replyer", - subtitle=f"会话流标识:{preview_chat_id}", + subtitle=f"流ID: {preview_chat_id}", folded=global_config.debug.fold_maisaka_thinking, ) started_at = time.perf_counter() try: - generation_result = await self.express_model.generate_response_with_messages(message_factory=message_factory) + generation_result = await self.express_model.generate_response_with_messages( + message_factory=message_factory + ) except Exception as exc: logger.exception("Maisaka 回复器调用失败") result.error_message = str(exc) @@ -565,17 +462,15 @@ class MaisakaReplyGenerator: return False, result logger.info( - f"Maisaka 回复器生成成功: 回复文本={response_text!r} " - f"总耗时毫秒={result.metrics.overall_ms} " - f"已选表达编号={result.selected_expression_ids!r}" + f"Maisaka 回复器生成成功: 文本={response_text!r} 总耗时ms={result.metrics.overall_ms} 已选表达={result.selected_expression_ids!r}" ) if global_config.debug.show_replyer_prompt or global_config.debug.show_replyer_reasoning: summary_lines = [ - f"会话流标识: {preview_chat_id or 'unknown'}", - f"总耗时: {result.metrics.overall_ms} ms", + f"流ID: {preview_chat_id or 'unknown'}", + f"耗时: {result.metrics.overall_ms} ms", ] if result.selected_expression_ids: - summary_lines.append(f"表达习惯编号: {result.selected_expression_ids!r}") + summary_lines.append(f"表达编号: {result.selected_expression_ids!r}") renderables: List[RenderableType] = [Text("\n".join(summary_lines))] if replyer_prompt_section is not None: @@ -584,7 +479,7 @@ class MaisakaReplyGenerator: renderables.append( Panel( Text(result.completion.reasoning_text), - title="回复器思考", + title="思考内容", border_style="magenta", padding=(0, 1), ) @@ -600,7 +495,7 @@ class MaisakaReplyGenerator: console.print( Panel( Group(*renderables), - title="MaiSaka 回复器结果", + title="MaiSaka 回复器", border_style="bright_yellow", padding=(0, 1), ) diff --git a/src/know_u/knowledge.py b/src/know_u/knowledge.py index 05573008..fb8df1a3 100644 --- a/src/know_u/knowledge.py +++ b/src/know_u/knowledge.py @@ -80,28 +80,23 @@ class KnowledgeLearner: self._store = get_knowledge_store() self._llm = LLMServiceClient(task_name="utils", request_type="maisaka.knowledge.learn") self._learning_lock = asyncio.Lock() - self._messages_cache: List[SessionMessage] = [] + self._last_processed_index = 0 + self.min_messages_for_extraction = 10 - def add_messages(self, messages: List[SessionMessage]) -> None: - """缓存待学习的消息。""" - self._messages_cache.extend(messages) + def get_pending_count(self, message_cache: List[SessionMessage]) -> int: + """??????????????""" + return max(0, len(message_cache) - self._last_processed_index) - def get_cache_size(self) -> int: - """获取缓存消息数量。""" - return len(self._messages_cache) - - async def learn(self) -> int: - """ - 从缓存消息中提取知识并落库。 - - Returns: - 新增入库的知识条数 - """ - if not self._messages_cache: + async def learn(self, message_cache: List[SessionMessage]) -> int: + """?????????????????????""" + pending_messages = message_cache[self._last_processed_index :] + if not pending_messages: + return 0 + if len(pending_messages) < self.min_messages_for_extraction: return 0 async with self._learning_lock: - chat_excerpt = self._build_chat_excerpt() + chat_excerpt = self._build_chat_excerpt(pending_messages) if not chat_excerpt: return 0 @@ -115,12 +110,13 @@ class KnowledgeLearner: ), ) except Exception: - logger.exception("知识学习模型调用失败") + logger.exception("??????????") return 0 knowledge_items = self._parse_learning_result(result.response or "") if not knowledge_items: - logger.debug("知识学习已完成,但未提取到有效条目") + logger.debug("?????????????????") + self._last_processed_index = len(message_cache) return 0 added_count = 0 @@ -146,23 +142,25 @@ class KnowledgeLearner: ): added_count += 1 + self._last_processed_index = len(message_cache) + if added_count > 0: logger.info( - f"Maisaka 知识学习已完成: 会话标识={self._session_id} 新增条数={added_count}" + f"Maisaka ???????: ????={self._session_id} ????={added_count}" ) else: logger.debug( - f"Maisaka 知识学习已完成,但没有新增条目: 会话标识={self._session_id}" + f"Maisaka ???????????????: ????={self._session_id}" ) return added_count - def _build_chat_excerpt(self) -> str: + def _build_chat_excerpt(self, messages: List[SessionMessage]) -> str: """ 构建适合画像提取的对话片段,只保留用户可见文本。 """ lines: List[str] = [] - for message in self._messages_cache[-30:]: + for message in messages[-30:]: if isinstance(message, (AssistantMessage, ToolResultMessage)): continue if isinstance(message, SessionBackedMessage): diff --git a/src/learners/expression_learner.py b/src/learners/expression_learner.py index c8ced532..cd3d1522 100644 --- a/src/learners/expression_learner.py +++ b/src/learners/expression_learner.py @@ -159,7 +159,8 @@ class ExpressionLearner: self._learning_lock = asyncio.Lock() # 消息缓存 - self._messages_cache: List["SessionMessage"] = [] + self._last_processed_index = 0 + self.min_messages_for_extraction = 10 @staticmethod def _get_runtime_manager() -> Any: @@ -265,27 +266,25 @@ class ExpressionLearner: normalized_entries.append((content, source_id)) return normalized_entries - def add_messages(self, messages: List["SessionMessage"]) -> None: - """添加消息到缓存""" - self._messages_cache.extend(messages) + def get_pending_count(self, message_cache: List["SessionMessage"]) -> int: + """??????????????""" + return max(0, len(message_cache) - self._last_processed_index) - def get_cache_size(self) -> int: - """获取当前消息缓存的大小""" - return len(self._messages_cache) + async def learn( + self, + message_cache: List["SessionMessage"], + jargon_miner: Optional["JargonMiner"] = None, + ) -> bool: + """?????????????????????""" + pending_messages = message_cache[self._last_processed_index :] + if not pending_messages: + logger.debug("??????????????????") + return False + if len(pending_messages) < self.min_messages_for_extraction: + return False - async def learn(self, jargon_miner: Optional["JargonMiner"] = None) -> None: - """执行表达方式学习主流程。 - - Args: - jargon_miner: 可选的黑话学习器实例,用于同步处理黑话候选。 - """ - if not self._messages_cache: - logger.debug("没有消息可供学习,跳过学习过程") - return - - # 构建可读消息 readable_message, _, _ = await MessageUtils.build_readable_message( - self._messages_cache, + pending_messages, anonymize=True, show_lineno=True, extract_pictures=True, @@ -293,57 +292,54 @@ class ExpressionLearner: target_bot_name="SELF", ) - # 准备提示词 prompt_template = prompt_manager.get_prompt("learn_style") prompt_template.add_context("bot_name", global_config.bot.nickname) prompt_template.add_context("chat_str", readable_message) prompt = await prompt_manager.render_prompt(prompt_template) - # 调用 LLM 学习表达方式 try: generation_result = await express_learn_model.generate_response( - prompt, options=LLMGenerationOptions(temperature=0.3) + prompt, + options=LLMGenerationOptions(temperature=0.3), ) response = generation_result.response except Exception as e: - logger.error(f"学习表达方式失败,模型生成出错:{e}") - return + logger.error(f"????????????????{e}") + return False - # 解析 LLM 返回的表达方式列表和黑话列表(包含来源行编号) expressions: List[Tuple[str, str, str]] - jargon_entries: List[Tuple[str, str]] # (content, source_id) + jargon_entries: List[Tuple[str, str]] expressions, jargon_entries = parse_expression_response(response) - # 从缓存中检查 jargon 是否出现在 messages 中 - if cached_jargon_entries := self._check_cached_jargons_in_messages(jargon_miner): - # 合并缓存中的 jargon 条目(去重:如果 content 已存在则跳过) + cached_jargon_entries = self._check_cached_jargons_in_messages(pending_messages, jargon_miner) + if cached_jargon_entries: existing_contents = {content for content, _ in jargon_entries} for content, source_id in cached_jargon_entries: - if content not in existing_contents: - jargon_entries.append((content, source_id)) - existing_contents.add(content) - logger.info(f"从缓存中检查到黑话:{content}") + if content in existing_contents: + continue + jargon_entries.append((content, source_id)) + existing_contents.add(content) + logger.info(f"??????????{content}") - # 检查表达方式数量,如果超过 20 个则放弃本次表达学习 if len(expressions) > 20: - logger.info(f"表达方式提取数量超过 20 个(实际{len(expressions)}个),放弃本次表达学习") + logger.info(f"?????????? 20 ???????????{len(expressions)}") expressions = [] - # 检查黑话数量,如果超过 30 个则放弃本次黑话学习 if len(jargon_entries) > 30: - logger.info(f"黑话提取数量超过 30 个(实际{len(jargon_entries)}个),放弃本次黑话学习") + logger.info(f"???????? 30 ???????????{len(jargon_entries)}") jargon_entries = [] after_extract_result = await self._get_runtime_manager().invoke_hook( "expression.learn.after_extract", session_id=self.session_id, - message_count=len(self._messages_cache), + message_count=len(pending_messages), expressions=self._serialize_expressions(expressions), jargon_entries=self._serialize_jargon_entries(jargon_entries), ) if after_extract_result.aborted: - logger.info(f"{self.session_id} 的表达方式学习结果被 Hook 中止") - return + logger.info(f"{self.session_id} ?????????? Hook ??") + self._last_processed_index = len(message_cache) + return False after_extract_kwargs = after_extract_result.kwargs raw_expressions = after_extract_kwargs.get("expressions") @@ -353,31 +349,26 @@ class ExpressionLearner: if raw_jargon_entries is not None: jargon_entries = self._deserialize_jargon_entries(raw_jargon_entries) - # 处理黑话条目,路由到 jargon_miner(即使没有表达方式也要处理黑话) - # TODO: 检测是否开启了 if jargon_entries: - await self._process_jargon_entries(jargon_entries, jargon_miner) + await self._process_jargon_entries(jargon_entries, pending_messages, jargon_miner) - # 如果没有表达方式,直接返回 if not expressions: - logger.info("解析后没有可用的表达方式") - return + logger.info("????????????") + self._last_processed_index = len(message_cache) + return False - logger.info(f"学习的 expressions: {expressions}") - logger.info(f"学习的 jargon_entries: {jargon_entries}") - - # 过滤表达方式,根据 source_id 溯源并应用各种过滤规则 - learnt_expressions = self._filter_expressions(expressions) + logger.info(f"???? expressions: {expressions}") + logger.info(f"???? jargon_entries: {jargon_entries}") + learnt_expressions = self._filter_expressions(expressions, pending_messages) if not learnt_expressions: - logger.info("没有学习到表达风格") - return + logger.info("????????????") + self._last_processed_index = len(message_cache) + return False - # 展示学到的表达方式 learnt_expressions_str = "\n".join(f"{situation}->{style}" for situation, style in learnt_expressions) - logger.info(f"在 {self.session_id} 学习到表达风格:\n{learnt_expressions_str}") + logger.info(f"? {self.session_id} ????????\n{learnt_expressions_str}") - # 存储到数据库 Expression 表 for situation, style in learnt_expressions: before_upsert_result = await self._get_runtime_manager().invoke_hook( "expression.learn.before_upsert", @@ -386,19 +377,25 @@ class ExpressionLearner: style=style, ) if before_upsert_result.aborted: - logger.info(f"{self.session_id} 的表达方式写入被 Hook 跳过: situation={situation!r}") + logger.info(f"{self.session_id} ???????? Hook ??: situation={situation!r}") continue upsert_kwargs = before_upsert_result.kwargs situation = str(upsert_kwargs.get("situation", situation) or "").strip() style = str(upsert_kwargs.get("style", style) or "").strip() if not situation or not style: - logger.info(f"{self.session_id} 的表达方式写入被 Hook 清空,已跳过") + logger.info(f"{self.session_id} ???????? Hook ??????") continue await self._upsert_expression_to_db(situation, style) - # ====== 黑话相关 ====== - def _check_cached_jargons_in_messages(self, jargon_miner: Optional["JargonMiner"] = None) -> List[Tuple[str, str]]: + self._last_processed_index = len(message_cache) + return True + + def _check_cached_jargons_in_messages( + self, + messages: List["SessionMessage"], + jargon_miner: Optional["JargonMiner"] = None, + ) -> List[Tuple[str, str]]: """ 检查缓存中的 jargon 是否出现在 messages 中 @@ -418,7 +415,7 @@ class ExpressionLearner: matched_entries: List[Tuple[str, str]] = [] - for i, msg in enumerate(self._messages_cache): + for i, msg in enumerate(messages): # 跳过机器人自己的消息 if is_bot_self(msg.platform, msg.message_info.user_info.user_id): continue @@ -454,7 +451,10 @@ class ExpressionLearner: return matched_entries async def _process_jargon_entries( - self, jargon_entries: List[Tuple[str, str]], jargon_miner: Optional["JargonMiner"] = None + self, + jargon_entries: List[Tuple[str, str]], + messages: List["SessionMessage"], + jargon_miner: Optional["JargonMiner"] = None, ): """ 处理从 expression learner 提取的黑话条目,路由到 jargon_miner @@ -463,7 +463,7 @@ class ExpressionLearner: jargon_entries: 黑话条目列表,每个元素是 (content, source_id) jargon_miner: JargonMiner 实例 """ - if not jargon_entries or not self._messages_cache: + if not jargon_entries or not messages: return if not jargon_miner: @@ -497,20 +497,20 @@ class ExpressionLearner: # build_readable_message 的编号从 1 开始 line_index = int(source_id) - 1 - if line_index < 0 or line_index >= len(self._messages_cache): + if line_index < 0 or line_index >= len(messages): logger.warning(f"黑话条目 source_id 超出范围:content={content}, source_id={source_id}") continue # 检查是否是机器人自己的消息 - target_msg = self._messages_cache[line_index] + target_msg = messages[line_index] if is_bot_self(target_msg.platform, target_msg.message_info.user_info.user_id): logger.info(f"跳过引用机器人自身消息的黑话:content={content}, source_id={source_id}") continue # 构建上下文段落(取前后各 3 条消息) start_idx = max(0, line_index - 3) - end_idx = min(len(self._messages_cache), line_index + 4) - context_msgs = self._messages_cache[start_idx:end_idx] + end_idx = min(len(messages), line_index + 4) + context_msgs = messages[start_idx:end_idx] context_paragraph = "\n".join( [f"[{i + 1}] {msg.processed_plain_text or ''}" for i, msg in enumerate(context_msgs)] @@ -529,7 +529,11 @@ class ExpressionLearner: logger.info(f"成功处理 {len(entries)} 个黑话条目") # ====== 过滤方法 ====== - def _filter_expressions(self, expressions: List[Tuple[str, str, str]]) -> List[Tuple[str, str]]: + def _filter_expressions( + self, + expressions: List[Tuple[str, str, str]], + messages: List["SessionMessage"], + ) -> List[Tuple[str, str]]: """ 过滤表达方式,移除不符合条件的条目 @@ -558,10 +562,10 @@ class ExpressionLearner: if not source_id_str.isdigit(): continue # 无效的来源行编号,跳过 line_index = int(source_id_str) - 1 # build_readable_message 的编号从 1 开始 - if line_index < 0 or line_index >= len(self._messages_cache): + if line_index < 0 or line_index >= len(messages): continue # 超出范围,跳过 # 当前行的原始消息 - current_msg = self._messages_cache[line_index] + current_msg = messages[line_index] # 过滤掉从 bot 自己发言中提取到的表达方式 if is_bot_self(current_msg.platform, current_msg.message_info.user_info.user_id): continue diff --git a/src/maisaka/builtin_tool/reply.py b/src/maisaka/builtin_tool/reply.py index ad7e9260..8a3d2f4f 100644 --- a/src/maisaka/builtin_tool/reply.py +++ b/src/maisaka/builtin_tool/reply.py @@ -14,6 +14,18 @@ from .context import BuiltinToolRuntimeContext logger = get_logger("maisaka_builtin_reply") +async def _run_expression_selector(tool_ctx: BuiltinToolRuntimeContext, system_prompt: str) -> str: + """运行 replyer 侧表达方式选择子代理,并返回文本结果。""" + response = await tool_ctx.runtime.run_sub_agent( + context_message_limit=10, + system_prompt=system_prompt, + request_kind="expression_selector", + max_tokens=256, + temperature=0.1, + ) + return (response.content or "").strip() + + def get_tool_spec() -> ToolSpec: """获取 reply 工具声明。""" @@ -102,6 +114,10 @@ async def handle_tool( stream_id=tool_ctx.runtime.session_id, reply_message=target_message, chat_history=tool_ctx.runtime._chat_history, + sub_agent_runner=lambda system_prompt: _run_expression_selector( + tool_ctx, + system_prompt, + ), log_reply=False, ) except Exception as exc: diff --git a/src/maisaka/prompt_cli_renderer.py b/src/maisaka/prompt_cli_renderer.py index 9652eb7b..c32a1ab3 100644 --- a/src/maisaka/prompt_cli_renderer.py +++ b/src/maisaka/prompt_cli_renderer.py @@ -652,10 +652,14 @@ class PromptCLIVisualizer: dump_uri = cls._build_file_uri(prompt_dump_path) body = Group( - Text(f"富文本预览:{viewer_html_path}", style="bold green"), - Text(f"原始文本备份:{prompt_dump_path}", style="magenta"), - Text.from_markup(f"[link={viewer_uri}]点击在浏览器打开富文本 Prompt 视图[/link]", style="bold green"), - Text.from_markup(f"[link={dump_uri}]点击直接打开 Prompt 文本[/link]", style="cyan"), + Text.from_markup( + f"[bold green]富文本预览:{viewer_html_path}[/bold green] " + f"[link={viewer_uri}]点击在浏览器打开富文本 Prompt 视图[/link]" + ), + Text.from_markup( + f"[magenta]原始文本备份:{prompt_dump_path}[/magenta] " + f"[cyan][link={dump_uri}]点击直接打开 Prompt 文本[/link][/cyan]" + ), ) return body @@ -812,10 +816,14 @@ class PromptCLIVisualizer: dump_uri = cls._build_file_uri(text_dump_path) body = Group( - Text(f"富文本预览:{viewer_html_path}", style="bold green"), - Text(f"原始文本备份:{text_dump_path}", style="magenta"), - Text.from_markup(f"[link={viewer_uri}]点击在浏览器打开富文本 Prompt 视图[/link]", style="bold green"), - Text.from_markup(f"[link={dump_uri}]点击直接打开 Prompt 文本[/link]", style="cyan"), + Text.from_markup( + f"[bold green]富文本预览:{viewer_html_path}[/bold green] " + f"[link={viewer_uri}]点击在浏览器打开富文本 Prompt 视图[/link]" + ), + Text.from_markup( + f"[magenta]原始文本备份:{text_dump_path}[/magenta] " + f"[cyan][link={dump_uri}]点击直接打开 Prompt 文本[/link][/cyan]" + ), ) return body diff --git a/src/maisaka/runtime.py b/src/maisaka/runtime.py index c68998e8..bcdf4cc8 100644 --- a/src/maisaka/runtime.py +++ b/src/maisaka/runtime.py @@ -84,7 +84,6 @@ class MaisakaHeartFlowChatting: self._enable_expression_use = expr_use self._enable_expression_learning = expr_learn self._enable_jargon_learning = jargon_learn - self._min_messages_for_extraction = 10 self._min_extraction_interval = 30 self._last_expression_extraction_time = 0.0 self._last_knowledge_extraction_time = 0.0 @@ -351,88 +350,84 @@ class MaisakaHeartFlowChatting: logger.error(f"{self.log_prefix} 知识学习任务异常退出: {knowledge_result}") async def _trigger_expression_learning(self, messages: list[SessionMessage]) -> None: - """基于新收集的一批消息触发表达学习。""" - self._expression_learner.add_messages(messages) - + """?????????????????""" if not self._enable_expression_learning: - logger.debug(f"{self.log_prefix} 表达学习未启用,跳过当前批次") + logger.debug(f"{self.log_prefix} ??????????????") return elapsed = time.time() - self._last_expression_extraction_time if elapsed < self._min_extraction_interval: logger.debug( - f"{self.log_prefix} 表达学习尚未达到触发间隔: " - f"已过={elapsed:.2f} 秒 阈值={self._min_extraction_interval} 秒" + f"{self.log_prefix} ????????????: " + f"??={elapsed:.2f} ? ??={self._min_extraction_interval} ?" ) return - cache_size = self._expression_learner.get_cache_size() - if cache_size < self._min_messages_for_extraction: + pending_count = self._expression_learner.get_pending_count(self.message_cache) + if pending_count < self._expression_learner.min_messages_for_extraction: logger.debug( - f"{self.log_prefix} 表达学习因缓存数量不足而跳过: " - f"学习器缓存={cache_size} 阈值={self._min_messages_for_extraction} " - f"消息总缓存={len(self.message_cache)}" + f"{self.log_prefix} ??????????????: " + f"??????={pending_count} ??={self._expression_learner.min_messages_for_extraction} " + f"?????={len(self.message_cache)}" ) return self._last_expression_extraction_time = time.time() logger.info( - f"{self.log_prefix} 开始表达学习: " - f"新批次消息数={len(messages)} 学习器缓存={cache_size} " - f"消息总缓存={len(self.message_cache)} " - f"启用黑话学习={self._enable_jargon_learning}" + f"{self.log_prefix} ??????: " + f"??????={len(messages)} ??????={pending_count} " + f"?????={len(self.message_cache)} " + f"??????={self._enable_jargon_learning}" ) try: jargon_miner = self._jargon_miner if self._enable_jargon_learning else None - learnt_style = await self._expression_learner.learn(jargon_miner) + learnt_style = await self._expression_learner.learn(self.message_cache, jargon_miner) if learnt_style: - logger.info(f"{self.log_prefix} 表达学习已完成") + logger.info(f"{self.log_prefix} ???????") else: - logger.debug(f"{self.log_prefix} 表达学习已完成,但没有可用结果") + logger.debug(f"{self.log_prefix} ???????????????") except Exception: - logger.exception(f"{self.log_prefix} 表达学习失败") + logger.exception(f"{self.log_prefix} ??????") async def _trigger_knowledge_learning(self, messages: list[SessionMessage]) -> None: - """基于新收集的一批消息触发知识学习。""" - self._knowledge_learner.add_messages(messages) - + """?????????????????""" if not global_config.maisaka.enable_knowledge_module: - logger.debug(f"{self.log_prefix} 知识学习未启用,跳过当前批次") + logger.debug(f"{self.log_prefix} ??????????????") return elapsed = time.time() - self._last_knowledge_extraction_time if elapsed < self._min_extraction_interval: logger.debug( - f"{self.log_prefix} 知识学习尚未达到触发间隔: " - f"已过={elapsed:.2f} 秒 阈值={self._min_extraction_interval} 秒" + f"{self.log_prefix} ????????????: " + f"??={elapsed:.2f} ? ??={self._min_extraction_interval} ?" ) return - cache_size = self._knowledge_learner.get_cache_size() - if cache_size < self._min_messages_for_extraction: + pending_count = self._knowledge_learner.get_pending_count(self.message_cache) + if pending_count < self._knowledge_learner.min_messages_for_extraction: logger.debug( - f"{self.log_prefix} 知识学习因缓存数量不足而跳过: " - f"学习器缓存={cache_size} 阈值={self._min_messages_for_extraction} " - f"消息总缓存={len(self.message_cache)}" + f"{self.log_prefix} ??????????????: " + f"??????={pending_count} ??={self._knowledge_learner.min_messages_for_extraction} " + f"?????={len(self.message_cache)}" ) return self._last_knowledge_extraction_time = time.time() logger.info( - f"{self.log_prefix} 开始知识学习: " - f"新批次消息数={len(messages)} 学习器缓存={cache_size} " - f"消息总缓存={len(self.message_cache)}" + f"{self.log_prefix} ??????: " + f"??????={len(messages)} ??????={pending_count} " + f"?????={len(self.message_cache)}" ) try: - added_count = await self._knowledge_learner.learn() + added_count = await self._knowledge_learner.learn(self.message_cache) if added_count > 0: - logger.info(f"{self.log_prefix} 知识学习已完成: 新增条目数={added_count}") + logger.info(f"{self.log_prefix} ???????: ?????={added_count}") else: - logger.debug(f"{self.log_prefix} 知识学习已完成,但没有可用结果") + logger.debug(f"{self.log_prefix} ???????????????") except Exception: - logger.exception(f"{self.log_prefix} 知识学习失败") + logger.exception(f"{self.log_prefix} ??????") async def _init_mcp(self) -> None: """初始化 MCP 工具并注册到统一工具层。""" @@ -502,9 +497,8 @@ class MaisakaHeartFlowChatting: session_name = chat_manager.get_session_name(self.session_id) or self.session_id body_lines = [ - f"聊天流: {session_name}", - f"Chat ID: {self.session_id}", - f"上下文占用: {selected_history_count}条 / {self._format_token_count(prompt_tokens)}", + f"上下文占用:{selected_history_count}/{self._max_context_size} 条", + f"本次请求token消耗:{self._format_token_count(prompt_tokens)}", ] renderables: list[RenderableType] = [Text("\n".join(body_lines))] @@ -584,7 +578,7 @@ class MaisakaHeartFlowChatting: def _log_history_trimmed(self, removed_count: int, user_message_count: int) -> None: logger.info( f"{self.log_prefix} 已裁剪 {removed_count} 条历史消息; " - f"剩余计入上下文的消息数={user_message_count}" + # f"剩余计入上下文的消息数={user_message_count}" ) def _log_internal_loop_cancelled(self) -> None: