feat:合并timing和plan展示,回复频率控制

This commit is contained in:
SengokuCola
2026-04-07 20:26:07 +08:00
parent 297b1bf5e3
commit f058bc3189
12 changed files with 409 additions and 1108 deletions

View File

@@ -205,7 +205,7 @@ async def handle_tool(
else:
for sent_message in sent_messages:
tool_ctx.append_sent_message_to_chat_history(sent_message)
tool_ctx.runtime._clear_force_continue_until_reply()
tool_ctx.runtime._record_reply_sent()
return tool_ctx.build_success_result(
invocation.tool_name,
"回复已生成并发送。",

View File

@@ -163,6 +163,103 @@ def _serialize_tool_results(tools: List[Dict[str, Any]]) -> List[Dict[str, Any]]
return serialized_tools
def _serialize_request_block(
messages: Optional[List[Any]],
selected_history_count: Optional[int],
tool_count: Optional[int],
) -> Optional[Dict[str, Any]]:
"""标准化请求区块。"""
if messages is None and selected_history_count is None and tool_count is None:
return None
return {
"messages": _serialize_messages(list(messages or [])),
"selected_history_count": int(selected_history_count or 0),
"tool_count": int(tool_count or 0),
}
def _serialize_planner_block(
content: Optional[str],
tool_calls: Optional[List[Any]],
prompt_tokens: Optional[int],
completion_tokens: Optional[int],
total_tokens: Optional[int],
duration_ms: Optional[float],
) -> Optional[Dict[str, Any]]:
"""标准化 planner 结果区块。"""
if (
content is None
and tool_calls is None
and prompt_tokens is None
and completion_tokens is None
and total_tokens is None
and duration_ms is None
):
return None
return {
"content": content,
"tool_calls": _serialize_tool_calls_from_objects(list(tool_calls or [])),
"prompt_tokens": int(prompt_tokens or 0),
"completion_tokens": int(completion_tokens or 0),
"total_tokens": int(total_tokens or 0),
"duration_ms": float(duration_ms or 0.0),
}
def _serialize_timing_gate_block(
*,
request_messages: Optional[List[Any]],
selected_history_count: Optional[int],
tool_count: Optional[int],
action: Optional[str],
content: Optional[str],
tool_calls: Optional[List[Any]],
tool_results: Optional[List[str]],
prompt_tokens: Optional[int],
completion_tokens: Optional[int],
total_tokens: Optional[int],
duration_ms: Optional[float],
) -> Optional[Dict[str, Any]]:
"""标准化 Timing Gate 结果区块。"""
if (
request_messages is None
and selected_history_count is None
and tool_count is None
and action is None
and content is None
and tool_calls is None
and tool_results is None
and prompt_tokens is None
and completion_tokens is None
and total_tokens is None
and duration_ms is None
):
return None
return {
"request": _serialize_request_block(
request_messages,
selected_history_count,
tool_count,
),
"result": {
"action": action,
"content": content,
"tool_calls": _serialize_tool_calls_from_objects(list(tool_calls or [])),
"tool_results": _normalize_payload_value(list(tool_results or [])),
"prompt_tokens": int(prompt_tokens or 0),
"completion_tokens": int(completion_tokens or 0),
"total_tokens": int(total_tokens or 0),
"duration_ms": float(duration_ms or 0.0),
},
}
async def _broadcast(event: str, data: Dict[str, Any]) -> None:
"""通过统一 WebSocket 管理器向监控主题广播事件。"""
@@ -268,16 +365,27 @@ async def emit_planner_finalized(
*,
session_id: str,
cycle_id: int,
request_messages: List[Any],
selected_history_count: int,
tool_count: int,
timing_request_messages: Optional[List[Any]],
timing_selected_history_count: Optional[int],
timing_tool_count: Optional[int],
timing_action: Optional[str],
timing_content: Optional[str],
timing_tool_calls: Optional[List[Any]],
timing_tool_results: Optional[List[str]],
timing_prompt_tokens: Optional[int],
timing_completion_tokens: Optional[int],
timing_total_tokens: Optional[int],
timing_duration_ms: Optional[float],
planner_request_messages: Optional[List[Any]],
planner_selected_history_count: Optional[int],
planner_tool_count: Optional[int],
planner_content: Optional[str],
planner_tool_calls: List[Any],
prompt_tokens: int,
completion_tokens: int,
total_tokens: int,
duration_ms: float,
tools: List[Dict[str, Any]],
planner_tool_calls: Optional[List[Any]],
planner_prompt_tokens: Optional[int],
planner_completion_tokens: Optional[int],
planner_total_tokens: Optional[int],
planner_duration_ms: Optional[float],
tools: Optional[List[Dict[str, Any]]],
time_records: Dict[str, float],
agent_state: str,
) -> None:
@@ -287,20 +395,33 @@ async def emit_planner_finalized(
"session_id": session_id,
"cycle_id": cycle_id,
"timestamp": time.time(),
"request": {
"messages": _serialize_messages(request_messages),
"selected_history_count": selected_history_count,
"tool_count": tool_count,
},
"planner": {
"content": planner_content,
"tool_calls": _serialize_tool_calls_from_objects(planner_tool_calls),
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"duration_ms": duration_ms,
},
"tools": _serialize_tool_results(tools),
"timing_gate": _serialize_timing_gate_block(
request_messages=timing_request_messages,
selected_history_count=timing_selected_history_count,
tool_count=timing_tool_count,
action=timing_action,
content=timing_content,
tool_calls=timing_tool_calls,
tool_results=timing_tool_results,
prompt_tokens=timing_prompt_tokens,
completion_tokens=timing_completion_tokens,
total_tokens=timing_total_tokens,
duration_ms=timing_duration_ms,
),
"request": _serialize_request_block(
planner_request_messages,
planner_selected_history_count,
planner_tool_count,
),
"planner": _serialize_planner_block(
planner_content,
planner_tool_calls,
planner_prompt_tokens,
planner_completion_tokens,
planner_total_tokens,
planner_duration_ms,
),
"tools": _serialize_tool_results(list(tools or [])),
"final_state": {
"time_records": _normalize_payload_value(time_records),
"agent_state": agent_state,

View File

@@ -347,6 +347,10 @@ class MaisakaReasoningEngine:
)
planner_started_at = 0.0
planner_duration_ms = 0.0
timing_duration_ms = 0.0
timing_action: Optional[str] = None
timing_response: Optional[ChatResponse] = None
timing_tool_results: Optional[list[str]] = None
response: Optional[ChatResponse] = None
tool_monitor_results: list[dict[str, Any]] = []
try:
@@ -458,23 +462,43 @@ class MaisakaReasoningEngine:
break
finally:
completed_cycle = self._end_cycle(cycle_detail)
if response is not None:
await emit_planner_finalized(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
request_messages=response.request_messages,
selected_history_count=response.selected_history_count,
tool_count=response.tool_count,
planner_content=response.content,
planner_tool_calls=response.tool_calls,
prompt_tokens=response.prompt_tokens,
completion_tokens=response.completion_tokens,
total_tokens=response.total_tokens,
duration_ms=planner_duration_ms,
tools=tool_monitor_results,
time_records=dict(completed_cycle.time_records),
agent_state=self._runtime._agent_state,
)
await emit_planner_finalized(
session_id=self._runtime.session_id,
cycle_id=cycle_detail.cycle_id,
timing_request_messages=(
timing_response.request_messages if timing_response is not None else None
),
timing_selected_history_count=(
timing_response.selected_history_count if timing_response is not None else None
),
timing_tool_count=timing_response.tool_count if timing_response is not None else None,
timing_action=timing_action,
timing_content=timing_response.content if timing_response is not None else None,
timing_tool_calls=timing_response.tool_calls if timing_response is not None else None,
timing_tool_results=timing_tool_results,
timing_prompt_tokens=timing_response.prompt_tokens if timing_response is not None else None,
timing_completion_tokens=(
timing_response.completion_tokens if timing_response is not None else None
),
timing_total_tokens=timing_response.total_tokens if timing_response is not None else None,
timing_duration_ms=timing_duration_ms if timing_response is not None else None,
planner_request_messages=response.request_messages if response is not None else None,
planner_selected_history_count=(
response.selected_history_count if response is not None else None
),
planner_tool_count=response.tool_count if response is not None else None,
planner_content=response.content if response is not None else None,
planner_tool_calls=response.tool_calls if response is not None else None,
planner_prompt_tokens=response.prompt_tokens if response is not None else None,
planner_completion_tokens=(
response.completion_tokens if response is not None else None
),
planner_total_tokens=response.total_tokens if response is not None else None,
planner_duration_ms=planner_duration_ms if response is not None else None,
tools=tool_monitor_results,
time_records=dict(completed_cycle.time_records),
agent_state=self._runtime._agent_state,
)
finally:
if self._runtime._agent_state == self._runtime._STATE_RUNNING:
self._runtime._agent_state = self._runtime._STATE_STOP

View File

@@ -1,5 +1,7 @@
"""Maisaka 非 CLI 运行时。"""
from collections import deque
from math import ceil
from typing import Any, Literal, Optional, Sequence
import asyncio
@@ -17,7 +19,7 @@ from src.chat.message_receive.message import SessionMessage
from src.chat.utils.utils import is_mentioned_bot_in_message
from src.common.data_models.mai_message_data_model import GroupInfo, UserInfo
from src.common.logger import get_logger
from src.common.utils.utils_config import ExpressionConfigUtils
from src.common.utils.utils_config import ChatConfigUtils, ExpressionConfigUtils
from src.config.config import global_config
from src.core.tooling import ToolRegistry
from src.learners.expression_learner import ExpressionLearner
@@ -77,9 +79,14 @@ class MaisakaHeartFlowChatting:
self._cycle_counter = 0
self._internal_loop_task: Optional[asyncio.Task] = None
self._message_turn_scheduled = False
self._deferred_message_turn_task: Optional[asyncio.Task[None]] = None
self._message_debounce_seconds = 1.0
self._message_debounce_required = False
self._message_received_at_by_id: dict[str, float] = {}
self._last_message_received_at = 0.0
self._talk_frequency_adjust = 1.0
self._reply_latency_measurement_started_at: Optional[float] = None
self._recent_reply_latencies: deque[tuple[float, float]] = deque()
self._wait_timeout_task: Optional[asyncio.Task[None]] = None
self._max_internal_rounds = MAX_INTERNAL_ROUNDS
self._max_context_size = max(1, int(global_config.chat.max_context_size))
@@ -132,6 +139,7 @@ class MaisakaHeartFlowChatting:
self._running = False
self._message_turn_scheduled = False
self._message_debounce_required = False
self._cancel_deferred_message_turn_task()
self._cancel_wait_timeout_task()
while not self._internal_turn_queue.empty():
_ = self._internal_turn_queue.get_nowait()
@@ -152,16 +160,19 @@ class MaisakaHeartFlowChatting:
logger.info(f"{self.log_prefix} Maisaka 运行时已停止")
def adjust_talk_frequency(self, frequency: float) -> None:
"""兼容现有管理器接口的占位方法"""
_ = frequency
"""调整当前会话的回复频率倍率"""
self._talk_frequency_adjust = max(0.01, float(frequency))
self._schedule_message_turn()
async def register_message(self, message: SessionMessage) -> None:
"""缓存一条新消息并唤醒主循环。"""
if self._running:
self._ensure_background_tasks_running()
self._last_message_received_at = time.time()
received_at = time.time()
self._last_message_received_at = received_at
self._update_message_trigger_state(message)
self.message_cache.append(message)
self._message_received_at_by_id[message.message_id] = received_at
self._source_messages_by_id[message.message_id] = message
if self._agent_state == self._STATE_WAIT:
self._cancel_wait_timeout_task()
@@ -197,6 +208,93 @@ class MaisakaHeartFlowChatting:
if self._running:
self._schedule_message_turn()
def _get_effective_reply_frequency(self) -> float:
"""返回当前会话生效的回复频率。"""
talk_value = max(0.01, float(ChatConfigUtils.get_talk_value(self.session_id)))
return max(0.01, talk_value * self._talk_frequency_adjust)
def _get_message_trigger_threshold(self) -> int:
"""根据回复频率折算出触发一轮循环所需的消息数。"""
effective_frequency = min(1.0, self._get_effective_reply_frequency())
return max(1, int(ceil(1.0 / effective_frequency)))
def _get_pending_message_count(self) -> int:
"""统计当前尚未进入内部循环的新消息数量。"""
pending_messages = self.message_cache[self._last_processed_index :]
if not pending_messages:
return 0
seen_message_ids: set[str] = set()
for message in pending_messages:
seen_message_ids.add(message.message_id)
return len(seen_message_ids)
def _prune_recent_reply_latencies(self, now: Optional[float] = None) -> None:
"""仅保留最近 10 分钟内的回复时长记录。"""
current_time = time.time() if now is None else now
expire_before = current_time - 600.0
while self._recent_reply_latencies and self._recent_reply_latencies[0][0] < expire_before:
self._recent_reply_latencies.popleft()
def _get_recent_average_reply_latency(self) -> Optional[float]:
"""获取最近 10 分钟平均消息回复时长。"""
self._prune_recent_reply_latencies()
if not self._recent_reply_latencies:
return None
total_duration = sum(duration for _, duration in self._recent_reply_latencies)
return total_duration / len(self._recent_reply_latencies)
def _record_reply_sent(self) -> None:
"""在成功发送 reply 后记录本轮消息回复时长。"""
self._clear_force_continue_until_reply()
if self._reply_latency_measurement_started_at is None:
return
reply_duration = max(0.0, time.time() - self._reply_latency_measurement_started_at)
self._reply_latency_measurement_started_at = None
self._recent_reply_latencies.append((time.time(), reply_duration))
self._prune_recent_reply_latencies()
logger.info(
f"{self.log_prefix} 已记录消息回复时长: {reply_duration:.2f}"
f"最近10分钟样本数={len(self._recent_reply_latencies)}"
)
def _should_trigger_message_turn_by_idle_compensation(
self,
*,
pending_count: int,
trigger_threshold: int,
) -> bool:
"""在新消息不足阈值时,按空窗时间折算补齐触发条件。"""
average_reply_latency = self._get_recent_average_reply_latency()
if average_reply_latency is None or average_reply_latency <= 0:
return False
idle_seconds = max(0.0, time.time() - self._last_message_received_at)
equivalent_message_count = pending_count + idle_seconds / average_reply_latency
return equivalent_message_count >= trigger_threshold
def _cancel_deferred_message_turn_task(self) -> None:
"""取消等待空窗补偿触发的延迟任务。"""
if self._deferred_message_turn_task is None:
return
self._deferred_message_turn_task.cancel()
self._deferred_message_turn_task = None
async def _schedule_deferred_message_turn(self, delay_seconds: float) -> None:
"""在预计满足空窗补偿条件时再次检查是否应触发循环。"""
try:
if delay_seconds > 0:
await asyncio.sleep(delay_seconds)
if not self._running:
return
self._schedule_message_turn()
except asyncio.CancelledError:
return
finally:
self._deferred_message_turn_task = None
def _update_message_trigger_state(self, message: SessionMessage) -> None:
"""补齐消息中的 @/提及 标记,并在命中时启用强制 continue。"""
@@ -356,8 +454,30 @@ class MaisakaHeartFlowChatting:
if not self._has_pending_messages() or self._message_turn_scheduled:
return
self._message_turn_scheduled = True
self._internal_turn_queue.put_nowait("message")
pending_count = self._get_pending_message_count()
if pending_count <= 0:
return
trigger_threshold = self._get_message_trigger_threshold()
if pending_count >= trigger_threshold or self._should_trigger_message_turn_by_idle_compensation(
pending_count=pending_count,
trigger_threshold=trigger_threshold,
):
self._cancel_deferred_message_turn_task()
self._message_turn_scheduled = True
self._internal_turn_queue.put_nowait("message")
return
average_reply_latency = self._get_recent_average_reply_latency()
if average_reply_latency is None or average_reply_latency <= 0:
return
idle_seconds = max(0.0, time.time() - self._last_message_received_at)
delay_seconds = max(0.0, (trigger_threshold - pending_count) * average_reply_latency - idle_seconds)
self._cancel_deferred_message_turn_task()
self._deferred_message_turn_task = asyncio.create_task(
self._schedule_deferred_message_turn(delay_seconds)
)
def _collect_pending_messages(self) -> list[SessionMessage]:
"""从消息缓存中收集一批尚未处理的消息。"""
@@ -380,6 +500,13 @@ class MaisakaHeartFlowChatting:
# f"{self.log_prefix} 已从消息缓存区[{start_index}:{self._last_processed_index}] "
# f"收集 {len(unique_messages)} 条新消息"
# )
if unique_messages and self._reply_latency_measurement_started_at is None:
self._reply_latency_measurement_started_at = min(
self._message_received_at_by_id.get(message.message_id, self._last_message_received_at)
for message in unique_messages
)
for message in unique_messages:
self._message_received_at_by_id.pop(message.message_id, None)
return unique_messages
async def _wait_for_message_quiet_period(self) -> None: