diff --git a/pytests/test_plugin_runtime.py b/pytests/test_plugin_runtime.py index aafd3d0f..823777f3 100644 --- a/pytests/test_plugin_runtime.py +++ b/pytests/test_plugin_runtime.py @@ -1259,25 +1259,24 @@ class TestSupervisor: assert supervisor.component_registry.get_component("plugin_a.handler") is not None @pytest.mark.asyncio - async def test_attach_runner_output_tasks_drains_streams(self): + async def test_attach_stderr_drain_drains_stream(self): + """_attach_stderr_drain 为 stderr 创建排空任务,读完后任务自动完成。""" from src.plugin_runtime.host.supervisor import PluginSupervisor supervisor = PluginSupervisor(plugin_dirs=[]) - stdout = asyncio.StreamReader() - stdout.feed_data(b"hello stdout\n") - stdout.feed_eof() - stderr = asyncio.StreamReader() - stderr.feed_data(b"hello stderr\n") + stderr.feed_data(b"fatal startup error\n") stderr.feed_eof() - process = SimpleNamespace(pid=99, stdout=stdout, stderr=stderr) - supervisor._attach_runner_output_tasks(process) + # stdout=None 模拟新架构(不再捕获 stdout) + process = SimpleNamespace(pid=99, stdout=None, stderr=stderr) + supervisor._attach_stderr_drain(process) + # 给 drain task 足够时间消费完数据 await asyncio.sleep(0.05) - assert not supervisor._runner_output_tasks + assert supervisor._stderr_drain_task is None or supervisor._stderr_drain_task.done() class TestIntegration: diff --git a/src/plugin_runtime/host/supervisor.py b/src/plugin_runtime/host/supervisor.py index 5f9f78e6..d5d13b28 100644 --- a/src/plugin_runtime/host/supervisor.py +++ b/src/plugin_runtime/host/supervisor.py @@ -7,6 +7,7 @@ 4. 优雅关停 """ +import logging as stdlib_logging from typing import Any, Dict, List, Optional, Tuple import asyncio @@ -24,6 +25,7 @@ from src.plugin_runtime.host.workflow_executor import WorkflowExecutor, Workflow from src.plugin_runtime.protocol.envelope import ( Envelope, HealthPayload, + LogBatchPayload, RegisterComponentsPayload, ShutdownPayload, ) @@ -33,6 +35,54 @@ from src.plugin_runtime.transport.factory import create_transport_server logger = get_logger("plugin_runtime.host.supervisor") +# ─── 日志桥 ────────────────────────────────────────────────────── + +class RunnerLogBridge: + """将 Runner 进程上报的批量日志重放到主进程的 Logger 中。 + + Runner 通过 ``runner.log_batch`` IPC 事件批量到达。 + 每条 LogEntry 被重建为一个真实的 :class:`logging.LogRecord` 并直接 + 调用 ``logging.getLogger(entry.logger_name).handle(record)``, + 从而接入主进程已配置好的 structlog Handler 链。 + """ + + async def handle_log_batch(self, envelope: Envelope) -> Envelope: + """IPC 事件处理器:解析批量日志并重放到主进程 Logger。 + + Args: + envelope: 方法名为 ``runner.log_batch`` 的 IPC 事件信封。 + + Returns: + 空响应信封(事件模式下将被忽略)。 + """ + try: + batch = LogBatchPayload.model_validate(envelope.payload) + except Exception as exc: + return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(exc)) + + for entry in batch.entries: + # 重建一个与原始日志尽量相符的 LogRecord + record = stdlib_logging.LogRecord( + name=entry.logger_name, + level=entry.level, + pathname="", + lineno=0, + msg=entry.message, + args=(), + exc_info=None, + ) + record.created = entry.timestamp_ms / 1000.0 + record.msecs = entry.timestamp_ms % 1000 + if entry.exception_text: + record.exc_text = entry.exception_text + + stdlib_logging.getLogger(entry.logger_name).handle(record) + + return envelope.make_response( + payload={"accepted": True, "count": len(batch.entries)} + ) + + class PluginSupervisor: """插件 Supervisor @@ -76,9 +126,13 @@ class PluginSupervisor: # 后台任务 self._health_task: Optional[asyncio.Task] = None - self._runner_output_tasks: List[asyncio.Task] = [] + # Runner stderr 流排空任务(仅保留 stderr,用于 IPC 建立前的启动日志倒空、致命错误输出等场景) + self._stderr_drain_task: Optional[asyncio.Task] = None self._running = False + # Runner 日志桥(将 Runner 上报的批量日志重放到主进程 Logger) + self._log_bridge: RunnerLogBridge = RunnerLogBridge() + # 注册内部 RPC 方法 self._register_internal_methods() @@ -266,6 +320,8 @@ class PluginSupervisor: self._rpc_server.register_method("cap.request", self._capability_service.handle_capability_request) # 插件注册 self._rpc_server.register_method("plugin.register_components", self._handle_register_components) + # Runner 日志批量上报 + self._rpc_server.register_method("runner.log_batch", self._log_bridge.handle_log_batch) async def _handle_register_components(self, envelope: Envelope) -> Envelope: """处理插件组件注册请求""" @@ -319,11 +375,13 @@ class PluginSupervisor: self._runner_process = await asyncio.create_subprocess_exec( sys.executable, "-m", runner_module, env=env, - stdout=asyncio.subprocess.PIPE, + # stdout 不捕获:Runner 的日志均通过 IPC 传㛹(RunnerIPCLogHandler) + stdout=None, + # stderr 捕获为 PIPE,仅用于 IPC 建立前的进程级致命错误输出 stderr=asyncio.subprocess.PIPE, ) - self._attach_runner_output_tasks(self._runner_process) + self._attach_stderr_drain(self._runner_process) self._runner_generation = self._rpc_server.runner_generation logger.info(f"Runner 子进程已启动: pid={self._runner_process.pid}, generation={self._runner_generation}") @@ -357,7 +415,7 @@ class PluginSupervisor: self._runner_process.kill() await self._runner_process.wait() - await self._cleanup_runner_output_tasks() + await self._cleanup_stderr_drain() async def _health_check_loop(self) -> None: """周期性健康检查 + 崩溃自动重启""" @@ -437,30 +495,40 @@ class PluginSupervisor: components=[c.model_dump() for c in reg.components], ) - def _attach_runner_output_tasks(self, process: asyncio.subprocess.Process) -> None: - """为 Runner 输出流创建排空任务,避免 PIPE 填满阻塞子进程。""" - streams = ( - (process.stdout, "stdout"), - (process.stderr, "stderr"), - ) - for stream, stream_name in streams: - if stream is None: - continue - task = asyncio.create_task(self._drain_runner_stream(stream, stream_name, process.pid)) - self._runner_output_tasks.append(task) - task.add_done_callback( - lambda done_task: self._runner_output_tasks.remove(done_task) - if done_task in self._runner_output_tasks - else None - ) + def _attach_stderr_drain(self, process: asyncio.subprocess.Process) -> None: + """为 Runner stderr 创建排空任务,捕获 IPC 建立前的进程级错误输出。 - async def _drain_runner_stream( + stderr 中的内容通常是: + - Runner 启动早期(握手完成之前)的日志 + - 进程级致命错误(ImportError、SyntaxError等) + - 异常进程退出前的最后输出 + + 握手成功后,插件的所有日志均经由 RunnerIPCLogHandler 通过 IPC 传输。 + """ + if process.stderr is None: + return + task = asyncio.create_task( + self._drain_runner_stderr(process.stderr, process.pid), + name=f"runner_stderr_drain:{process.pid}", + ) + self._stderr_drain_task = task + task.add_done_callback( + lambda done_task: None + if self._stderr_drain_task is not done_task + else setattr(self, "_stderr_drain_task", None) + ) + + async def _drain_runner_stderr( self, stream: asyncio.StreamReader, - stream_name: str, pid: int, ) -> None: - """持续消费 Runner 输出,避免 PIPE 回压导致子进程阻塞。""" + """持续读取 Runner stderr 并转发到 Host Logger,防止 PIPE 锡死子进程。 + + Args: + stream: Runner 子进程的 stderr 流。 + pid: 子进程 PID,仅用于日志上下文。 + """ try: while True: line = await stream.readline() @@ -468,22 +536,25 @@ class PluginSupervisor: break message = line.decode(errors="replace").rstrip() if message: - logger.debug(f"[runner:{pid}:{stream_name}] {message}") + # 将 stderr 输出以 WARNING 级展示: + # 如果 Runner 正常运行,此流应当无输出; + # 有输出说明进程级错误发生,需要出现在主进程日志中 + logger.warning(f"[runner:{pid}:stderr] {message}") except asyncio.CancelledError: raise - except Exception as e: - logger.debug(f"读取 Runner {stream_name} 失败: {e}") + except Exception as exc: + logger.debug(f"读取 Runner stderr 失败 (pid={pid}): {exc}") - async def _cleanup_runner_output_tasks(self) -> None: - """等待并清理 Runner 输出任务。""" - tasks = list(self._runner_output_tasks) - self._runner_output_tasks.clear() - for task in tasks: - if not task.done(): - task.cancel() - if tasks: - with contextlib.suppress(Exception): - await asyncio.gather(*tasks, return_exceptions=True) + async def _cleanup_stderr_drain(self) -> None: + """等待并取消 stderr 排空任务。""" + if self._stderr_drain_task is None: + return + task = self._stderr_drain_task + self._stderr_drain_task = None + if not task.done(): + task.cancel() + with contextlib.suppress(Exception): + await asyncio.gather(task, return_exceptions=True) @staticmethod async def _terminate_process( diff --git a/src/plugin_runtime/integration.py b/src/plugin_runtime/integration.py index 0c1b2268..3d031cef 100644 --- a/src/plugin_runtime/integration.py +++ b/src/plugin_runtime/integration.py @@ -270,8 +270,9 @@ class PluginRuntimeManager: # ── knowledge.* ─────────────────────────────────── cap_service.register_capability("knowledge.search", self._cap_knowledge_search) - # ── logging.* ───────────────────────────────────── - cap_service.register_capability("logging.log", self._cap_logging_log) + # 注意:logging.* 能力已移除——Runner 端通过 RunnerIPCLogHandler 将 stdlib + # logging 日志批量发送到 Host,由 RunnerLogBridge 重放到主进程 Logger, + # 不再需要单独的 logging.log RPC 能力。 logger.debug("已注册全部主程序能力实现") @@ -1520,26 +1521,6 @@ class PluginRuntimeManager: logger.error(f"[cap.knowledge.search] 执行失败: {e}", exc_info=True) return {"success": False, "error": str(e)} - # ═════════════════════════════════════════════════════════ - # logging.* 能力实现 - # ═════════════════════════════════════════════════════════ - - @staticmethod - async def _cap_logging_log(plugin_id: str, capability: str, args: Dict[str, Any]) -> Any: - """插件日志记录 - - args: level?, message - """ - level: str = args.get("level", "info").lower() - message: str = args.get("message", "") - if not message: - return {"success": False, "error": "缺少必要参数 message"} - - plugin_logger = get_logger(f"plugin.{plugin_id}") - log_fn = getattr(plugin_logger, level, plugin_logger.info) - log_fn(message) - return {"success": True} - # ─── 单例 ────────────────────────────────────────────────── diff --git a/src/plugin_runtime/protocol/envelope.py b/src/plugin_runtime/protocol/envelope.py index 6eabca33..40d720f6 100644 --- a/src/plugin_runtime/protocol/envelope.py +++ b/src/plugin_runtime/protocol/envelope.py @@ -9,6 +9,7 @@ from pydantic import BaseModel, Field from typing import Any, Dict, List, Optional import time +import logging as stdlib_logging # ─── 协议常量 ────────────────────────────────────────────────────── @@ -184,3 +185,42 @@ class ShutdownPayload(BaseModel): """plugin.shutdown / plugin.prepare_shutdown payload""" reason: str = Field(default="normal", description="关停原因") drain_timeout_ms: int = Field(default=5000, description="排空超时(ms)") + + +# ─── 日志传输 ────────────────────────────────────────────────────── + +class LogEntry(BaseModel): + """单条日志记录(Runner → Host 传输格式)""" + + timestamp_ms: int = Field( + description="日志时间戳,Unix epoch 毫秒", + ) + level: int = Field( + description=( + "stdlib logging 整数级别:" + " 10=DEBUG, 20=INFO, 30=WARNING, 40=ERROR, 50=CRITICAL" + ), + ) + logger_name: str = Field( + description="Logger 名称,如 plugin.my_plugin.submodule", + ) + message: str = Field( + description="经 Formatter 格式化后的完整日志消息(含 exc_info 文本)", + ) + exception_text: str = Field( + default="", + description="原始异常摘要(exc_text),供结构化消费;已嵌入 message 中", + ) + + @property + def levelname(self) -> str: + """返回对应的 stdlib logging 级别名称(如 'INFO')。""" + return stdlib_logging.getLevelName(self.level) + + +class LogBatchPayload(BaseModel): + """runner.log_batch 事件 payload:Runner 端向 Host 批量推送日志记录""" + + entries: List[LogEntry] = Field( + description="本批次日志记录列表,按时间升序排列", + ) diff --git a/src/plugin_runtime/runner/log_handler.py b/src/plugin_runtime/runner/log_handler.py new file mode 100644 index 00000000..26c8a02d --- /dev/null +++ b/src/plugin_runtime/runner/log_handler.py @@ -0,0 +1,160 @@ +"""Runner 端 IPC 日志 Handler + +将 Runner 进程内所有 stdlib logging 日志通过 IPC 批量发送到 Host, +Host 端将其重放到主进程的 Logger(以 plugin. 为名)中,从而 +统一在主进程的结构化日志体系中显示插件日志。 + +架构: + Runner 进程 + └── logging.root ← RunnerIPCLogHandler(本文件) + └── emit() 非阻塞入缓冲 → 后台刷新协程批量发送 + └── rpc_client.send_event("runner.log_batch", ...) + └── IPC socket → Host + └── RunnerLogBridge.handle_log_batch() + └── logging.getLogger("plugin.").handle(record) + +设计原则: +- emit() 必须是非阻塞的,不得在热路径上 await 任何 IPC 调用 +- 使用 collections.deque(maxlen=QUEUE_MAX) 作为有界环形缓冲: + 满时最旧条目自动被覆盖(不区分级别,为实现简单接受此折损) +- CPython 的 deque.append / deque.popleft 在 GIL 保护下是线程安全的, + 适合单消费后台协程 + 多生产线程的使用场景 +- 后台刷新协程每 FLUSH_INTERVAL_SEC 秒或 FLUSH_BATCH_SIZE 条后批量发送 +- IPC 发送失败时静默忽略;stderr fallback 由 supervisor 的 drain task 覆盖 +""" +from __future__ import annotations + +import asyncio +import collections +import contextlib +import logging +from typing import TYPE_CHECKING, List, Optional + +from src.plugin_runtime.protocol.envelope import LogBatchPayload, LogEntry + +if TYPE_CHECKING: + from src.plugin_runtime.runner.rpc_client import RPCClient + + +class RunnerIPCLogHandler(logging.Handler): + """将 Runner 进程内所有日志通过 IPC 批量转发到 Host 主进程。 + + 典型用法:: + + handler = RunnerIPCLogHandler() + handler.start(rpc_client, asyncio.get_running_loop()) + logging.root.addHandler(handler) + # ... 进程运行 ... + logging.root.removeHandler(handler) + await handler.stop() + """ + + #: 日志缓冲最大条数;超出后最旧的条目将被静默丢弃(deque(maxlen) 行为) + QUEUE_MAX: int = 200 + + #: 后台刷新循环的休眠间隔(秒) + FLUSH_INTERVAL_SEC: float = 0.1 + + #: 每次 send_event 携带的最大日志条数 + FLUSH_BATCH_SIZE: int = 20 + + def __init__(self) -> None: + super().__init__() + # deque(maxlen=N): append/popleft 在 CPython GIL 保护下线程安全 + self._buffer: collections.deque[LogEntry] = collections.deque(maxlen=self.QUEUE_MAX) + self._rpc_client: Optional[RPCClient] = None + self._flush_task: Optional[asyncio.Task[None]] = None + + # ─── 公开 API ────────────────────────────────────────────────── + + def start(self, rpc_client: RPCClient, loop: asyncio.AbstractEventLoop) -> None: + """握手完成后、在事件循环内调用,启动后台刷新任务。 + + Args: + rpc_client: 已完成握手的 RPCClient 实例。 + loop: 当前运行的 asyncio 事件循环。 + """ + self._rpc_client = rpc_client + self._flush_task = loop.create_task( + self._flush_loop(), + name="RunnerIPCLogHandler._flush_loop", + ) + + async def stop(self) -> None: + """停止刷新任务并将缓冲中剩余日志全部发送出去。 + + 应在 ``logging.root.removeHandler(handler)`` 之后调用, + 确保 emit() 不会在 stop() 执行期间向已消耗的缓冲写入新条目。 + """ + if self._flush_task is not None: + self._flush_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._flush_task + self._flush_task = None + + # 关闭前全量刷新,分多批次直到缓冲清空 + await self._flush_remaining() + + # ─── logging.Handler 接口 ────────────────────────────────────── + + def emit(self, record: logging.LogRecord) -> None: + """将一条 LogRecord 序列化后放入缓冲(同步,永不阻塞)。 + + 缓冲已满时,deque 自动从左侧丢弃最旧条目(FIFO 溢出)。 + 异常通过 ``self.handleError(record)`` 写到 stderr,不引发。 + """ + try: + # format() 触发 exc_info 格式化并将结果缓存到 record.exc_text + msg = self.format(record) + entry = LogEntry( + timestamp_ms=int(record.created * 1000), + level=record.levelno, + logger_name=record.name, + message=msg, + exception_text=record.exc_text or "", + ) + self._buffer.append(entry) + except Exception: + self.handleError(record) + + # ─── 内部方法 ────────────────────────────────────────────────── + + async def _flush_loop(self) -> None: + """后台批量刷新循环——每 FLUSH_INTERVAL_SEC 秒醒来一次。""" + while True: + try: + await asyncio.sleep(self.FLUSH_INTERVAL_SEC) + await self._flush_batch(self.FLUSH_BATCH_SIZE) + except asyncio.CancelledError: + break + except Exception: + # 任何发送侧错误都静默忽略,避免向 logging 写入导致嵌套循环 + pass + + async def _flush_batch(self, max_count: int) -> None: + """从缓冲中取出最多 max_count 条日志并通过 IPC 发送一个批次。 + + Args: + max_count: 本次最多发送的条目数。 + """ + if not self._buffer or self._rpc_client is None: + return + + entries: List[LogEntry] = [] + while self._buffer and len(entries) < max_count: + entries.append(self._buffer.popleft()) + + if not entries: + return + + # IPC 发送失败时静默忽略(进程退出、网络断开等场景) + with contextlib.suppress(Exception): + await self._rpc_client.send_event( + "runner.log_batch", + payload=LogBatchPayload(entries=entries).model_dump(), + ) + + async def _flush_remaining(self) -> None: + """将缓冲中剩余的所有条目分批全部发送。""" + while self._buffer: + await self._flush_batch(self.FLUSH_BATCH_SIZE) diff --git a/src/plugin_runtime/runner/rpc_client.py b/src/plugin_runtime/runner/rpc_client.py index 46355071..34e0b4da 100644 --- a/src/plugin_runtime/runner/rpc_client.py +++ b/src/plugin_runtime/runner/rpc_client.py @@ -186,6 +186,34 @@ class RPCClient: # ─── 内部方法 ────────────────────────────────────────────── + async def send_event( + self, + method: str, + plugin_id: str = "", + payload: Optional[Dict[str, Any]] = None, + ) -> None: + """向 Host 发送单向事件(fire-and-forget,不等待响应)。 + + Args: + method: RPC 方法名,如 "runner.log_batch"。 + plugin_id: 目标插件 ID(可为空,表示 Runner 级消息)。 + payload: 事件数据。 + """ + if not self.is_connected: + return + + request_id = self._id_gen.next() + envelope = Envelope( + request_id=request_id, + message_type=MessageType.EVENT, + method=method, + plugin_id=plugin_id, + generation=self._generation, + payload=payload or {}, + ) + data = self._codec.encode_envelope(envelope) + await self._connection.send_frame(data) + async def _recv_loop(self) -> None: """消息接收主循环""" while self._running and self._connection and not self._connection.is_closed: diff --git a/src/plugin_runtime/runner/runner_main.py b/src/plugin_runtime/runner/runner_main.py index 8bb0c588..7a0229ee 100644 --- a/src/plugin_runtime/runner/runner_main.py +++ b/src/plugin_runtime/runner/runner_main.py @@ -9,7 +9,8 @@ 6. 转发插件的能力调用到 Host """ -from typing import List +import logging as stdlib_logging +from typing import List, Optional import asyncio import contextlib @@ -29,6 +30,7 @@ from src.plugin_runtime.protocol.envelope import ( RegisterComponentsPayload, ) from src.plugin_runtime.protocol.errors import ErrorCode +from src.plugin_runtime.runner.log_handler import RunnerIPCLogHandler from src.plugin_runtime.runner.plugin_loader import PluginLoader, PluginMeta from src.plugin_runtime.runner.rpc_client import RPCClient @@ -56,6 +58,9 @@ class PluginRunner: self._start_time: float = time.monotonic() self._shutting_down: bool = False + # IPC 日志 Handler:握手成功后安装,将所有 stdlib logging 转发到 Host + self._log_handler: Optional[RunnerIPCLogHandler] = None + async def run(self) -> None: """Runner 主入口""" # 1. 连接 Host @@ -65,7 +70,10 @@ class PluginRunner: logger.error("握手失败,退出") return - # 2. 注册方法处理器 + # 2. 握手成功后立即安装 IPC 日志 Handler,接管所有 Runner 端日志 + self._install_log_handler() + + # 3. 注册方法处理器 self._register_handlers() # 3. 加载插件 @@ -97,10 +105,37 @@ class PluginRunner: while not self._shutting_down: await asyncio.sleep(1.0) - # 6. 断开连接 + # 6. 卸载 IPC 日志 Handler 并刷空剩余缓冲,然后断开连接 + logger.info("Runner 开始关停") + await self._uninstall_log_handler() await self._rpc_client.disconnect() logger.info("Runner 已退出") + def _install_log_handler(self) -> None: + """握手完成后将 RunnerIPCLogHandler 安装到 logging.root。 + + 安装后,Runner 进程内所有 stdlib logging 调用(含 structlog 透传的) + 均会通过 IPC 转发到 Host,由 Host 的 RunnerLogBridge 重放到主进程 Logger。 + """ + loop = asyncio.get_running_loop() + handler = RunnerIPCLogHandler() + handler.start(self._rpc_client, loop) + stdlib_logging.root.addHandler(handler) + self._log_handler = handler + logger.debug("RunnerIPCLogHandler \u5df2\u5b89\u88c3\uff0c\u63d2\u4ef6\u65e5\u5fd7\u5c06\u901a\u8fc7 IPC \u8f6c\u53d1\u5230\u4e3b\u8fdb\u7a0b") + + async def _uninstall_log_handler(self) -> None: + """关停前从 logging.root 移除 Handler 并刷空缓冲。 + + 必须在 disconnect() 之前调用,确保最后一批日志能正常发送。 + """ + if self._log_handler is None: + return + stdlib_logging.root.removeHandler(self._log_handler) + await self._log_handler.stop() + self._log_handler = None + logger.debug("RunnerIPCLogHandler \u5df2\u5378\u8f7d") + def _register_handlers(self) -> None: """注册方法处理器""" self._rpc_client.register_method("plugin.invoke_command", self._handle_invoke)