diff --git a/.gitignore b/.gitignore index 8fcdc015..156a41dc 100644 --- a/.gitignore +++ b/.gitignore @@ -278,6 +278,8 @@ logs .vscode /config/* +config/mcp_config.json +!config/mcp_config.json.template config/old/bot_config_20250405_212257.toml temp/ diff --git a/code_scripts/migrate_expression_jargon_db.py b/code_scripts/migrate_expression_jargon_db.py new file mode 100644 index 00000000..24353511 --- /dev/null +++ b/code_scripts/migrate_expression_jargon_db.py @@ -0,0 +1,355 @@ +from argparse import ArgumentParser, Namespace +from collections.abc import Iterable +from datetime import datetime +from pathlib import Path +from sys import path as sys_path +from typing import Any, Optional + +import json +import sqlite3 + +from sqlmodel import Session, SQLModel, create_engine, delete + +ROOT_PATH = Path(__file__).resolve().parent.parent +if str(ROOT_PATH) not in sys_path: + sys_path.insert(0, str(ROOT_PATH)) + +from src.common.database.database_model import Expression, Jargon, ModifiedBy + + +def build_argument_parser() -> ArgumentParser: + """构建命令行参数解析器。""" + parser = ArgumentParser( + description="将旧版 expression/jargon 数据迁移到新版 expressions/jargons 数据库。" + ) + parser.add_argument("--source-db", dest="source_db", help="旧版 SQLite 数据库路径") + parser.add_argument("--target-db", dest="target_db", help="新版 SQLite 数据库路径") + parser.add_argument( + "--clear-target", + dest="clear_target", + action="store_true", + help="迁移前清空目标库中的 expressions 和 jargons 表", + ) + return parser + + +def prompt_path(prompt_text: str, current_value: Optional[str] = None) -> Path: + """读取数据库路径输入。""" + while True: + suffix = f" [{current_value}]" if current_value else "" + raw_text = input(f"{prompt_text}{suffix}: ").strip() + value = raw_text or current_value or "" + if not value: + print("路径不能为空,请重新输入。") + continue + return Path(value).expanduser().resolve() + + +def prompt_yes_no(prompt_text: str, default: bool = False) -> bool: + """读取是否确认输入。""" + default_hint = "Y/n" if default else "y/N" + raw_text = input(f"{prompt_text} [{default_hint}]: ").strip().lower() + if not raw_text: + return default + return raw_text in {"y", "yes"} + + +def ensure_sqlite_file(path: Path, should_exist: bool) -> None: + """校验 SQLite 文件路径。""" + if should_exist and not path.is_file(): + raise FileNotFoundError(f"数据库文件不存在:{path}") + if not should_exist: + path.parent.mkdir(parents=True, exist_ok=True) + + +def connect_sqlite(path: Path) -> sqlite3.Connection: + """创建 SQLite 连接。""" + connection = sqlite3.connect(path) + connection.row_factory = sqlite3.Row + return connection + + +def table_exists(connection: sqlite3.Connection, table_name: str) -> bool: + """检查表是否存在。""" + result = connection.execute( + "SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ? LIMIT 1", + (table_name,), + ).fetchone() + return result is not None + + +def resolve_source_table_name(connection: sqlite3.Connection, candidates: list[str]) -> str: + """从候选表名中解析实际存在的表名。""" + for table_name in candidates: + if table_exists(connection, table_name): + return table_name + raise ValueError(f"未找到候选表:{', '.join(candidates)}") + + +def get_table_columns(connection: sqlite3.Connection, table_name: str) -> set[str]: + """获取表字段名集合。""" + rows = connection.execute(f"PRAGMA table_info('{table_name}')").fetchall() + return {str(row["name"]) for row in rows} + + +def load_rows(connection: sqlite3.Connection, table_name: str) -> list[sqlite3.Row]: + """读取整张表的数据。""" + return connection.execute(f"SELECT * FROM {table_name}").fetchall() + + +def normalize_string_list(raw_value: Any) -> list[str]: + """将旧库中的 JSON/文本字段标准化为字符串列表。""" + if raw_value is None: + return [] + if isinstance(raw_value, list): + return [str(item).strip() for item in raw_value if str(item).strip()] + if isinstance(raw_value, str): + raw_text = raw_value.strip() + if not raw_text: + return [] + try: + parsed = json.loads(raw_text) + except json.JSONDecodeError: + return [raw_text] + if isinstance(parsed, list): + return [str(item).strip() for item in parsed if str(item).strip()] + if isinstance(parsed, str): + parsed_text = parsed.strip() + return [parsed_text] if parsed_text else [] + if parsed is None: + return [] + return [str(parsed).strip()] + return [str(raw_value).strip()] + + +def normalize_modified_by(raw_value: Any) -> Optional[ModifiedBy]: + """标准化审核来源字段。""" + if raw_value is None: + return None + value = str(raw_value).strip().lower() + if value == ModifiedBy.AI.value: + return ModifiedBy.AI + if value == ModifiedBy.USER.value: + return ModifiedBy.USER + return None + + +def timestamp_to_datetime(raw_value: Any, fallback_now: bool) -> Optional[datetime]: + """将旧库中的 Unix 时间戳转换为 datetime。""" + if raw_value is None or raw_value == "": + return datetime.now() if fallback_now else None + if isinstance(raw_value, datetime): + return raw_value + try: + return datetime.fromtimestamp(float(raw_value)) + except (TypeError, ValueError, OSError, OverflowError): + return datetime.now() if fallback_now else None + + +def build_session_id_dict(raw_chat_id: Any, fallback_count: int) -> str: + """将旧版 jargon.chat_id 转换为新版 session_id_dict。""" + if raw_chat_id is None: + return json.dumps({}, ensure_ascii=False) + + if isinstance(raw_chat_id, str): + raw_text = raw_chat_id.strip() + else: + raw_text = str(raw_chat_id).strip() + + if not raw_text: + return json.dumps({}, ensure_ascii=False) + + try: + parsed = json.loads(raw_text) + except json.JSONDecodeError: + return json.dumps({raw_text: max(fallback_count, 1)}, ensure_ascii=False) + + if isinstance(parsed, str): + parsed_text = parsed.strip() + session_counts = {parsed_text: max(fallback_count, 1)} if parsed_text else {} + return json.dumps(session_counts, ensure_ascii=False) + + if not isinstance(parsed, list): + return json.dumps({}, ensure_ascii=False) + + session_counts: dict[str, int] = {} + for item in parsed: + if not isinstance(item, list) or not item: + continue + session_id = str(item[0]).strip() + if not session_id: + continue + item_count = 1 + if len(item) > 1: + try: + item_count = int(item[1]) + except (TypeError, ValueError): + item_count = 1 + session_counts[session_id] = max(item_count, 1) + + return json.dumps(session_counts, ensure_ascii=False) + + +def create_target_engine(target_db_path: Path): + """创建目标数据库引擎。""" + return create_engine( + f"sqlite:///{target_db_path.as_posix()}", + echo=False, + connect_args={"check_same_thread": False}, + ) + + +def clear_target_tables(session: Session) -> None: + """清空目标表。""" + session.exec(delete(Expression)) + session.exec(delete(Jargon)) + + +def migrate_expressions( + old_rows: Iterable[sqlite3.Row], + target_session: Session, + expression_columns: set[str], +) -> int: + """迁移 expression 数据。""" + migrated_count = 0 + for row in old_rows: + create_time = timestamp_to_datetime(row["create_date"] if "create_date" in expression_columns else None, True) + last_active_time = timestamp_to_datetime( + row["last_active_time"] if "last_active_time" in expression_columns else None, + True, + ) + content_list = normalize_string_list(row["content_list"] if "content_list" in expression_columns else None) + + expression = Expression( + id=int(row["id"]) if row["id"] is not None else None, + situation=str(row["situation"]).strip(), + style=str(row["style"]).strip(), + content_list=json.dumps(content_list, ensure_ascii=False), + count=int(row["count"]) if "count" in expression_columns and row["count"] is not None else 1, + last_active_time=last_active_time or datetime.now(), + create_time=create_time or datetime.now(), + session_id=str(row["chat_id"]).strip() if "chat_id" in expression_columns and row["chat_id"] else None, + checked=bool(row["checked"]) if "checked" in expression_columns and row["checked"] is not None else False, + rejected=bool(row["rejected"]) if "rejected" in expression_columns and row["rejected"] is not None else False, + modified_by=normalize_modified_by(row["modified_by"] if "modified_by" in expression_columns else None), + ) + target_session.add(expression) + migrated_count += 1 + return migrated_count + + +def migrate_jargons( + old_rows: Iterable[sqlite3.Row], + target_session: Session, + jargon_columns: set[str], +) -> int: + """迁移 jargon 数据。""" + migrated_count = 0 + for row in old_rows: + count = int(row["count"]) if "count" in jargon_columns and row["count"] is not None else 0 + raw_content_list = normalize_string_list(row["raw_content"] if "raw_content" in jargon_columns else None) + inference_content_key = ( + "inference_content_only" + if "inference_content_only" in jargon_columns + else "inference_with_content_only" + if "inference_with_content_only" in jargon_columns + else None + ) + + jargon = Jargon( + id=int(row["id"]) if row["id"] is not None else None, + content=str(row["content"]).strip(), + raw_content=json.dumps(raw_content_list, ensure_ascii=False), + meaning=str(row["meaning"]).strip() if "meaning" in jargon_columns and row["meaning"] is not None else "", + session_id_dict=build_session_id_dict( + row["chat_id"] if "chat_id" in jargon_columns else None, + fallback_count=count, + ), + count=count, + is_jargon=bool(row["is_jargon"]) if "is_jargon" in jargon_columns and row["is_jargon"] is not None else None, + is_complete=bool(row["is_complete"]) if "is_complete" in jargon_columns and row["is_complete"] is not None else False, + is_global=bool(row["is_global"]) if "is_global" in jargon_columns and row["is_global"] is not None else False, + last_inference_count=( + int(row["last_inference_count"]) + if "last_inference_count" in jargon_columns and row["last_inference_count"] is not None + else 0 + ), + inference_with_context=( + str(row["inference_with_context"]) + if "inference_with_context" in jargon_columns and row["inference_with_context"] is not None + else None + ), + inference_with_content_only=( + str(row[inference_content_key]) if inference_content_key and row[inference_content_key] is not None else None + ), + ) + target_session.add(jargon) + migrated_count += 1 + return migrated_count + + +def confirm_target_replacement(target_db_path: Path, clear_target: bool) -> bool: + """确认是否写入目标数据库。""" + if clear_target: + return prompt_yes_no(f"将清空目标库中的 expressions/jargons 后再迁移,确认继续吗?\n目标库:{target_db_path}") + return prompt_yes_no(f"将写入目标库,若主键冲突会导致迁移失败,确认继续吗?\n目标库:{target_db_path}") + + +def parse_arguments() -> Namespace: + """解析参数。""" + return build_argument_parser().parse_args() + + +def main() -> None: + """脚本入口。""" + args = parse_arguments() + + print("旧版 expression/jargon -> 新版 expressions/jargons 迁移工具") + source_db_path = prompt_path("请输入旧版数据库路径", args.source_db) + target_db_path = prompt_path("请输入新版数据库路径", args.target_db) + clear_target = args.clear_target or prompt_yes_no("迁移前是否清空目标库中的 expressions 和 jargons 表?", False) + + if source_db_path == target_db_path: + raise ValueError("旧版数据库路径和新版数据库路径不能相同。") + + ensure_sqlite_file(source_db_path, should_exist=True) + ensure_sqlite_file(target_db_path, should_exist=False) + + print(f"旧库:{source_db_path}") + print(f"新库:{target_db_path}") + print(f"清空目标表:{'是' if clear_target else '否'}") + + if not confirm_target_replacement(target_db_path, clear_target): + print("已取消迁移。") + return + + source_connection = connect_sqlite(source_db_path) + try: + expression_table_name = resolve_source_table_name(source_connection, ["expression", "expressions"]) + jargon_table_name = resolve_source_table_name(source_connection, ["jargon", "jargons"]) + expression_columns = get_table_columns(source_connection, expression_table_name) + jargon_columns = get_table_columns(source_connection, jargon_table_name) + expression_rows = load_rows(source_connection, expression_table_name) + jargon_rows = load_rows(source_connection, jargon_table_name) + finally: + source_connection.close() + + target_engine = create_target_engine(target_db_path) + SQLModel.metadata.create_all(target_engine) + + with Session(target_engine) as target_session: + if clear_target: + clear_target_tables(target_session) + target_session.commit() + + expression_count = migrate_expressions(expression_rows, target_session, expression_columns) + jargon_count = migrate_jargons(jargon_rows, target_session, jargon_columns) + target_session.commit() + + print("迁移完成。") + print(f"已迁移 expression 记录:{expression_count}") + print(f"已迁移 jargon 记录:{jargon_count}") + + +if __name__ == "__main__": + main() diff --git a/prompts/zh-CN/maidairy_chat.prompt b/prompts/zh-CN/maidairy_chat.prompt index f0cff929..aac8e8ac 100644 --- a/prompts/zh-CN/maidairy_chat.prompt +++ b/prompts/zh-CN/maidairy_chat.prompt @@ -1,5 +1,5 @@ 你的任务是分析聊天和聊天中的互动情况。 -你需要关注 {bot_name}(AI) 与不同用户的对话来为选择正确的动作和行为提供建议 +你需要关注 {bot_name}(AI) 与不同用户的对话来为选择正确的动作和行为以及搜集信息提供建议 【参考信息】 {bot_name}的人设:{identity} @@ -8,27 +8,28 @@ 你需要根据提供的参考信息,当前场景和输出规则来进行分析 在当前场景中,用户正在与AI麦麦进行聊天互动,你的任务不是生成对用户可见的发言,而是进行分析来指导AI进行回复。 “分析”应该体现你对当前局面的判断、你的建议、你的下一步计划,以及你为什么这样想。 -没必要刻意友好回复,符合你的人格就行 +你需要先搜集能够帮助{bot_name}回复的信息,然后再给出回复意见 + 你可以使用这些工具: - wait(seconds) - 暂时停止对话,等待(seconds)秒,把话语权交给用户,等待对方新的发言。 -- stop() - 结束对话,不进行任何回复,直到对方有新消息。 -- reply():当你判断现在应该正式对用户发出一条可见回复时调用。调用后系统会基于你当前这轮的想法生成一条真正展示给用户的回复。 -- no_reply():当你判断现在不应该发言,应该继续内部思考时调用。这个工具不会做任何外部行为,只会继续下一轮循环。 -{file_tools_section} +- stop() - 当你判断{bot_name}现在不应该发言,结束对话,不进行任何回复,直到对方有新消息。 +- reply():当你判断{bot_name}现在应该正式对用户发出一条可见回复时调用。调用后系统会基于你当前这轮的想法生成一条真正展示给用户的回复。 +- query_jargon():当你认为某些词的含义不明确,或用户询问某些词的含义,需要进行查询 +- 其他定义的工具,你可以视情况合适使用 工具使用规则: -1.如果麦麦已经回复,但用户暂时没有新的回复,且没有新信息需要搜集,使用wait或者stop进行等待 +1.如果{bot_name}已经回复,但用户暂时没有新的回复,且没有新信息需要搜集,使用wait或者stop进行等待 2.如果用户有新发言,但是你评估用户还有后续发言尚未发送,可以适当等待让用户说完 3.在特定情况下也可以连续回复,例如想要追问,或者补充自己先前的发言,可以不使用stop或者wait -4.如果你想指导麦麦直接发言,可以不使用任何工具 -5.你需要控制自己发言的频率,如果用户一对一聊天,可以以均匀地频率发言,如果用户较多,不要每句都回复,控制回复频率。当你决定暂时不发言,可以使用wait暂时等待一定时间或者stop等待新消息 +4.你需要控制自己发言的频率,如果用户一对一聊天,可以以均匀地频率发言,如果用户较多,不要每句都回复,控制回复频率。当你决定暂时不发言,可以使用wait暂时等待一定时间或者stop等待新消息 +5.如果存在用户的疑问,或者对某些概念的不确定,你可以使用工具来搜集信息或者查询含义,你可以使用多个工具 你的分析规则: 1. 默认直接输出你当前的最新分析,不要重复之前的分析内容。 2. 最新分析应尽量具体,贴近上下文,不要空泛重复。 -3. 只有在确实需要等待或停止时才调用工具,否则优先直接输出分析想法。 -4. 如果你刚刚做了工具调用,下一轮应结合工具结果继续输出新的分析。 -5. 你需要评估哪些话是对你的发言,哪些是用户之间的交流或者自言自语,不要频繁插入无关的话题。 +3. 如果你刚刚做了工具调用,下一轮应结合工具结果继续输出新的分析。 +4. 你需要评估哪些话是对{bot_name}的发言,哪些是用户之间的交流或者自言自语,不要频繁插入无关的话题。 +5. 如果你上一轮没有发言,需要重新进行分析,输出新的分析内容,不要重复上一轮的分析内容 -现在,请你输出你的分析: +现在,请你输出你对{bot_name}发言的分析,你必须先输出文本内容的分析,然后再进行工具调用: diff --git a/prompts/zh-CN/maidairy_cognition.prompt b/prompts/zh-CN/maidairy_cognition.prompt deleted file mode 100644 index 7c5c814a..00000000 --- a/prompts/zh-CN/maidairy_cognition.prompt +++ /dev/null @@ -1,11 +0,0 @@ -你是一个认知感知分析模块。你的任务是根据对话上下文,分析对话中用户的: -1. 核心意图(如:寻求帮助、纯粹聊天、请求任务、发泄情绪、获取信息、表达观点等) -2. 认知状态(如:明确具体、模糊试探、犹豫不决、困惑迷茫、思路清晰、逻辑混乱等) -3. 隐含目的(如:解决问题、获得安慰、打发时间、寻求认同、交换想法、表达自我等) - -要求: -- 只分析用户(对话中 role=user 的内容),不要分析助手自己 -- 根据用户最新发言重点分析,同时结合上下文理解深层动机 -- 输出简洁(2-4 句话),不要太长 -- 如果信息太少无法判断,就说信息不足,给出初步印象 -- 直接输出分析结果,不要有格式标题 diff --git a/prompts/zh-CN/maidairy_emotion.prompt b/prompts/zh-CN/maidairy_emotion.prompt deleted file mode 100644 index b8440527..00000000 --- a/prompts/zh-CN/maidairy_emotion.prompt +++ /dev/null @@ -1,11 +0,0 @@ -你是一个情绪感知分析模块。你的任务是根据对话上下文,分析对话中用户的: -1. 当前情绪状态(如:开心、沮丧、焦虑、平静、兴奋、愤怒等) -2. 言语态度(如:友好、冷淡、热情、敷衍、试探、认真、调侃等) -3. 潜在的情感需求(如:需要倾听、需要鼓励、想要倾诉、只是闲聊等) - -要求: -- 只分析用户(对话中 role=user 的内容),不要分析助手自己 -- 根据用户最新发言重点分析,同时结合上下文理解变化趋势 -- 输出简洁(2-4 句话),不要太长 -- 如果信息太少无法判断,就说信息不足,给出初步印象 -- 直接输出分析结果,不要有格式标题 diff --git a/src/chat/replyer/maisaka_generator.py b/src/chat/replyer/maisaka_generator.py index 76bbc93f..946014ff 100644 --- a/src/chat/replyer/maisaka_generator.py +++ b/src/chat/replyer/maisaka_generator.py @@ -29,7 +29,7 @@ logger = get_logger("maisaka_replyer") class MaisakaReplyGenerator: - """Maisaka 可见回复生成器。""" + """生成 Maisaka 的最终可见回复。""" def __init__( self, @@ -45,7 +45,7 @@ class MaisakaReplyGenerator: self._personality_prompt = self._build_personality_prompt() def _build_personality_prompt(self) -> str: - """构建回复器使用的人设描述。""" + """构建 replyer 使用的人设描述。""" try: bot_name = global_config.bot.nickname alias_names = global_config.bot.alias_names @@ -79,8 +79,7 @@ class MaisakaReplyGenerator: @staticmethod def _extract_visible_assistant_reply(message: SessionMessage) -> str: - if is_perception_message(message): - return "" + del message return "" def _extract_guided_bot_reply(self, message: SessionMessage) -> str: @@ -91,11 +90,11 @@ class MaisakaReplyGenerator: return "" @staticmethod - def _split_user_message_segments(raw_content: str) -> list[tuple[Optional[str], str]]: + def _split_user_message_segments(raw_content: str) -> List[tuple[Optional[str], str]]: """按说话人拆分用户消息。""" - segments: list[tuple[Optional[str], str]] = [] + segments: List[tuple[Optional[str], str]] = [] current_speaker: Optional[str] = None - current_lines: list[str] = [] + current_lines: List[str] = [] for raw_line in raw_content.splitlines(): speaker_name, content_body = parse_speaker_content(raw_line) @@ -113,10 +112,10 @@ class MaisakaReplyGenerator: return segments - def _format_chat_history(self, messages: list[SessionMessage]) -> str: - """格式化回复器使用的可见聊天历史。""" + def _format_chat_history(self, messages: List[SessionMessage]) -> str: + """格式化 replyer 使用的可见聊天记录。""" bot_nickname = global_config.bot.nickname.strip() or "Bot" - parts: list[str] = [] + parts: List[str] = [] for message in messages: role = get_message_role(message) @@ -144,7 +143,13 @@ class MaisakaReplyGenerator: return "\n".join(parts) - def _build_prompt(self, chat_history: List[SessionMessage], reply_reason: str) -> str: + def _build_prompt( + self, + chat_history: List[SessionMessage], + reply_reason: str, + expression_habits: str = "", + jargon_explanation: str = "", + ) -> str: """构建 Maisaka replyer 提示词。""" current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") formatted_history = self._format_chat_history(chat_history) @@ -157,14 +162,24 @@ class MaisakaReplyGenerator: reply_style=global_config.personality.reply_style, ) except Exception: - system_prompt = "你是一个友好的 AI 助手,请根据用户的想法生成自然的回复。" + system_prompt = "你是一个友好的 AI 助手,请根据聊天记录自然回复。" - user_prompt = ( - f"当前时间:{current_time}\n\n" - f"【聊天记录】\n{formatted_history}\n\n" - f"【你的想法】\n{reply_reason}\n\n" - "现在,你说:" - ) + extra_sections: List[str] = [] + if expression_habits.strip(): + extra_sections.append(expression_habits.strip()) + if jargon_explanation.strip(): + extra_sections.append(jargon_explanation.strip()) + + user_sections = [ + f"当前时间:{current_time}", + f"【聊天记录】\n{formatted_history}", + ] + if extra_sections: + user_sections.append("\n\n".join(extra_sections)) + user_sections.append(f"【你的想法】\n{reply_reason}") + user_sections.append("现在,你说:") + + user_prompt = "\n\n".join(user_sections) return f"System: {system_prompt}\n\nUser: {user_prompt}" async def generate_reply_with_context( @@ -182,6 +197,9 @@ class MaisakaReplyGenerator: unknown_words: Optional[List[str]] = None, log_reply: bool = True, chat_history: Optional[List[SessionMessage]] = None, + expression_habits: str = "", + jargon_explanation: str = "", + selected_expression_ids: Optional[List[int]] = None, ) -> Tuple[bool, ReplyGenerationResult]: """结合上下文生成 Maisaka 的最终可见回复。""" del available_actions @@ -195,14 +213,18 @@ class MaisakaReplyGenerator: del unknown_words result = ReplyGenerationResult() - if not reply_reason or chat_history is None: - result.error_message = "reply_reason or chat_history is empty" + result.selected_expression_ids = list(selected_expression_ids or []) + + if chat_history is None: + result.error_message = "chat_history is empty" return False, result logger.info( f"Maisaka replyer start: stream_id={stream_id} reply_reason={reply_reason!r} " f"history_size={len(chat_history)} target_message_id=" - f"{reply_message.message_id if reply_message else None}" + f"{reply_message.message_id if reply_message else None} " + f"expression_count={len(result.selected_expression_ids)} " + f"jargon_enabled={bool(jargon_explanation.strip())}" ) filtered_history = [ @@ -210,7 +232,12 @@ class MaisakaReplyGenerator: for message in chat_history if get_message_role(message) != "system" and get_message_kind(message) != "perception" ] - prompt = self._build_prompt(filtered_history, reply_reason) + prompt = self._build_prompt( + chat_history=filtered_history, + reply_reason=reply_reason or "", + expression_habits=expression_habits, + jargon_explanation=jargon_explanation, + ) result.completion.request_prompt = prompt if global_config.debug.show_replyer_prompt: @@ -250,7 +277,8 @@ class MaisakaReplyGenerator: logger.info( f"Maisaka replyer success: response_text={response_text!r} " - f"overall_ms={result.metrics.overall_ms}" + f"overall_ms={result.metrics.overall_ms} " + f"selected_expression_ids={result.selected_expression_ids!r}" ) result.text_fragments = [response_text] return True, result diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 1bc93914..ea20f57d 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -1510,24 +1510,6 @@ class MaiSakaConfig(ConfigBase): __ui_icon__ = "message-circle" __ui_parent__ = "experimental" - enable_emotion_module: bool = Field( - default=True, - json_schema_extra={ - "x-widget": "switch", - "x-icon": "heart", - }, - ) - """启用情绪感知模块""" - - enable_cognition_module: bool = Field( - default=True, - json_schema_extra={ - "x-widget": "switch", - "x-icon": "brain", - }, - ) - """启用认知分析模块""" - enable_knowledge_module: bool = Field( default=True, json_schema_extra={ @@ -1546,33 +1528,6 @@ class MaiSakaConfig(ConfigBase): ) """启用 MCP (Model Context Protocol) 支持""" - enable_write_file: bool = Field( - default=True, - json_schema_extra={ - "x-widget": "switch", - "x-icon": "file-plus", - }, - ) - """启用文件写入工具""" - - enable_read_file: bool = Field( - default=True, - json_schema_extra={ - "x-widget": "switch", - "x-icon": "file-text", - }, - ) - """启用文件读取工具""" - - enable_list_files: bool = Field( - default=True, - json_schema_extra={ - "x-widget": "switch", - "x-icon": "list", - }, - ) - """启用文件列表工具""" - show_analyze_cognition_prompt: bool = Field( default=False, json_schema_extra={ diff --git a/src/maisaka/LICENSE b/src/maisaka/LICENSE deleted file mode 100644 index cb1ae897..00000000 --- a/src/maisaka/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2026 SengokuCola - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/src/maisaka/builtin_tools.py b/src/maisaka/builtin_tools.py index c57b1259..62590157 100644 --- a/src/maisaka/builtin_tools.py +++ b/src/maisaka/builtin_tools.py @@ -27,7 +27,10 @@ def create_builtin_tools() -> List[ToolOption]: reply_builder = ToolOptionBuilder() reply_builder.set_name("reply") - reply_builder.set_description("Generate and emit a visible reply based on the current thought. You must specify the target user msg_id to reply to.") + reply_builder.set_description( + "Generate and emit a visible reply based on the current thought. " + "You must specify the target user msg_id to reply to." + ) reply_builder.add_param( name="msg_id", param_type=ToolParamType.STRING, @@ -35,8 +38,38 @@ def create_builtin_tools() -> List[ToolOption]: required=True, enum_values=None, ) + reply_builder.add_param( + name="quote", + param_type=ToolParamType.BOOLEAN, + description="Whether the visible reply should be sent as a quoted reply to the target msg_id.", + required=False, + enum_values=None, + ) + reply_builder.add_param( + name="unknown_words", + param_type=ToolParamType.ARRAY, + description="Optional list of words or phrases that may need jargon lookup before replying.", + required=False, + enum_values=None, + items_schema={"type": "string"}, + ) tools.append(reply_builder.build()) + query_jargon_builder = ToolOptionBuilder() + query_jargon_builder.set_name("query_jargon") + query_jargon_builder.set_description( + "Query the meanings of one or more jargon words in the current chat context." + ) + query_jargon_builder.add_param( + name="words", + param_type=ToolParamType.ARRAY, + description="A list of words or phrases to query from the jargon store.", + required=True, + enum_values=None, + items_schema={"type": "string"}, + ) + tools.append(query_jargon_builder.build()) + no_reply_builder = ToolOptionBuilder() no_reply_builder.set_name("no_reply") no_reply_builder.set_description("Do not emit a visible reply this round and continue thinking.") diff --git a/src/maisaka/cli.py b/src/maisaka/cli.py index 8c84b997..5f2f14cd 100644 --- a/src/maisaka/cli.py +++ b/src/maisaka/cli.py @@ -3,6 +3,7 @@ MaiSaka CLI and conversation loop. """ from datetime import datetime +from pathlib import Path from typing import Optional import asyncio @@ -16,6 +17,7 @@ from rich.text import Text from src.chat.message_receive.message import SessionMessage from src.chat.replyer.maisaka_generator import MaisakaReplyGenerator from src.config.config import config_manager, global_config +from src.mcp_module import MCPManager from .chat_loop_service import MaisakaChatLoopService from .console import console @@ -23,7 +25,6 @@ from .input_reader import InputReader from .knowledge import retrieve_relevant_knowledge from .knowledge_store import get_knowledge_store from .message_adapter import build_message, format_speaker_content, remove_last_perception -from .mcp_client import MCPManager from .tool_handlers import ( ToolHandlerContext, handle_mcp_tool, @@ -289,11 +290,8 @@ class BufferCLI: async def _init_mcp(self) -> None: """初始化 MCP 服务并注册暴露的工具。""" - config_path = os.path.join( - os.path.dirname(os.path.abspath(__file__)), - "mcp_config.json", - ) - self._mcp_manager = await MCPManager.from_config(config_path) + config_path = Path(__file__).resolve().parents[2] / "config" / "mcp_config.json" + self._mcp_manager = await MCPManager.from_config(str(config_path)) if self._mcp_manager and self._chat_loop_service: mcp_tools = self._mcp_manager.get_openai_tools() diff --git a/src/maisaka/reasoning_engine.py b/src/maisaka/reasoning_engine.py index f3b38171..1a7ee665 100644 --- a/src/maisaka/reasoning_engine.py +++ b/src/maisaka/reasoning_engine.py @@ -1,17 +1,24 @@ """Maisaka 推理引擎。""" +import json import asyncio +import re import time from datetime import datetime from typing import TYPE_CHECKING, Optional +from sqlmodel import select + from src.chat.heart_flow.heartFC_utils import CycleDetail from src.chat.message_receive.message import SessionMessage from src.chat.replyer.replyer_manager import replyer_manager +from src.common.database.database import get_db_session +from src.common.database.database_model import Jargon from src.common.data_models.mai_message_data_model import UserInfo -from src.common.data_models.message_component_data_model import MessageSequence +from src.common.data_models.message_component_data_model import MessageSequence, TextComponent from src.common.logger import get_logger from src.config.config import global_config +from src.learners.jargon_explainer import search_jargon from src.llm_models.payload_content.tool_option import ToolCall from src.services import send_service @@ -20,8 +27,10 @@ from .message_adapter import ( build_visible_text_from_sequence, clone_message_sequence, format_speaker_content, + get_message_text, get_message_role, ) +from .reply_context_builder import MaisakaReplyContextBuilder from .tool_handlers import ( handle_mcp_tool, handle_unknown_tool, @@ -38,6 +47,7 @@ class MaisakaReasoningEngine: def __init__(self, runtime: "MaisakaHeartFlowChatting") -> None: self._runtime = runtime + self._reply_context_builder = MaisakaReplyContextBuilder(runtime.session_id) async def run_loop(self) -> None: """独立消费消息批次,并执行对应的内部思考轮次。""" @@ -99,8 +109,7 @@ class MaisakaReasoningEngine: """处理传入消息列表,将其转换为历史消息并加入聊天历史缓存。""" for message in messages: # 构建用户消息序列 - user_sequence = await self._build_message_sequence(message) - visible_text = build_visible_text_from_sequence(user_sequence).strip() + user_sequence, visible_text = await self._build_message_sequence(message) if not user_sequence.components: continue @@ -116,14 +125,15 @@ class MaisakaReasoningEngine: raw_message=user_sequence, display_text=visible_text, ) - self._runtime._chat_history.append(history_message) + insert_index = self._insert_chat_history_message(history_message) + reference_message = await self._build_jargon_reference_message(message) + if reference_message is not None: + self._runtime._chat_history.insert(insert_index + 1, reference_message) self._trim_chat_history() - async def _build_message_sequence(self, message: SessionMessage) -> MessageSequence: + async def _build_message_sequence(self, message: SessionMessage) -> tuple[MessageSequence, str]: message_sequence = MessageSequence([]) - user_info = message.message_info.user_info - speaker_name = user_info.user_cardname or user_info.user_nickname or user_info.user_id - message_sequence.text(format_speaker_content(speaker_name, "", message.timestamp, message.message_id)) + planner_prefix = self._build_planner_user_prefix(message) appended_component = False if global_config.maisaka.direct_image_input: @@ -131,18 +141,153 @@ class MaisakaReasoningEngine: else: source_sequence = message.raw_message - for component in clone_message_sequence(source_sequence).components: + planner_components = clone_message_sequence(source_sequence).components + if planner_components and isinstance(planner_components[0], TextComponent): + planner_components[0].text = planner_prefix + planner_components[0].text + else: + planner_components.insert(0, TextComponent(planner_prefix)) + + for component in planner_components: message_sequence.components.append(component) appended_component = True + legacy_visible_text = self._build_legacy_visible_text(message, source_sequence) if not appended_component: if not message.processed_plain_text: await message.process() content = (message.processed_plain_text or "").strip() if content: - message_sequence.text(content) + message_sequence.text(planner_prefix + content) + legacy_visible_text = self._build_legacy_visible_text_from_text(message, content) - return message_sequence + return message_sequence, legacy_visible_text + + @staticmethod + def _build_planner_user_prefix(message: SessionMessage) -> str: + user_info = message.message_info.user_info + timestamp_text = message.timestamp.strftime("%H:%M:%S") + user_name = user_info.user_nickname or user_info.user_id + group_card = user_info.user_cardname or "" + message_id = message.message_id or "" + return ( + f"[时间]{timestamp_text}\n" + f"[用户]{user_name}\n" + f"[用户群昵称]{group_card}\n" + f"[msg_id]{message_id}\n" + "[发言内容]" + ) + + def _build_legacy_visible_text(self, message: SessionMessage, source_sequence: MessageSequence) -> str: + user_info = message.message_info.user_info + speaker_name = user_info.user_cardname or user_info.user_nickname or user_info.user_id + legacy_sequence = MessageSequence([]) + legacy_sequence.text(format_speaker_content(speaker_name, "", message.timestamp, message.message_id)) + for component in clone_message_sequence(source_sequence).components: + legacy_sequence.components.append(component) + return build_visible_text_from_sequence(legacy_sequence).strip() + + def _build_legacy_visible_text_from_text(self, message: SessionMessage, content: str) -> str: + user_info = message.message_info.user_info + speaker_name = user_info.user_cardname or user_info.user_nickname or user_info.user_id + return format_speaker_content(speaker_name, content, message.timestamp, message.message_id).strip() + + def _insert_chat_history_message(self, message: SessionMessage) -> int: + """按时间顺序将消息插入聊天历史,同时保留 system 消息在最前。""" + if not self._runtime._chat_history: + self._runtime._chat_history.append(message) + return 0 + + insert_at = len(self._runtime._chat_history) + for index, existing_message in enumerate(self._runtime._chat_history): + if get_message_role(existing_message) == "system": + continue + if existing_message.timestamp > message.timestamp: + insert_at = index + break + + self._runtime._chat_history.insert(insert_at, message) + return insert_at + + async def _build_jargon_reference_message(self, message: SessionMessage) -> Optional[SessionMessage]: + """如果命中了黑话词条,则构建一条额外的参考信息消息。""" + content = (get_message_text(message) or "").strip() + if not content: + if not message.processed_plain_text: + await message.process() + content = (message.processed_plain_text or "").strip() + if not content: + return None + + matched_words = self._find_jargon_words_in_text(content) + if not matched_words: + return None + + reference_text = ( + "[参考信息]\n" + f"{','.join(matched_words)}可能是jargon,可以使用query_jargon来查看其含义" + ) + reference_sequence = MessageSequence([TextComponent(reference_text)]) + return build_message( + role="user", + content="", + source="user_reference", + timestamp=message.timestamp, + platform=message.platform, + session_id=self._runtime.session_id, + group_info=self._runtime._build_group_info(message), + user_info=self._runtime._build_runtime_user_info(), + raw_message=reference_sequence, + display_text=reference_text, + ) + + def _find_jargon_words_in_text(self, content: str) -> list[str]: + """匹配正文中出现的 jargon 词条。""" + lowered_content = content.lower() + matches: list[str] = [] + seen_words: set[str] = set() + + with get_db_session(auto_commit=False) as session: + query = select(Jargon).where(Jargon.is_jargon.is_(True)).order_by(Jargon.count.desc()).limit(200) # type: ignore[attr-defined] + jargons = session.exec(query).all() + + for jargon in jargons: + jargon_content = str(jargon.content or "").strip() + if not jargon_content: + continue + if jargon_content in seen_words: + continue + if not self._is_visible_jargon(jargon): + continue + if not self._jargon_matches_text(jargon_content, lowered_content, content): + continue + + seen_words.add(jargon_content) + matches.append(jargon_content) + if len(matches) >= 8: + break + + return matches + + def _is_visible_jargon(self, jargon: Jargon) -> bool: + """判断当前会话是否可见该 jargon。""" + if global_config.expression.all_global_jargon or bool(jargon.is_global): + return True + + try: + session_id_dict = json.loads(jargon.session_id_dict or "{}") + except (TypeError, json.JSONDecodeError): + logger.warning(f"Failed to parse jargon.session_id_dict: jargon_id={jargon.id}") + return False + return self._runtime.session_id in session_id_dict + + @staticmethod + def _jargon_matches_text(jargon_content: str, lowered_content: str, original_content: str) -> bool: + """判断词条是否命中消息正文。""" + if re.search(r"[\u4e00-\u9fff]", jargon_content): + return jargon_content in original_content + + pattern = rf"\b{re.escape(jargon_content.lower())}\b" + return re.search(pattern, lowered_content) is not None def _start_cycle(self) -> CycleDetail: """开始一轮 Maisaka 思考循环。""" @@ -166,21 +311,24 @@ class MaisakaReasoningEngine: def _trim_chat_history(self) -> None: """裁剪聊天历史,保证用户消息数量不超过配置限制。""" - user_message_count = sum(1 for message in self._runtime._chat_history if get_message_role(message) == "user") - if user_message_count <= self._runtime._max_context_size: + counted_roles = {"user", "assistant"} + conversation_message_count = sum( + 1 for message in self._runtime._chat_history if get_message_role(message) in counted_roles + ) + if conversation_message_count <= self._runtime._max_context_size: return trimmed_history = list(self._runtime._chat_history) removed_count = 0 - while user_message_count >= self._runtime._max_context_size and trimmed_history: + while conversation_message_count >= self._runtime._max_context_size and trimmed_history: removed_message = trimmed_history.pop(0) removed_count += 1 - if get_message_role(removed_message) == "user": - user_message_count -= 1 + if get_message_role(removed_message) in counted_roles: + conversation_message_count -= 1 self._runtime._chat_history = trimmed_history - self._runtime._log_history_trimmed(removed_count, user_message_count) + self._runtime._log_history_trimmed(removed_count, conversation_message_count) async def _handle_tool_calls( self, @@ -204,6 +352,10 @@ class MaisakaReasoningEngine: ) continue + if tool_call.func_name == "query_jargon": + await self._handle_query_jargon(tool_call) + continue + if tool_call.func_name == "wait": seconds = (tool_call.args or {}).get("seconds", 30) try: @@ -238,6 +390,68 @@ class MaisakaReasoningEngine: return False + async def _handle_query_jargon(self, tool_call: ToolCall) -> None: + tool_args = tool_call.args or {} + raw_words = tool_args.get("words") + + if not isinstance(raw_words, list): + self._runtime._chat_history.append( + self._build_tool_message(tool_call, "query_jargon requires a words array.") + ) + return + + words: list[str] = [] + seen_words: set[str] = set() + for item in raw_words: + if not isinstance(item, str): + continue + word = item.strip() + if not word or word in seen_words: + continue + seen_words.add(word) + words.append(word) + + if not words: + self._runtime._chat_history.append( + self._build_tool_message(tool_call, "query_jargon requires at least one non-empty word.") + ) + return + + logger.info(f"{self._runtime.log_prefix} query_jargon triggered: words={words!r}") + + results: list[dict[str, object]] = [] + for word in words: + exact_matches = search_jargon( + keyword=word, + chat_id=self._runtime.session_id, + limit=5, + case_sensitive=False, + fuzzy=False, + ) + matched_entries = exact_matches or search_jargon( + keyword=word, + chat_id=self._runtime.session_id, + limit=5, + case_sensitive=False, + fuzzy=True, + ) + + results.append( + { + "word": word, + "found": bool(matched_entries), + "matches": matched_entries, + } + ) + + logger.info(f"{self._runtime.log_prefix} query_jargon finished: results={results!r}") + self._runtime._chat_history.append( + self._build_tool_message( + tool_call, + json.dumps({"results": results}, ensure_ascii=False), + ) + ) + async def _handle_reply( self, tool_call: ToolCall, @@ -246,6 +460,9 @@ class MaisakaReasoningEngine: ) -> bool: tool_args = tool_call.args or {} target_message_id = str(tool_args.get("msg_id") or "").strip() + quote_reply = bool(tool_args.get("quote", True)) + raw_unknown_words = tool_args.get("unknown_words") + unknown_words = raw_unknown_words if isinstance(raw_unknown_words, list) else None if not target_message_id: self._runtime._chat_history.append( self._build_tool_message(tool_call, "reply requires a valid msg_id argument.") @@ -261,7 +478,7 @@ class MaisakaReasoningEngine: logger.info( f"{self._runtime.log_prefix} reply tool triggered: " - f"target_msg_id={target_message_id} latest_thought={latest_thought!r}" + f"target_msg_id={target_message_id} quote={quote_reply} latest_thought={latest_thought!r}" ) logger.info(f"{self._runtime.log_prefix} acquiring Maisaka reply generator") try: @@ -288,6 +505,34 @@ class MaisakaReasoningEngine: return False logger.info(f"{self._runtime.log_prefix} acquired Maisaka reply generator successfully") + logger.info( + f"{self._runtime.log_prefix} building reply context: " + f"target_msg_id={target_message_id} unknown_words={unknown_words!r}" + ) + + try: + reply_context = await self._reply_context_builder.build( + chat_history=self._runtime._chat_history, + reply_message=target_message, + reply_reason=latest_thought, + unknown_words=unknown_words, + ) + except Exception: + logger.exception( + f"{self._runtime.log_prefix} reply context builder crashed: " + f"target_msg_id={target_message_id}" + ) + self._runtime._chat_history.append( + self._build_tool_message(tool_call, "Reply context preparation crashed.") + ) + return False + + logger.info( + f"{self._runtime.log_prefix} reply context built: " + f"target_msg_id={target_message_id} " + f"selected_expression_ids={reply_context.selected_expression_ids!r} " + f"has_jargon_explanation={bool(reply_context.jargon_explanation.strip())}" + ) try: success, reply_result = await replyer.generate_reply_with_context( @@ -295,7 +540,11 @@ class MaisakaReasoningEngine: stream_id=self._runtime.session_id, reply_message=target_message, chat_history=self._runtime._chat_history, + unknown_words=unknown_words, log_reply=False, + expression_habits=reply_context.expression_habits, + jargon_explanation=reply_context.jargon_explanation, + selected_expression_ids=reply_context.selected_expression_ids, ) except Exception: logger.exception(f"{self._runtime.log_prefix} reply generator crashed: target_msg_id={target_message_id}") @@ -322,14 +571,15 @@ class MaisakaReasoningEngine: logger.info( f"{self._runtime.log_prefix} sending guided reply: " - f"target_msg_id={target_message_id} reply_text={reply_text!r}" + f"target_msg_id={target_message_id} quote={quote_reply} reply_text={reply_text!r}" ) try: sent = await send_service.text_to_stream( text=reply_text, stream_id=self._runtime.session_id, - set_reply=True, - reply_message=target_message, + set_reply=quote_reply, + reply_message=target_message if quote_reply else None, + selected_expressions=reply_result.selected_expression_ids or None, typing=False, ) except Exception: diff --git a/src/maisaka/reply_context_builder.py b/src/maisaka/reply_context_builder.py new file mode 100644 index 00000000..f619216a --- /dev/null +++ b/src/maisaka/reply_context_builder.py @@ -0,0 +1,277 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import List, Optional + +import json +import re + +from sqlmodel import select + +from src.chat.message_receive.message import SessionMessage +from src.common.database.database import get_db_session +from src.common.database.database_model import Expression, Jargon +from src.common.logger import get_logger +from src.config.config import global_config +from src.learners.jargon_explainer import search_jargon + +from .message_adapter import get_message_text, parse_speaker_content + +logger = get_logger("maisaka_reply_context") + + +@dataclass +class ReplyContextBuildResult: + """Reply 前置上下文构建结果。""" + + expression_habits: str = "" + jargon_explanation: str = "" + selected_expression_ids: List[int] = field(default_factory=list) + + +@dataclass +class _ExpressionRecord: + expression_id: Optional[int] + situation: str + style: str + + +@dataclass +class _JargonRecord: + jargon_id: Optional[int] + content: str + meaning: str + session_id_dict: str + is_global: bool + + +class MaisakaReplyContextBuilder: + """为 Maisaka reply 构建表达方式和黑话解释。""" + + def __init__(self, session_id: str) -> None: + self._session_id = session_id + + async def build( + self, + chat_history: List[SessionMessage], + reply_message: Optional[SessionMessage], + reply_reason: str, + unknown_words: Optional[List[str]] = None, + ) -> ReplyContextBuildResult: + """构建 reply 前置上下文。""" + expression_habits, selected_expression_ids = self._build_expression_habits( + chat_history=chat_history, + reply_message=reply_message, + reply_reason=reply_reason, + ) + jargon_explanation = self._build_jargon_explanation( + chat_history=chat_history, + reply_message=reply_message, + unknown_words=unknown_words, + ) + return ReplyContextBuildResult( + expression_habits=expression_habits, + jargon_explanation=jargon_explanation, + selected_expression_ids=selected_expression_ids, + ) + + def _build_expression_habits( + self, + chat_history: List[SessionMessage], + reply_message: Optional[SessionMessage], + reply_reason: str, + ) -> tuple[str, List[int]]: + """查询并格式化适合当前会话的表达方式。""" + del chat_history + del reply_message + del reply_reason + + expression_records = self._load_expression_records() + if not expression_records: + return "", [] + + lines: List[str] = [] + selected_ids: List[int] = [] + for expression in expression_records: + if expression.expression_id is not None: + selected_ids.append(expression.expression_id) + lines.append(f"- 当{expression.situation}时,可以自然地用{expression.style}这种表达习惯。") + + block = "【表达习惯参考】\n" + "\n".join(lines) + logger.info( + f"Built Maisaka expression habits: session_id={self._session_id} " + f"count={len(selected_ids)} ids={selected_ids!r}" + ) + return block, selected_ids + + def _load_expression_records(self) -> List[_ExpressionRecord]: + """在 session 内提取表达方式的静态数据,避免 detached ORM 对象。""" + with get_db_session(auto_commit=False) as session: + query = select(Expression).where(Expression.rejected.is_(False)) # type: ignore[attr-defined] + if global_config.expression.expression_checked_only: + query = query.where(Expression.checked.is_(True)) # type: ignore[attr-defined] + + query = query.where( + (Expression.session_id == self._session_id) | (Expression.session_id.is_(None)) # type: ignore[attr-defined] + ).order_by(Expression.count.desc(), Expression.last_active_time.desc()) # type: ignore[attr-defined] + + expressions = session.exec(query.limit(5)).all() + return [ + _ExpressionRecord( + expression_id=expression.id, + situation=expression.situation, + style=expression.style, + ) + for expression in expressions + ] + + def _build_jargon_explanation( + self, + chat_history: List[SessionMessage], + reply_message: Optional[SessionMessage], + unknown_words: Optional[List[str]], + ) -> str: + """查询并格式化黑话解释。""" + if not global_config.expression.enable_jargon_explanation: + return "" + + if global_config.expression.jargon_mode == "planner": + return self._build_planner_jargon_explanation(unknown_words or []) + + return self._build_context_jargon_explanation(chat_history, reply_message) + + def _build_planner_jargon_explanation(self, unknown_words: List[str]) -> str: + """基于 planner 传入的 unknown_words 构建黑话解释。""" + normalized_words: List[str] = [] + seen_words: set[str] = set() + for raw_word in unknown_words: + word = str(raw_word or "").strip() + if not word: + continue + lowered = word.lower() + if lowered in seen_words: + continue + seen_words.add(lowered) + normalized_words.append(word) + + if not normalized_words: + return "" + + lines: List[str] = [] + seen_entries: set[str] = set() + for word in normalized_words: + matches = search_jargon(word, chat_id=self._session_id, limit=3, fuzzy=False) + if not matches: + matches = search_jargon(word, chat_id=self._session_id, limit=3, fuzzy=True) + for match in matches: + content = str(match.get("content") or "").strip() + meaning = str(match.get("meaning") or "").strip() + if not content or not meaning: + continue + entry_key = f"{content}\n{meaning}" + if entry_key in seen_entries: + continue + seen_entries.add(entry_key) + lines.append(f"- {content}: {meaning}") + + if not lines: + return "" + + return "【黑话解释】\n" + "\n".join(lines[:8]) + + def _build_context_jargon_explanation( + self, + chat_history: List[SessionMessage], + reply_message: Optional[SessionMessage], + ) -> str: + """基于当前上下文自动匹配黑话。""" + corpus = self._build_context_corpus(chat_history, reply_message) + if not corpus: + return "" + + jargon_records = self._load_jargon_records() + lines: List[str] = [] + seen_contents: set[str] = set() + for jargon in jargon_records: + if not jargon.content or not jargon.meaning: + continue + if jargon.content in seen_contents: + continue + if not self._is_visible_jargon(jargon): + continue + if not self._is_jargon_in_corpus(jargon.content, corpus): + continue + + seen_contents.add(jargon.content) + lines.append(f"- {jargon.content}: {jargon.meaning}") + if len(lines) >= 8: + break + + if not lines: + return "" + + logger.info( + f"Built Maisaka jargon explanation: session_id={self._session_id} " + f"count={len(lines)}" + ) + return "【黑话解释】\n" + "\n".join(lines) + + def _load_jargon_records(self) -> List[_JargonRecord]: + """在 session 内提取黑话的静态数据,避免 detached ORM 对象。""" + with get_db_session(auto_commit=False) as session: + query = select(Jargon).where(Jargon.meaning != "") # type: ignore[attr-defined] + query = query.order_by(Jargon.count.desc()).limit(200) # type: ignore[attr-defined] + jargons = session.exec(query).all() + return [ + _JargonRecord( + jargon_id=jargon.id, + content=(jargon.content or "").strip(), + meaning=(jargon.meaning or "").strip(), + session_id_dict=jargon.session_id_dict or "{}", + is_global=bool(jargon.is_global), + ) + for jargon in jargons + ] + + def _build_context_corpus( + self, + chat_history: List[SessionMessage], + reply_message: Optional[SessionMessage], + ) -> str: + """将最近上下文拼成待匹配文本。""" + parts: List[str] = [] + for message in chat_history[-20:]: + text = get_message_text(message).strip() + if not text: + continue + _, body = parse_speaker_content(text) + parts.append(body.strip() or text) + + if reply_message is not None: + reply_text = get_message_text(reply_message).strip() + if reply_text: + _, body = parse_speaker_content(reply_text) + parts.append(body.strip() or reply_text) + + return "\n".join(parts) + + def _is_visible_jargon(self, jargon: _JargonRecord) -> bool: + """判断当前会话是否可见该黑话。""" + if global_config.expression.all_global_jargon or jargon.is_global: + return True + + try: + session_id_dict = json.loads(jargon.session_id_dict or "{}") + except (TypeError, json.JSONDecodeError): + logger.warning(f"Failed to parse jargon.session_id_dict: jargon_id={jargon.jargon_id}") + return False + return self._session_id in session_id_dict + + @staticmethod + def _is_jargon_in_corpus(content: str, corpus: str) -> bool: + """判断黑话词条是否出现在上下文中。""" + if re.search(r"[\u4e00-\u9fff]", content): + return re.search(re.escape(content), corpus, flags=re.IGNORECASE) is not None + + pattern = rf"\b{re.escape(content)}\b" + return re.search(pattern, corpus, flags=re.IGNORECASE) is not None diff --git a/src/maisaka/runtime.py b/src/maisaka/runtime.py index 0d8b28da..d5be0c54 100644 --- a/src/maisaka/runtime.py +++ b/src/maisaka/runtime.py @@ -1,43 +1,30 @@ -""" -Maisaka runtime for non-CLI integrations. -""" +"""Maisaka runtime for non-CLI integrations.""" + +from pathlib import Path +from typing import Literal, Optional import asyncio import time -from datetime import datetime -from pathlib import Path -from typing import Literal, Optional from src.chat.heart_flow.heartFC_utils import CycleDetail from src.chat.message_receive.chat_manager import BotChatSession, chat_manager from src.chat.message_receive.message import SessionMessage from src.common.data_models.mai_message_data_model import GroupInfo, UserInfo -from src.common.data_models.message_component_data_model import MessageSequence from src.common.logger import get_logger +from src.common.utils.utils_config import ExpressionConfigUtils from src.config.config import global_config -from src.llm_models.payload_content.tool_option import ToolCall -from src.services import send_service +from src.mcp_module import MCPManager +from src.learners.expression_learner import ExpressionLearner +from src.learners.jargon_miner import JargonMiner from .chat_loop_service import MaisakaChatLoopService -from .mcp_client import MCPManager -from .message_adapter import ( - build_message, - build_visible_text_from_sequence, - clone_message_sequence, - format_speaker_content, - get_message_role, -) from .reasoning_engine import MaisakaReasoningEngine -from .tool_handlers import ( - handle_mcp_tool, - handle_unknown_tool, -) logger = get_logger("maisaka_runtime") class MaisakaHeartFlowChatting: - """Session-scoped Maisaka runtime that replaces the HFC planner and reply loop.""" + """Session-scoped Maisaka runtime.""" _STATE_RUNNING: Literal["running"] = "running" _STATE_WAIT: Literal["wait"] = "wait" @@ -54,9 +41,12 @@ class MaisakaHeartFlowChatting: self._chat_loop_service = MaisakaChatLoopService() self._chat_history: list[SessionMessage] = [] self.history_loop: list[CycleDetail] = [] + + # Keep all original messages for batching and later learning. self.message_cache: list[SessionMessage] = [] + self._last_processed_index = 0 self._internal_turn_queue: asyncio.Queue[list[SessionMessage]] = asyncio.Queue() - self._message_queue: asyncio.Queue[SessionMessage] = asyncio.Queue() + self._mcp_manager: Optional[MCPManager] = None self._current_cycle_detail: Optional[CycleDetail] = None self._source_messages_by_id: dict[str, SessionMessage] = {} @@ -69,6 +59,17 @@ class MaisakaHeartFlowChatting: self._max_context_size = max(1, int(global_config.chat.max_context_size)) self._agent_state: Literal["running", "wait", "stop"] = self._STATE_STOP self._wait_until: Optional[float] = None + + expr_use, jargon_learn, expr_learn = ExpressionConfigUtils.get_expression_config_for_chat(session_id) + self._enable_expression_use = expr_use + self._enable_expression_learning = expr_learn + self._enable_jargon_learning = jargon_learn + self._min_messages_for_extraction = 10 + self._min_extraction_interval = 30 + self._last_extraction_time = 0.0 + self._expression_learner = ExpressionLearner(session_id) + self._jargon_miner = JargonMiner(session_id, session_name=session_name) + self._reasoning_engine = MaisakaReasoningEngine(self) async def start(self) -> None: @@ -82,7 +83,7 @@ class MaisakaHeartFlowChatting: self._running = True self._internal_loop_task = asyncio.create_task(self._reasoning_engine.run_loop()) self._loop_task = asyncio.create_task(self._main_loop()) - logger.info(f"{self.log_prefix} MaiSaka 启动") + logger.info(f"{self.log_prefix} Maisaka runtime started") async def stop(self) -> None: """Stop the runtime loop.""" @@ -91,9 +92,6 @@ class MaisakaHeartFlowChatting: self._running = False self._new_message_event.set() - self.message_cache.clear() - while not self._message_queue.empty(): - _ = self._message_queue.get_nowait() while not self._internal_turn_queue.empty(): _ = self._internal_turn_queue.get_nowait() @@ -119,16 +117,15 @@ class MaisakaHeartFlowChatting: await self._mcp_manager.close() self._mcp_manager = None - logger.info(f"{self.log_prefix} MaiSaka runtime stopped") + logger.info(f"{self.log_prefix} Maisaka runtime stopped") def adjust_talk_frequency(self, frequency: float) -> None: """Compatibility shim for the existing manager API.""" _ = frequency async def register_message(self, message: SessionMessage) -> None: - """Append a newly received message into the HFC-style message cache.""" + """Cache a new message and wake the main loop.""" self.message_cache.append(message) - await self._message_queue.put(message) self._source_messages_by_id[message.message_id] = message if self._agent_state in (self._STATE_WAIT, self._STATE_STOP): self._agent_state = self._STATE_RUNNING @@ -137,7 +134,7 @@ class MaisakaHeartFlowChatting: async def _main_loop(self) -> None: try: while self._running: - if self._message_queue.empty(): + if not self._has_pending_messages(): if self._agent_state == self._STATE_WAIT: message_arrived = await self._wait_for_trigger() else: @@ -146,6 +143,7 @@ class MaisakaHeartFlowChatting: message_arrived = self._running else: message_arrived = True + if not self._running: return if not message_arrived: @@ -154,28 +152,54 @@ class MaisakaHeartFlowChatting: self._new_message_event.clear() - # 加锁灌注消息 - while not self._message_queue.empty(): - cached_messages = self._drain_message_cache() - if cached_messages: - await self._internal_turn_queue.put(cached_messages) + while self._has_pending_messages(): + cached_messages = self._collect_pending_messages() + if not cached_messages: + break + await self._internal_turn_queue.put(cached_messages) + asyncio.create_task(self._trigger_expression_learning(cached_messages)) except asyncio.CancelledError: - logger.info(f"{self.log_prefix} MaiSaka runtime loop cancelled") + logger.info(f"{self.log_prefix} Maisaka runtime loop cancelled") + + def _has_pending_messages(self) -> bool: + return self._last_processed_index < len(self.message_cache) + + def _collect_pending_messages(self) -> list[SessionMessage]: + """Collect one batch of unprocessed messages from message_cache.""" + start_index = self._last_processed_index + pending_messages = self.message_cache[start_index:] + if not pending_messages: + return [] + + unique_messages: list[SessionMessage] = [] + seen_message_ids: set[str] = set() + for message in pending_messages: + message_id = message.message_id + if message_id in seen_message_ids: + continue + seen_message_ids.add(message_id) + unique_messages.append(message) + + self._last_processed_index = len(self.message_cache) + logger.info( + f"{self.log_prefix} collected {len(unique_messages)} new messages " + f"from message_cache[{start_index}:{self._last_processed_index}]" + ) + return unique_messages async def _wait_for_trigger(self) -> bool: - """等待外部触发。返回 True 表示有新消息事件,返回 False 表示等待超时。""" + """Return True on new message, False on timeout.""" if self._agent_state != self._STATE_WAIT: await self._new_message_event.wait() return True - # 处理 wait 工具调用带来的等待窗口:超时后恢复 idle;有新消息则继续处理缓存消息 if self._wait_until is None: await self._new_message_event.wait() return True timeout = self._wait_until - time.time() if timeout <= 0: - logger.info(f"{self.log_prefix} Maisaka 等待超时,继续查看新消息") + logger.info(f"{self.log_prefix} Maisaka wait timed out") self._enter_stop_state() self._wait_until = None return False @@ -184,47 +208,67 @@ class MaisakaHeartFlowChatting: await asyncio.wait_for(self._new_message_event.wait(), timeout=timeout) return True except asyncio.TimeoutError: - logger.info(f"{self.log_prefix} Maisaka 等待超时,继续查看新消息") + logger.info(f"{self.log_prefix} Maisaka wait timed out") self._enter_stop_state() self._wait_until = None return False def _enter_wait_state(self, seconds: Optional[float] = None) -> None: - """进入等待状态,seconds 为 None 时表示一直等待直到新消息到达。""" + """Enter wait state.""" self._agent_state = self._STATE_WAIT self._wait_until = None if seconds is None else time.time() + seconds def _enter_stop_state(self) -> None: - """进入停顿状态:仅等待新消息。""" + """Enter stop state.""" self._agent_state = self._STATE_STOP self._wait_until = None - def _drain_message_cache(self) -> list[SessionMessage]: - """Drain the current message cache as one processing batch.""" - drained_messages: list[SessionMessage] = [] - seen_message_ids: set[str] = set() + async def _trigger_expression_learning(self, messages: list[SessionMessage]) -> None: + """Trigger expression learning from the newly collected batch.""" + self._expression_learner.add_messages(messages) - def append_unique(message: SessionMessage) -> None: - message_id = message.message_id - if message_id in seen_message_ids: - return - seen_message_ids.add(message_id) - drained_messages.append(message) + if not self._enable_expression_learning: + logger.debug(f"{self.log_prefix} expression learning disabled, skip this batch") + return - for message in self.message_cache: - append_unique(message) + elapsed = time.time() - self._last_extraction_time + if elapsed < self._min_extraction_interval: + logger.debug( + f"{self.log_prefix} expression learning interval not reached: " + f"elapsed={elapsed:.2f}s threshold={self._min_extraction_interval}s" + ) + return - self.message_cache.clear() - while not self._message_queue.empty(): - try: - append_unique(self._message_queue.get_nowait()) - except asyncio.QueueEmpty: - break - return drained_messages + cache_size = self._expression_learner.get_cache_size() + if cache_size < self._min_messages_for_extraction: + logger.debug( + f"{self.log_prefix} expression learning skipped due to cache size: " + f"learner_cache={cache_size} threshold={self._min_messages_for_extraction} " + f"message_cache_total={len(self.message_cache)}" + ) + return + + self._last_extraction_time = time.time() + logger.info( + f"{self.log_prefix} starting expression learning: " + f"new_batch={len(messages)} learner_cache={cache_size} " + f"message_cache_total={len(self.message_cache)} " + f"enable_jargon_learning={self._enable_jargon_learning}" + ) + + try: + jargon_miner = self._jargon_miner if self._enable_jargon_learning else None + learnt_style = await self._expression_learner.learn(jargon_miner) + if learnt_style: + logger.info(f"{self.log_prefix} expression learning finished") + else: + logger.debug(f"{self.log_prefix} expression learning finished without usable result") + except Exception: + logger.exception(f"{self.log_prefix} expression learning failed") async def _init_mcp(self) -> None: - """Initialize MCP tools for the runtime and inject them into the planner.""" - config_path = Path(__file__).with_name("mcp_config.json") + """Initialize MCP tools and inject them into the planner.""" + config_path = Path(__file__).resolve().parents[2] / "config" / "mcp_config.json" self._mcp_manager = await MCPManager.from_config(str(config_path)) if self._mcp_manager is None: logger.info(f"{self.log_prefix} MCP manager is unavailable") @@ -241,101 +285,6 @@ class MaisakaHeartFlowChatting: f"{self._mcp_manager.get_tool_summary()}" ) - async def _ingest_messages(self, messages: list[SessionMessage]) -> None: - """处理传入消息列表,将其转换为历史消息并加入聊天历史缓存。""" - for message in messages: - # 构建用户消息序列 - user_sequence = await self._build_message_sequence(message) - visible_text = build_visible_text_from_sequence(user_sequence).strip() - if not user_sequence.components: - continue - - history_message = build_message( - role="user", - content=visible_text, - source="user", - timestamp=message.timestamp, - platform=message.platform, - session_id=self.session_id, - group_info=self._build_group_info(message), - user_info=self._build_runtime_user_info(), - raw_message=user_sequence, - display_text=visible_text, - ) - self._chat_history.append(history_message) - self._trim_chat_history() - - async def _build_message_sequence(self, message: SessionMessage) -> MessageSequence: - message_sequence = MessageSequence([]) - user_info = message.message_info.user_info - speaker_name = user_info.user_cardname or user_info.user_nickname or user_info.user_id - message_sequence.text(format_speaker_content(speaker_name, "", message.timestamp, message.message_id)) - - appended_component = False - if global_config.maisaka.direct_image_input: - source_sequence = getattr(message, "maisaka_original_raw_message", message.raw_message) - else: - source_sequence = message.raw_message - - for component in clone_message_sequence(source_sequence).components: - message_sequence.components.append(component) - appended_component = True - - if not appended_component: - if not message.processed_plain_text: - await message.process() - content = (message.processed_plain_text or "").strip() - if content: - message_sequence.text(content) - - return message_sequence - - - def _start_cycle(self) -> CycleDetail: - """Start a Maisaka thinking cycle.""" - self._cycle_counter += 1 - self._current_cycle_detail = CycleDetail(cycle_id=self._cycle_counter) - self._current_cycle_detail.thinking_id = f"maisaka_tid{round(time.time(), 2)}" - return self._current_cycle_detail - - def _end_cycle(self, cycle_detail: CycleDetail, only_long_execution: bool = True) -> CycleDetail: - """End and record a Maisaka thinking cycle.""" - cycle_detail.end_time = time.time() - self.history_loop.append(cycle_detail) - - timer_strings = [ - f"{name}: {duration:.2f}s" - for name, duration in cycle_detail.time_records.items() - if not only_long_execution or duration >= 0.1 - ] - logger.info( - f"{self.log_prefix} MaiSaka cycle={cycle_detail.cycle_id} completed " - f"in {cycle_detail.end_time - cycle_detail.start_time:.2f}s; " - f"stages={', '.join(timer_strings) if timer_strings else 'none'}" - ) - return cycle_detail - - def _trim_chat_history(self) -> None: - """Trim the oldest history until the user-message count is below the configured limit.""" - user_message_count = sum(1 for message in self._chat_history if get_message_role(message) == "user") - if user_message_count <= self._max_context_size: - return - - trimmed_history = list(self._chat_history) - removed_count = 0 - - while user_message_count >= self._max_context_size and trimmed_history: - removed_message = trimmed_history.pop(0) - removed_count += 1 - if get_message_role(removed_message) == "user": - user_message_count -= 1 - - self._chat_history = trimmed_history - logger.info( - f"{self.log_prefix} Trimmed {removed_count} history messages; " - f"remaining_user_messages={user_message_count}" - ) - def _build_runtime_user_info(self) -> UserInfo: if self.chat_stream.user_id: return UserInfo( @@ -345,13 +294,6 @@ class MaisakaHeartFlowChatting: ) return UserInfo(user_id="maisaka_user", user_nickname="user", user_cardname=None) - def _build_runtime_bot_user_info(self) -> UserInfo: - return UserInfo( - user_id=str(global_config.bot.qq_account) if global_config.bot.qq_account else "maisaka_assistant", - user_nickname=global_config.bot.nickname.strip() or "MaiSaka", - user_cardname=None, - ) - def _build_group_info(self, message: Optional[SessionMessage] = None) -> Optional[GroupInfo]: group_info = None if message is not None: diff --git a/src/maisaka/tool_handlers.py b/src/maisaka/tool_handlers.py index 6cd8c2d6..dc326c09 100644 --- a/src/maisaka/tool_handlers.py +++ b/src/maisaka/tool_handlers.py @@ -19,7 +19,7 @@ from .input_reader import InputReader from .message_adapter import build_message if TYPE_CHECKING: - from .mcp_client import MCPManager + from src.mcp_module import MCPManager MAI_FILES_DIR = Path(os.path.join(os.path.dirname(os.path.abspath(__file__)), "mai_files"))