feat: 添加插件身份绑定机制,防止伪造插件身份的 RPC 调用

This commit is contained in:
DrSmoothl
2026-03-13 15:40:14 +08:00
parent 44a9e9ecd7
commit f1e10b4054
3 changed files with 100 additions and 12 deletions

View File

@@ -313,6 +313,43 @@ class TestSDK:
assert plugin.ctx.llm is not None
assert plugin.ctx.config is not None
@pytest.mark.asyncio
async def test_runner_injected_context_binds_plugin_identity(self):
"""Runner 注入的上下文应忽略调用方伪造的 plugin_id。"""
from src.plugin_runtime.runner.runner_main import PluginRunner
class DummyRPCClient:
def __init__(self):
self.calls = []
async def send_request(self, method, plugin_id="", payload=None, timeout_ms=30000):
self.calls.append(
{
"method": method,
"plugin_id": plugin_id,
"payload": payload,
"timeout_ms": timeout_ms,
}
)
return SimpleNamespace(error=None, payload={"result": {"ok": True}})
class DummyPlugin:
def _set_context(self, ctx):
self.ctx = ctx
runner = PluginRunner(host_address="dummy", session_token="token", plugin_dirs=[])
runner._rpc_client = DummyRPCClient()
plugin = DummyPlugin()
runner._inject_context("owner_plugin", plugin)
plugin.ctx._plugin_id = "forged_plugin"
result = await plugin.ctx.call_capability("send.text", text="hello", stream_id="stream-1")
assert result == {"ok": True}
assert runner._rpc_client.calls[0]["plugin_id"] == "owner_plugin"
assert runner._rpc_client.calls[0]["method"] == "cap.request"
# ─── 端到端集成测试 ────────────────────────────────────────
@@ -1177,6 +1214,46 @@ class TestWorkflowExecutor:
await asyncio.sleep(0.1)
assert result.status == "completed"
@pytest.mark.asyncio
async def test_nonblocking_tasks_are_retained_until_completion(self):
"""execute 返回后non-blocking task 仍应保持强引用直到执行完成。"""
from src.plugin_runtime.host.component_registry import ComponentRegistry
from src.plugin_runtime.host.workflow_executor import WorkflowExecutor
reg = ComponentRegistry()
reg.register_component(
"observer",
"workflow_step",
"p1",
{
"stage": "post_process",
"priority": 0,
"blocking": False,
},
)
executor = WorkflowExecutor(reg)
started = asyncio.Event()
release = asyncio.Event()
async def mock_invoke(plugin_id, comp_name, args):
started.set()
await release.wait()
return {"hook_result": "continue"}
result, final_msg, _ = await executor.execute(mock_invoke, message={"plain_text": "original"})
await asyncio.sleep(0)
assert result.status == "completed"
assert final_msg["plain_text"] == "original"
assert started.is_set()
assert len(executor._background_tasks) == 1
release.set()
await asyncio.sleep(0)
await asyncio.sleep(0)
assert not executor._background_tasks
@pytest.mark.asyncio
async def test_command_routing(self):
"""PLAN 阶段内置命令路由"""