feat:优化timing门控逻辑,减少消耗,提高速度

This commit is contained in:
SengokuCola
2026-04-09 13:56:34 +08:00
parent daef71b7e9
commit b28481d205
17 changed files with 371 additions and 49 deletions

View File

@@ -52,7 +52,7 @@ logger = get_logger("maisaka_reasoning_engine")
TIMING_GATE_CONTEXT_LIMIT = 24
TIMING_GATE_MAX_TOKENS = 384
TIMING_GATE_TOOL_NAMES = {"continue", "no_reply", "wait"}
ACTION_HIDDEN_TOOL_NAMES = {"continue", "no_reply", "wait"}
ACTION_HIDDEN_TOOL_NAMES = {"continue", "no_reply"}
ACTION_BUILTIN_TOOL_NAMES = {tool_spec.name for tool_spec in get_action_tool_specs()}
@@ -297,6 +297,21 @@ class MaisakaReasoningEngine:
[f"- continue [强制跳过]: {reason}"],
)
@staticmethod
def _mark_timing_gate_completed(timing_action: str) -> bool:
"""根据门控动作决定下一轮是否还需要重新执行 timing。"""
return timing_action != "continue"
@staticmethod
def _should_retry_planner_after_interrupt(
*,
round_index: int,
max_internal_rounds: int,
has_pending_messages: bool,
) -> bool:
return has_pending_messages and round_index + 1 < max_internal_rounds
async def run_loop(self) -> None:
"""独立消费消息批次,并执行对应的内部思考轮次。"""
try:
@@ -313,7 +328,7 @@ class MaisakaReasoningEngine:
if self._runtime._has_pending_messages()
else []
)
if not timeout_triggered and not cached_messages and not message_triggered:
if not timeout_triggered and not cached_messages:
continue
self._runtime._agent_state = self._runtime._STATE_RUNNING
@@ -335,6 +350,7 @@ class MaisakaReasoningEngine:
self._trim_chat_history()
try:
timing_gate_required = True
for round_index in range(self._runtime._max_internal_rounds):
cycle_detail = self._start_cycle()
self._runtime._log_cycle_started(cycle_detail, round_index)
@@ -363,27 +379,36 @@ class MaisakaReasoningEngine:
f"{self._runtime.log_prefix} 本轮思考前已刷新 {refreshed_message_count} 条视觉占位历史消息"
)
timing_started_at = time.time()
timing_action, timing_response, timing_tool_results = await self._run_timing_gate(anchor_message)
timing_duration_ms = (time.time() - timing_started_at) * 1000
cycle_detail.time_records["timing_gate"] = timing_duration_ms / 1000
await emit_timing_gate_result(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
action=timing_action,
content=timing_response.content,
tool_calls=timing_response.tool_calls,
messages=[],
prompt_tokens=timing_response.prompt_tokens,
selected_history_count=timing_response.selected_history_count,
duration_ms=timing_duration_ms,
)
if timing_action != "continue":
logger.info(
f"{self._runtime.log_prefix} Timing Gate 结束当前回合: "
f"回合={round_index + 1} 动作={timing_action}"
if timing_gate_required:
timing_started_at = time.time()
timing_action, timing_response, timing_tool_results = await self._run_timing_gate(
anchor_message
)
timing_duration_ms = (time.time() - timing_started_at) * 1000
cycle_detail.time_records["timing_gate"] = timing_duration_ms / 1000
await emit_timing_gate_result(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
action=timing_action,
content=timing_response.content,
tool_calls=timing_response.tool_calls,
messages=[],
prompt_tokens=timing_response.prompt_tokens,
selected_history_count=timing_response.selected_history_count,
duration_ms=timing_duration_ms,
)
timing_gate_required = self._mark_timing_gate_completed(timing_action)
if timing_action != "continue":
logger.info(
f"{self._runtime.log_prefix} Timing Gate 结束当前回合: "
f"回合={round_index + 1} 动作={timing_action}"
)
break
else:
logger.info(
f"{self._runtime.log_prefix} 跳过 Timing Gate继续执行 Planner: "
f"回合={round_index + 1}"
)
break
planner_started_at = time.time()
action_tool_definitions = await self._build_action_tool_definitions()
@@ -436,7 +461,28 @@ class MaisakaReasoningEngine:
f"打断时间={interrupted_at:.3f} "
f"耗时={interrupted_at - planner_started_at:.3f}"
)
break
if not self._should_retry_planner_after_interrupt(
round_index=round_index,
max_internal_rounds=self._runtime._max_internal_rounds,
has_pending_messages=self._runtime._has_pending_messages(),
):
break
await self._runtime._wait_for_message_quiet_period()
self._runtime._message_turn_scheduled = False
interrupted_messages = self._runtime._collect_pending_messages()
if not interrupted_messages:
break
asyncio.create_task(self._runtime._trigger_batch_learning(interrupted_messages))
self._append_wait_interrupted_message_if_needed()
await self._ingest_messages(interrupted_messages)
anchor_message = interrupted_messages[-1]
logger.info(
f"{self._runtime.log_prefix} 淇濇寔娲昏穬鐘舵€侊紝璺宠繃 Timing Gate 鐩存帴閲嶈瘯 Planner: "
f"鍥炲悎={round_index + 2}"
)
continue
finally:
completed_cycle = self._end_cycle(cycle_detail)
self._runtime._render_context_usage_panel(
@@ -933,6 +979,9 @@ class MaisakaReasoningEngine:
if invocation.tool_name == "no_reply":
return "你暂停了当前对话循环,等待新的外部消息。"
if invocation.tool_name == "finish":
return "你结束了本轮思考,等待新的外部消息后再继续。"
if invocation.tool_name == "continue":
return "你允许当前对话继续进入下一轮完整思考与工具执行。"