feat(plugin-system): add workflow pipeline and cross-plugin service registry

This commit is contained in:
DrSmoothl
2026-02-20 19:24:05 +08:00
parent 6bcd7cbebb
commit 6fcc53a22b
16 changed files with 1036 additions and 19 deletions

View File

@@ -1,9 +1,9 @@
import os
import traceback
from typing import Dict, List, Optional, Tuple, Type, Any
from importlib.util import spec_from_file_location, module_from_spec
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple, Type
from collections import deque
import os
import traceback
from src.common.logger import get_logger
@@ -11,6 +11,7 @@ from src.plugin_system.base.plugin_base import PluginBase
from src.plugin_system.base.component_types import ComponentType
from src.plugin_system.utils.manifest_utils import VersionComparator
from .component_registry import component_registry
from .plugin_service_registry import plugin_service_registry
logger = get_logger("plugin_manager")
@@ -70,10 +71,36 @@ class PluginManager:
logger.debug(f"插件模块加载完成 - 成功: {total_loaded_modules}, 失败: {total_failed_modules}")
# 第二阶段前:根据依赖关系决定插件加载顺序
dependency_graph, missing_dependencies = self._build_plugin_dependency_graph()
sorted_plugins, cycle_plugins = self._resolve_plugin_load_order(dependency_graph)
total_registered = 0
total_failed_registration = 0
for plugin_name in self.plugin_classes.keys():
# 先处理缺失依赖
for plugin_name, missing in missing_dependencies.items():
if not missing:
continue
missing_dep_names = ", ".join(sorted(missing))
self.failed_plugins[plugin_name] = f"缺少依赖插件: {missing_dep_names}"
logger.error(f"❌ 插件加载失败: {plugin_name} - 缺少依赖插件: {missing_dep_names}")
total_failed_registration += 1
# 再处理循环依赖
for plugin_name in sorted(cycle_plugins):
if plugin_name in missing_dependencies and missing_dependencies[plugin_name]:
continue
self.failed_plugins[plugin_name] = "检测到循环依赖"
logger.error(f"❌ 插件加载失败: {plugin_name} - 检测到循环依赖")
total_failed_registration += 1
# 最后按拓扑序加载可加载插件
for plugin_name in sorted_plugins:
if plugin_name in cycle_plugins:
continue
if plugin_name in missing_dependencies and missing_dependencies[plugin_name]:
continue
load_status, count = self.load_registered_plugin_classes(plugin_name)
if load_status:
total_registered += 1
@@ -165,6 +192,7 @@ class PluginManager:
for component in plugin_info.components:
success &= await component_registry.remove_component(component.name, component.component_type, plugin_name)
success &= component_registry.remove_plugin_registry(plugin_name)
plugin_service_registry.remove_services_by_plugin(plugin_name)
del self.loaded_plugins[plugin_name]
return success
@@ -313,6 +341,89 @@ class PluginManager:
self.failed_plugins[module_name] = error_msg
return False
# == 依赖解析与加载顺序 ==
def _extract_declared_dependencies(self, plugin_name: str, plugin_class: Type[PluginBase]) -> Set[str]:
"""提取插件声明的依赖。
兼容声明格式:
- list[str]
- list[dict]其中dict至少包含name键
"""
dependencies: Set[str] = set()
raw_dependencies = getattr(plugin_class, "dependencies", [])
# 兼容错误声明
if isinstance(raw_dependencies, property):
logger.warning(f"插件 {plugin_name} 的 dependencies 未声明为类属性,将按无依赖处理")
return dependencies
if not isinstance(raw_dependencies, list):
logger.warning(f"插件 {plugin_name} 的 dependencies 不是列表,将按无依赖处理")
return dependencies
for dependency in raw_dependencies:
dependency_name = ""
if isinstance(dependency, str):
dependency_name = dependency.strip()
elif isinstance(dependency, dict):
dependency_name = str(dependency.get("name", "")).strip()
else:
logger.warning(f"插件 {plugin_name} 包含不支持的依赖声明类型: {type(dependency)}")
continue
if not dependency_name:
continue
if dependency_name == plugin_name:
logger.warning(f"插件 {plugin_name} 声明了对自身的依赖,已忽略")
continue
dependencies.add(dependency_name)
return dependencies
def _build_plugin_dependency_graph(self) -> Tuple[Dict[str, Set[str]], Dict[str, Set[str]]]:
"""构建依赖图并返回缺失依赖映射。"""
plugin_names = set(self.plugin_classes.keys())
dependency_graph: Dict[str, Set[str]] = {name: set() for name in plugin_names}
missing_dependencies: Dict[str, Set[str]] = {name: set() for name in plugin_names}
for plugin_name, plugin_class in self.plugin_classes.items():
declared_dependencies = self._extract_declared_dependencies(plugin_name, plugin_class)
for dependency_name in declared_dependencies:
if dependency_name in plugin_names:
dependency_graph[plugin_name].add(dependency_name)
else:
missing_dependencies[plugin_name].add(dependency_name)
return dependency_graph, missing_dependencies
def _resolve_plugin_load_order(self, dependency_graph: Dict[str, Set[str]]) -> Tuple[List[str], Set[str]]:
"""根据依赖图计算加载顺序,并检测循环依赖。"""
indegree: Dict[str, int] = {plugin_name: len(dependencies) for plugin_name, dependencies in dependency_graph.items()}
reverse_graph: Dict[str, Set[str]] = {plugin_name: set() for plugin_name in dependency_graph}
for plugin_name, dependencies in dependency_graph.items():
for dependency_name in dependencies:
reverse_graph[dependency_name].add(plugin_name)
zero_indegree_queue = deque(sorted([name for name, degree in indegree.items() if degree == 0]))
load_order: List[str] = []
while zero_indegree_queue:
current_plugin = zero_indegree_queue.popleft()
load_order.append(current_plugin)
for dependent_plugin in sorted(reverse_graph[current_plugin]):
indegree[dependent_plugin] -= 1
if indegree[dependent_plugin] == 0:
zero_indegree_queue.append(dependent_plugin)
cycle_plugins = {name for name, degree in indegree.items() if degree > 0}
if cycle_plugins:
logger.error(f"检测到循环依赖插件: {', '.join(sorted(cycle_plugins))}")
return load_order, cycle_plugins
# == 兼容性检查 ==
def _check_plugin_version_compatibility(self, plugin_name: str, manifest_data: Dict[str, Any]) -> Tuple[bool, str]: