From baae2df154d62e0943b46940a7d6bf15c4e9037e Mon Sep 17 00:00:00 2001 From: DrSmoothl <1787882683@qq.com> Date: Fri, 6 Mar 2026 16:37:56 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=9B=86=E6=88=90=E6=96=B0=E7=89=88?= =?UTF-8?q?=E6=9C=AC=E6=8F=92=E4=BB=B6=E8=BF=90=E8=A1=8C=E6=97=B6=EF=BC=8C?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E4=BA=8B=E4=BB=B6=E6=A1=A5=E6=8E=A5=E4=B8=8E?= =?UTF-8?q?=E5=91=BD=E4=BB=A4=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- bot.py | 4 + src/chat/message_receive/bot.py | 55 +++- src/main.py | 9 + src/plugin_runtime/host/capability_service.py | 4 +- src/plugin_runtime/host/component_registry.py | 4 +- src/plugin_runtime/host/event_dispatcher.py | 4 +- src/plugin_runtime/host/rpc_server.py | 4 +- src/plugin_runtime/host/supervisor.py | 4 +- src/plugin_runtime/host/workflow_executor.py | 4 +- src/plugin_runtime/integration.py | 255 ++++++++++++++++++ .../runner/manifest_validator.py | 5 +- src/plugin_runtime/runner/plugin_loader.py | 4 +- src/plugin_runtime/runner/rpc_client.py | 4 +- src/plugin_runtime/runner/runner_main.py | 9 +- src/plugin_system/core/events_manager.py | 41 +++ 15 files changed, 385 insertions(+), 25 deletions(-) create mode 100644 src/plugin_runtime/integration.py diff --git a/bot.py b/bot.py index 77465c40..e182c1eb 100644 --- a/bot.py +++ b/bot.py @@ -201,6 +201,10 @@ async def graceful_shutdown(): # sourcery skip: use-named-expression # 触发 ON_STOP 事件 await events_manager.handle_mai_events(event_type=EventType.ON_STOP) + # 停止新版本插件运行时 + from src.plugin_runtime.integration import get_plugin_runtime_manager + await get_plugin_runtime_manager().stop() + # 停止所有异步任务 await async_task_manager.stop_and_wait_all_tasks() diff --git a/src/chat/message_receive/bot.py b/src/chat/message_receive/bot.py index 9cbb3cf9..e0b91d31 100644 --- a/src/chat/message_receive/bot.py +++ b/src/chat/message_receive/bot.py @@ -117,13 +117,66 @@ class ChatBot: # 命令出错时,根据命令的拦截设置决定是否继续处理消息 return True, str(e), False # 出错时继续处理消息 - # 没有找到命令,继续处理消息 + # 没有找到旧系统命令,尝试新版本插件运行时 + new_cmd_result = await self._process_new_runtime_command(message) + if new_cmd_result is not None: + return new_cmd_result + + # 新旧系统都没找到命令,继续处理消息 return False, None, True except Exception as e: logger.error(f"处理命令时出错: {e}") return False, None, True # 出错时继续处理消息 + async def _process_new_runtime_command(self, message: SessionMessage): + """尝试在新版本插件运行时中查找并执行命令 + + Returns: + (found, response, continue_processing) 三元组, + 或 None 表示新运行时中也未找到匹配命令。 + """ + from src.plugin_runtime.integration import get_plugin_runtime_manager + + prm = get_plugin_runtime_manager() + if not prm.is_running or prm.supervisor is None: + return None + + matched = prm.supervisor.component_registry.find_command_by_text(message.processed_plain_text) + if matched is None: + return None + + message.is_command = True + logger.info(f"[新运行时] 匹配命令: {matched.full_name}") + + try: + resp = await prm.supervisor.invoke_plugin( + method="plugin.invoke_command", + plugin_id=matched.plugin_id, + component_name=matched.name, + args={ + "text": message.processed_plain_text, + "stream_id": message.session_id or "", + }, + timeout_ms=30000, + ) + + payload = resp.payload + success = payload.get("success", False) + result = payload.get("result", "") + intercept = bool(matched.metadata.get("intercept_message_level", 0)) + + if success: + logger.info(f"[新运行时] 命令执行成功: {matched.full_name}") + else: + logger.warning(f"[新运行时] 命令执行失败: {matched.full_name} - {result}") + + return True, result, not intercept + + except Exception as e: + logger.error(f"[新运行时] 执行命令 {matched.full_name} 异常: {e}", exc_info=True) + return True, str(e), True + async def handle_notice_message(self, message: MessageRecv): if message.message_info.message_id == "notice": message.is_notify = True diff --git a/src/main.py b/src/main.py index df2d069a..29b00682 100644 --- a/src/main.py +++ b/src/main.py @@ -21,6 +21,9 @@ from rich.traceback import install # 导入新的插件管理器 from src.plugin_system.core.plugin_manager import plugin_manager +# 导入新版本插件运行时 +from src.plugin_runtime.integration import get_plugin_runtime_manager + # 导入消息API和traceback模块 from src.common.message_server import get_global_api from src.dream.dream_agent import start_dream_scheduler @@ -108,6 +111,9 @@ class MainSystem: # 加载所有actions,包括默认的和插件的 plugin_manager.load_all_plugins() + # 启动新版本插件运行时(与旧系统并行运行) + await get_plugin_runtime_manager().start() + # 初始化表情管理器 emoji_manager.load_emojis_from_db() logger.info("表情包管理器初始化成功") @@ -131,6 +137,9 @@ class MainSystem: from src.plugin_system.base.component_types import EventType await events_manager.handle_mai_events(event_type=EventType.ON_START) + + # 桥接 ON_START 事件到新版本插件运行时 + await get_plugin_runtime_manager().bridge_event("on_start") # logger.info("已触发 ON_START 事件") try: init_time = int(1000 * (time.time() - init_start_time)) diff --git a/src/plugin_runtime/host/capability_service.py b/src/plugin_runtime/host/capability_service.py index f937d0da..8e7b77ba 100644 --- a/src/plugin_runtime/host/capability_service.py +++ b/src/plugin_runtime/host/capability_service.py @@ -6,7 +6,7 @@ Host 端实现的能力服务,处理来自插件的 cap.* 请求。 from typing import Any, Callable, Awaitable -import logging +from src.common.logger import get_logger from src.plugin_runtime.protocol.envelope import ( CapabilityRequestPayload, @@ -16,7 +16,7 @@ from src.plugin_runtime.protocol.envelope import ( from src.plugin_runtime.protocol.errors import ErrorCode, RPCError from src.plugin_runtime.host.policy_engine import PolicyEngine -logger = logging.getLogger("plugin_runtime.host.capability_service") +logger = get_logger("plugin_runtime.host.capability_service") # 能力实现函数类型: (plugin_id, capability, args) -> result CapabilityImpl = Callable[[str, str, dict[str, Any]], Awaitable[Any]] diff --git a/src/plugin_runtime/host/component_registry.py b/src/plugin_runtime/host/component_registry.py index a4793020..74dba956 100644 --- a/src/plugin_runtime/host/component_registry.py +++ b/src/plugin_runtime/host/component_registry.py @@ -11,10 +11,10 @@ from typing import Any -import logging +from src.common.logger import get_logger import re -logger = logging.getLogger("plugin_runtime.host.component_registry") +logger = get_logger("plugin_runtime.host.component_registry") class RegisteredComponent: diff --git a/src/plugin_runtime/host/event_dispatcher.py b/src/plugin_runtime/host/event_dispatcher.py index db117daf..29fa85d3 100644 --- a/src/plugin_runtime/host/event_dispatcher.py +++ b/src/plugin_runtime/host/event_dispatcher.py @@ -10,11 +10,11 @@ from typing import Any, Awaitable, Callable import asyncio -import logging +from src.common.logger import get_logger from src.plugin_runtime.host.component_registry import ComponentRegistry, RegisteredComponent -logger = logging.getLogger("plugin_runtime.host.event_dispatcher") +logger = get_logger("plugin_runtime.host.event_dispatcher") # invoke_fn 类型: async (plugin_id, component_name, args) -> response_payload dict InvokeFn = Callable[[str, str, dict[str, Any]], Awaitable[dict[str, Any]]] diff --git a/src/plugin_runtime/host/rpc_server.py b/src/plugin_runtime/host/rpc_server.py index 2bc7fea7..7c601492 100644 --- a/src/plugin_runtime/host/rpc_server.py +++ b/src/plugin_runtime/host/rpc_server.py @@ -10,9 +10,9 @@ from typing import Any, Callable, Awaitable import asyncio -import logging import secrets +from src.common.logger import get_logger from src.plugin_runtime.protocol.codec import Codec, MsgPackCodec from src.plugin_runtime.protocol.envelope import ( PROTOCOL_VERSION, @@ -27,7 +27,7 @@ from src.plugin_runtime.protocol.envelope import ( from src.plugin_runtime.protocol.errors import ErrorCode, RPCError from src.plugin_runtime.transport.base import Connection, TransportServer -logger = logging.getLogger("plugin_runtime.host.rpc_server") +logger = get_logger("plugin_runtime.host.rpc_server") # RPC 方法处理器类型 MethodHandler = Callable[[Envelope], Awaitable[Envelope]] diff --git a/src/plugin_runtime/host/supervisor.py b/src/plugin_runtime/host/supervisor.py index 041b3a68..122dc8ee 100644 --- a/src/plugin_runtime/host/supervisor.py +++ b/src/plugin_runtime/host/supervisor.py @@ -10,10 +10,10 @@ from typing import Any import asyncio -import logging import os import sys +from src.common.logger import get_logger from src.plugin_runtime.host.capability_service import CapabilityService from src.plugin_runtime.host.component_registry import ComponentRegistry from src.plugin_runtime.host.event_dispatcher import EventDispatcher @@ -29,7 +29,7 @@ from src.plugin_runtime.protocol.envelope import ( from src.plugin_runtime.protocol.errors import ErrorCode, RPCError from src.plugin_runtime.transport.factory import create_transport_server -logger = logging.getLogger("plugin_runtime.host.supervisor") +logger = get_logger("plugin_runtime.host.supervisor") class PluginSupervisor: diff --git a/src/plugin_runtime/host/workflow_executor.py b/src/plugin_runtime/host/workflow_executor.py index 01eb3f3b..3f943935 100644 --- a/src/plugin_runtime/host/workflow_executor.py +++ b/src/plugin_runtime/host/workflow_executor.py @@ -20,13 +20,13 @@ from typing import Any, Awaitable, Callable import asyncio -import logging import time import uuid +from src.common.logger import get_logger from src.plugin_runtime.host.component_registry import ComponentRegistry, RegisteredComponent -logger = logging.getLogger("plugin_runtime.host.workflow_executor") +logger = get_logger("plugin_runtime.host.workflow_executor") # 阶段顺序 STAGE_SEQUENCE: list[str] = [ diff --git a/src/plugin_runtime/integration.py b/src/plugin_runtime/integration.py new file mode 100644 index 00000000..111f72d5 --- /dev/null +++ b/src/plugin_runtime/integration.py @@ -0,0 +1,255 @@ +"""新版本插件运行时与主程序的集成层 + +提供 PluginRuntimeManager 单例,负责: +1. 管理 PluginSupervisor 的生命周期(启动 / 停止) +2. 将旧系统的 EventType 桥接到新运行时的 event dispatch +3. 将新运行时注册的 command 合并到旧系统的命令查找流程 +4. 提供统一的能力实现注册接口,使新插件可以调用主程序功能 + +在过渡期内,新旧插件系统共存: +- 旧插件继续通过 plugin_manager / component_registry 加载和执行 +- 新插件通过 PluginSupervisor + Runner 子进程加载和执行 +- 事件和命令在两套系统间桥接 +""" + +from __future__ import annotations + +from typing import Any + +import os + +from src.common.logger import get_logger + +logger = get_logger("plugin_runtime.integration") + +# 旧系统 EventType -> 新系统 event_type 字符串映射 +_EVENT_TYPE_MAP: dict[str, str] = { + "on_start": "on_start", + "on_stop": "on_stop", + "on_message_pre_process": "on_message_pre_process", + "on_message": "on_message", + "on_plan": "on_plan", + "post_llm": "post_llm", + "after_llm": "after_llm", + "post_send_pre_process": "post_send_pre_process", + "post_send": "post_send", + "after_send": "after_send", +} + + +class PluginRuntimeManager: + """新版本插件运行时管理器(单例) + + 作为主程序与 PluginSupervisor 之间的桥梁。 + """ + + def __init__(self) -> None: + from src.plugin_runtime.host.supervisor import PluginSupervisor + + self._supervisor: PluginSupervisor | None = None + self._started: bool = False + + def _get_plugin_dirs(self) -> list[str]: + """获取新版本插件目录列表 + + 新版本插件放在 plugins/ 目录中,与旧版本共存。 + 只有包含 _manifest.json 的插件目录会被新 Runner 加载。 + """ + dirs: list[str] = [] + for candidate in ("plugins",): + abs_path: str = os.path.abspath(candidate) + if os.path.isdir(abs_path): + dirs.append(abs_path) + return dirs + + async def start(self) -> None: + """启动新版本插件运行时 + + 应在 plugin_manager.load_all_plugins() 之后调用。 + """ + if self._started: + logger.warning("PluginRuntimeManager 已在运行中,跳过重复启动") + return + + from src.plugin_runtime.host.supervisor import PluginSupervisor + + plugin_dirs: list[str] = self._get_plugin_dirs() + if not plugin_dirs: + logger.info("未找到插件目录,跳过新版本插件运行时启动") + return + + self._supervisor = PluginSupervisor(plugin_dirs=plugin_dirs) + + # 注册主程序提供的能力实现 + self._register_capability_impls() + + try: + await self._supervisor.start() + self._started = True + logger.info(f"新版本插件运行时已启动,监控目录: {plugin_dirs}") + except Exception as e: + logger.error(f"新版本插件运行时启动失败: {e}", exc_info=True) + self._supervisor = None + + async def stop(self) -> None: + """停止新版本插件运行时""" + if not self._started or self._supervisor is None: + return + + try: + await self._supervisor.stop() + logger.info("新版本插件运行时已停止") + except Exception as e: + logger.error(f"新版本插件运行时停止失败: {e}", exc_info=True) + finally: + self._started = False + self._supervisor = None + + @property + def is_running(self) -> bool: + return self._started + + @property + def supervisor(self) -> Any: + """获取底层 Supervisor(供高级用途)""" + return self._supervisor + + # ─── 事件桥接 ────────────────────────────────────────────── + + async def bridge_event( + self, + event_type_value: str, + message_dict: dict[str, Any] | None = None, + extra_args: dict[str, Any] | None = None, + ) -> tuple[bool, dict[str, Any] | None]: + """将旧系统事件转发到新版本插件运行时 + + Args: + event_type_value: 旧 EventType 的 .value(如 "on_message") + message_dict: 序列化后的消息字典(MaiMessages 转 dict) + extra_args: 额外参数 + + Returns: + (continue_flag, modified_message_dict) + """ + if not self._started or self._supervisor is None: + return True, None + + new_event_type: str = _EVENT_TYPE_MAP.get(event_type_value, event_type_value) + + try: + return await self._supervisor.dispatch_event( + event_type=new_event_type, + message=message_dict, + extra_args=extra_args, + ) + except Exception as e: + logger.error(f"桥接事件 {new_event_type} 到新运行时失败: {e}", exc_info=True) + return True, None + + # ─── 命令桥接 ────────────────────────────────────────────── + + def find_command_by_text(self, text: str) -> dict[str, Any] | None: + """在新版本插件运行时的 ComponentRegistry 中查找命令 + + Returns: + 匹配结果字典 {"component": RegisteredComponent, "match": re.Match} + 或 None + """ + if not self._started or self._supervisor is None: + return None + + return self._supervisor.component_registry.find_command_by_text(text) + + # ─── 能力实现注册 ────────────────────────────────────────── + + def _register_capability_impls(self) -> None: + """注册主程序提供的能力实现 + + 新版本插件通过 cap.request RPC 请求能力调用, + Host 端的 CapabilityService 需要真正的能力实现来处理这些请求。 + 这里注册主程序中可用的功能接口。 + """ + if self._supervisor is None: + return + + cap_service = self._supervisor.capability_service + + # 注册 send.* 能力 + cap_service.register_capability("send.text", self._cap_send_text) + cap_service.register_capability("send.emoji", self._cap_send_emoji) + cap_service.register_capability("send.image", self._cap_send_image) + + # 注册 llm.* 能力 + cap_service.register_capability("llm.generate", self._cap_llm_generate) + + # 注册 config.* 能力 + cap_service.register_capability("config.get", self._cap_config_get) + + logger.debug("已注册主程序能力实现") + + # ─── 能力实现 ────────────────────────────────────────────── + + @staticmethod + async def _cap_send_text(plugin_id: str, capability: str, args: dict[str, Any]) -> Any: + """发送文本消息能力实现 + + 注意: chat_stream 模块已被移除,send.text 能力暂不可用, + 待新的消息发送接口稳定后再接入。 + """ + return {"success": False, "error": "send.text 尚未接入(chat_stream 已移除)"} + + @staticmethod + async def _cap_send_emoji(plugin_id: str, capability: str, args: dict[str, Any]) -> Any: + """发送表情能力实现""" + return {"success": False, "error": "send.emoji 尚未实现"} + + @staticmethod + async def _cap_send_image(plugin_id: str, capability: str, args: dict[str, Any]) -> Any: + """发送图片能力实现""" + return {"success": False, "error": "send.image 尚未实现"} + + @staticmethod + async def _cap_llm_generate(plugin_id: str, capability: str, args: dict[str, Any]) -> Any: + """LLM 生成能力实现""" + return {"success": False, "error": "llm.generate 尚未完全接入,请使用旧系统的 LLM API"} + + @staticmethod + async def _cap_config_get(plugin_id: str, capability: str, args: dict[str, Any]) -> Any: + """配置读取能力实现""" + from src.plugin_system.core import component_registry as old_registry + + plugin_name: str = args.get("plugin_name", plugin_id) + key: str = args.get("key", "") + + try: + config = old_registry.get_plugin_config(plugin_name) + if config is None: + return {"success": False, "value": None, "error": f"未找到插件 {plugin_name} 的配置"} + + if key: + parts = key.split(".") + value: Any = config + for part in parts: + if isinstance(value, dict): + value = value.get(part) + else: + return {"success": False, "value": None, "error": f"配置路径无效: {key}"} + return {"success": True, "value": value} + + return {"success": True, "value": config} + except Exception as e: + return {"success": False, "value": None, "error": str(e)} + + +# ─── 单例 ────────────────────────────────────────────────── + +_manager: PluginRuntimeManager | None = None + + +def get_plugin_runtime_manager() -> PluginRuntimeManager: + """获取 PluginRuntimeManager 全局单例""" + global _manager + if _manager is None: + _manager = PluginRuntimeManager() + return _manager diff --git a/src/plugin_runtime/runner/manifest_validator.py b/src/plugin_runtime/runner/manifest_validator.py index 0df0c74d..7419dc21 100644 --- a/src/plugin_runtime/runner/manifest_validator.py +++ b/src/plugin_runtime/runner/manifest_validator.py @@ -6,10 +6,11 @@ from typing import Any -import logging import re -logger = logging.getLogger("plugin_runtime.runner.manifest_validator") +from src.common.logger import get_logger + +logger = get_logger("plugin_runtime.runner.manifest_validator") class VersionComparator: diff --git a/src/plugin_runtime/runner/plugin_loader.py b/src/plugin_runtime/runner/plugin_loader.py index 26d8f16f..8978944d 100644 --- a/src/plugin_runtime/runner/plugin_loader.py +++ b/src/plugin_runtime/runner/plugin_loader.py @@ -11,13 +11,13 @@ from typing import Any import importlib import importlib.util import json -import logging import os import sys +from src.common.logger import get_logger from src.plugin_runtime.runner.manifest_validator import ManifestValidator -logger = logging.getLogger("plugin_runtime.runner.plugin_loader") +logger = get_logger("plugin_runtime.runner.plugin_loader") class PluginMeta: diff --git a/src/plugin_runtime/runner/rpc_client.py b/src/plugin_runtime/runner/rpc_client.py index 0e525e34..76581e4b 100644 --- a/src/plugin_runtime/runner/rpc_client.py +++ b/src/plugin_runtime/runner/rpc_client.py @@ -12,9 +12,9 @@ from typing import Any, Callable, Awaitable import asyncio import contextlib -import logging import uuid +from src.common.logger import get_logger from src.plugin_runtime.protocol.codec import Codec, MsgPackCodec from src.plugin_runtime.protocol.envelope import ( Envelope, @@ -27,7 +27,7 @@ from src.plugin_runtime.protocol.errors import ErrorCode, RPCError from src.plugin_runtime.transport.base import Connection from src.plugin_runtime.transport.factory import create_transport_client -logger = logging.getLogger("plugin_runtime.runner.rpc_client") +logger = get_logger("plugin_runtime.runner.rpc_client") # RPC 方法处理器类型 MethodHandler = Callable[[Envelope], Awaitable[Envelope]] diff --git a/src/plugin_runtime/runner/runner_main.py b/src/plugin_runtime/runner/runner_main.py index 4972129f..c4fe84f9 100644 --- a/src/plugin_runtime/runner/runner_main.py +++ b/src/plugin_runtime/runner/runner_main.py @@ -12,12 +12,12 @@ import asyncio import contextlib import inspect -import logging import os import signal import sys import time +from src.common.logger import get_logger, initialize_logging from src.plugin_runtime.protocol.envelope import ( ComponentDeclaration, Envelope, @@ -30,7 +30,7 @@ from src.plugin_runtime.protocol.errors import ErrorCode from src.plugin_runtime.runner.plugin_loader import PluginLoader, PluginMeta from src.plugin_runtime.runner.rpc_client import RPCClient -logger = logging.getLogger("plugin_runtime.runner.main") +logger = get_logger("plugin_runtime.runner.main") class PluginRunner: @@ -347,10 +347,7 @@ async def _async_main() -> None: def main() -> None: """进程入口(python -m src.plugin_runtime.runner.runner_main)""" - logging.basicConfig( - level=logging.INFO, - format="%(asctime)s [%(name)s] %(levelname)s: %(message)s", - ) + initialize_logging(verbose=False) asyncio.run(_async_main()) diff --git a/src/plugin_system/core/events_manager.py b/src/plugin_system/core/events_manager.py index a81cc462..be435848 100644 --- a/src/plugin_system/core/events_manager.py +++ b/src/plugin_system/core/events_manager.py @@ -125,6 +125,11 @@ class EventsManager: # 异步执行,不阻塞 self._dispatch_handler_task(handler, event_type, transformed_message) + # 桥接到新版本插件运行时 + continue_flag, modified_message = await self._bridge_to_new_runtime( + event_type, continue_flag, modified_message or transformed_message + ) + return continue_flag, modified_message async def handle_workflow_message( @@ -329,6 +334,42 @@ class EventsManager: additional_data={"response_is_processed": True}, ) + async def _bridge_to_new_runtime( + self, + event_type: EventType | str, + continue_flag: bool, + message: Optional[MaiMessages], + ) -> Tuple[bool, Optional[MaiMessages]]: + """将事件桥接到新版本插件运行时 + + 如果旧 handler 已经 abort(continue_flag=False),直接跳过。 + """ + if not continue_flag: + return continue_flag, message + + try: + from src.plugin_runtime.integration import get_plugin_runtime_manager + + prm = get_plugin_runtime_manager() + if not prm.is_running: + return continue_flag, message + + event_value = event_type.value if isinstance(event_type, EventType) else str(event_type) + message_dict = message.to_dict() if message and hasattr(message, "to_dict") else None + + new_continue, new_msg_dict = await prm.bridge_event( + event_type_value=event_value, + message_dict=message_dict, + ) + # 新运行时返回 abort 则合并 + if not new_continue: + continue_flag = False + + except Exception as e: + logger.warning(f"桥接事件到新运行时失败: {e}") + + return continue_flag, message + def _prepare_message( self, event_type: EventType | str,