fix:修复无边界内存增长,对缓存内容进行裁切

This commit is contained in:
DawnARC
2026-05-08 17:09:34 +08:00
parent 04015b2055
commit c78125e6d4
3 changed files with 128 additions and 1 deletions

View File

@@ -1,4 +1,5 @@
import asyncio import asyncio
import time
import traceback import traceback
from typing import Dict from typing import Dict
@@ -9,6 +10,10 @@ from src.maisaka.runtime import MaisakaHeartFlowChatting
logger = get_logger("heartflow") logger = get_logger("heartflow")
HEARTFLOW_RUNTIME_IDLE_TTL_SECONDS = 6 * 60 * 60
HEARTFLOW_RUNTIME_MAX_SESSIONS = 512
HEARTFLOW_RUNTIME_CLEANUP_INTERVAL_SECONDS = 5 * 60
class HeartflowManager: class HeartflowManager:
"""管理 session 级别的 Maisaka 心流实例。""" """管理 session 级别的 Maisaka 心流实例。"""
@@ -16,25 +21,91 @@ class HeartflowManager:
def __init__(self) -> None: def __init__(self) -> None:
self.heartflow_chat_list: Dict[str, MaisakaHeartFlowChatting] = {} self.heartflow_chat_list: Dict[str, MaisakaHeartFlowChatting] = {}
self._chat_create_locks: Dict[str, asyncio.Lock] = {} self._chat_create_locks: Dict[str, asyncio.Lock] = {}
self._last_access_at: Dict[str, float] = {}
self._last_cleanup_at = 0.0
async def _stop_runtime(self, session_id: str, chat: MaisakaHeartFlowChatting) -> None:
try:
await chat.stop()
except Exception as exc:
logger.warning(f"清理心流聊天 {session_id} 时停止 runtime 失败: {exc}", exc_info=True)
def _prune_runtime_caches(self, chat: MaisakaHeartFlowChatting) -> None:
prune_runtime_caches = getattr(chat, "prune_runtime_caches", None)
if callable(prune_runtime_caches):
prune_runtime_caches()
async def cleanup_idle_chats(self, *, now: float | None = None, exclude_session_ids: set[str] | None = None) -> None:
"""清理长期空闲或超过容量的 Maisaka runtime。"""
current_time = time.time() if now is None else now
excluded_session_ids = exclude_session_ids or set()
expire_before = current_time - HEARTFLOW_RUNTIME_IDLE_TTL_SECONDS
session_ids_to_remove = [
session_id
for session_id, accessed_at in self._last_access_at.items()
if accessed_at < expire_before and session_id not in excluded_session_ids
]
active_count_after_idle = len(self.heartflow_chat_list) - len(set(session_ids_to_remove))
if active_count_after_idle > HEARTFLOW_RUNTIME_MAX_SESSIONS:
overflow_count = active_count_after_idle - HEARTFLOW_RUNTIME_MAX_SESSIONS
active_session_ids = [
session_id
for session_id in self.heartflow_chat_list
if session_id not in session_ids_to_remove
and session_id not in excluded_session_ids
]
active_session_ids.sort(key=lambda session_id: self._last_access_at.get(session_id, 0.0))
session_ids_to_remove.extend(active_session_ids[:overflow_count])
self._last_cleanup_at = current_time
removed_count = 0
for session_id in dict.fromkeys(session_ids_to_remove):
chat = self.heartflow_chat_list.pop(session_id, None)
self._chat_create_locks.pop(session_id, None)
self._last_access_at.pop(session_id, None)
if chat is None:
continue
await self._stop_runtime(session_id, chat)
removed_count += 1
if removed_count > 0:
logger.info(f"已清理空闲心流聊天: 数量={removed_count} 剩余={len(self.heartflow_chat_list)}")
async def _cleanup_idle_chats_if_due(self, *, now: float, exclude_session_id: str) -> None:
cleanup_due = now - self._last_cleanup_at >= HEARTFLOW_RUNTIME_CLEANUP_INTERVAL_SECONDS
capacity_exceeded = len(self.heartflow_chat_list) >= HEARTFLOW_RUNTIME_MAX_SESSIONS
if not cleanup_due and not capacity_exceeded:
return
await self.cleanup_idle_chats(now=now, exclude_session_ids={exclude_session_id})
async def get_or_create_heartflow_chat(self, session_id: str) -> MaisakaHeartFlowChatting: async def get_or_create_heartflow_chat(self, session_id: str) -> MaisakaHeartFlowChatting:
"""获取或创建指定会话对应的 Maisaka runtime。""" """获取或创建指定会话对应的 Maisaka runtime。"""
try: try:
current_time = time.time()
if chat := self.heartflow_chat_list.get(session_id): if chat := self.heartflow_chat_list.get(session_id):
self._last_access_at[session_id] = current_time
self._prune_runtime_caches(chat)
await self._cleanup_idle_chats_if_due(now=current_time, exclude_session_id=session_id)
return chat return chat
create_lock = self._chat_create_locks.setdefault(session_id, asyncio.Lock()) create_lock = self._chat_create_locks.setdefault(session_id, asyncio.Lock())
async with create_lock: async with create_lock:
current_time = time.time()
if chat := self.heartflow_chat_list.get(session_id): if chat := self.heartflow_chat_list.get(session_id):
self._last_access_at[session_id] = current_time
self._prune_runtime_caches(chat)
await self._cleanup_idle_chats_if_due(now=current_time, exclude_session_id=session_id)
return chat return chat
chat_session = chat_manager.get_session_by_session_id(session_id) chat_session = chat_manager.get_session_by_session_id(session_id)
if not chat_session: if not chat_session:
raise ValueError(f"未找到 session_id={session_id} 对应的聊天流") raise ValueError(f"未找到 session_id={session_id} 对应的聊天流")
await self._cleanup_idle_chats_if_due(now=current_time, exclude_session_id=session_id)
new_chat = MaisakaHeartFlowChatting(session_id=session_id) new_chat = MaisakaHeartFlowChatting(session_id=session_id)
await new_chat.start() await new_chat.start()
self.heartflow_chat_list[session_id] = new_chat self.heartflow_chat_list[session_id] = new_chat
self._last_access_at[session_id] = current_time
return new_chat return new_chat
except Exception as exc: except Exception as exc:
logger.error(f"创建心流聊天 {session_id} 失败: {exc}", exc_info=True) logger.error(f"创建心流聊天 {session_id} 失败: {exc}", exc_info=True)

View File

@@ -941,6 +941,7 @@ class MaisakaReasoningEngine:
"""结束并记录一轮 Maisaka 思考循环。""" """结束并记录一轮 Maisaka 思考循环。"""
cycle_detail.end_time = time.time() cycle_detail.end_time = time.time()
self._runtime.history_loop.append(cycle_detail) self._runtime.history_loop.append(cycle_detail)
self._runtime.prune_runtime_caches()
self._post_process_chat_history_after_cycle() self._post_process_chat_history_after_cycle()
timer_strings = [ timer_strings = [

View File

@@ -56,6 +56,9 @@ from .tool_provider import MaisakaBuiltinToolProvider
logger = get_logger("maisaka_runtime") logger = get_logger("maisaka_runtime")
MAX_INTERNAL_ROUNDS = 10 MAX_INTERNAL_ROUNDS = 10
MESSAGE_CACHE_MIN_RETAINED = 200
MESSAGE_CACHE_CONTEXT_MULTIPLIER = 4
HISTORY_LOOP_MAX_RETAINED = 256
class MaisakaHeartFlowChatting: class MaisakaHeartFlowChatting:
@@ -82,7 +85,7 @@ class MaisakaHeartFlowChatting:
self._chat_history: list[LLMContextMessage] = [] self._chat_history: list[LLMContextMessage] = []
self.history_loop: list[CycleDetail] = [] self.history_loop: list[CycleDetail] = []
# Keep all original messages for batching and later learning. # Keep recent original messages for batching, tools, and later learning.
self.message_cache: list[SessionMessage] = [] self.message_cache: list[SessionMessage] = []
self._last_processed_index = 0 self._last_processed_index = 0
self._internal_turn_queue: asyncio.Queue[Literal["message", "timeout"]] = asyncio.Queue() self._internal_turn_queue: asyncio.Queue[Literal["message", "timeout"]] = asyncio.Queue()
@@ -111,6 +114,10 @@ class MaisakaHeartFlowChatting:
else global_config.chat.max_private_context_size else global_config.chat.max_private_context_size
) )
self._max_context_size = max(1, int(configured_context_size)) self._max_context_size = max(1, int(configured_context_size))
self._message_cache_max_size = max(
MESSAGE_CACHE_MIN_RETAINED,
self._max_context_size * MESSAGE_CACHE_CONTEXT_MULTIPLIER,
)
self._agent_state: Literal["running", "wait", "stop"] = self._STATE_STOP self._agent_state: Literal["running", "wait", "stop"] = self._STATE_STOP
self._pending_wait_tool_call_id: Optional[str] = None self._pending_wait_tool_call_id: Optional[str] = None
self._force_next_timing_continue = False self._force_next_timing_continue = False
@@ -940,6 +947,54 @@ class MaisakaHeartFlowChatting:
def _has_pending_messages(self) -> bool: def _has_pending_messages(self) -> bool:
return self._last_processed_index < len(self.message_cache) return self._last_processed_index < len(self.message_cache)
def _get_expression_learner_processed_index(self) -> int:
learner_index = getattr(self._expression_learner, "_last_processed_index", self._last_processed_index)
try:
return max(0, int(learner_index))
except (TypeError, ValueError):
return self._last_processed_index
def _adjust_expression_learner_processed_index(self, removed_count: int) -> None:
if not hasattr(self._expression_learner, "_last_processed_index"):
return
learner_index = self._get_expression_learner_processed_index()
setattr(self._expression_learner, "_last_processed_index", max(0, learner_index - removed_count))
def _prune_processed_message_cache(self) -> None:
"""Trim old processed messages while preserving pending and learning windows."""
max_size = max(1, int(getattr(self, "_message_cache_max_size", MESSAGE_CACHE_MIN_RETAINED)))
overflow_count = len(self.message_cache) - max_size
if overflow_count <= 0:
return
processed_boundary = self._last_processed_index
removable_count = min(overflow_count, processed_boundary)
if removable_count <= 0:
return
removed_messages = self.message_cache[:removable_count]
removed_message_ids = {message.message_id for message in removed_messages}
del self.message_cache[:removable_count]
self._last_processed_index = max(0, self._last_processed_index - removable_count)
self._adjust_expression_learner_processed_index(removable_count)
retained_message_ids = {message.message_id for message in self.message_cache}
for message_id in removed_message_ids:
if message_id not in retained_message_ids:
self._message_received_at_by_id.pop(message_id, None)
self._source_messages_by_id.pop(message_id, None)
logger.debug(
f"{self.log_prefix} 已裁剪 Maisaka 消息缓存: "
f"移除={removable_count} 剩余={len(self.message_cache)} 上限={max_size}"
)
def prune_runtime_caches(self) -> None:
"""Apply bounded retention to runtime-only in-memory histories."""
self._prune_processed_message_cache()
if len(self.history_loop) > HISTORY_LOOP_MAX_RETAINED:
del self.history_loop[: len(self.history_loop) - HISTORY_LOOP_MAX_RETAINED]
def _schedule_message_turn(self) -> None: def _schedule_message_turn(self) -> None:
"""为当前待处理消息安排一次内部 turn。""" """为当前待处理消息安排一次内部 turn。"""
if self._agent_state == self._STATE_WAIT: if self._agent_state == self._STATE_WAIT: