feat: 添加 Runner 日志桥,支持将 Runner 进程日志通过 IPC 批量发送到主进程

This commit is contained in:
DrSmoothl
2026-03-12 21:45:58 +08:00
parent d0b56abdab
commit b17948a495
7 changed files with 384 additions and 70 deletions

View File

@@ -1259,25 +1259,24 @@ class TestSupervisor:
assert supervisor.component_registry.get_component("plugin_a.handler") is not None
@pytest.mark.asyncio
async def test_attach_runner_output_tasks_drains_streams(self):
async def test_attach_stderr_drain_drains_stream(self):
"""_attach_stderr_drain 为 stderr 创建排空任务,读完后任务自动完成。"""
from src.plugin_runtime.host.supervisor import PluginSupervisor
supervisor = PluginSupervisor(plugin_dirs=[])
stdout = asyncio.StreamReader()
stdout.feed_data(b"hello stdout\n")
stdout.feed_eof()
stderr = asyncio.StreamReader()
stderr.feed_data(b"hello stderr\n")
stderr.feed_data(b"fatal startup error\n")
stderr.feed_eof()
process = SimpleNamespace(pid=99, stdout=stdout, stderr=stderr)
supervisor._attach_runner_output_tasks(process)
# stdout=None 模拟新架构(不再捕获 stdout
process = SimpleNamespace(pid=99, stdout=None, stderr=stderr)
supervisor._attach_stderr_drain(process)
# 给 drain task 足够时间消费完数据
await asyncio.sleep(0.05)
assert not supervisor._runner_output_tasks
assert supervisor._stderr_drain_task is None or supervisor._stderr_drain_task.done()
class TestIntegration:

View File

@@ -7,6 +7,7 @@
4. 优雅关停
"""
import logging as stdlib_logging
from typing import Any, Dict, List, Optional, Tuple
import asyncio
@@ -24,6 +25,7 @@ from src.plugin_runtime.host.workflow_executor import WorkflowExecutor, Workflow
from src.plugin_runtime.protocol.envelope import (
Envelope,
HealthPayload,
LogBatchPayload,
RegisterComponentsPayload,
ShutdownPayload,
)
@@ -33,6 +35,54 @@ from src.plugin_runtime.transport.factory import create_transport_server
logger = get_logger("plugin_runtime.host.supervisor")
# ─── 日志桥 ──────────────────────────────────────────────────────
class RunnerLogBridge:
"""将 Runner 进程上报的批量日志重放到主进程的 Logger 中。
Runner 通过 ``runner.log_batch`` IPC 事件批量到达。
每条 LogEntry 被重建为一个真实的 :class:`logging.LogRecord` 并直接
调用 ``logging.getLogger(entry.logger_name).handle(record)``
从而接入主进程已配置好的 structlog Handler 链。
"""
async def handle_log_batch(self, envelope: Envelope) -> Envelope:
"""IPC 事件处理器:解析批量日志并重放到主进程 Logger。
Args:
envelope: 方法名为 ``runner.log_batch`` 的 IPC 事件信封。
Returns:
空响应信封(事件模式下将被忽略)。
"""
try:
batch = LogBatchPayload.model_validate(envelope.payload)
except Exception as exc:
return envelope.make_error_response(ErrorCode.E_BAD_PAYLOAD.value, str(exc))
for entry in batch.entries:
# 重建一个与原始日志尽量相符的 LogRecord
record = stdlib_logging.LogRecord(
name=entry.logger_name,
level=entry.level,
pathname="<runner>",
lineno=0,
msg=entry.message,
args=(),
exc_info=None,
)
record.created = entry.timestamp_ms / 1000.0
record.msecs = entry.timestamp_ms % 1000
if entry.exception_text:
record.exc_text = entry.exception_text
stdlib_logging.getLogger(entry.logger_name).handle(record)
return envelope.make_response(
payload={"accepted": True, "count": len(batch.entries)}
)
class PluginSupervisor:
"""插件 Supervisor
@@ -76,9 +126,13 @@ class PluginSupervisor:
# 后台任务
self._health_task: Optional[asyncio.Task] = None
self._runner_output_tasks: List[asyncio.Task] = []
# Runner stderr 流排空任务(仅保留 stderr用于 IPC 建立前的启动日志倒空、致命错误输出等场景)
self._stderr_drain_task: Optional[asyncio.Task] = None
self._running = False
# Runner 日志桥(将 Runner 上报的批量日志重放到主进程 Logger
self._log_bridge: RunnerLogBridge = RunnerLogBridge()
# 注册内部 RPC 方法
self._register_internal_methods()
@@ -266,6 +320,8 @@ class PluginSupervisor:
self._rpc_server.register_method("cap.request", self._capability_service.handle_capability_request)
# 插件注册
self._rpc_server.register_method("plugin.register_components", self._handle_register_components)
# Runner 日志批量上报
self._rpc_server.register_method("runner.log_batch", self._log_bridge.handle_log_batch)
async def _handle_register_components(self, envelope: Envelope) -> Envelope:
"""处理插件组件注册请求"""
@@ -319,11 +375,13 @@ class PluginSupervisor:
self._runner_process = await asyncio.create_subprocess_exec(
sys.executable, "-m", runner_module,
env=env,
stdout=asyncio.subprocess.PIPE,
# stdout 不捕获Runner 的日志均通过 IPC 传㛹RunnerIPCLogHandler
stdout=None,
# stderr 捕获为 PIPE仅用于 IPC 建立前的进程级致命错误输出
stderr=asyncio.subprocess.PIPE,
)
self._attach_runner_output_tasks(self._runner_process)
self._attach_stderr_drain(self._runner_process)
self._runner_generation = self._rpc_server.runner_generation
logger.info(f"Runner 子进程已启动: pid={self._runner_process.pid}, generation={self._runner_generation}")
@@ -357,7 +415,7 @@ class PluginSupervisor:
self._runner_process.kill()
await self._runner_process.wait()
await self._cleanup_runner_output_tasks()
await self._cleanup_stderr_drain()
async def _health_check_loop(self) -> None:
"""周期性健康检查 + 崩溃自动重启"""
@@ -437,30 +495,40 @@ class PluginSupervisor:
components=[c.model_dump() for c in reg.components],
)
def _attach_runner_output_tasks(self, process: asyncio.subprocess.Process) -> None:
"""为 Runner 输出流创建排空任务,避免 PIPE 填满阻塞子进程。"""
streams = (
(process.stdout, "stdout"),
(process.stderr, "stderr"),
)
for stream, stream_name in streams:
if stream is None:
continue
task = asyncio.create_task(self._drain_runner_stream(stream, stream_name, process.pid))
self._runner_output_tasks.append(task)
task.add_done_callback(
lambda done_task: self._runner_output_tasks.remove(done_task)
if done_task in self._runner_output_tasks
else None
)
def _attach_stderr_drain(self, process: asyncio.subprocess.Process) -> None:
"""为 Runner stderr 创建排空任务,捕获 IPC 建立前的进程级错误输出。
async def _drain_runner_stream(
stderr 中的内容通常是:
- Runner 启动早期(握手完成之前)的日志
- 进程级致命错误ImportError、SyntaxError等
- 异常进程退出前的最后输出
握手成功后,插件的所有日志均经由 RunnerIPCLogHandler 通过 IPC 传输。
"""
if process.stderr is None:
return
task = asyncio.create_task(
self._drain_runner_stderr(process.stderr, process.pid),
name=f"runner_stderr_drain:{process.pid}",
)
self._stderr_drain_task = task
task.add_done_callback(
lambda done_task: None
if self._stderr_drain_task is not done_task
else setattr(self, "_stderr_drain_task", None)
)
async def _drain_runner_stderr(
self,
stream: asyncio.StreamReader,
stream_name: str,
pid: int,
) -> None:
"""持续消费 Runner 输出,避免 PIPE 回压导致子进程阻塞"""
"""持续读取 Runner stderr 并转发到 Host Logger防止 PIPE 锡死子进程。
Args:
stream: Runner 子进程的 stderr 流。
pid: 子进程 PID仅用于日志上下文。
"""
try:
while True:
line = await stream.readline()
@@ -468,22 +536,25 @@ class PluginSupervisor:
break
message = line.decode(errors="replace").rstrip()
if message:
logger.debug(f"[runner:{pid}:{stream_name}] {message}")
# 将 stderr 输出以 WARNING 级展示:
# 如果 Runner 正常运行,此流应当无输出;
# 有输出说明进程级错误发生,需要出现在主进程日志中
logger.warning(f"[runner:{pid}:stderr] {message}")
except asyncio.CancelledError:
raise
except Exception as e:
logger.debug(f"读取 Runner {stream_name} 失败: {e}")
except Exception as exc:
logger.debug(f"读取 Runner stderr 失败 (pid={pid}): {exc}")
async def _cleanup_runner_output_tasks(self) -> None:
"""等待并清理 Runner 输出任务。"""
tasks = list(self._runner_output_tasks)
self._runner_output_tasks.clear()
for task in tasks:
if not task.done():
task.cancel()
if tasks:
with contextlib.suppress(Exception):
await asyncio.gather(*tasks, return_exceptions=True)
async def _cleanup_stderr_drain(self) -> None:
"""等待并取消 stderr 排空任务。"""
if self._stderr_drain_task is None:
return
task = self._stderr_drain_task
self._stderr_drain_task = None
if not task.done():
task.cancel()
with contextlib.suppress(Exception):
await asyncio.gather(task, return_exceptions=True)
@staticmethod
async def _terminate_process(

View File

@@ -270,8 +270,9 @@ class PluginRuntimeManager:
# ── knowledge.* ───────────────────────────────────
cap_service.register_capability("knowledge.search", self._cap_knowledge_search)
# ── logging.* ─────────────────────────────────────
cap_service.register_capability("logging.log", self._cap_logging_log)
# 注意:logging.* 能力已移除——Runner 端通过 RunnerIPCLogHandler 将 stdlib
# logging 日志批量发送到 Host由 RunnerLogBridge 重放到主进程 Logger
# 不再需要单独的 logging.log RPC 能力。
logger.debug("已注册全部主程序能力实现")
@@ -1520,26 +1521,6 @@ class PluginRuntimeManager:
logger.error(f"[cap.knowledge.search] 执行失败: {e}", exc_info=True)
return {"success": False, "error": str(e)}
# ═════════════════════════════════════════════════════════
# logging.* 能力实现
# ═════════════════════════════════════════════════════════
@staticmethod
async def _cap_logging_log(plugin_id: str, capability: str, args: Dict[str, Any]) -> Any:
"""插件日志记录
args: level?, message
"""
level: str = args.get("level", "info").lower()
message: str = args.get("message", "")
if not message:
return {"success": False, "error": "缺少必要参数 message"}
plugin_logger = get_logger(f"plugin.{plugin_id}")
log_fn = getattr(plugin_logger, level, plugin_logger.info)
log_fn(message)
return {"success": True}
# ─── 单例 ──────────────────────────────────────────────────

View File

@@ -9,6 +9,7 @@ from pydantic import BaseModel, Field
from typing import Any, Dict, List, Optional
import time
import logging as stdlib_logging
# ─── 协议常量 ──────────────────────────────────────────────────────
@@ -184,3 +185,42 @@ class ShutdownPayload(BaseModel):
"""plugin.shutdown / plugin.prepare_shutdown payload"""
reason: str = Field(default="normal", description="关停原因")
drain_timeout_ms: int = Field(default=5000, description="排空超时(ms)")
# ─── 日志传输 ──────────────────────────────────────────────────────
class LogEntry(BaseModel):
"""单条日志记录Runner → Host 传输格式)"""
timestamp_ms: int = Field(
description="日志时间戳Unix epoch 毫秒",
)
level: int = Field(
description=(
"stdlib logging 整数级别:"
" 10=DEBUG, 20=INFO, 30=WARNING, 40=ERROR, 50=CRITICAL"
),
)
logger_name: str = Field(
description="Logger 名称,如 plugin.my_plugin.submodule",
)
message: str = Field(
description="经 Formatter 格式化后的完整日志消息(含 exc_info 文本)",
)
exception_text: str = Field(
default="",
description="原始异常摘要exc_text供结构化消费已嵌入 message 中",
)
@property
def levelname(self) -> str:
"""返回对应的 stdlib logging 级别名称(如 'INFO')。"""
return stdlib_logging.getLevelName(self.level)
class LogBatchPayload(BaseModel):
"""runner.log_batch 事件 payloadRunner 端向 Host 批量推送日志记录"""
entries: List[LogEntry] = Field(
description="本批次日志记录列表,按时间升序排列",
)

View File

@@ -0,0 +1,160 @@
"""Runner 端 IPC 日志 Handler
将 Runner 进程内所有 stdlib logging 日志通过 IPC 批量发送到 Host
Host 端将其重放到主进程的 Logger以 plugin.<name> 为名)中,从而
统一在主进程的结构化日志体系中显示插件日志。
架构:
Runner 进程
└── logging.root ← RunnerIPCLogHandler本文件
└── emit() 非阻塞入缓冲 → 后台刷新协程批量发送
└── rpc_client.send_event("runner.log_batch", ...)
└── IPC socket → Host
└── RunnerLogBridge.handle_log_batch()
└── logging.getLogger("plugin.<name>").handle(record)
设计原则:
- emit() 必须是非阻塞的,不得在热路径上 await 任何 IPC 调用
- 使用 collections.deque(maxlen=QUEUE_MAX) 作为有界环形缓冲:
满时最旧条目自动被覆盖(不区分级别,为实现简单接受此折损)
- CPython 的 deque.append / deque.popleft 在 GIL 保护下是线程安全的,
适合单消费后台协程 + 多生产线程的使用场景
- 后台刷新协程每 FLUSH_INTERVAL_SEC 秒或 FLUSH_BATCH_SIZE 条后批量发送
- IPC 发送失败时静默忽略stderr fallback 由 supervisor 的 drain task 覆盖
"""
from __future__ import annotations
import asyncio
import collections
import contextlib
import logging
from typing import TYPE_CHECKING, List, Optional
from src.plugin_runtime.protocol.envelope import LogBatchPayload, LogEntry
if TYPE_CHECKING:
from src.plugin_runtime.runner.rpc_client import RPCClient
class RunnerIPCLogHandler(logging.Handler):
"""将 Runner 进程内所有日志通过 IPC 批量转发到 Host 主进程。
典型用法::
handler = RunnerIPCLogHandler()
handler.start(rpc_client, asyncio.get_running_loop())
logging.root.addHandler(handler)
# ... 进程运行 ...
logging.root.removeHandler(handler)
await handler.stop()
"""
#: 日志缓冲最大条数超出后最旧的条目将被静默丢弃deque(maxlen) 行为)
QUEUE_MAX: int = 200
#: 后台刷新循环的休眠间隔(秒)
FLUSH_INTERVAL_SEC: float = 0.1
#: 每次 send_event 携带的最大日志条数
FLUSH_BATCH_SIZE: int = 20
def __init__(self) -> None:
super().__init__()
# deque(maxlen=N): append/popleft 在 CPython GIL 保护下线程安全
self._buffer: collections.deque[LogEntry] = collections.deque(maxlen=self.QUEUE_MAX)
self._rpc_client: Optional[RPCClient] = None
self._flush_task: Optional[asyncio.Task[None]] = None
# ─── 公开 API ──────────────────────────────────────────────────
def start(self, rpc_client: RPCClient, loop: asyncio.AbstractEventLoop) -> None:
"""握手完成后、在事件循环内调用,启动后台刷新任务。
Args:
rpc_client: 已完成握手的 RPCClient 实例。
loop: 当前运行的 asyncio 事件循环。
"""
self._rpc_client = rpc_client
self._flush_task = loop.create_task(
self._flush_loop(),
name="RunnerIPCLogHandler._flush_loop",
)
async def stop(self) -> None:
"""停止刷新任务并将缓冲中剩余日志全部发送出去。
应在 ``logging.root.removeHandler(handler)`` 之后调用,
确保 emit() 不会在 stop() 执行期间向已消耗的缓冲写入新条目。
"""
if self._flush_task is not None:
self._flush_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._flush_task
self._flush_task = None
# 关闭前全量刷新,分多批次直到缓冲清空
await self._flush_remaining()
# ─── logging.Handler 接口 ──────────────────────────────────────
def emit(self, record: logging.LogRecord) -> None:
"""将一条 LogRecord 序列化后放入缓冲(同步,永不阻塞)。
缓冲已满时deque 自动从左侧丢弃最旧条目FIFO 溢出)。
异常通过 ``self.handleError(record)`` 写到 stderr不引发。
"""
try:
# format() 触发 exc_info 格式化并将结果缓存到 record.exc_text
msg = self.format(record)
entry = LogEntry(
timestamp_ms=int(record.created * 1000),
level=record.levelno,
logger_name=record.name,
message=msg,
exception_text=record.exc_text or "",
)
self._buffer.append(entry)
except Exception:
self.handleError(record)
# ─── 内部方法 ──────────────────────────────────────────────────
async def _flush_loop(self) -> None:
"""后台批量刷新循环——每 FLUSH_INTERVAL_SEC 秒醒来一次。"""
while True:
try:
await asyncio.sleep(self.FLUSH_INTERVAL_SEC)
await self._flush_batch(self.FLUSH_BATCH_SIZE)
except asyncio.CancelledError:
break
except Exception:
# 任何发送侧错误都静默忽略,避免向 logging 写入导致嵌套循环
pass
async def _flush_batch(self, max_count: int) -> None:
"""从缓冲中取出最多 max_count 条日志并通过 IPC 发送一个批次。
Args:
max_count: 本次最多发送的条目数。
"""
if not self._buffer or self._rpc_client is None:
return
entries: List[LogEntry] = []
while self._buffer and len(entries) < max_count:
entries.append(self._buffer.popleft())
if not entries:
return
# IPC 发送失败时静默忽略(进程退出、网络断开等场景)
with contextlib.suppress(Exception):
await self._rpc_client.send_event(
"runner.log_batch",
payload=LogBatchPayload(entries=entries).model_dump(),
)
async def _flush_remaining(self) -> None:
"""将缓冲中剩余的所有条目分批全部发送。"""
while self._buffer:
await self._flush_batch(self.FLUSH_BATCH_SIZE)

View File

@@ -186,6 +186,34 @@ class RPCClient:
# ─── 内部方法 ──────────────────────────────────────────────
async def send_event(
self,
method: str,
plugin_id: str = "",
payload: Optional[Dict[str, Any]] = None,
) -> None:
"""向 Host 发送单向事件fire-and-forget不等待响应
Args:
method: RPC 方法名,如 "runner.log_batch"
plugin_id: 目标插件 ID可为空表示 Runner 级消息)。
payload: 事件数据。
"""
if not self.is_connected:
return
request_id = self._id_gen.next()
envelope = Envelope(
request_id=request_id,
message_type=MessageType.EVENT,
method=method,
plugin_id=plugin_id,
generation=self._generation,
payload=payload or {},
)
data = self._codec.encode_envelope(envelope)
await self._connection.send_frame(data)
async def _recv_loop(self) -> None:
"""消息接收主循环"""
while self._running and self._connection and not self._connection.is_closed:

View File

@@ -9,7 +9,8 @@
6. 转发插件的能力调用到 Host
"""
from typing import List
import logging as stdlib_logging
from typing import List, Optional
import asyncio
import contextlib
@@ -29,6 +30,7 @@ from src.plugin_runtime.protocol.envelope import (
RegisterComponentsPayload,
)
from src.plugin_runtime.protocol.errors import ErrorCode
from src.plugin_runtime.runner.log_handler import RunnerIPCLogHandler
from src.plugin_runtime.runner.plugin_loader import PluginLoader, PluginMeta
from src.plugin_runtime.runner.rpc_client import RPCClient
@@ -56,6 +58,9 @@ class PluginRunner:
self._start_time: float = time.monotonic()
self._shutting_down: bool = False
# IPC 日志 Handler握手成功后安装将所有 stdlib logging 转发到 Host
self._log_handler: Optional[RunnerIPCLogHandler] = None
async def run(self) -> None:
"""Runner 主入口"""
# 1. 连接 Host
@@ -65,7 +70,10 @@ class PluginRunner:
logger.error("握手失败,退出")
return
# 2. 注册方法处理器
# 2. 握手成功后立即安装 IPC 日志 Handler接管所有 Runner 端日志
self._install_log_handler()
# 3. 注册方法处理器
self._register_handlers()
# 3. 加载插件
@@ -97,10 +105,37 @@ class PluginRunner:
while not self._shutting_down:
await asyncio.sleep(1.0)
# 6. 断开连接
# 6. 卸载 IPC 日志 Handler 并刷空剩余缓冲,然后断开连接
logger.info("Runner 开始关停")
await self._uninstall_log_handler()
await self._rpc_client.disconnect()
logger.info("Runner 已退出")
def _install_log_handler(self) -> None:
"""握手完成后将 RunnerIPCLogHandler 安装到 logging.root。
安装后Runner 进程内所有 stdlib logging 调用(含 structlog 透传的)
均会通过 IPC 转发到 Host由 Host 的 RunnerLogBridge 重放到主进程 Logger。
"""
loop = asyncio.get_running_loop()
handler = RunnerIPCLogHandler()
handler.start(self._rpc_client, loop)
stdlib_logging.root.addHandler(handler)
self._log_handler = handler
logger.debug("RunnerIPCLogHandler \u5df2\u5b89\u88c3\uff0c\u63d2\u4ef6\u65e5\u5fd7\u5c06\u901a\u8fc7 IPC \u8f6c\u53d1\u5230\u4e3b\u8fdb\u7a0b")
async def _uninstall_log_handler(self) -> None:
"""关停前从 logging.root 移除 Handler 并刷空缓冲。
必须在 disconnect() 之前调用,确保最后一批日志能正常发送。
"""
if self._log_handler is None:
return
stdlib_logging.root.removeHandler(self._log_handler)
await self._log_handler.stop()
self._log_handler = None
logger.debug("RunnerIPCLogHandler \u5df2\u5378\u8f7d")
def _register_handlers(self) -> None:
"""注册方法处理器"""
self._rpc_client.register_method("plugin.invoke_command", self._handle_invoke)