feat:wait现在会正确重试,给planner打断添加防抖

This commit is contained in:
SengokuCola
2026-04-05 02:05:03 +08:00
parent 7b924774be
commit 87cd992a9f
5 changed files with 17958 additions and 179 deletions

17709
dashboard/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,15 +1,16 @@
# 使用基于时间戳的文件处理器,简单的轮转份数限制
import logging
import json
import threading
import time
import structlog
import tomlkit
from datetime import datetime, timedelta
from pathlib import Path
from typing import Callable, Optional
from datetime import datetime, timedelta
import json
import logging
import threading
import time
import structlog
import tomlkit
from .logger_color_and_mapping import MODULE_ALIASES, RESET_COLOR, CONVERTED_MODULE_COLORS as MODULE_COLORS
@@ -200,6 +201,8 @@ class WebSocketLogHandler(logging.Handler):
"""发送日志到 WebSocket 客户端"""
if not self._initialized or self.loop is None:
return
if self.loop.is_closed():
return
try:
# 获取格式化后的消息
@@ -233,7 +236,12 @@ class WebSocketLogHandler(logging.Handler):
import asyncio
from src.webui.logs_ws import broadcast_log
asyncio.run_coroutine_threadsafe(broadcast_log(log_data), self.loop)
coroutine = broadcast_log(log_data)
try:
asyncio.run_coroutine_threadsafe(coroutine, self.loop)
except Exception:
coroutine.close()
raise
except Exception:
# WebSocket 推送失败不影响日志记录
pass

View File

@@ -190,12 +190,12 @@ class ExpressionAutoCheckTask(AsyncTask):
set_review_state(expression.id, True, not suitable, "ai")
status = "通过" if suitable else "不通过"
logger.info(
f"表达方式评估完成 [ID: {expression.id}] - {status} | "
f"Situation: {expression.situation}... | "
f"Style: {expression.style}... | "
f"Reason: {reason[:50]}..."
)
# logger.info(
# f"表达方式评估完成 [ID: {expression.id}] - {status} | "
# f"Situation: {expression.situation}... | "
# f"Style: {expression.style}... | "
# f"Reason: {reason[:50]}..."
# )
if error:
logger.warning(f"表达方式评估时出现错误 [ID: {expression.id}]: {error}")

View File

@@ -253,124 +253,142 @@ class MaisakaReasoningEngine:
"""独立消费消息批次,并执行对应的内部思考轮次。"""
try:
while self._runtime._running:
cached_messages = await self._runtime._internal_turn_queue.get()
timeout_triggered = cached_messages is None
if not timeout_triggered and not cached_messages:
self._runtime._internal_turn_queue.task_done()
continue
self._runtime._agent_state = self._runtime._STATE_RUNNING
if cached_messages:
self._clear_pending_wait_tool_call_id()
await self._ingest_messages(cached_messages)
anchor_message = cached_messages[-1]
else:
anchor_message = self._get_timeout_anchor_message()
if anchor_message is None:
logger.warning(
f"{self._runtime.log_prefix} 等待超时后缺少可复用的锚点消息,跳过本轮继续思考"
)
self._runtime._internal_turn_queue.task_done()
continue
logger.info(f"{self._runtime.log_prefix} 等待超时后开始新一轮思考")
self._clear_pending_wait_tool_call_id()
self._trim_chat_history()
queue_item_done_count = 0
try:
for round_index in range(self._runtime._max_internal_rounds):
cycle_detail = self._start_cycle()
self._runtime._log_cycle_started(cycle_detail, round_index)
planner_started_at = 0.0
try:
timing_started_at = time.time()
timing_action, timing_response, timing_tool_results = await self._run_timing_gate(anchor_message)
cycle_detail.time_records["timing_gate"] = time.time() - timing_started_at
self._runtime._render_context_usage_panel(
selected_history_count=timing_response.selected_history_count,
prompt_tokens=timing_response.prompt_tokens,
planner_response=timing_response.content or "",
tool_calls=timing_response.tool_calls,
tool_results=timing_tool_results,
prompt_section=timing_response.prompt_section,
queued_trigger = await self._runtime._internal_turn_queue.get()
(
message_triggered,
timeout_triggered,
queue_item_done_count,
) = self._drain_ready_turn_triggers(queued_trigger)
if message_triggered:
await self._runtime._wait_for_message_quiet_period()
self._runtime._message_turn_scheduled = False
cached_messages = (
self._runtime._collect_pending_messages()
if self._runtime._has_pending_messages()
else []
)
if not timeout_triggered and not cached_messages and not message_triggered:
continue
self._runtime._agent_state = self._runtime._STATE_RUNNING
if cached_messages:
asyncio.create_task(self._runtime._trigger_batch_learning(cached_messages))
self._append_wait_interrupted_message_if_needed()
await self._ingest_messages(cached_messages)
anchor_message = cached_messages[-1]
else:
anchor_message = self._get_timeout_anchor_message()
if anchor_message is None:
logger.warning(
f"{self._runtime.log_prefix} 等待超时后缺少可复用的锚点消息,跳过本轮继续思考"
)
if timing_action != "continue":
continue
logger.info(f"{self._runtime.log_prefix} 等待超时后开始新一轮思考")
if self._runtime._pending_wait_tool_call_id:
self._runtime._chat_history.append(self._build_wait_timeout_message())
self._trim_chat_history()
try:
for round_index in range(self._runtime._max_internal_rounds):
cycle_detail = self._start_cycle()
self._runtime._log_cycle_started(cycle_detail, round_index)
planner_started_at = 0.0
try:
timing_started_at = time.time()
timing_action, timing_response, timing_tool_results = await self._run_timing_gate(anchor_message)
cycle_detail.time_records["timing_gate"] = time.time() - timing_started_at
self._runtime._render_context_usage_panel(
selected_history_count=timing_response.selected_history_count,
prompt_tokens=timing_response.prompt_tokens,
planner_response=timing_response.content or "",
tool_calls=timing_response.tool_calls,
tool_results=timing_tool_results,
prompt_section=timing_response.prompt_section,
)
if timing_action != "continue":
logger.info(
f"{self._runtime.log_prefix} Timing Gate 结束当前回合: "
f"回合={round_index + 1} 动作={timing_action}"
)
break
planner_started_at = time.time()
action_tool_definitions = await self._build_action_tool_definitions()
logger.info(
f"{self._runtime.log_prefix} Timing Gate 结束当前回合: "
f"回合={round_index + 1} 动作={timing_action}"
f"{self._runtime.log_prefix} 规划器开始执行: "
f"回合={round_index + 1} "
f"历史消息数={len(self._runtime._chat_history)} "
f"开始时间={planner_started_at:.3f}"
)
break
planner_started_at = time.time()
action_tool_definitions = await self._build_action_tool_definitions()
logger.info(
f"{self._runtime.log_prefix} 规划器开始执行: "
f"回合={round_index + 1} "
f"历史消息数={len(self._runtime._chat_history)} "
f"开始时间={planner_started_at:.3f}"
)
response = await self._run_interruptible_planner(
tool_definitions=action_tool_definitions,
)
cycle_detail.time_records["planner"] = time.time() - planner_started_at
logger.info(
f"{self._runtime.log_prefix} 规划器执行完成: "
f"回合={round_index + 1} "
f"耗时={cycle_detail.time_records['planner']:.3f}"
)
reasoning_content = response.content or ""
if self._should_replace_reasoning(reasoning_content):
response.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后先输出想法再使用工具"
response.raw_message.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后先输出想法再使用工具"
logger.info(f"{self._runtime.log_prefix} 当前思考与上一轮过于相似,已替换为重新思考提示")
self._last_reasoning_content = reasoning_content
self._runtime._chat_history.append(response.raw_message)
tool_result_summaries: list[str] = []
if response.tool_calls:
tool_started_at = time.time()
should_pause, tool_result_summaries = await self._handle_tool_calls(
response.tool_calls,
response.content or "",
anchor_message,
response = await self._run_interruptible_planner(
tool_definitions=action_tool_definitions,
)
cycle_detail.time_records["tool_calls"] = time.time() - tool_started_at
cycle_detail.time_records["planner"] = time.time() - planner_started_at
logger.info(
f"{self._runtime.log_prefix} 规划器执行完成: "
f"回合={round_index + 1} "
f"耗时={cycle_detail.time_records['planner']:.3f}"
)
reasoning_content = response.content or ""
if self._should_replace_reasoning(reasoning_content):
response.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后先输出想法再使用工具"
response.raw_message.content = "我应该根据我上面思考的内容进行反思,重新思考我下一步的行动,我需要分析当前场景,对话,以及我可以使用的工具,然后先输出想法再使用工具"
logger.info(f"{self._runtime.log_prefix} 当前思考与上一轮过于相似,已替换为重新思考提示")
self._last_reasoning_content = reasoning_content
self._runtime._chat_history.append(response.raw_message)
tool_result_summaries: list[str] = []
if response.tool_calls:
tool_started_at = time.time()
should_pause, tool_result_summaries = await self._handle_tool_calls(
response.tool_calls,
response.content or "",
anchor_message,
)
cycle_detail.time_records["tool_calls"] = time.time() - tool_started_at
self._runtime._render_context_usage_panel(
selected_history_count=response.selected_history_count,
prompt_tokens=response.prompt_tokens,
planner_response=response.content or "",
tool_calls=response.tool_calls,
tool_results=tool_result_summaries,
prompt_section=response.prompt_section,
)
if should_pause:
break
continue
self._runtime._render_context_usage_panel(
selected_history_count=response.selected_history_count,
prompt_tokens=response.prompt_tokens,
planner_response=response.content or "",
tool_calls=response.tool_calls,
tool_results=tool_result_summaries,
prompt_section=response.prompt_section,
)
if should_pause:
if not response.content:
break
continue
self._runtime._render_context_usage_panel(
selected_history_count=response.selected_history_count,
prompt_tokens=response.prompt_tokens,
planner_response=response.content or "",
prompt_section=response.prompt_section,
)
if not response.content:
except ReqAbortException:
interrupted_at = time.time()
logger.info(
f"{self._runtime.log_prefix} 规划器打断成功: "
f"回合={round_index + 1} "
f"开始时间={planner_started_at:.3f} "
f"打断时间={interrupted_at:.3f} "
f"耗时={interrupted_at - planner_started_at:.3f}"
)
break
except ReqAbortException:
interrupted_at = time.time()
logger.info(
f"{self._runtime.log_prefix} 规划器打断成功: "
f"回合={round_index + 1} "
f"开始时间={planner_started_at:.3f} "
f"打断时间={interrupted_at:.3f} "
f"耗时={interrupted_at - planner_started_at:.3f}"
)
break
finally:
self._end_cycle(cycle_detail)
finally:
self._end_cycle(cycle_detail)
finally:
if self._runtime._agent_state == self._runtime._STATE_RUNNING:
self._runtime._agent_state = self._runtime._STATE_STOP
finally:
if self._runtime._agent_state == self._runtime._STATE_RUNNING:
self._runtime._agent_state = self._runtime._STATE_STOP
self._runtime._internal_turn_queue.task_done()
for _ in range(queue_item_done_count):
self._runtime._internal_turn_queue.task_done()
except asyncio.CancelledError:
self._runtime._log_internal_loop_cancelled()
raise
@@ -379,16 +397,42 @@ class MaisakaReasoningEngine:
logger.error(traceback.format_exc())
raise
def _drain_ready_turn_triggers(
self,
queued_trigger: Literal["message", "timeout"],
) -> tuple[bool, bool, int]:
"""合并当前已就绪的 turn 触发信号。"""
queue_item_done_count = 1
message_triggered = queued_trigger == "message"
timeout_triggered = queued_trigger == "timeout"
while True:
try:
next_trigger = self._runtime._internal_turn_queue.get_nowait()
except asyncio.QueueEmpty:
break
queue_item_done_count += 1
if next_trigger == "message":
message_triggered = True
continue
if next_trigger == "timeout":
timeout_triggered = True
continue
if message_triggered:
# 这些消息触发将由当前 turn 接手,旧的事件位不应再污染后续 wait 判定。
self._runtime._new_message_event.clear()
return message_triggered, timeout_triggered, queue_item_done_count
def _get_timeout_anchor_message(self) -> Optional[SessionMessage]:
"""在 wait 超时后复用最近一条真实用户消息作为锚点。"""
if self._runtime.message_cache:
return self._runtime.message_cache[-1]
return None
def _clear_pending_wait_tool_call_id(self) -> None:
"""清理等待状态残留的 wait 工具调用编号。"""
self._runtime._pending_wait_tool_call_id = None
def _build_wait_timeout_message(self) -> ToolResultMessage:
"""构造 wait 超时后的工具结果消息。"""
tool_call_id = self._runtime._pending_wait_tool_call_id or "wait_timeout"

View File

@@ -62,7 +62,7 @@ class MaisakaHeartFlowChatting:
# Keep all original messages for batching and later learning.
self.message_cache: list[SessionMessage] = []
self._last_processed_index = 0
self._internal_turn_queue: asyncio.Queue[Optional[list[SessionMessage]]] = asyncio.Queue()
self._internal_turn_queue: asyncio.Queue[Literal["message", "timeout"]] = asyncio.Queue()
self._mcp_manager: Optional[MCPManager] = None
self._mcp_host_bridge: Optional[MCPHostLLMBridge] = None
@@ -73,6 +73,11 @@ class MaisakaHeartFlowChatting:
self._internal_loop_task: Optional[asyncio.Task] = None
self._loop_task: Optional[asyncio.Task] = None
self._new_message_event = asyncio.Event()
self._message_turn_scheduled = False
self._message_debounce_seconds = 1.0
self._message_debounce_required = False
self._last_message_received_at = 0.0
self._wait_timeout_task: Optional[asyncio.Task[None]] = None
self._max_internal_rounds = global_config.maisaka.max_internal_rounds
self._max_context_size = max(1, int(global_config.chat.max_context_size))
self._agent_state: Literal["running", "wait", "stop"] = self._STATE_STOP
@@ -115,6 +120,9 @@ class MaisakaHeartFlowChatting:
self._running = False
self._new_message_event.set()
self._message_turn_scheduled = False
self._message_debounce_required = False
self._cancel_wait_timeout_task()
while not self._internal_turn_queue.empty():
_ = self._internal_turn_queue.get_nowait()
@@ -150,8 +158,14 @@ class MaisakaHeartFlowChatting:
"""缓存一条新消息并唤醒主循环。"""
if self._running:
self._ensure_background_tasks_running()
self._last_message_received_at = time.time()
self.message_cache.append(message)
self._source_messages_by_id[message.message_id] = message
if self._agent_state == self._STATE_WAIT:
self._cancel_wait_timeout_task()
self._wait_until = None
if self._agent_state == self._STATE_RUNNING:
self._message_debounce_required = True
if self._agent_state == self._STATE_RUNNING and self._planner_interrupt_flag is not None:
logger.info(
f"{self.log_prefix} 收到新消息,发起规划器打断; "
@@ -159,8 +173,6 @@ class MaisakaHeartFlowChatting:
f"时间戳={time.time():.3f}"
)
self._planner_interrupt_flag.set()
if self._agent_state in (self._STATE_WAIT, self._STATE_STOP):
self._agent_state = self._STATE_RUNNING
self._new_message_event.set()
def _ensure_background_tasks_running(self) -> None:
@@ -240,36 +252,13 @@ class MaisakaHeartFlowChatting:
async def _main_loop(self) -> None:
try:
while self._running:
if not self._has_pending_messages():
if self._agent_state == self._STATE_WAIT:
trigger_reason = await self._wait_for_trigger()
else:
self._new_message_event.clear()
await self._new_message_event.wait()
trigger_reason: Literal["message", "timeout", "stop"] = "message" if self._running else "stop"
else:
trigger_reason = "message"
if not self._running:
return
if trigger_reason == "stop":
self._agent_state = self._STATE_STOP
if self._has_pending_messages() and not self._message_turn_scheduled:
self._message_turn_scheduled = True
await self._internal_turn_queue.put("message")
continue
self._new_message_event.clear()
if trigger_reason == "timeout":
# 等待超时后继续下一轮内部思考,但不要重复注入旧消息。
logger.info(f"{self.log_prefix} 等待超时后已投递继续思考触发信号")
await self._internal_turn_queue.put(None)
continue
while self._has_pending_messages():
cached_messages = self._collect_pending_messages()
if not cached_messages:
break
await self._internal_turn_queue.put(cached_messages)
asyncio.create_task(self._trigger_batch_learning(cached_messages))
await self._new_message_event.wait()
except asyncio.CancelledError:
logger.info(f"{self.log_prefix} Maisaka 运行时主循环已取消")
@@ -293,49 +282,78 @@ class MaisakaHeartFlowChatting:
unique_messages.append(message)
self._last_processed_index = len(self.message_cache)
logger.info(
f"{self.log_prefix} 已从消息缓存区[{start_index}:{self._last_processed_index}] "
f"收集 {len(unique_messages)} 条新消息"
)
# logger.info(
# f"{self.log_prefix} 已从消息缓存区[{start_index}:{self._last_processed_index}] "
# f"收集 {len(unique_messages)} 条新消息"
# )
return unique_messages
async def _wait_for_trigger(self) -> Literal["message", "timeout", "stop"]:
"""等待 wait 状态的触发结果"""
if self._agent_state != self._STATE_WAIT:
await self._new_message_event.wait()
return "message"
async def _wait_for_message_quiet_period(self) -> None:
"""等待消息静默窗口结束后,再启动由打断触发的新一轮"""
if not self._message_debounce_required:
return
if self._wait_until is None:
await self._new_message_event.wait()
return "message"
if self._message_debounce_seconds <= 0:
self._message_debounce_required = False
return
timeout = self._wait_until - time.time()
if timeout <= 0:
logger.info(f"{self.log_prefix} Maisaka 等待已超时")
self._agent_state = self._STATE_RUNNING
self._wait_until = None
return "timeout"
while self._running:
elapsed = time.time() - self._last_message_received_at
remaining = self._message_debounce_seconds - elapsed
if remaining <= 0:
break
await asyncio.sleep(remaining)
try:
await asyncio.wait_for(self._new_message_event.wait(), timeout=timeout)
return "message"
except asyncio.TimeoutError:
logger.info(f"{self.log_prefix} Maisaka 等待已超时")
self._agent_state = self._STATE_RUNNING
self._wait_until = None
return "timeout"
self._message_debounce_required = False
def _enter_wait_state(self, seconds: Optional[float] = None, tool_call_id: Optional[str] = None) -> None:
"""切换到等待状态。"""
self._agent_state = self._STATE_WAIT
self._wait_until = None if seconds is None else time.time() + seconds
self._pending_wait_tool_call_id = tool_call_id
self._cancel_wait_timeout_task()
if seconds is not None:
self._wait_timeout_task = asyncio.create_task(
self._schedule_wait_timeout(seconds=seconds, tool_call_id=tool_call_id)
)
# 清理旧的消息触发信号,避免 wait 被历史消息残留事件立即唤醒。
self._new_message_event.clear()
def _enter_stop_state(self) -> None:
"""切换到停止状态。"""
self._agent_state = self._STATE_STOP
self._wait_until = None
self._pending_wait_tool_call_id = None
self._cancel_wait_timeout_task()
def _cancel_wait_timeout_task(self) -> None:
"""取消当前 wait 对应的超时任务。"""
if self._wait_timeout_task is None:
return
self._wait_timeout_task.cancel()
self._wait_timeout_task = None
async def _schedule_wait_timeout(self, seconds: float, tool_call_id: Optional[str]) -> None:
"""在 wait 到期后向内部循环投递 timeout 触发。"""
try:
if seconds > 0:
await asyncio.sleep(seconds)
if not self._running:
return
if self._agent_state != self._STATE_WAIT:
return
if self._pending_wait_tool_call_id != tool_call_id:
return
logger.info(f"{self.log_prefix} Maisaka 等待已超时")
self._agent_state = self._STATE_RUNNING
self._wait_until = None
await self._internal_turn_queue.put("timeout")
except asyncio.CancelledError:
return
finally:
if self._wait_timeout_task is not None and self._pending_wait_tool_call_id == tool_call_id:
self._wait_timeout_task = None
async def _trigger_batch_learning(self, messages: list[SessionMessage]) -> None:
"""按同一批消息触发表达方式、黑话和 knowledge 学习。"""