Merge branch 'r-dev' of github.com:Mai-with-u/MaiBot into r-dev

This commit is contained in:
UnCLAS-Prommer
2026-03-15 23:53:59 +08:00
3 changed files with 115 additions and 27 deletions

View File

@@ -6,6 +6,7 @@
from types import SimpleNamespace from types import SimpleNamespace
import asyncio import asyncio
import json
import os import os
import sys import sys
@@ -871,6 +872,63 @@ class TestDependencyResolution:
order, failed = loader._resolve_dependencies(candidates) order, failed = loader._resolve_dependencies(candidates)
assert len(failed) >= 1 # 至少一个循环插件被标记 assert len(failed) >= 1 # 至少一个循环插件被标记
def test_loader_supports_package_imports_inside_create_plugin(self, tmp_path):
from src.plugin_runtime.runner.plugin_loader import PluginLoader
plugin_root = tmp_path / "plugins"
plugin_root.mkdir()
plugin_dir = plugin_root / "grok_search_plugin"
plugin_dir.mkdir()
(plugin_dir / "_manifest.json").write_text(
json.dumps(
{
"name": "grok_search_plugin",
"version": "1.0.0",
"description": "demo",
"author": "tester",
}
),
encoding="utf-8",
)
(plugin_dir / "__init__.py").write_text("VALUE = 1\n", encoding="utf-8")
(plugin_dir / "services.py").write_text("def answer():\n return 42\n", encoding="utf-8")
(plugin_dir / "plugin.py").write_text(
"class DemoPlugin:\n"
" pass\n\n"
"def create_plugin():\n"
" from grok_search_plugin.services import answer\n"
" plugin = DemoPlugin()\n"
" plugin.answer = answer\n"
" return plugin\n",
encoding="utf-8",
)
loader = PluginLoader()
loaded = loader.discover_and_load([str(plugin_root)])
assert [meta.plugin_id for meta in loaded] == ["grok_search_plugin"]
assert loader.failed_plugins == {}
assert loaded[0].instance.answer() == 42
def test_isolate_sys_path_preserves_plugin_dirs(self):
from src.plugin_runtime.runner import runner_main
plugin_root = os.path.normpath("/tmp/maibot-plugin-root")
original_path = list(sys.path)
original_meta_path = list(sys.meta_path)
try:
if plugin_root in sys.path:
sys.path.remove(plugin_root)
runner_main._isolate_sys_path([plugin_root])
assert plugin_root in sys.path
finally:
sys.path[:] = original_path
sys.meta_path[:] = original_meta_path
# ─── Host-side ComponentRegistry 测试 ────────────────────── # ─── Host-side ComponentRegistry 测试 ──────────────────────

View File

@@ -9,6 +9,7 @@
from collections import deque from collections import deque
from typing import Any, Dict, List, Optional, Set, Tuple from typing import Any, Dict, List, Optional, Set, Tuple
import contextlib
import importlib import importlib
import importlib.util import importlib.util
import json import json
@@ -239,6 +240,9 @@ class PluginLoader:
module = importlib.util.module_from_spec(spec) module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module sys.modules[module_name] = module
plugin_parent_dir = os.path.normpath(os.path.dirname(plugin_dir))
with self._temporary_sys_path_entry(plugin_parent_dir):
spec.loader.exec_module(module) spec.loader.exec_module(module)
# 优先使用新版 create_plugin 工厂函数 # 优先使用新版 create_plugin 工厂函数
@@ -269,6 +273,27 @@ class PluginLoader:
logger.error(f"插件 {plugin_id} 缺少 create_plugin 工厂函数且未检测到旧版 BasePlugin") logger.error(f"插件 {plugin_id} 缺少 create_plugin 工厂函数且未检测到旧版 BasePlugin")
return None return None
@staticmethod
@contextlib.contextmanager
def _temporary_sys_path_entry(path: str):
"""临时将路径放入 sys.path 头部,并在离开作用域后恢复。"""
if not path:
yield
return
normalized_path = os.path.normpath(path)
existing_paths = {os.path.normpath(entry) for entry in sys.path}
inserted = normalized_path not in existing_paths
if inserted:
sys.path.insert(0, normalized_path)
try:
yield
finally:
if inserted:
with contextlib.suppress(ValueError):
sys.path.remove(normalized_path)
# ──── 旧版插件兼容 ──────────────────────────────────────── # ──── 旧版插件兼容 ────────────────────────────────────────
def _ensure_compat_hook(self) -> None: def _ensure_compat_hook(self) -> None:

View File

@@ -627,14 +627,19 @@ def _isolate_sys_path(plugin_dirs: List[str]) -> None:
allowed.add(p) allowed.add(p)
# 添加插件目录 # 添加插件目录
for d in plugin_dirs: plugin_dir_paths = [os.path.normpath(d) for d in plugin_dirs]
allowed.add(os.path.normpath(d)) for d in plugin_dir_paths:
allowed.add(d)
# 添加项目根目录(使得 src.plugin_runtime / src.common 可导入) # 添加项目根目录(使得 src.plugin_runtime / src.common 可导入)
runtime_root = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "..", "..")) runtime_root = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "..", ".."))
allowed.add(runtime_root) allowed.add(runtime_root)
sys.path[:] = [p for p in sys.path if p in allowed] preserved_paths = [p for p in sys.path if p in allowed]
for extra_path in [*plugin_dir_paths, runtime_root]:
if extra_path not in preserved_paths:
preserved_paths.append(extra_path)
sys.path[:] = preserved_paths
# 安装 import 钩子,阻止插件导入主程序核心模块 # 安装 import 钩子,阻止插件导入主程序核心模块
# 仅允许 src.plugin_runtime 和 src.common拒绝其他 src.* 子包 # 仅允许 src.plugin_runtime 和 src.common拒绝其他 src.* 子包