feat: 添加组件启停目标解析功能,支持全局唯一短名,避免跨 Supervisor 误操作

This commit is contained in:
DrSmoothl
2026-03-13 16:15:08 +08:00
parent e9badbc307
commit 29f4d05a87
2 changed files with 105 additions and 28 deletions

View File

@@ -1696,6 +1696,66 @@ class TestSupervisor:
class TestIntegration:
"""运行时集成层启动/清理测试"""
@pytest.mark.asyncio
async def test_component_enable_rejects_ambiguous_short_name(self, monkeypatch):
from src.plugin_runtime import integration as integration_module
from src.plugin_runtime.host.component_registry import ComponentRegistry
class FakeSupervisor:
def __init__(self, plugin_id: str):
self.component_registry = ComponentRegistry()
self.component_registry.register_component(
name="shared",
component_type="tool",
plugin_id=plugin_id,
metadata={},
)
class FakeManager:
def __init__(self):
self.supervisors = [FakeSupervisor("plugin_a"), FakeSupervisor("plugin_b")]
monkeypatch.setattr(integration_module, "get_plugin_runtime_manager", lambda: FakeManager())
result = await integration_module.PluginRuntimeManager._cap_component_enable(
"plugin_a",
"component.enable",
{"name": "shared", "component_type": "tool", "scope": "global", "stream_id": ""},
)
assert result["success"] is False
assert "组件名不唯一" in result["error"]
@pytest.mark.asyncio
async def test_component_disable_rejects_non_global_scope(self, monkeypatch):
from src.plugin_runtime import integration as integration_module
from src.plugin_runtime.host.component_registry import ComponentRegistry
class FakeSupervisor:
def __init__(self):
self.component_registry = ComponentRegistry()
self.component_registry.register_component(
name="handler",
component_type="tool",
plugin_id="plugin_a",
metadata={},
)
class FakeManager:
def __init__(self):
self.supervisors = [FakeSupervisor()]
monkeypatch.setattr(integration_module, "get_plugin_runtime_manager", lambda: FakeManager())
result = await integration_module.PluginRuntimeManager._cap_component_disable(
"plugin_a",
"component.disable",
{"name": "plugin_a.handler", "component_type": "tool", "scope": "stream", "stream_id": "s1"},
)
assert result["success"] is False
assert "仅支持全局组件禁用" in result["error"]
@pytest.mark.asyncio
async def test_start_cleans_up_started_supervisors_on_failure(self, monkeypatch):
from src.plugin_runtime import integration as integration_module
@@ -1740,8 +1800,6 @@ class TestIntegration:
@pytest.mark.asyncio
async def test_handle_plugin_file_changes_routes_reload_and_config_update(self, monkeypatch, tmp_path):
from pathlib import Path
from src.config.file_watcher import FileChange
from src.plugin_runtime import integration as integration_module

View File

@@ -1652,6 +1652,31 @@ class PluginRuntimeManager:
plugins.extend(sv._registered_plugins.keys())
return {"success": True, "plugins": plugins}
@staticmethod
def _resolve_component_toggle_target(name: str, component_type: str) -> tuple[Optional[Any], Optional[str]]:
"""解析组件启停目标。
支持全名 plugin_id.component_name短名仅在全局唯一时允许
否则返回歧义错误,避免跨 Supervisor 误操作。
"""
mgr = get_plugin_runtime_manager()
short_name_matches: List[Any] = []
for sv in mgr.supervisors:
comp = sv.component_registry.get_component(name)
if comp is not None and comp.component_type == component_type:
return comp, None
for candidate in sv.component_registry.get_components_by_type(component_type, enabled_only=False):
if candidate.name == name:
short_name_matches.append(candidate)
if len(short_name_matches) == 1:
return short_name_matches[0], None
if len(short_name_matches) > 1:
return None, f"组件名不唯一: {name} ({component_type}),请使用完整名 plugin_id.component_name"
return None, f"未找到组件: {name} ({component_type})"
@staticmethod
async def _cap_component_enable(plugin_id: str, capability: str, args: Dict[str, Any]) -> Any:
"""启用组件
@@ -1660,23 +1685,19 @@ class PluginRuntimeManager:
"""
name: str = args.get("name", "")
component_type: str = args.get("component_type", "")
scope: str = args.get("scope", "global")
stream_id: str = args.get("stream_id", "")
if not name or not component_type:
return {"success": False, "error": "缺少必要参数 name 或 component_type"}
if scope != "global" or stream_id:
return {"success": False, "error": "当前仅支持全局组件启用,不支持 scope/stream_id 定位"}
# TODO: scope 和 stream_id 参数尚未实现,当前均为全局启用
mgr = get_plugin_runtime_manager()
for sv in mgr.supervisors:
# 先尝试按全名查找plugin_id.component_name
comp = sv.component_registry.get_component(name)
if comp is not None and comp.component_type == component_type:
comp.enabled = True
return {"success": True}
# 回退:按短名 + 类型在该类型索引中搜索
for c in sv.component_registry.get_components_by_type(component_type, enabled_only=False):
if c.name == name:
c.enabled = True
return {"success": True}
return {"success": False, "error": f"未找到组件: {name} ({component_type})"}
comp, error = PluginRuntimeManager._resolve_component_toggle_target(name, component_type)
if comp is None:
return {"success": False, "error": error or f"未找到组件: {name} ({component_type})"}
comp.enabled = True
return {"success": True}
@staticmethod
async def _cap_component_disable(plugin_id: str, capability: str, args: Dict[str, Any]) -> Any:
@@ -1686,21 +1707,19 @@ class PluginRuntimeManager:
"""
name: str = args.get("name", "")
component_type: str = args.get("component_type", "")
scope: str = args.get("scope", "global")
stream_id: str = args.get("stream_id", "")
if not name or not component_type:
return {"success": False, "error": "缺少必要参数 name 或 component_type"}
if scope != "global" or stream_id:
return {"success": False, "error": "当前仅支持全局组件禁用,不支持 scope/stream_id 定位"}
# TODO: scope 和 stream_id 参数尚未实现,当前均为全局禁用
mgr = get_plugin_runtime_manager()
for sv in mgr.supervisors:
comp = sv.component_registry.get_component(name)
if comp is not None and comp.component_type == component_type:
comp.enabled = False
return {"success": True}
for c in sv.component_registry.get_components_by_type(component_type, enabled_only=False):
if c.name == name:
c.enabled = False
return {"success": True}
return {"success": False, "error": f"未找到组件: {name} ({component_type})"}
comp, error = PluginRuntimeManager._resolve_component_toggle_target(name, component_type)
if comp is None:
return {"success": False, "error": error or f"未找到组件: {name} ({component_type})"}
comp.enabled = False
return {"success": True}
@staticmethod
async def _cap_component_load_plugin(plugin_id: str, capability: str, args: Dict[str, Any]) -> Any: