From 702316ae575b63de860f2376a35ae6b0f026d814 Mon Sep 17 00:00:00 2001 From: Losita <2810873701@qq.com> Date: Tue, 12 May 2026 22:02:15 +0800 Subject: [PATCH] fix(plugin-runtime): surface detailed send failures for plugin tools --- pytests/test_send_service.py | 16 ++ src/plugin_runtime/capabilities/core.py | 16 +- src/plugin_runtime/runner/runner_main.py | 57 ++++- src/services/send_service.py | 300 +++++++++++++++++++++++ 4 files changed, 380 insertions(+), 9 deletions(-) diff --git a/pytests/test_send_service.py b/pytests/test_send_service.py index afad8dd6..23d51eb6 100644 --- a/pytests/test_send_service.py +++ b/pytests/test_send_service.py @@ -284,6 +284,22 @@ async def test_text_to_stream_returns_false_when_platform_io_fails(monkeypatch: assert len(fake_manager.sent_messages) == 1 +@pytest.mark.asyncio +async def test_custom_to_stream_detailed_raises_stream_not_found(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + send_service._chat_manager, + "get_session_by_session_id", + lambda stream_id: None, + ) + + with pytest.raises(send_service.SendServiceError, match="未找到聊天流: group_chat"): + await send_service.custom_to_stream_detailed( + message_type="poke", + content={"qq_id": "2810873701"}, + stream_id="group_chat", + ) + + @pytest.mark.asyncio async def test_private_outbound_message_preserves_bot_sender_and_receiver_user( monkeypatch: pytest.MonkeyPatch, diff --git a/src/plugin_runtime/capabilities/core.py b/src/plugin_runtime/capabilities/core.py index f445b4fc..f5bcdc4e 100644 --- a/src/plugin_runtime/capabilities/core.py +++ b/src/plugin_runtime/capabilities/core.py @@ -81,7 +81,7 @@ class RuntimeCoreCapabilityMixin: return {"success": False, "error": "缺少必要参数 text 或 stream_id"} try: - result = await send_api.text_to_stream( + await send_api.text_to_stream_with_message_detailed( text=text, stream_id=stream_id, typing=bool(args.get("typing", False)), @@ -90,7 +90,7 @@ class RuntimeCoreCapabilityMixin: sync_to_maisaka_history=sync_to_maisaka_history, maisaka_source_kind=maisaka_source_kind, ) - return {"success": result} + return {"success": True} except Exception as exc: logger.error(f"[cap.send.text] 执行失败: {exc}", exc_info=True) return {"success": False, "error": str(exc)} @@ -117,14 +117,14 @@ class RuntimeCoreCapabilityMixin: return {"success": False, "error": "缺少必要参数 emoji_base64 或 stream_id"} try: - result = await send_api.emoji_to_stream( + await send_api.emoji_to_stream_with_message_detailed( emoji_base64=emoji_base64, stream_id=stream_id, storage_message=bool(args.get("storage_message", True)), sync_to_maisaka_history=sync_to_maisaka_history, maisaka_source_kind=maisaka_source_kind, ) - return {"success": result} + return {"success": True} except Exception as exc: logger.error(f"[cap.send.emoji] 执行失败: {exc}", exc_info=True) return {"success": False, "error": str(exc)} @@ -185,7 +185,7 @@ class RuntimeCoreCapabilityMixin: return {"success": False, "error": "缺少必要参数 command 或 stream_id"} try: - result = await send_api.custom_to_stream( + await send_api.custom_to_stream_detailed( message_type="command", content=command, stream_id=stream_id, @@ -194,7 +194,7 @@ class RuntimeCoreCapabilityMixin: sync_to_maisaka_history=sync_to_maisaka_history, maisaka_source_kind=maisaka_source_kind, ) - return {"success": result} + return {"success": True} except Exception as exc: logger.error(f"[cap.send.command] 执行失败: {exc}", exc_info=True) return {"success": False, "error": str(exc)} @@ -224,7 +224,7 @@ class RuntimeCoreCapabilityMixin: return {"success": False, "error": "缺少必要参数 message_type 或 stream_id"} try: - result = await send_api.custom_to_stream( + await send_api.custom_to_stream_detailed( message_type=message_type, content=content, stream_id=stream_id, @@ -234,7 +234,7 @@ class RuntimeCoreCapabilityMixin: sync_to_maisaka_history=sync_to_maisaka_history, maisaka_source_kind=maisaka_source_kind, ) - return {"success": result} + return {"success": True} except Exception as exc: logger.error(f"[cap.send.custom] 执行失败: {exc}", exc_info=True) return {"success": False, "error": str(exc)} diff --git a/src/plugin_runtime/runner/runner_main.py b/src/plugin_runtime/runner/runner_main.py index b53cab8e..38696b62 100644 --- a/src/plugin_runtime/runner/runner_main.py +++ b/src/plugin_runtime/runner/runner_main.py @@ -88,6 +88,22 @@ _RAISE_ON_FAILED_CAPABILITIES = frozenset( ) +class _PluginLogCaptureHandler(stdlib_logging.Handler): + """捕获插件调用期间的 warning/error 日志。""" + + def __init__(self) -> None: + super().__init__(level=stdlib_logging.WARNING) + self.messages: List[str] = [] + + def emit(self, record: stdlib_logging.LogRecord) -> None: + try: + message = record.getMessage().strip() + except Exception: + message = "" + if message: + self.messages.append(message) + + class _ContextAwarePlugin(Protocol): """支持注入运行时上下文的插件协议。 @@ -548,6 +564,36 @@ class PluginRunner: cast(_ContextAwarePlugin, instance)._set_context(ctx) logger.debug(f"已为插件 {plugin_id} 注入 PluginContext") + @staticmethod + def _enrich_failed_invoke_result(result: Any, log_messages: List[str]) -> Any: + """将插件工具失败结果补充为包含最近错误详情的结构。""" + if not isinstance(result, dict) or result.get("success", True) is not False: + return result + + detail = "" + for item in reversed(log_messages): + normalized = str(item or "").strip() + if normalized: + detail = normalized + break + if not detail: + return result + + current_error = str(result.get("error") or "").strip() + current_message = str(result.get("message") or result.get("content") or "").strip() + + if not current_error: + result["error"] = detail + elif detail not in current_error: + result["error"] = f"{current_error};{detail}" + + if current_message: + if detail not in current_message: + result["message"] = f"{current_message}(原因:{detail})" + else: + result["message"] = detail + return result + def _apply_plugin_config(self, meta: PluginMeta, config_data: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """在 Runner 侧为插件实例注入当前插件配置。 @@ -1659,6 +1705,9 @@ class PluginRunner: component_name = invoke.component_name handler_method = self._resolve_component_handler(meta, component_name) + plugin_logger = stdlib_logging.getLogger(f"plugin.{plugin_id}") + capture_handler = _PluginLogCaptureHandler() + plugin_logger.addHandler(capture_handler) # 回退: 旧版 LegacyPluginAdapter 通过 invoke_component 统一桥接 if (handler_method is None or not callable(handler_method)) and hasattr(meta.instance, "invoke_component"): @@ -1683,12 +1732,18 @@ class PluginRunner: if inspect.iscoroutinefunction(handler_method) else handler_method(**invoke.args) ) + result = self._enrich_failed_invoke_result(result, capture_handler.messages) resp_payload = InvokeResultPayload(success=True, result=result) return envelope.make_response(payload=resp_payload.model_dump()) except Exception as e: logger.error(f"插件 {plugin_id} 组件 {component_name} 执行异常: {e}", exc_info=True) - resp_payload = InvokeResultPayload(success=False, result=str(e)) + error_text = str(e).strip() + if not error_text and capture_handler.messages: + error_text = capture_handler.messages[-1] + resp_payload = InvokeResultPayload(success=False, result=error_text) return envelope.make_response(payload=resp_payload.model_dump()) + finally: + plugin_logger.removeHandler(capture_handler) async def _handle_llm_provider_invoke(self, envelope: Envelope) -> Envelope: """处理 LLM Provider 调用请求。 diff --git a/src/services/send_service.py b/src/services/send_service.py index bd2e986d..130550ca 100644 --- a/src/services/send_service.py +++ b/src/services/send_service.py @@ -48,6 +48,10 @@ from src.plugin_runtime.host.hook_spec_registry import HookSpec, HookSpecRegistr logger = get_logger("send_service") +class SendServiceError(RuntimeError): + """发送服务错误。""" + + def register_send_service_hook_specs(registry: HookSpecRegistry) -> List[HookSpec]: """注册发送服务内置 Hook 规格。 @@ -581,6 +585,34 @@ def _build_outbound_session_message( return outbound_message +def _build_outbound_session_message_detailed( + message_sequence: MessageSequence, + stream_id: str, + processed_plain_text: str = "", + reply_message: Optional[MaiMessage] = None, + selected_expressions: Optional[List[int]] = None, +) -> SessionMessage: + """根据目标会话构建待发送消息,失败时抛出带详情的异常。""" + outbound_message = _build_outbound_session_message( + message_sequence=message_sequence, + stream_id=stream_id, + processed_plain_text=processed_plain_text, + reply_message=reply_message, + selected_expressions=selected_expressions, + ) + if outbound_message is not None: + return outbound_message + + target_stream = _chat_manager.get_session_by_session_id(stream_id) + if target_stream is None: + raise SendServiceError(f"未找到聊天流: {stream_id}") + + if not get_bot_account(target_stream.platform): + raise SendServiceError(f"平台 {target_stream.platform} 未配置机器人账号,无法发送消息") + + raise SendServiceError("构建出站消息失败") + + def _ensure_reply_component(message: SessionMessage, reply_message_id: str) -> None: """为消息补充回复组件。 @@ -743,6 +775,18 @@ def _log_platform_io_failures(delivery_batch: DeliveryBatch) -> None: logger.warning(f"[SendService] Platform IO 发送失败: platform={delivery_batch.route_key.platform} {failed_details}") +def _build_platform_io_failure_message(delivery_batch: DeliveryBatch) -> str: + """构造 Platform IO 发送失败的紧凑错误信息。""" + failed_details = [ + str(receipt.error or "").strip() + for receipt in delivery_batch.failed_receipts + if str(receipt.error or "").strip() + ] + if failed_details: + return "; ".join(failed_details) + return "未命中任何发送路由" + + async def _send_via_platform_io( message: SessionMessage, *, @@ -884,6 +928,117 @@ async def send_session_message_with_message( return sent_message +async def send_session_message_with_message_detailed( + message: SessionMessage, + *, + typing: bool = False, + set_reply: bool = False, + reply_message_id: Optional[str] = None, + storage_message: bool = True, + show_log: bool = True, + sync_to_maisaka_history: bool = False, + maisaka_source_kind: str = "outbound_send", +) -> SessionMessage: + """统一发送一条内部消息,失败时抛出带详情的异常。""" + if not message.message_id: + logger.error("[SendService] 消息缺少 message_id,无法发送") + raise SendServiceError("消息缺少 message_id,无法发送") + + try: + before_send_result, message = await _invoke_send_hook( + "send_service.before_send", + message, + typing=typing, + set_reply=set_reply, + reply_message_id=reply_message_id, + storage_message=storage_message, + show_log=show_log, + ) + if before_send_result.aborted: + logger.info(f"[SendService] 消息 {message.message_id} 在发送前被 Hook 中止") + raise SendServiceError("消息在发送前被 Hook 中止") + + before_kwargs = before_send_result.kwargs + typing = _coerce_bool(before_kwargs.get("typing"), typing) + set_reply = _coerce_bool(before_kwargs.get("set_reply"), set_reply) + storage_message = _coerce_bool(before_kwargs.get("storage_message"), storage_message) + show_log = _coerce_bool(before_kwargs.get("show_log"), show_log) + raw_reply_message_id = before_kwargs.get("reply_message_id", reply_message_id) + reply_message_id = None if raw_reply_message_id in {None, ""} else str(raw_reply_message_id) + + platform_io_manager = get_platform_io_manager() + try: + await platform_io_manager.ensure_send_pipeline_ready() + except Exception as exc: + logger.error(f"[SendService] 准备 Platform IO 发送管线失败: {exc}") + logger.debug(traceback.format_exc()) + raise SendServiceError(f"准备 Platform IO 发送管线失败: {exc}") from exc + + try: + route_key = platform_io_manager.build_route_key_from_message(message) + except Exception as exc: + logger.warning(f"[SendService] 根据消息构造 Platform IO 路由键失败: {exc}") + raise SendServiceError(f"根据消息构造 Platform IO 路由键失败: {exc}") from exc + + try: + await _prepare_message_for_platform_io( + message, + typing=typing, + set_reply=set_reply, + reply_message_id=reply_message_id, + ) + delivery_batch = await platform_io_manager.send_message( + message, + route_key, + metadata={"show_log": False}, + ) + except Exception as exc: + logger.error(f"[SendService] Platform IO 发送异常: {exc}") + logger.debug(traceback.format_exc()) + raise SendServiceError(f"Platform IO 发送异常: {exc}") from exc + + sent = bool(delivery_batch.has_success) + if sent: + await _apply_successful_delivery_receipt(message, delivery_batch) + await _dispatch_adapter_callbacks(delivery_batch) + await _invoke_send_hook( + "send_service.after_send", + message, + sent=sent, + typing=typing, + set_reply=set_reply, + reply_message_id=reply_message_id, + storage_message=storage_message, + show_log=show_log, + ) + + if delivery_batch.has_success: + if storage_message: + _store_sent_message(message) + await _notify_memory_automation_on_message_sent(message) + if show_log: + successful_driver_ids = [ + receipt.driver_id or "unknown" + for receipt in delivery_batch.sent_receipts + ] + logger.info( + f"[SendService] 已通过 Platform IO 将消息发往平台 '{route_key.platform}' " + f"(drivers: {', '.join(successful_driver_ids)}) " + f"message={_build_outbound_log_preview(message)}" + ) + if sync_to_maisaka_history: + _sync_sent_message_to_maisaka_history( + message, + source_kind=str(maisaka_source_kind or "outbound_send"), + ) + return message + + _log_platform_io_failures(delivery_batch) + raise SendServiceError(_build_platform_io_failure_message(delivery_batch)) + except SendServiceError: + raise + + async def send_session_message( message: SessionMessage, *, @@ -1054,6 +1209,76 @@ async def _send_to_target_with_message( return None +async def _send_to_target_with_message_detailed( + message_sequence: MessageSequence, + stream_id: str, + processed_plain_text: str = "", + typing: bool = False, + set_reply: bool = False, + reply_message: Optional[MaiMessage] = None, + storage_message: bool = True, + show_log: bool = True, + selected_expressions: Optional[List[int]] = None, + sync_to_maisaka_history: bool = False, + maisaka_source_kind: str = "outbound_send", +) -> SessionMessage: + """向指定目标构建并发送消息,失败时抛出带详情的异常。""" + try: + if set_reply and reply_message is None: + logger.warning("[SendService] 使用引用回复,但未提供回复消息") + raise SendServiceError("使用引用回复,但未提供回复消息") + + if show_log: + logger.debug(f"[SendService] 发送{_describe_message_sequence(message_sequence)}消息到 {stream_id}") + + outbound_message = _build_outbound_session_message_detailed( + message_sequence=message_sequence, + stream_id=stream_id, + processed_plain_text=processed_plain_text, + reply_message=reply_message, + selected_expressions=selected_expressions, + ) + + after_build_result, outbound_message = await _invoke_send_hook( + "send_service.after_build_message", + outbound_message, + stream_id=stream_id, + processed_plain_text=processed_plain_text, + typing=typing, + set_reply=set_reply, + storage_message=storage_message, + show_log=show_log, + ) + if after_build_result.aborted: + logger.info(f"[SendService] 消息 {outbound_message.message_id} 在构建后被 Hook 中止") + raise SendServiceError(f"消息 {outbound_message.message_id} 在构建后被 Hook 中止") + + after_build_kwargs = after_build_result.kwargs + typing = _coerce_bool(after_build_kwargs.get("typing"), typing) + set_reply = _coerce_bool(after_build_kwargs.get("set_reply"), set_reply) + storage_message = _coerce_bool(after_build_kwargs.get("storage_message"), storage_message) + show_log = _coerce_bool(after_build_kwargs.get("show_log"), show_log) + + sent_message = await send_session_message_with_message_detailed( + outbound_message, + typing=typing, + set_reply=set_reply, + reply_message_id=reply_message.message_id if reply_message is not None else None, + storage_message=storage_message, + show_log=show_log, + sync_to_maisaka_history=sync_to_maisaka_history, + maisaka_source_kind=maisaka_source_kind, + ) + logger.debug(f"[SendService] 成功发送消息到 {stream_id}") + return sent_message + except SendServiceError: + raise + except Exception as exc: + logger.error(f"[SendService] 发送消息时出错: {exc}") + logger.debug(traceback.format_exc()) + raise SendServiceError(f"发送消息时出错: {exc}") from exc + + async def text_to_stream_with_message( text: str, stream_id: str, @@ -1079,6 +1304,31 @@ async def text_to_stream_with_message( ) +async def text_to_stream_with_message_detailed( + text: str, + stream_id: str, + typing: bool = False, + set_reply: bool = False, + reply_message: Optional[MaiMessage] = None, + storage_message: bool = True, + selected_expressions: Optional[List[int]] = None, + sync_to_maisaka_history: bool = False, + maisaka_source_kind: str = "outbound_send", +) -> SessionMessage: + """向指定流发送文本消息,失败时抛出带详情的异常。""" + return await _send_to_target_with_message_detailed( + message_sequence=MessageSequence(components=[TextComponent(text=text)]), + stream_id=stream_id, + typing=typing, + set_reply=set_reply, + reply_message=reply_message, + storage_message=storage_message, + selected_expressions=selected_expressions, + sync_to_maisaka_history=sync_to_maisaka_history, + maisaka_source_kind=maisaka_source_kind, + ) + + async def text_to_stream( text: str, stream_id: str, @@ -1142,6 +1392,28 @@ async def emoji_to_stream_with_message( ) +async def emoji_to_stream_with_message_detailed( + emoji_base64: str, + stream_id: str, + storage_message: bool = True, + set_reply: bool = False, + reply_message: Optional[MaiMessage] = None, + sync_to_maisaka_history: bool = False, + maisaka_source_kind: str = "outbound_send", +) -> SessionMessage: + """向指定流发送表情消息,失败时抛出带详情的异常。""" + return await _send_to_target_with_message_detailed( + message_sequence=_build_message_sequence_from_custom_message("emoji", emoji_base64), + stream_id=stream_id, + typing=False, + storage_message=storage_message, + set_reply=set_reply, + reply_message=reply_message, + sync_to_maisaka_history=sync_to_maisaka_history, + maisaka_source_kind=maisaka_source_kind, + ) + + async def emoji_to_stream( emoji_base64: str, stream_id: str, @@ -1253,6 +1525,34 @@ async def custom_to_stream( ) +async def custom_to_stream_detailed( + message_type: str, + content: str | Dict[str, Any], + stream_id: str, + processed_plain_text: str = "", + typing: bool = False, + reply_message: Optional[MaiMessage] = None, + set_reply: bool = False, + storage_message: bool = True, + show_log: bool = True, + sync_to_maisaka_history: bool = False, + maisaka_source_kind: str = "outbound_send", +) -> SessionMessage: + """向指定流发送自定义类型消息,失败时抛出带详情的异常。""" + return await _send_to_target_with_message_detailed( + message_sequence=_build_message_sequence_from_custom_message(message_type, content), + stream_id=stream_id, + processed_plain_text=processed_plain_text, + typing=typing, + reply_message=reply_message, + set_reply=set_reply, + storage_message=storage_message, + show_log=show_log, + sync_to_maisaka_history=sync_to_maisaka_history, + maisaka_source_kind=maisaka_source_kind, + ) + + async def custom_reply_set_to_stream( reply_set: MessageSequence, stream_id: str,