diff --git a/src/config/config.py b/src/config/config.py index f1e27e72..f8b5f91b 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -55,7 +55,7 @@ CONFIG_DIR: Path = PROJECT_ROOT / "config" BOT_CONFIG_PATH: Path = (CONFIG_DIR / "bot_config.toml").resolve().absolute() MODEL_CONFIG_PATH: Path = (CONFIG_DIR / "model_config.toml").resolve().absolute() MMC_VERSION: str = "1.0.0" -CONFIG_VERSION: str = "8.3.2" +CONFIG_VERSION: str = "8.3.4" MODEL_CONFIG_VERSION: str = "1.13.1" logger = get_logger("config") diff --git a/src/config/legacy_migration.py b/src/config/legacy_migration.py index 31f6e392..bdae3254 100644 --- a/src/config/legacy_migration.py +++ b/src/config/legacy_migration.py @@ -346,6 +346,10 @@ def try_migrate_legacy_bot_config_dict(data: dict[str, Any]) -> MigrationResult: if chat is None: chat = {} data["chat"] = chat + elif "private_plan_style" in chat: + chat.pop("private_plan_style", None) + migrated_any = True + reasons.append("chat.private_plan_style_removed") mem = _as_dict(data.get("memory")) if mem is not None: @@ -366,7 +370,12 @@ def try_migrate_legacy_bot_config_dict(data: dict[str, Any]) -> MigrationResult: migrated_any = True reasons.append("experimental.chat_prompts") - for key in ("private_plan_style", "group_chat_prompt", "private_chat_prompts", "chat_prompts"): + if "private_plan_style" in exp: + exp.pop("private_plan_style", None) + migrated_any = True + reasons.append("experimental.private_plan_style_removed") + + for key in ("group_chat_prompt", "private_chat_prompts", "chat_prompts"): if key in exp and key not in chat: chat[key] = exp[key] migrated_any = True diff --git a/src/config/official_configs.py b/src/config/official_configs.py index e5d934a6..03399994 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -113,21 +113,6 @@ class PersonalityConfig(ConfigBase): ) """每次构建回复时,从 multiple_reply_style 中随机替换 reply_style 的概率(0.0-1.0)""" - plan_style: str = Field( - default=( - "1.思考**所有**的可用的action中的**每个动作**是否符合当下条件,如果动作使用条件符合聊天内容就使用" - "2.如果相同的action已经被执行,请不要重复执行该action" - "3.如果有人对你感到厌烦,请减少回复" - "4.如果有人在追问你,或者话题没有说完,请你继续回复" - "5.请分析哪些对话是和你说的,哪些是其他人之间的互动,不要误认为其他人之间的互动是和你说的" - ), - json_schema_extra={ - "x-widget": "textarea", - "x-icon": "book-open", - }, - ) - """_wrap_麦麦的说话规则和行为规则""" - visual_style: str = Field( default="请用中文描述这张图片的内容。如果有文字,请把文字描述概括出来,请留意其主题,直观感受,输出为一段平文本,最多30字,请注意不要分点,就输出一段文本", json_schema_extra={ @@ -242,20 +227,6 @@ class ChatConfig(ConfigBase): }, ) """每个聊天流最大保存的Plan/Reply日志数量,超过此数量时会自动删除最老的日志""" - private_plan_style: str = Field( - default=( - "1.思考**所有**的可用的action中的**每个动作**是否符合当下条件,如果动作使用条件符合聊天内容就使用\n" - "2.如果相同的内容已经被执行,请不要重复执行\n" - "3.某句话如果已经被回复过,不要重复回复" - ), - json_schema_extra={ - "x-widget": "textarea", - "x-icon": "user", - }, - ) - """_wrap_私聊说话规则,行为风格""" - - group_chat_prompt: str = Field( default="你需要控制自己发言的频率,如果是一对一聊天,可以以较均匀的频率发言;如果用户较多,不要每句都回复,控制回复频率,不要回复的太频繁!控制回复的频率,不要每个人的消息都回复。", json_schema_extra={ diff --git a/src/llm_models/utils_model.py b/src/llm_models/utils_model.py index 794617b4..29aa9d86 100644 --- a/src/llm_models/utils_model.py +++ b/src/llm_models/utils_model.py @@ -2,14 +2,14 @@ from dataclasses import dataclass from enum import Enum from typing import Any, Callable, Dict, List, Optional, Set, Tuple -from rich.traceback import install - import asyncio import random import re import time import traceback +from rich.traceback import install + from src.common.logger import get_logger from src.common.data_models.llm_service_data_models import ( LLMAudioTranscriptionResult, @@ -51,6 +51,13 @@ install(extra_lines=3) logger = get_logger("model_utils") +DATA_URI_LIMIT_PATTERN = re.compile( + r"Exceeded limit on max bytes per data-uri item\s*:\s*(?P\d+)", + re.IGNORECASE, +) +DATA_URI_RETRY_MARGIN_BYTES = 128 * 1024 +MIN_COMPRESSED_IMAGE_TARGET_SIZE_BYTES = 512 * 1024 + class RequestType(Enum): """请求类型枚举""" @@ -132,6 +139,49 @@ class LLMOrchestrator: f" 如果你认为该警告出现得过于频繁,请调整model_config.toml中对应任务的slow_threshold至符合你实际情况的合理值" ) + @staticmethod + def _can_retry_with_compressed_images( + active_request: ClientRequest, + original_response_request: ResponseRequest | None, + ) -> bool: + """判断当前请求是否还可以通过压缩图片进行一次兜底重试。""" + return ( + isinstance(active_request, ResponseRequest) + and bool(active_request.message_list) + and original_response_request is not None + and active_request.message_list == original_response_request.message_list + ) + + @staticmethod + def _extract_data_uri_limit_bytes(error: RespNotOkException) -> int | None: + """从兼容 OpenAI 的错误文本中提取 data URI 单项大小限制。""" + candidate_messages = [error.message, str(error)] + if error.__cause__ is not None: + candidate_messages.append(str(error.__cause__)) + + for candidate_message in candidate_messages: + if not candidate_message: + continue + + match = DATA_URI_LIMIT_PATTERN.search(candidate_message) + if match is None: + continue + + try: + return int(match.group("limit")) + except (TypeError, ValueError): + return None + + return None + + @staticmethod + def _build_data_uri_retry_target_size(limit_bytes: int) -> int: + """根据上游返回的 data URI 上限,计算压缩重试的安全目标值。""" + return max( + MIN_COMPRESSED_IMAGE_TARGET_SIZE_BYTES, + limit_bytes - DATA_URI_RETRY_MARGIN_BYTES, + ) + @staticmethod def _build_generation_result( content: str, @@ -735,6 +785,11 @@ class LLMOrchestrator: task_display = self.request_type or "未知任务" # 可重试的HTTP错误 + can_retry_with_compression = self._can_retry_with_compressed_images( + active_request, + original_response_request, + ) + if e.status_code == 429 or e.status_code >= 500: retry_remain -= 1 if retry_remain <= 0: @@ -750,12 +805,23 @@ class LLMOrchestrator: continue # 特殊处理413,尝试压缩 + data_uri_limit_bytes = self._extract_data_uri_limit_bytes(e) + if data_uri_limit_bytes is not None and can_retry_with_compression: + target_size = self._build_data_uri_retry_target_size(data_uri_limit_bytes) + logger.warning( + f"任务 '{task_display}' 的模型 '{model_info.name}' 返回 data URI 图片过大错误," + f"检测到单项上限 {data_uri_limit_bytes} 字节,尝试压缩图片后重试..." + ) + compressed_messages = compress_messages( + active_request.message_list, + img_target_size=target_size, + ) + active_request = active_request.copy_with(message_list=compressed_messages) + continue + if ( e.status_code == 413 - and isinstance(active_request, ResponseRequest) - and active_request.message_list - and original_response_request is not None - and active_request.message_list == original_response_request.message_list + and can_retry_with_compression ): logger.warning( f"任务 '{task_display}' 的模型 '{model_info.name}' 返回413请求体过大,尝试压缩后重试..." diff --git a/src/maisaka/builtin_tool/reply.py b/src/maisaka/builtin_tool/reply.py index f996e5a5..d8c2dd24 100644 --- a/src/maisaka/builtin_tool/reply.py +++ b/src/maisaka/builtin_tool/reply.py @@ -191,6 +191,7 @@ async def handle_tool( else: for sent_message in sent_messages: tool_ctx.append_sent_message_to_chat_history(sent_message) + tool_ctx.runtime._clear_force_continue_until_reply() return tool_ctx.build_success_result( invocation.tool_name, "回复已生成并发送。", diff --git a/src/maisaka/context_messages.py b/src/maisaka/context_messages.py index 164e913d..18111cfa 100644 --- a/src/maisaka/context_messages.py +++ b/src/maisaka/context_messages.py @@ -78,6 +78,24 @@ def _append_reply_component(builder: MessageBuilder, component: ReplyComponent) return True +def _render_at_component_text(component: AtComponent) -> str: + """灏?AtComponent 娓叉煋涓烘枃鏈舰寮忋€?""" + + target_name = component.target_user_cardname or component.target_user_nickname or component.target_user_id + return f"@{target_name}".strip() + + +def _append_at_component(builder: MessageBuilder, component: AtComponent) -> bool: + """灏?@ 缁勪欢杞崲涓烘枃鏈苟鍐欏叆 LLM 娑堟伅銆?""" + + rendered_text = _render_at_component_text(component) + if not rendered_text: + return False + + builder.add_text_content(rendered_text) + return True + + def contains_complex_message(message_sequence: MessageSequence) -> bool: """判断消息序列中是否包含复杂消息组件。""" @@ -119,8 +137,7 @@ def _render_component_for_prompt(component: StandardMessageComponents) -> str: return component.content.strip() if component.content else "[语音消息]" if isinstance(component, AtComponent): - target_name = component.target_user_cardname or component.target_user_nickname or component.target_user_id - return f"@{target_name}".strip() + return _render_at_component_text(component) if isinstance(component, ReplyComponent): sender_name = ( @@ -224,6 +241,10 @@ def _build_message_from_sequence( has_content = _append_image_component(builder, component) or has_content continue + if isinstance(component, AtComponent): + has_content = _append_at_component(builder, component) or has_content + continue + if isinstance(component, ReplyComponent): has_content = _append_reply_component(builder, component) or has_content continue diff --git a/src/maisaka/message_adapter.py b/src/maisaka/message_adapter.py index 06e55a0e..5d48ac19 100644 --- a/src/maisaka/message_adapter.py +++ b/src/maisaka/message_adapter.py @@ -3,9 +3,11 @@ from copy import deepcopy from datetime import datetime from typing import Optional + import re from src.common.data_models.message_component_data_model import ( + AtComponent, EmojiComponent, ImageComponent, MessageSequence, @@ -26,13 +28,15 @@ def format_speaker_content( message_id: Optional[str] = None, ) -> str: """将可见文本格式化为带说话人前缀的样式。""" + time_prefix = timestamp.strftime("%H:%M:%S") if timestamp is not None else "" message_id_prefix = f"[msg_id:{message_id}]" if message_id else "" return f"{time_prefix}{message_id_prefix}[{speaker_name}]{content}" def parse_speaker_content(content: str) -> tuple[Optional[str], str]: - """解析形如 [speaker]message 的可见文本。""" + """解析形如 `[speaker]message` 的可见文本。""" + match = SPEAKER_PREFIX_PATTERN.match(content or "") if not match: return None, content or "" @@ -41,11 +45,20 @@ def parse_speaker_content(content: str) -> tuple[Optional[str], str]: def clone_message_sequence(message_sequence: MessageSequence) -> MessageSequence: """复制消息片段序列。""" + return MessageSequence([deepcopy(component) for component in message_sequence.components]) +def _render_at_component_text(component: AtComponent) -> str: + """将 AtComponent 渲染为文本。""" + + target_name = component.target_user_cardname or component.target_user_nickname or component.target_user_id + return f"@{target_name}".strip() + + def build_visible_text_from_sequence(message_sequence: MessageSequence) -> str: """从消息片段序列提取可见文本。""" + parts: list[str] = [] for component in message_sequence.components: if isinstance(component, TextComponent): @@ -73,6 +86,10 @@ def build_visible_text_from_sequence(message_sequence: MessageSequence) -> str: parts.append(component.content or "[图片]") continue + if isinstance(component, AtComponent): + parts.append(_render_at_component_text(component)) + continue + if isinstance(component, ReplyComponent): target_message_id = component.target_message_id.strip() if target_message_id: diff --git a/src/maisaka/reasoning_engine.py b/src/maisaka/reasoning_engine.py index 66775fa1..583865f6 100644 --- a/src/maisaka/reasoning_engine.py +++ b/src/maisaka/reasoning_engine.py @@ -23,9 +23,11 @@ from src.services import database_service as database_api from .builtin_tool import get_action_tool_specs from .builtin_tool import build_builtin_tool_handlers as build_split_builtin_tool_handlers from .builtin_tool import get_timing_tools +from .chat_loop_service import ChatResponse from .chat_history_visual_refresher import refresh_chat_history_visual_placeholders from .builtin_tool.context import BuiltinToolRuntimeContext from .context_messages import ( + AssistantMessage, ComplexSessionMessage, LLMContextMessage, SessionBackedMessage, @@ -229,6 +231,9 @@ class MaisakaReasoningEngine: ) -> tuple[Literal["continue", "no_reply", "wait"], Any, list[str]]: """运行 Timing Gate 子代理并返回控制决策。""" + if self._runtime._force_continue_until_reply: + return self._build_forced_continue_timing_result() + response = await self._run_interruptible_sub_agent( context_message_limit=TIMING_GATE_CONTEXT_LIMIT, system_prompt=self._build_timing_gate_system_prompt(), @@ -264,191 +269,210 @@ class MaisakaReasoningEngine: return "continue", response, tool_result_summaries return timing_action, response, tool_result_summaries + def _build_forced_continue_timing_result(self) -> tuple[Literal["continue"], ChatResponse, list[str]]: + """构造跳过 Timing Gate 时使用的伪 continue 结果。""" + + reason = self._runtime._build_force_continue_timing_reason() + logger.info(f"{self._runtime.log_prefix} {reason}") + return ( + "continue", + ChatResponse( + content=reason, + tool_calls=[], + raw_message=AssistantMessage( + content="", + timestamp=datetime.now(), + source_kind="perception", + ), + selected_history_count=min( + sum(1 for message in self._runtime._chat_history if message.count_in_context), + self._runtime._max_context_size, + ), + prompt_tokens=0, + built_message_count=0, + completion_tokens=0, + total_tokens=0, + prompt_section=None, + ), + [f"- continue [强制跳过]: {reason}"], + ) + async def run_loop(self) -> None: """独立消费消息批次,并执行对应的内部思考轮次。""" try: while self._runtime._running: - queue_item_done_count = 0 - try: - queued_trigger = await self._runtime._internal_turn_queue.get() - ( - message_triggered, - timeout_triggered, - queue_item_done_count, - ) = self._drain_ready_turn_triggers(queued_trigger) + queued_trigger = await self._runtime._internal_turn_queue.get() + message_triggered, timeout_triggered = self._drain_ready_turn_triggers(queued_trigger) - if message_triggered: - await self._runtime._wait_for_message_quiet_period() - self._runtime._message_turn_scheduled = False + if message_triggered: + await self._runtime._wait_for_message_quiet_period() + self._runtime._message_turn_scheduled = False - cached_messages = ( - self._runtime._collect_pending_messages() - if self._runtime._has_pending_messages() - else [] - ) - if not timeout_triggered and not cached_messages and not message_triggered: + cached_messages = ( + self._runtime._collect_pending_messages() + if self._runtime._has_pending_messages() + else [] + ) + if not timeout_triggered and not cached_messages and not message_triggered: + continue + + self._runtime._agent_state = self._runtime._STATE_RUNNING + if cached_messages: + asyncio.create_task(self._runtime._trigger_batch_learning(cached_messages)) + self._append_wait_interrupted_message_if_needed() + await self._ingest_messages(cached_messages) + anchor_message = cached_messages[-1] + else: + anchor_message = self._get_timeout_anchor_message() + if anchor_message is None: + logger.warning( + f"{self._runtime.log_prefix} 等待超时后缺少可复用的锚点消息,跳过本轮继续思考" + ) continue + logger.info(f"{self._runtime.log_prefix} 等待超时后开始新一轮思考") + if self._runtime._pending_wait_tool_call_id: + self._runtime._chat_history.append(self._build_wait_timeout_message()) + self._trim_chat_history() - self._runtime._agent_state = self._runtime._STATE_RUNNING - if cached_messages: - asyncio.create_task(self._runtime._trigger_batch_learning(cached_messages)) - self._append_wait_interrupted_message_if_needed() - await self._ingest_messages(cached_messages) - anchor_message = cached_messages[-1] - else: - anchor_message = self._get_timeout_anchor_message() - if anchor_message is None: - logger.warning( - f"{self._runtime.log_prefix} 等待超时后缺少可复用的锚点消息,跳过本轮继续思考" - ) - continue - logger.info(f"{self._runtime.log_prefix} 等待超时后开始新一轮思考") - if self._runtime._pending_wait_tool_call_id: - self._runtime._chat_history.append(self._build_wait_timeout_message()) - self._trim_chat_history() + try: + for round_index in range(self._runtime._max_internal_rounds): + cycle_detail = self._start_cycle() + self._runtime._log_cycle_started(cycle_detail, round_index) + await emit_cycle_start( + session_id=self._runtime.session_id, + cycle_id=cycle_detail.cycle_id, + round_index=round_index, + max_rounds=self._runtime._max_internal_rounds, + history_count=len(self._runtime._chat_history), + ) + planner_started_at = 0.0 + try: + visual_refresh_started_at = time.time() + refreshed_message_count = await self._refresh_chat_history_visual_placeholders() + cycle_detail.time_records["visual_refresh"] = time.time() - visual_refresh_started_at + if refreshed_message_count > 0: + logger.info( + f"{self._runtime.log_prefix} 本轮思考前已刷新 {refreshed_message_count} 条视觉占位历史消息" + ) - try: - for round_index in range(self._runtime._max_internal_rounds): - cycle_detail = self._start_cycle() - self._runtime._log_cycle_started(cycle_detail, round_index) - await emit_cycle_start( + timing_started_at = time.time() + timing_action, timing_response, timing_tool_results = await self._run_timing_gate(anchor_message) + timing_duration_ms = (time.time() - timing_started_at) * 1000 + cycle_detail.time_records["timing_gate"] = timing_duration_ms / 1000 + await emit_timing_gate_result( session_id=self._runtime.session_id, cycle_id=cycle_detail.cycle_id, - round_index=round_index, - max_rounds=self._runtime._max_internal_rounds, - history_count=len(self._runtime._chat_history), + action=timing_action, + content=timing_response.content, + tool_calls=timing_response.tool_calls, + messages=[], + prompt_tokens=timing_response.prompt_tokens, + selected_history_count=timing_response.selected_history_count, + duration_ms=timing_duration_ms, ) - planner_started_at = 0.0 - try: - visual_refresh_started_at = time.time() - refreshed_message_count = await self._refresh_chat_history_visual_placeholders() - cycle_detail.time_records["visual_refresh"] = time.time() - visual_refresh_started_at - if refreshed_message_count > 0: - logger.info( - f"{self._runtime.log_prefix} 本轮思考前已刷新 {refreshed_message_count} 条视觉占位历史消息" - ) - - timing_started_at = time.time() - timing_action, timing_response, timing_tool_results = await self._run_timing_gate(anchor_message) - timing_duration_ms = (time.time() - timing_started_at) * 1000 - cycle_detail.time_records["timing_gate"] = timing_duration_ms / 1000 - await emit_timing_gate_result( - session_id=self._runtime.session_id, - cycle_id=cycle_detail.cycle_id, - action=timing_action, - content=timing_response.content, - tool_calls=timing_response.tool_calls, - messages=[], - prompt_tokens=timing_response.prompt_tokens, - selected_history_count=timing_response.selected_history_count, - duration_ms=timing_duration_ms, - ) - self._runtime._render_context_usage_panel( - selected_history_count=timing_response.selected_history_count, - prompt_tokens=timing_response.prompt_tokens, - planner_response=timing_response.content or "", - tool_calls=timing_response.tool_calls, - tool_results=timing_tool_results, - prompt_section=timing_response.prompt_section, - ) - if timing_action != "continue": - logger.info( - f"{self._runtime.log_prefix} Timing Gate 结束当前回合: " - f"回合={round_index + 1} 动作={timing_action}" - ) - break - - planner_started_at = time.time() - action_tool_definitions = await self._build_action_tool_definitions() + self._runtime._render_context_usage_panel( + selected_history_count=timing_response.selected_history_count, + prompt_tokens=timing_response.prompt_tokens, + planner_response=timing_response.content or "", + tool_calls=timing_response.tool_calls, + tool_results=timing_tool_results, + prompt_section=timing_response.prompt_section, + ) + if timing_action != "continue": logger.info( - f"{self._runtime.log_prefix} 规划器开始执行: " - f"回合={round_index + 1} " - f"历史消息数={len(self._runtime._chat_history)} " - f"开始时间={planner_started_at:.3f}" + f"{self._runtime.log_prefix} Timing Gate 结束当前回合: " + f"回合={round_index + 1} 动作={timing_action}" ) - response = await self._run_interruptible_planner( - tool_definitions=action_tool_definitions, + break + + planner_started_at = time.time() + action_tool_definitions = await self._build_action_tool_definitions() + logger.info( + f"{self._runtime.log_prefix} 规划器开始执行: " + f"回合={round_index + 1} " + f"历史消息数={len(self._runtime._chat_history)} " + f"开始时间={planner_started_at:.3f}" + ) + response = await self._run_interruptible_planner( + tool_definitions=action_tool_definitions, + ) + planner_duration_ms = (time.time() - planner_started_at) * 1000 + cycle_detail.time_records["planner"] = planner_duration_ms / 1000 + logger.info( + f"{self._runtime.log_prefix} 规划器执行完成: " + f"回合={round_index + 1} " + f"耗时={cycle_detail.time_records['planner']:.3f} 秒" + ) + await emit_planner_response( + session_id=self._runtime.session_id, + cycle_id=cycle_detail.cycle_id, + content=response.content, + tool_calls=response.tool_calls, + prompt_tokens=response.prompt_tokens, + completion_tokens=response.completion_tokens, + total_tokens=response.total_tokens, + duration_ms=planner_duration_ms, + ) + + reasoning_content = response.content or "" + if self._should_replace_reasoning(reasoning_content): + response.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后先输出想法再使用工具" + response.raw_message.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后先输出想法再使用工具" + logger.info(f"{self._runtime.log_prefix} 当前思考与上一轮过于相似,已替换为重新思考提示") + + self._last_reasoning_content = reasoning_content + self._runtime._chat_history.append(response.raw_message) + tool_result_summaries: list[str] = [] + + if response.tool_calls: + tool_started_at = time.time() + should_pause, tool_result_summaries = await self._handle_tool_calls( + response.tool_calls, + response.content or "", + anchor_message, ) - planner_duration_ms = (time.time() - planner_started_at) * 1000 - cycle_detail.time_records["planner"] = planner_duration_ms / 1000 - logger.info( - f"{self._runtime.log_prefix} 规划器执行完成: " - f"回合={round_index + 1} " - f"耗时={cycle_detail.time_records['planner']:.3f} 秒" - ) - await emit_planner_response( - session_id=self._runtime.session_id, - cycle_id=cycle_detail.cycle_id, - content=response.content, - tool_calls=response.tool_calls, - prompt_tokens=response.prompt_tokens, - completion_tokens=response.completion_tokens, - total_tokens=response.total_tokens, - duration_ms=planner_duration_ms, - ) - - reasoning_content = response.content or "" - if self._should_replace_reasoning(reasoning_content): - response.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后先输出想法再使用工具" - response.raw_message.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后先输出想法再使用工具" - logger.info(f"{self._runtime.log_prefix} 当前思考与上一轮过于相似,已替换为重新思考提示") - - self._last_reasoning_content = reasoning_content - self._runtime._chat_history.append(response.raw_message) - tool_result_summaries: list[str] = [] - - if response.tool_calls: - tool_started_at = time.time() - should_pause, tool_result_summaries = await self._handle_tool_calls( - response.tool_calls, - response.content or "", - anchor_message, - ) - cycle_detail.time_records["tool_calls"] = time.time() - tool_started_at - self._runtime._render_context_usage_panel( - selected_history_count=response.selected_history_count, - prompt_tokens=response.prompt_tokens, - planner_response=response.content or "", - tool_calls=response.tool_calls, - tool_results=tool_result_summaries, - prompt_section=response.prompt_section, - ) - if should_pause: - break - continue - + cycle_detail.time_records["tool_calls"] = time.time() - tool_started_at self._runtime._render_context_usage_panel( selected_history_count=response.selected_history_count, prompt_tokens=response.prompt_tokens, planner_response=response.content or "", + tool_calls=response.tool_calls, + tool_results=tool_result_summaries, prompt_section=response.prompt_section, ) - if not response.content: + if should_pause: break - except ReqAbortException: - interrupted_at = time.time() - logger.info( - f"{self._runtime.log_prefix} 规划器打断成功: " - f"回合={round_index + 1} " - f"开始时间={planner_started_at:.3f} " - f"打断时间={interrupted_at:.3f} " - f"耗时={interrupted_at - planner_started_at:.3f} 秒" - ) + continue + + self._runtime._render_context_usage_panel( + selected_history_count=response.selected_history_count, + prompt_tokens=response.prompt_tokens, + planner_response=response.content or "", + prompt_section=response.prompt_section, + ) + if not response.content: break - finally: - self._end_cycle(cycle_detail) - await emit_cycle_end( - session_id=self._runtime.session_id, - cycle_id=cycle_detail.cycle_id, - time_records=dict(cycle_detail.time_records), - agent_state=self._runtime._agent_state, - ) - finally: - if self._runtime._agent_state == self._runtime._STATE_RUNNING: - self._runtime._agent_state = self._runtime._STATE_STOP + except ReqAbortException: + interrupted_at = time.time() + logger.info( + f"{self._runtime.log_prefix} 规划器打断成功: " + f"回合={round_index + 1} " + f"开始时间={planner_started_at:.3f} " + f"打断时间={interrupted_at:.3f} " + f"耗时={interrupted_at - planner_started_at:.3f} 秒" + ) + break + finally: + self._end_cycle(cycle_detail) + await emit_cycle_end( + session_id=self._runtime.session_id, + cycle_id=cycle_detail.cycle_id, + time_records=dict(cycle_detail.time_records), + agent_state=self._runtime._agent_state, + ) finally: - for _ in range(queue_item_done_count): - self._runtime._internal_turn_queue.task_done() + if self._runtime._agent_state == self._runtime._STATE_RUNNING: + self._runtime._agent_state = self._runtime._STATE_STOP except asyncio.CancelledError: self._runtime._log_internal_loop_cancelled() raise @@ -460,10 +484,9 @@ class MaisakaReasoningEngine: def _drain_ready_turn_triggers( self, queued_trigger: Literal["message", "timeout"], - ) -> tuple[bool, bool, int]: + ) -> tuple[bool, bool]: """合并当前已就绪的 turn 触发信号。""" - queue_item_done_count = 1 message_triggered = queued_trigger == "message" timeout_triggered = queued_trigger == "timeout" @@ -473,7 +496,6 @@ class MaisakaReasoningEngine: except asyncio.QueueEmpty: break - queue_item_done_count += 1 if next_trigger == "message": message_triggered = True continue @@ -481,11 +503,7 @@ class MaisakaReasoningEngine: timeout_triggered = True continue - if message_triggered: - # 这些消息触发将由当前 turn 接手,旧的事件位不应再污染后续 wait 判定。 - self._runtime._new_message_event.clear() - - return message_triggered, timeout_triggered, queue_item_done_count + return message_triggered, timeout_triggered def _get_timeout_anchor_message(self) -> Optional[SessionMessage]: """在 wait 超时后复用最近一条真实用户消息作为锚点。""" diff --git a/src/maisaka/runtime.py b/src/maisaka/runtime.py index 3801f5cd..47bfb9e6 100644 --- a/src/maisaka/runtime.py +++ b/src/maisaka/runtime.py @@ -13,6 +13,7 @@ from src.cli.console import console from src.chat.heart_flow.heartFC_utils import CycleDetail from src.chat.message_receive.chat_manager import BotChatSession, chat_manager from src.chat.message_receive.message import SessionMessage +from src.chat.utils.utils import is_mentioned_bot_in_message from src.common.data_models.mai_message_data_model import GroupInfo, UserInfo from src.common.logger import get_logger from src.common.utils.utils_config import ExpressionConfigUtils @@ -72,8 +73,6 @@ class MaisakaHeartFlowChatting: self._running = False self._cycle_counter = 0 self._internal_loop_task: Optional[asyncio.Task] = None - self._loop_task: Optional[asyncio.Task] = None - self._new_message_event = asyncio.Event() self._message_turn_scheduled = False self._message_debounce_seconds = 1.0 self._message_debounce_required = False @@ -84,6 +83,9 @@ class MaisakaHeartFlowChatting: self._agent_state: Literal["running", "wait", "stop"] = self._STATE_STOP self._wait_until: Optional[float] = None self._pending_wait_tool_call_id: Optional[str] = None + self._force_continue_until_reply = False + self._force_continue_trigger_message_id = "" + self._force_continue_trigger_reason = "" self._planner_interrupt_flag: Optional[asyncio.Event] = None self._planner_interrupt_requested = False self._planner_interrupt_consecutive_count = 0 @@ -118,6 +120,7 @@ class MaisakaHeartFlowChatting: self._running = True self._ensure_background_tasks_running() + self._schedule_message_turn() logger.info(f"{self.log_prefix} Maisaka 运行时已启动") async def stop(self) -> None: @@ -126,22 +129,12 @@ class MaisakaHeartFlowChatting: return self._running = False - self._new_message_event.set() self._message_turn_scheduled = False self._message_debounce_required = False self._cancel_wait_timeout_task() while not self._internal_turn_queue.empty(): _ = self._internal_turn_queue.get_nowait() - if self._loop_task is not None: - self._loop_task.cancel() - try: - await self._loop_task - except asyncio.CancelledError: - pass - finally: - self._loop_task = None - if self._internal_loop_task is not None: self._internal_loop_task.cancel() try: @@ -166,6 +159,7 @@ class MaisakaHeartFlowChatting: if self._running: self._ensure_background_tasks_running() self._last_message_received_at = time.time() + self._update_message_trigger_state(message) self.message_cache.append(message) self._source_messages_by_id[message.message_id] = message if self._agent_state == self._STATE_WAIT: @@ -199,7 +193,78 @@ class MaisakaHeartFlowChatting: f"{self._planner_interrupt_max_consecutive_count}" ) self._planner_interrupt_flag.set() - self._new_message_event.set() + if self._running: + self._schedule_message_turn() + + def _update_message_trigger_state(self, message: SessionMessage) -> None: + """补齐消息中的 @/提及 标记,并在命中时启用强制 continue。""" + + detected_mentioned, detected_at, _ = is_mentioned_bot_in_message(message) + if detected_at: + message.is_at = True + if detected_mentioned: + message.is_mentioned = True + + if not message.is_at and not message.is_mentioned: + return + + self._arm_force_continue_until_reply( + message, + is_at=message.is_at, + is_mentioned=message.is_mentioned, + ) + + def _arm_force_continue_until_reply( + self, + message: SessionMessage, + *, + is_at: bool, + is_mentioned: bool, + ) -> None: + """在检测到 @ 或提及时,要求后续轮次跳过 Timing Gate 直到成功 reply。""" + + trigger_reason = "@消息" if is_at else "提及消息" if is_mentioned else "触发消息" + was_armed = self._force_continue_until_reply + self._force_continue_until_reply = True + self._force_continue_trigger_message_id = message.message_id + self._force_continue_trigger_reason = trigger_reason + + if was_armed: + logger.info( + f"{self.log_prefix} 检测到新的{trigger_reason},刷新强制 continue 状态;" + f"消息编号={message.message_id}" + ) + return + + logger.info( + f"{self.log_prefix} 检测到{trigger_reason},将跳过 Timing Gate 直到成功发送一条 reply;" + f"消息编号={message.message_id}" + ) + + def _clear_force_continue_until_reply(self) -> None: + """在成功发送 reply 后清理强制 continue 状态。""" + + if not self._force_continue_until_reply: + return + + logger.info( + f"{self.log_prefix} 已成功发送 reply,恢复 Timing Gate;" + f"触发原因={self._force_continue_trigger_reason or '未知'} " + f"触发消息编号={self._force_continue_trigger_message_id or 'unknown'}" + ) + self._force_continue_until_reply = False + self._force_continue_trigger_message_id = "" + self._force_continue_trigger_reason = "" + + def _build_force_continue_timing_reason(self) -> str: + """返回当前强制跳过 Timing Gate 的原因描述。""" + + trigger_reason = self._force_continue_trigger_reason or "@/提及消息" + trigger_message_id = self._force_continue_trigger_message_id or "unknown" + return ( + f"检测到新的{trigger_reason}(消息编号={trigger_message_id})," + "本轮直接跳过 Timing Gate 并视作 continue,直到成功发送一条 reply。" + ) def _bind_planner_interrupt_flag(self, interrupt_flag: asyncio.Event) -> None: """绑定当前可打断请求使用的中断标记。""" @@ -235,17 +300,6 @@ class MaisakaHeartFlowChatting: self._internal_loop_task = asyncio.create_task(self._reasoning_engine.run_loop()) logger.warning(f"{self.log_prefix} 已重新拉起 Maisaka 内部循环任务") - if self._loop_task is None or self._loop_task.done(): - if self._loop_task is not None and not self._loop_task.cancelled(): - try: - exc = self._loop_task.exception() - except Exception: - exc = None - if exc is not None: - logger.error(f"{self.log_prefix} 主循环任务异常退出: {exc}") - self._loop_task = asyncio.create_task(self._main_loop()) - logger.warning(f"{self.log_prefix} 已重新拉起 Maisaka 主循环任务") - def _register_tool_providers(self) -> None: """注册 Maisaka 运行时默认启用的工具 Provider。""" @@ -293,22 +347,17 @@ class MaisakaHeartFlowChatting: tool_definitions=[] if tool_definitions is None else tool_definitions, ) - async def _main_loop(self) -> None: - try: - while self._running: - if self._has_pending_messages() and not self._message_turn_scheduled: - self._message_turn_scheduled = True - await self._internal_turn_queue.put("message") - continue - - self._new_message_event.clear() - await self._new_message_event.wait() - except asyncio.CancelledError: - logger.info(f"{self.log_prefix} Maisaka 运行时主循环已取消") - def _has_pending_messages(self) -> bool: return self._last_processed_index < len(self.message_cache) + def _schedule_message_turn(self) -> None: + """为当前待处理消息安排一次内部 turn。""" + if not self._has_pending_messages() or self._message_turn_scheduled: + return + + self._message_turn_scheduled = True + self._internal_turn_queue.put_nowait("message") + def _collect_pending_messages(self) -> list[SessionMessage]: """从消息缓存中收集一批尚未处理的消息。""" start_index = self._last_processed_index @@ -360,8 +409,6 @@ class MaisakaHeartFlowChatting: self._wait_timeout_task = asyncio.create_task( self._schedule_wait_timeout(seconds=seconds, tool_call_id=tool_call_id) ) - # 清理旧的消息触发信号,避免 wait 被历史消息残留事件立即唤醒。 - self._new_message_event.clear() def _enter_stop_state(self) -> None: """切换到停止状态。"""