diff --git a/pytests/test_plugin_runtime.py b/pytests/test_plugin_runtime.py index 76dbe316..3de238ee 100644 --- a/pytests/test_plugin_runtime.py +++ b/pytests/test_plugin_runtime.py @@ -582,11 +582,13 @@ class TestComponentRegistry: match = reg.find_command_by_text("/help me") assert match is not None - assert match.name == "help" + comp, groups = match + assert comp.name == "help" match = reg.find_command_by_text("/echo hello") assert match is not None - assert match.name == "echo" + comp, groups = match + assert comp.name == "echo" match = reg.find_command_by_text("no match") assert match is None diff --git a/src/plugin_runtime/host/supervisor.py b/src/plugin_runtime/host/supervisor.py index e9d3b726..6e1c22fe 100644 --- a/src/plugin_runtime/host/supervisor.py +++ b/src/plugin_runtime/host/supervisor.py @@ -209,11 +209,22 @@ class PluginSupervisor: return result if isinstance(result, dict) else {} raise RuntimeError(payload.get("result", "workflow step invoke failed")) + async def _command_invoke(plugin_id: str, component_name: str, args: Dict[str, Any]) -> Dict[str, Any]: + """命令走 plugin.invoke_command,保留原始返回值结构。""" + resp = await self.invoke_plugin( + method="plugin.invoke_command", + plugin_id=plugin_id, + component_name=component_name, + args=args, + ) + return resp.payload + return await self._workflow_executor.execute( invoke_fn=_invoke, message=message, stream_id=stream_id, context=context, + command_invoke_fn=_command_invoke, ) async def start(self) -> None: diff --git a/src/plugin_runtime/host/workflow_executor.py b/src/plugin_runtime/host/workflow_executor.py index ffc4e995..316f6c10 100644 --- a/src/plugin_runtime/host/workflow_executor.py +++ b/src/plugin_runtime/host/workflow_executor.py @@ -118,9 +118,15 @@ class WorkflowExecutor: message: Optional[Dict[str, Any]] = None, stream_id: Optional[str] = None, context: Optional[WorkflowContext] = None, + command_invoke_fn: Optional[InvokeFn] = None, ) -> Tuple[WorkflowResult, Optional[Dict[str, Any]], WorkflowContext]: """执行 workflow pipeline。 + Args: + invoke_fn: 用于 workflow_step 的回调 + command_invoke_fn: 用于 command 的回调(走 plugin.invoke_command), + 未传则复用 invoke_fn + Returns: (result, final_message, context) """ @@ -136,7 +142,7 @@ class WorkflowExecutor: # PLAN 阶段: 先做 Command 路由 if stage == "plan" and current_message: cmd_result = await self._route_command( - invoke_fn, current_message, ctx + command_invoke_fn or invoke_fn, current_message, ctx ) if cmd_result is not None: # 命令匹配成功,跳过 PLAN 阶段的 hook,直接存结果进 stage_outputs @@ -377,10 +383,12 @@ class WorkflowExecutor: if not plain_text: return None - matched = self._registry.find_command_by_text(plain_text) - if matched is None: + match_result = self._registry.find_command_by_text(plain_text) + if match_result is None: return None + matched, matched_groups = match_result + ctx.matched_command = matched.full_name logger.info(f"[{ctx.trace_id}] 命令匹配: {matched.full_name}") @@ -389,6 +397,7 @@ class WorkflowExecutor: "text": plain_text, "message": message, "trace_id": ctx.trace_id, + "matched_groups": matched_groups, }) except Exception as e: logger.error(f"[{ctx.trace_id}] 命令 {matched.full_name} 执行失败: {e}", exc_info=True) diff --git a/src/plugin_runtime/integration.py b/src/plugin_runtime/integration.py index fd29fa06..8842fa31 100644 --- a/src/plugin_runtime/integration.py +++ b/src/plugin_runtime/integration.py @@ -1060,7 +1060,7 @@ class PluginRuntimeManager: async def _cap_message_count_new(plugin_id: str, capability: str, args: Dict[str, Any]) -> Any: """统计新消息数量 - args: chat_id, start_time?, end_time? + args: chat_id, since? | start_time?, end_time? """ from src.services import message_service as message_api @@ -1069,9 +1069,12 @@ class PluginRuntimeManager: return {"success": False, "error": "缺少必要参数 chat_id"} try: + # 兼容 SDK 传 since 和直接传 start_time 两种方式 + since = args.get("since") + start_time = float(since) if since is not None else float(args.get("start_time", 0.0)) count = message_api.count_new_messages( chat_id=chat_id, - start_time=float(args.get("start_time", 0.0)), + start_time=start_time, end_time=args.get("end_time"), ) return {"success": True, "count": count} @@ -1083,21 +1086,29 @@ class PluginRuntimeManager: async def _cap_message_build_readable(plugin_id: str, capability: str, args: Dict[str, Any]) -> Any: """将消息列表构建成可读字符串 - args: chat_id, start_time, end_time, limit?, replace_bot_name?, timestamp_mode? + 支持两种调用方式: + 1. SDK 方式: 传入 messages(已查询的消息列表) + 2. 直接方式: 传入 chat_id + start_time + end_time(Host 端查询) + + args: messages? | chat_id?, start_time?, end_time?, limit?, replace_bot_name?, timestamp_mode? """ from src.services import message_service as message_api - chat_id: str = args.get("chat_id", "") - if not chat_id: - return {"success": False, "error": "缺少必要参数 chat_id"} - try: - messages = message_api.get_messages_by_time_in_chat( - chat_id=chat_id, - start_time=float(args.get("start_time", 0.0)), - end_time=float(args.get("end_time", 0.0)), - limit=args.get("limit", 0), - ) + # 优先使用调用方已提供的消息列表 + messages = args.get("messages") + if messages is None: + # 回退到 chat_id + 时间范围查询 + chat_id: str = args.get("chat_id", "") + if not chat_id: + return {"success": False, "error": "缺少必要参数: messages 或 chat_id"} + messages = message_api.get_messages_by_time_in_chat( + chat_id=chat_id, + start_time=float(args.get("start_time", 0.0)), + end_time=float(args.get("end_time", 0.0)), + limit=args.get("limit", 0), + ) + readable = message_api.build_readable_messages_to_str( messages=messages, replace_bot_name=args.get("replace_bot_name", True),