feat:添加提及必回,部分尺寸过大自动重试,移除无用配置项,正确解析at消息

This commit is contained in:
SengokuCola
2026-04-07 01:31:58 +08:00
parent d3fc044a39
commit 50a51757a8
9 changed files with 398 additions and 248 deletions

View File

@@ -55,7 +55,7 @@ CONFIG_DIR: Path = PROJECT_ROOT / "config"
BOT_CONFIG_PATH: Path = (CONFIG_DIR / "bot_config.toml").resolve().absolute()
MODEL_CONFIG_PATH: Path = (CONFIG_DIR / "model_config.toml").resolve().absolute()
MMC_VERSION: str = "1.0.0"
CONFIG_VERSION: str = "8.3.2"
CONFIG_VERSION: str = "8.3.4"
MODEL_CONFIG_VERSION: str = "1.13.1"
logger = get_logger("config")

View File

@@ -346,6 +346,10 @@ def try_migrate_legacy_bot_config_dict(data: dict[str, Any]) -> MigrationResult:
if chat is None:
chat = {}
data["chat"] = chat
elif "private_plan_style" in chat:
chat.pop("private_plan_style", None)
migrated_any = True
reasons.append("chat.private_plan_style_removed")
mem = _as_dict(data.get("memory"))
if mem is not None:
@@ -366,7 +370,12 @@ def try_migrate_legacy_bot_config_dict(data: dict[str, Any]) -> MigrationResult:
migrated_any = True
reasons.append("experimental.chat_prompts")
for key in ("private_plan_style", "group_chat_prompt", "private_chat_prompts", "chat_prompts"):
if "private_plan_style" in exp:
exp.pop("private_plan_style", None)
migrated_any = True
reasons.append("experimental.private_plan_style_removed")
for key in ("group_chat_prompt", "private_chat_prompts", "chat_prompts"):
if key in exp and key not in chat:
chat[key] = exp[key]
migrated_any = True

View File

@@ -113,21 +113,6 @@ class PersonalityConfig(ConfigBase):
)
"""每次构建回复时,从 multiple_reply_style 中随机替换 reply_style 的概率0.0-1.0"""
plan_style: str = Field(
default=(
"1.思考**所有**的可用的action中的**每个动作**是否符合当下条件,如果动作使用条件符合聊天内容就使用"
"2.如果相同的action已经被执行请不要重复执行该action"
"3.如果有人对你感到厌烦,请减少回复"
"4.如果有人在追问你,或者话题没有说完,请你继续回复"
"5.请分析哪些对话是和你说的,哪些是其他人之间的互动,不要误认为其他人之间的互动是和你说的"
),
json_schema_extra={
"x-widget": "textarea",
"x-icon": "book-open",
},
)
"""_wrap_麦麦的说话规则和行为规则"""
visual_style: str = Field(
default="请用中文描述这张图片的内容。如果有文字请把文字描述概括出来请留意其主题直观感受输出为一段平文本最多30字请注意不要分点就输出一段文本",
json_schema_extra={
@@ -242,20 +227,6 @@ class ChatConfig(ConfigBase):
},
)
"""每个聊天流最大保存的Plan/Reply日志数量超过此数量时会自动删除最老的日志"""
private_plan_style: str = Field(
default=(
"1.思考**所有**的可用的action中的**每个动作**是否符合当下条件,如果动作使用条件符合聊天内容就使用\n"
"2.如果相同的内容已经被执行,请不要重复执行\n"
"3.某句话如果已经被回复过,不要重复回复"
),
json_schema_extra={
"x-widget": "textarea",
"x-icon": "user",
},
)
"""_wrap_私聊说话规则行为风格"""
group_chat_prompt: str = Field(
default="你需要控制自己发言的频率,如果是一对一聊天,可以以较均匀的频率发言;如果用户较多,不要每句都回复,控制回复频率,不要回复的太频繁!控制回复的频率,不要每个人的消息都回复。",
json_schema_extra={

View File

@@ -2,14 +2,14 @@ from dataclasses import dataclass
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Set, Tuple
from rich.traceback import install
import asyncio
import random
import re
import time
import traceback
from rich.traceback import install
from src.common.logger import get_logger
from src.common.data_models.llm_service_data_models import (
LLMAudioTranscriptionResult,
@@ -51,6 +51,13 @@ install(extra_lines=3)
logger = get_logger("model_utils")
DATA_URI_LIMIT_PATTERN = re.compile(
r"Exceeded limit on max bytes per data-uri item\s*:\s*(?P<limit>\d+)",
re.IGNORECASE,
)
DATA_URI_RETRY_MARGIN_BYTES = 128 * 1024
MIN_COMPRESSED_IMAGE_TARGET_SIZE_BYTES = 512 * 1024
class RequestType(Enum):
"""请求类型枚举"""
@@ -132,6 +139,49 @@ class LLMOrchestrator:
f" 如果你认为该警告出现得过于频繁请调整model_config.toml中对应任务的slow_threshold至符合你实际情况的合理值"
)
@staticmethod
def _can_retry_with_compressed_images(
active_request: ClientRequest,
original_response_request: ResponseRequest | None,
) -> bool:
"""判断当前请求是否还可以通过压缩图片进行一次兜底重试。"""
return (
isinstance(active_request, ResponseRequest)
and bool(active_request.message_list)
and original_response_request is not None
and active_request.message_list == original_response_request.message_list
)
@staticmethod
def _extract_data_uri_limit_bytes(error: RespNotOkException) -> int | None:
"""从兼容 OpenAI 的错误文本中提取 data URI 单项大小限制。"""
candidate_messages = [error.message, str(error)]
if error.__cause__ is not None:
candidate_messages.append(str(error.__cause__))
for candidate_message in candidate_messages:
if not candidate_message:
continue
match = DATA_URI_LIMIT_PATTERN.search(candidate_message)
if match is None:
continue
try:
return int(match.group("limit"))
except (TypeError, ValueError):
return None
return None
@staticmethod
def _build_data_uri_retry_target_size(limit_bytes: int) -> int:
"""根据上游返回的 data URI 上限,计算压缩重试的安全目标值。"""
return max(
MIN_COMPRESSED_IMAGE_TARGET_SIZE_BYTES,
limit_bytes - DATA_URI_RETRY_MARGIN_BYTES,
)
@staticmethod
def _build_generation_result(
content: str,
@@ -735,6 +785,11 @@ class LLMOrchestrator:
task_display = self.request_type or "未知任务"
# 可重试的HTTP错误
can_retry_with_compression = self._can_retry_with_compressed_images(
active_request,
original_response_request,
)
if e.status_code == 429 or e.status_code >= 500:
retry_remain -= 1
if retry_remain <= 0:
@@ -750,12 +805,23 @@ class LLMOrchestrator:
continue
# 特殊处理413尝试压缩
data_uri_limit_bytes = self._extract_data_uri_limit_bytes(e)
if data_uri_limit_bytes is not None and can_retry_with_compression:
target_size = self._build_data_uri_retry_target_size(data_uri_limit_bytes)
logger.warning(
f"任务 '{task_display}' 的模型 '{model_info.name}' 返回 data URI 图片过大错误,"
f"检测到单项上限 {data_uri_limit_bytes} 字节,尝试压缩图片后重试..."
)
compressed_messages = compress_messages(
active_request.message_list,
img_target_size=target_size,
)
active_request = active_request.copy_with(message_list=compressed_messages)
continue
if (
e.status_code == 413
and isinstance(active_request, ResponseRequest)
and active_request.message_list
and original_response_request is not None
and active_request.message_list == original_response_request.message_list
and can_retry_with_compression
):
logger.warning(
f"任务 '{task_display}' 的模型 '{model_info.name}' 返回413请求体过大尝试压缩后重试..."

View File

@@ -191,6 +191,7 @@ async def handle_tool(
else:
for sent_message in sent_messages:
tool_ctx.append_sent_message_to_chat_history(sent_message)
tool_ctx.runtime._clear_force_continue_until_reply()
return tool_ctx.build_success_result(
invocation.tool_name,
"回复已生成并发送。",

View File

@@ -78,6 +78,24 @@ def _append_reply_component(builder: MessageBuilder, component: ReplyComponent)
return True
def _render_at_component_text(component: AtComponent) -> str:
"""灏?AtComponent 娓叉煋涓烘枃鏈舰寮忋€?"""
target_name = component.target_user_cardname or component.target_user_nickname or component.target_user_id
return f"@{target_name}".strip()
def _append_at_component(builder: MessageBuilder, component: AtComponent) -> bool:
"""灏?@ 缁勪欢杞崲涓烘枃鏈苟鍐欏叆 LLM 娑堟伅銆?"""
rendered_text = _render_at_component_text(component)
if not rendered_text:
return False
builder.add_text_content(rendered_text)
return True
def contains_complex_message(message_sequence: MessageSequence) -> bool:
"""判断消息序列中是否包含复杂消息组件。"""
@@ -119,8 +137,7 @@ def _render_component_for_prompt(component: StandardMessageComponents) -> str:
return component.content.strip() if component.content else "[语音消息]"
if isinstance(component, AtComponent):
target_name = component.target_user_cardname or component.target_user_nickname or component.target_user_id
return f"@{target_name}".strip()
return _render_at_component_text(component)
if isinstance(component, ReplyComponent):
sender_name = (
@@ -224,6 +241,10 @@ def _build_message_from_sequence(
has_content = _append_image_component(builder, component) or has_content
continue
if isinstance(component, AtComponent):
has_content = _append_at_component(builder, component) or has_content
continue
if isinstance(component, ReplyComponent):
has_content = _append_reply_component(builder, component) or has_content
continue

View File

@@ -3,9 +3,11 @@
from copy import deepcopy
from datetime import datetime
from typing import Optional
import re
from src.common.data_models.message_component_data_model import (
AtComponent,
EmojiComponent,
ImageComponent,
MessageSequence,
@@ -26,13 +28,15 @@ def format_speaker_content(
message_id: Optional[str] = None,
) -> str:
"""将可见文本格式化为带说话人前缀的样式。"""
time_prefix = timestamp.strftime("%H:%M:%S") if timestamp is not None else ""
message_id_prefix = f"[msg_id:{message_id}]" if message_id else ""
return f"{time_prefix}{message_id_prefix}[{speaker_name}]{content}"
def parse_speaker_content(content: str) -> tuple[Optional[str], str]:
"""解析形如 [speaker]message 的可见文本。"""
"""解析形如 `[speaker]message` 的可见文本。"""
match = SPEAKER_PREFIX_PATTERN.match(content or "")
if not match:
return None, content or ""
@@ -41,11 +45,20 @@ def parse_speaker_content(content: str) -> tuple[Optional[str], str]:
def clone_message_sequence(message_sequence: MessageSequence) -> MessageSequence:
"""复制消息片段序列。"""
return MessageSequence([deepcopy(component) for component in message_sequence.components])
def _render_at_component_text(component: AtComponent) -> str:
"""将 AtComponent 渲染为文本。"""
target_name = component.target_user_cardname or component.target_user_nickname or component.target_user_id
return f"@{target_name}".strip()
def build_visible_text_from_sequence(message_sequence: MessageSequence) -> str:
"""从消息片段序列提取可见文本。"""
parts: list[str] = []
for component in message_sequence.components:
if isinstance(component, TextComponent):
@@ -73,6 +86,10 @@ def build_visible_text_from_sequence(message_sequence: MessageSequence) -> str:
parts.append(component.content or "[图片]")
continue
if isinstance(component, AtComponent):
parts.append(_render_at_component_text(component))
continue
if isinstance(component, ReplyComponent):
target_message_id = component.target_message_id.strip()
if target_message_id:

View File

@@ -23,9 +23,11 @@ from src.services import database_service as database_api
from .builtin_tool import get_action_tool_specs
from .builtin_tool import build_builtin_tool_handlers as build_split_builtin_tool_handlers
from .builtin_tool import get_timing_tools
from .chat_loop_service import ChatResponse
from .chat_history_visual_refresher import refresh_chat_history_visual_placeholders
from .builtin_tool.context import BuiltinToolRuntimeContext
from .context_messages import (
AssistantMessage,
ComplexSessionMessage,
LLMContextMessage,
SessionBackedMessage,
@@ -229,6 +231,9 @@ class MaisakaReasoningEngine:
) -> tuple[Literal["continue", "no_reply", "wait"], Any, list[str]]:
"""运行 Timing Gate 子代理并返回控制决策。"""
if self._runtime._force_continue_until_reply:
return self._build_forced_continue_timing_result()
response = await self._run_interruptible_sub_agent(
context_message_limit=TIMING_GATE_CONTEXT_LIMIT,
system_prompt=self._build_timing_gate_system_prompt(),
@@ -264,191 +269,210 @@ class MaisakaReasoningEngine:
return "continue", response, tool_result_summaries
return timing_action, response, tool_result_summaries
def _build_forced_continue_timing_result(self) -> tuple[Literal["continue"], ChatResponse, list[str]]:
"""构造跳过 Timing Gate 时使用的伪 continue 结果。"""
reason = self._runtime._build_force_continue_timing_reason()
logger.info(f"{self._runtime.log_prefix} {reason}")
return (
"continue",
ChatResponse(
content=reason,
tool_calls=[],
raw_message=AssistantMessage(
content="",
timestamp=datetime.now(),
source_kind="perception",
),
selected_history_count=min(
sum(1 for message in self._runtime._chat_history if message.count_in_context),
self._runtime._max_context_size,
),
prompt_tokens=0,
built_message_count=0,
completion_tokens=0,
total_tokens=0,
prompt_section=None,
),
[f"- continue [强制跳过]: {reason}"],
)
async def run_loop(self) -> None:
"""独立消费消息批次,并执行对应的内部思考轮次。"""
try:
while self._runtime._running:
queue_item_done_count = 0
try:
queued_trigger = await self._runtime._internal_turn_queue.get()
(
message_triggered,
timeout_triggered,
queue_item_done_count,
) = self._drain_ready_turn_triggers(queued_trigger)
queued_trigger = await self._runtime._internal_turn_queue.get()
message_triggered, timeout_triggered = self._drain_ready_turn_triggers(queued_trigger)
if message_triggered:
await self._runtime._wait_for_message_quiet_period()
self._runtime._message_turn_scheduled = False
if message_triggered:
await self._runtime._wait_for_message_quiet_period()
self._runtime._message_turn_scheduled = False
cached_messages = (
self._runtime._collect_pending_messages()
if self._runtime._has_pending_messages()
else []
)
if not timeout_triggered and not cached_messages and not message_triggered:
cached_messages = (
self._runtime._collect_pending_messages()
if self._runtime._has_pending_messages()
else []
)
if not timeout_triggered and not cached_messages and not message_triggered:
continue
self._runtime._agent_state = self._runtime._STATE_RUNNING
if cached_messages:
asyncio.create_task(self._runtime._trigger_batch_learning(cached_messages))
self._append_wait_interrupted_message_if_needed()
await self._ingest_messages(cached_messages)
anchor_message = cached_messages[-1]
else:
anchor_message = self._get_timeout_anchor_message()
if anchor_message is None:
logger.warning(
f"{self._runtime.log_prefix} 等待超时后缺少可复用的锚点消息,跳过本轮继续思考"
)
continue
logger.info(f"{self._runtime.log_prefix} 等待超时后开始新一轮思考")
if self._runtime._pending_wait_tool_call_id:
self._runtime._chat_history.append(self._build_wait_timeout_message())
self._trim_chat_history()
self._runtime._agent_state = self._runtime._STATE_RUNNING
if cached_messages:
asyncio.create_task(self._runtime._trigger_batch_learning(cached_messages))
self._append_wait_interrupted_message_if_needed()
await self._ingest_messages(cached_messages)
anchor_message = cached_messages[-1]
else:
anchor_message = self._get_timeout_anchor_message()
if anchor_message is None:
logger.warning(
f"{self._runtime.log_prefix} 等待超时后缺少可复用的锚点消息,跳过本轮继续思考"
)
continue
logger.info(f"{self._runtime.log_prefix} 等待超时后开始新一轮思考")
if self._runtime._pending_wait_tool_call_id:
self._runtime._chat_history.append(self._build_wait_timeout_message())
self._trim_chat_history()
try:
for round_index in range(self._runtime._max_internal_rounds):
cycle_detail = self._start_cycle()
self._runtime._log_cycle_started(cycle_detail, round_index)
await emit_cycle_start(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
round_index=round_index,
max_rounds=self._runtime._max_internal_rounds,
history_count=len(self._runtime._chat_history),
)
planner_started_at = 0.0
try:
visual_refresh_started_at = time.time()
refreshed_message_count = await self._refresh_chat_history_visual_placeholders()
cycle_detail.time_records["visual_refresh"] = time.time() - visual_refresh_started_at
if refreshed_message_count > 0:
logger.info(
f"{self._runtime.log_prefix} 本轮思考前已刷新 {refreshed_message_count} 条视觉占位历史消息"
)
try:
for round_index in range(self._runtime._max_internal_rounds):
cycle_detail = self._start_cycle()
self._runtime._log_cycle_started(cycle_detail, round_index)
await emit_cycle_start(
timing_started_at = time.time()
timing_action, timing_response, timing_tool_results = await self._run_timing_gate(anchor_message)
timing_duration_ms = (time.time() - timing_started_at) * 1000
cycle_detail.time_records["timing_gate"] = timing_duration_ms / 1000
await emit_timing_gate_result(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
round_index=round_index,
max_rounds=self._runtime._max_internal_rounds,
history_count=len(self._runtime._chat_history),
action=timing_action,
content=timing_response.content,
tool_calls=timing_response.tool_calls,
messages=[],
prompt_tokens=timing_response.prompt_tokens,
selected_history_count=timing_response.selected_history_count,
duration_ms=timing_duration_ms,
)
planner_started_at = 0.0
try:
visual_refresh_started_at = time.time()
refreshed_message_count = await self._refresh_chat_history_visual_placeholders()
cycle_detail.time_records["visual_refresh"] = time.time() - visual_refresh_started_at
if refreshed_message_count > 0:
logger.info(
f"{self._runtime.log_prefix} 本轮思考前已刷新 {refreshed_message_count} 条视觉占位历史消息"
)
timing_started_at = time.time()
timing_action, timing_response, timing_tool_results = await self._run_timing_gate(anchor_message)
timing_duration_ms = (time.time() - timing_started_at) * 1000
cycle_detail.time_records["timing_gate"] = timing_duration_ms / 1000
await emit_timing_gate_result(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
action=timing_action,
content=timing_response.content,
tool_calls=timing_response.tool_calls,
messages=[],
prompt_tokens=timing_response.prompt_tokens,
selected_history_count=timing_response.selected_history_count,
duration_ms=timing_duration_ms,
)
self._runtime._render_context_usage_panel(
selected_history_count=timing_response.selected_history_count,
prompt_tokens=timing_response.prompt_tokens,
planner_response=timing_response.content or "",
tool_calls=timing_response.tool_calls,
tool_results=timing_tool_results,
prompt_section=timing_response.prompt_section,
)
if timing_action != "continue":
logger.info(
f"{self._runtime.log_prefix} Timing Gate 结束当前回合: "
f"回合={round_index + 1} 动作={timing_action}"
)
break
planner_started_at = time.time()
action_tool_definitions = await self._build_action_tool_definitions()
self._runtime._render_context_usage_panel(
selected_history_count=timing_response.selected_history_count,
prompt_tokens=timing_response.prompt_tokens,
planner_response=timing_response.content or "",
tool_calls=timing_response.tool_calls,
tool_results=timing_tool_results,
prompt_section=timing_response.prompt_section,
)
if timing_action != "continue":
logger.info(
f"{self._runtime.log_prefix} 规划器开始执行: "
f"回合={round_index + 1} "
f"历史消息数={len(self._runtime._chat_history)} "
f"开始时间={planner_started_at:.3f}"
f"{self._runtime.log_prefix} Timing Gate 结束当前回合: "
f"回合={round_index + 1} 动作={timing_action}"
)
response = await self._run_interruptible_planner(
tool_definitions=action_tool_definitions,
break
planner_started_at = time.time()
action_tool_definitions = await self._build_action_tool_definitions()
logger.info(
f"{self._runtime.log_prefix} 规划器开始执行: "
f"回合={round_index + 1} "
f"历史消息数={len(self._runtime._chat_history)} "
f"开始时间={planner_started_at:.3f}"
)
response = await self._run_interruptible_planner(
tool_definitions=action_tool_definitions,
)
planner_duration_ms = (time.time() - planner_started_at) * 1000
cycle_detail.time_records["planner"] = planner_duration_ms / 1000
logger.info(
f"{self._runtime.log_prefix} 规划器执行完成: "
f"回合={round_index + 1} "
f"耗时={cycle_detail.time_records['planner']:.3f}"
)
await emit_planner_response(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
content=response.content,
tool_calls=response.tool_calls,
prompt_tokens=response.prompt_tokens,
completion_tokens=response.completion_tokens,
total_tokens=response.total_tokens,
duration_ms=planner_duration_ms,
)
reasoning_content = response.content or ""
if self._should_replace_reasoning(reasoning_content):
response.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后先输出想法再使用工具"
response.raw_message.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后先输出想法再使用工具"
logger.info(f"{self._runtime.log_prefix} 当前思考与上一轮过于相似,已替换为重新思考提示")
self._last_reasoning_content = reasoning_content
self._runtime._chat_history.append(response.raw_message)
tool_result_summaries: list[str] = []
if response.tool_calls:
tool_started_at = time.time()
should_pause, tool_result_summaries = await self._handle_tool_calls(
response.tool_calls,
response.content or "",
anchor_message,
)
planner_duration_ms = (time.time() - planner_started_at) * 1000
cycle_detail.time_records["planner"] = planner_duration_ms / 1000
logger.info(
f"{self._runtime.log_prefix} 规划器执行完成: "
f"回合={round_index + 1} "
f"耗时={cycle_detail.time_records['planner']:.3f}"
)
await emit_planner_response(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
content=response.content,
tool_calls=response.tool_calls,
prompt_tokens=response.prompt_tokens,
completion_tokens=response.completion_tokens,
total_tokens=response.total_tokens,
duration_ms=planner_duration_ms,
)
reasoning_content = response.content or ""
if self._should_replace_reasoning(reasoning_content):
response.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后先输出想法再使用工具"
response.raw_message.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后先输出想法再使用工具"
logger.info(f"{self._runtime.log_prefix} 当前思考与上一轮过于相似,已替换为重新思考提示")
self._last_reasoning_content = reasoning_content
self._runtime._chat_history.append(response.raw_message)
tool_result_summaries: list[str] = []
if response.tool_calls:
tool_started_at = time.time()
should_pause, tool_result_summaries = await self._handle_tool_calls(
response.tool_calls,
response.content or "",
anchor_message,
)
cycle_detail.time_records["tool_calls"] = time.time() - tool_started_at
self._runtime._render_context_usage_panel(
selected_history_count=response.selected_history_count,
prompt_tokens=response.prompt_tokens,
planner_response=response.content or "",
tool_calls=response.tool_calls,
tool_results=tool_result_summaries,
prompt_section=response.prompt_section,
)
if should_pause:
break
continue
cycle_detail.time_records["tool_calls"] = time.time() - tool_started_at
self._runtime._render_context_usage_panel(
selected_history_count=response.selected_history_count,
prompt_tokens=response.prompt_tokens,
planner_response=response.content or "",
tool_calls=response.tool_calls,
tool_results=tool_result_summaries,
prompt_section=response.prompt_section,
)
if not response.content:
if should_pause:
break
except ReqAbortException:
interrupted_at = time.time()
logger.info(
f"{self._runtime.log_prefix} 规划器打断成功: "
f"回合={round_index + 1} "
f"开始时间={planner_started_at:.3f} "
f"打断时间={interrupted_at:.3f} "
f"耗时={interrupted_at - planner_started_at:.3f}"
)
continue
self._runtime._render_context_usage_panel(
selected_history_count=response.selected_history_count,
prompt_tokens=response.prompt_tokens,
planner_response=response.content or "",
prompt_section=response.prompt_section,
)
if not response.content:
break
finally:
self._end_cycle(cycle_detail)
await emit_cycle_end(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
time_records=dict(cycle_detail.time_records),
agent_state=self._runtime._agent_state,
)
finally:
if self._runtime._agent_state == self._runtime._STATE_RUNNING:
self._runtime._agent_state = self._runtime._STATE_STOP
except ReqAbortException:
interrupted_at = time.time()
logger.info(
f"{self._runtime.log_prefix} 规划器打断成功: "
f"回合={round_index + 1} "
f"开始时间={planner_started_at:.3f} "
f"打断时间={interrupted_at:.3f} "
f"耗时={interrupted_at - planner_started_at:.3f}"
)
break
finally:
self._end_cycle(cycle_detail)
await emit_cycle_end(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
time_records=dict(cycle_detail.time_records),
agent_state=self._runtime._agent_state,
)
finally:
for _ in range(queue_item_done_count):
self._runtime._internal_turn_queue.task_done()
if self._runtime._agent_state == self._runtime._STATE_RUNNING:
self._runtime._agent_state = self._runtime._STATE_STOP
except asyncio.CancelledError:
self._runtime._log_internal_loop_cancelled()
raise
@@ -460,10 +484,9 @@ class MaisakaReasoningEngine:
def _drain_ready_turn_triggers(
self,
queued_trigger: Literal["message", "timeout"],
) -> tuple[bool, bool, int]:
) -> tuple[bool, bool]:
"""合并当前已就绪的 turn 触发信号。"""
queue_item_done_count = 1
message_triggered = queued_trigger == "message"
timeout_triggered = queued_trigger == "timeout"
@@ -473,7 +496,6 @@ class MaisakaReasoningEngine:
except asyncio.QueueEmpty:
break
queue_item_done_count += 1
if next_trigger == "message":
message_triggered = True
continue
@@ -481,11 +503,7 @@ class MaisakaReasoningEngine:
timeout_triggered = True
continue
if message_triggered:
# 这些消息触发将由当前 turn 接手,旧的事件位不应再污染后续 wait 判定。
self._runtime._new_message_event.clear()
return message_triggered, timeout_triggered, queue_item_done_count
return message_triggered, timeout_triggered
def _get_timeout_anchor_message(self) -> Optional[SessionMessage]:
"""在 wait 超时后复用最近一条真实用户消息作为锚点。"""

View File

@@ -13,6 +13,7 @@ from src.cli.console import console
from src.chat.heart_flow.heartFC_utils import CycleDetail
from src.chat.message_receive.chat_manager import BotChatSession, chat_manager
from src.chat.message_receive.message import SessionMessage
from src.chat.utils.utils import is_mentioned_bot_in_message
from src.common.data_models.mai_message_data_model import GroupInfo, UserInfo
from src.common.logger import get_logger
from src.common.utils.utils_config import ExpressionConfigUtils
@@ -72,8 +73,6 @@ class MaisakaHeartFlowChatting:
self._running = False
self._cycle_counter = 0
self._internal_loop_task: Optional[asyncio.Task] = None
self._loop_task: Optional[asyncio.Task] = None
self._new_message_event = asyncio.Event()
self._message_turn_scheduled = False
self._message_debounce_seconds = 1.0
self._message_debounce_required = False
@@ -84,6 +83,9 @@ class MaisakaHeartFlowChatting:
self._agent_state: Literal["running", "wait", "stop"] = self._STATE_STOP
self._wait_until: Optional[float] = None
self._pending_wait_tool_call_id: Optional[str] = None
self._force_continue_until_reply = False
self._force_continue_trigger_message_id = ""
self._force_continue_trigger_reason = ""
self._planner_interrupt_flag: Optional[asyncio.Event] = None
self._planner_interrupt_requested = False
self._planner_interrupt_consecutive_count = 0
@@ -118,6 +120,7 @@ class MaisakaHeartFlowChatting:
self._running = True
self._ensure_background_tasks_running()
self._schedule_message_turn()
logger.info(f"{self.log_prefix} Maisaka 运行时已启动")
async def stop(self) -> None:
@@ -126,22 +129,12 @@ class MaisakaHeartFlowChatting:
return
self._running = False
self._new_message_event.set()
self._message_turn_scheduled = False
self._message_debounce_required = False
self._cancel_wait_timeout_task()
while not self._internal_turn_queue.empty():
_ = self._internal_turn_queue.get_nowait()
if self._loop_task is not None:
self._loop_task.cancel()
try:
await self._loop_task
except asyncio.CancelledError:
pass
finally:
self._loop_task = None
if self._internal_loop_task is not None:
self._internal_loop_task.cancel()
try:
@@ -166,6 +159,7 @@ class MaisakaHeartFlowChatting:
if self._running:
self._ensure_background_tasks_running()
self._last_message_received_at = time.time()
self._update_message_trigger_state(message)
self.message_cache.append(message)
self._source_messages_by_id[message.message_id] = message
if self._agent_state == self._STATE_WAIT:
@@ -199,7 +193,78 @@ class MaisakaHeartFlowChatting:
f"{self._planner_interrupt_max_consecutive_count}"
)
self._planner_interrupt_flag.set()
self._new_message_event.set()
if self._running:
self._schedule_message_turn()
def _update_message_trigger_state(self, message: SessionMessage) -> None:
"""补齐消息中的 @/提及 标记,并在命中时启用强制 continue。"""
detected_mentioned, detected_at, _ = is_mentioned_bot_in_message(message)
if detected_at:
message.is_at = True
if detected_mentioned:
message.is_mentioned = True
if not message.is_at and not message.is_mentioned:
return
self._arm_force_continue_until_reply(
message,
is_at=message.is_at,
is_mentioned=message.is_mentioned,
)
def _arm_force_continue_until_reply(
self,
message: SessionMessage,
*,
is_at: bool,
is_mentioned: bool,
) -> None:
"""在检测到 @ 或提及时,要求后续轮次跳过 Timing Gate 直到成功 reply。"""
trigger_reason = "@消息" if is_at else "提及消息" if is_mentioned else "触发消息"
was_armed = self._force_continue_until_reply
self._force_continue_until_reply = True
self._force_continue_trigger_message_id = message.message_id
self._force_continue_trigger_reason = trigger_reason
if was_armed:
logger.info(
f"{self.log_prefix} 检测到新的{trigger_reason},刷新强制 continue 状态;"
f"消息编号={message.message_id}"
)
return
logger.info(
f"{self.log_prefix} 检测到{trigger_reason},将跳过 Timing Gate 直到成功发送一条 reply"
f"消息编号={message.message_id}"
)
def _clear_force_continue_until_reply(self) -> None:
"""在成功发送 reply 后清理强制 continue 状态。"""
if not self._force_continue_until_reply:
return
logger.info(
f"{self.log_prefix} 已成功发送 reply恢复 Timing Gate"
f"触发原因={self._force_continue_trigger_reason or '未知'} "
f"触发消息编号={self._force_continue_trigger_message_id or 'unknown'}"
)
self._force_continue_until_reply = False
self._force_continue_trigger_message_id = ""
self._force_continue_trigger_reason = ""
def _build_force_continue_timing_reason(self) -> str:
"""返回当前强制跳过 Timing Gate 的原因描述。"""
trigger_reason = self._force_continue_trigger_reason or "@/提及消息"
trigger_message_id = self._force_continue_trigger_message_id or "unknown"
return (
f"检测到新的{trigger_reason}(消息编号={trigger_message_id}"
"本轮直接跳过 Timing Gate 并视作 continue直到成功发送一条 reply。"
)
def _bind_planner_interrupt_flag(self, interrupt_flag: asyncio.Event) -> None:
"""绑定当前可打断请求使用的中断标记。"""
@@ -235,17 +300,6 @@ class MaisakaHeartFlowChatting:
self._internal_loop_task = asyncio.create_task(self._reasoning_engine.run_loop())
logger.warning(f"{self.log_prefix} 已重新拉起 Maisaka 内部循环任务")
if self._loop_task is None or self._loop_task.done():
if self._loop_task is not None and not self._loop_task.cancelled():
try:
exc = self._loop_task.exception()
except Exception:
exc = None
if exc is not None:
logger.error(f"{self.log_prefix} 主循环任务异常退出: {exc}")
self._loop_task = asyncio.create_task(self._main_loop())
logger.warning(f"{self.log_prefix} 已重新拉起 Maisaka 主循环任务")
def _register_tool_providers(self) -> None:
"""注册 Maisaka 运行时默认启用的工具 Provider。"""
@@ -293,22 +347,17 @@ class MaisakaHeartFlowChatting:
tool_definitions=[] if tool_definitions is None else tool_definitions,
)
async def _main_loop(self) -> None:
try:
while self._running:
if self._has_pending_messages() and not self._message_turn_scheduled:
self._message_turn_scheduled = True
await self._internal_turn_queue.put("message")
continue
self._new_message_event.clear()
await self._new_message_event.wait()
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} Maisaka 运行时主循环已取消")
def _has_pending_messages(self) -> bool:
return self._last_processed_index < len(self.message_cache)
def _schedule_message_turn(self) -> None:
"""为当前待处理消息安排一次内部 turn。"""
if not self._has_pending_messages() or self._message_turn_scheduled:
return
self._message_turn_scheduled = True
self._internal_turn_queue.put_nowait("message")
def _collect_pending_messages(self) -> list[SessionMessage]:
"""从消息缓存中收集一批尚未处理的消息。"""
start_index = self._last_processed_index
@@ -360,8 +409,6 @@ class MaisakaHeartFlowChatting:
self._wait_timeout_task = asyncio.create_task(
self._schedule_wait_timeout(seconds=seconds, tool_call_id=tool_call_id)
)
# 清理旧的消息触发信号,避免 wait 被历史消息残留事件立即唤醒。
self._new_message_event.clear()
def _enter_stop_state(self) -> None:
"""切换到停止状态。"""