feat: add plugin dependency pipeline and HTML rendering service

- Implemented a new dependency pipeline for plugins to manage Python package dependencies, including conflict detection and automatic installation of missing dependencies.
- Introduced an HTML rendering service that utilizes existing browsers to render HTML content as PNG images, with support for various configurations and error handling.
This commit is contained in:
DrSmoothl
2026-04-03 01:48:23 +08:00
parent fbc2fba6ff
commit 4ec06ece56
17 changed files with 2585 additions and 93 deletions

View File

@@ -26,6 +26,7 @@ dependencies = [
"openai>=1.95.0", "openai>=1.95.0",
"pandas>=2.3.1", "pandas>=2.3.1",
"pillow>=11.3.0", "pillow>=11.3.0",
"playwright>=1.54.0",
"pyarrow>=20.0.0", "pyarrow>=20.0.0",
"pydantic>=2.11.7", "pydantic>=2.11.7",
"pypinyin>=0.54.0", "pypinyin>=0.54.0",

View File

@@ -0,0 +1,194 @@
"""HTML 浏览器渲染服务测试。"""
from pathlib import Path
from typing import Any, Dict, List
import pytest
from src.config.official_configs import PluginRuntimeRenderConfig
from src.services import html_render_service as html_render_service_module
from src.services.html_render_service import HTMLRenderService, ManagedBrowserRecord
class _FakeChromium:
"""用于模拟 Playwright Chromium 启动器的测试桩。"""
def __init__(self, effects: List[Any]) -> None:
"""初始化 Chromium 启动测试桩。
Args:
effects: 每次调用 ``launch`` 时依次返回或抛出的结果。
"""
self._effects: List[Any] = list(effects)
self.calls: List[Dict[str, Any]] = []
async def launch(self, **kwargs: Any) -> Any:
"""模拟 Playwright Chromium 的启动过程。
Args:
**kwargs: 浏览器启动参数。
Returns:
Any: 预设的浏览器对象。
Raises:
Exception: 当预设结果为异常对象时抛出。
"""
self.calls.append(dict(kwargs))
effect = self._effects.pop(0)
if isinstance(effect, Exception):
raise effect
return effect
class _FakePlaywright:
"""用于模拟 Playwright 根对象的测试桩。"""
def __init__(self, chromium: _FakeChromium) -> None:
"""初始化 Playwright 测试桩。
Args:
chromium: Chromium 启动器测试桩。
"""
self.chromium = chromium
def _build_render_config(**kwargs: Any) -> PluginRuntimeRenderConfig:
"""构造用于测试的浏览器渲染配置。
Args:
**kwargs: 需要覆盖的配置字段。
Returns:
PluginRuntimeRenderConfig: 测试使用的配置对象。
"""
payload: Dict[str, Any] = {
"auto_download_chromium": True,
"browser_install_root": "data/test-playwright-browsers",
}
payload.update(kwargs)
return PluginRuntimeRenderConfig(**payload)
@pytest.mark.asyncio
async def test_launch_browser_auto_downloads_chromium_when_missing(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""未检测到可用浏览器时,应自动下载 Chromium 并记录状态。"""
monkeypatch.setattr(html_render_service_module, "PROJECT_ROOT", tmp_path)
service = HTMLRenderService()
config = _build_render_config()
fake_browser = object()
fake_chromium = _FakeChromium(
[
RuntimeError("browserType.launch: Executable doesn't exist at /tmp/chromium"),
fake_browser,
]
)
install_calls: List[str] = []
monkeypatch.setattr(service, "_resolve_executable_path", lambda _config: "")
async def fake_install(_config: PluginRuntimeRenderConfig) -> None:
"""模拟 Chromium 自动下载。
Args:
_config: 当前浏览器渲染配置。
"""
install_calls.append(_config.browser_install_root)
browsers_path = service._get_managed_browsers_path(_config)
(browsers_path / "chromium-1234").mkdir(parents=True, exist_ok=True)
monkeypatch.setattr(service, "_install_chromium_browser", fake_install)
browser = await service._launch_browser(_FakePlaywright(fake_chromium), config)
assert browser is fake_browser
assert install_calls == ["data/test-playwright-browsers"]
assert len(fake_chromium.calls) == 2
browser_record = service._load_managed_browser_record()
assert browser_record is not None
assert browser_record.install_source == "auto_download"
assert browser_record.browser_name == "chromium"
@pytest.mark.asyncio
async def test_launch_browser_reuses_existing_managed_browser(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""已存在 Playwright 托管浏览器时,不应重复下载。"""
monkeypatch.setattr(html_render_service_module, "PROJECT_ROOT", tmp_path)
service = HTMLRenderService()
config = _build_render_config()
browsers_path = service._get_managed_browsers_path(config)
(browsers_path / "chrome-headless-shell-1234").mkdir(parents=True, exist_ok=True)
fake_browser = object()
fake_chromium = _FakeChromium([fake_browser])
monkeypatch.setattr(service, "_resolve_executable_path", lambda _config: "")
async def fail_install(_config: PluginRuntimeRenderConfig) -> None:
"""若被错误调用则立即失败。
Args:
_config: 当前浏览器渲染配置。
Raises:
AssertionError: 表示本测试不期望进入下载逻辑。
"""
raise AssertionError("不应触发自动下载")
monkeypatch.setattr(service, "_install_chromium_browser", fail_install)
browser = await service._launch_browser(_FakePlaywright(fake_chromium), config)
assert browser is fake_browser
assert len(fake_chromium.calls) == 1
browser_record = service._load_managed_browser_record()
assert browser_record is not None
assert browser_record.install_source == "existing_cache"
assert browser_record.browsers_path == str(browsers_path)
@pytest.mark.asyncio
async def test_launch_browser_prefers_local_executable(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""探测到本机浏览器时,应优先使用可执行文件路径启动。"""
monkeypatch.setattr(html_render_service_module, "PROJECT_ROOT", tmp_path)
service = HTMLRenderService()
config = _build_render_config()
fake_browser = object()
fake_chromium = _FakeChromium([fake_browser])
executable_path = "/usr/bin/google-chrome"
monkeypatch.setattr(service, "_resolve_executable_path", lambda _config: executable_path)
browser = await service._launch_browser(_FakePlaywright(fake_chromium), config)
assert browser is fake_browser
assert len(fake_chromium.calls) == 1
assert fake_chromium.calls[0]["executable_path"] == executable_path
assert service._load_managed_browser_record() is None
def test_managed_browser_record_roundtrip() -> None:
"""托管浏览器记录应支持序列化与反序列化。"""
record = ManagedBrowserRecord(
browser_name="chromium",
browsers_path="/tmp/playwright-browsers",
install_source="auto_download",
playwright_version="1.58.0",
recorded_at="2026-04-03T10:00:00+00:00",
last_verified_at="2026-04-03T10:00:01+00:00",
)
restored_record = ManagedBrowserRecord.from_dict(record.to_dict())
assert restored_record == record

View File

@@ -0,0 +1,225 @@
"""插件依赖流水线测试。"""
from pathlib import Path
import json
import pytest
from src.plugin_runtime.dependency_pipeline import PluginDependencyPipeline
def _build_manifest(
plugin_id: str,
*,
dependencies: list[dict[str, str]] | None = None,
) -> dict[str, object]:
"""构造测试用的 Manifest v2 数据。
Args:
plugin_id: 插件 ID。
dependencies: 依赖声明列表。
Returns:
dict[str, object]: 可直接写入 ``_manifest.json`` 的字典。
"""
return {
"manifest_version": 2,
"version": "1.0.0",
"name": plugin_id,
"description": "测试插件",
"author": {
"name": "tester",
"url": "https://example.com/tester",
},
"license": "MIT",
"urls": {
"repository": f"https://example.com/{plugin_id}",
},
"host_application": {
"min_version": "1.0.0",
"max_version": "1.0.0",
},
"sdk": {
"min_version": "2.0.0",
"max_version": "2.99.99",
},
"dependencies": dependencies or [],
"capabilities": [],
"i18n": {
"default_locale": "zh-CN",
"supported_locales": ["zh-CN"],
},
"id": plugin_id,
}
def _write_plugin(
plugin_root: Path,
plugin_name: str,
plugin_id: str,
*,
dependencies: list[dict[str, str]] | None = None,
) -> Path:
"""在临时目录中写入一个测试插件。
Args:
plugin_root: 插件根目录。
plugin_name: 插件目录名。
plugin_id: 插件 ID。
dependencies: Python 依赖声明列表。
Returns:
Path: 插件目录路径。
"""
plugin_dir = plugin_root / plugin_name
plugin_dir.mkdir(parents=True)
(plugin_dir / "plugin.py").write_text("def create_plugin():\n return object()\n", encoding="utf-8")
(plugin_dir / "_manifest.json").write_text(
json.dumps(_build_manifest(plugin_id, dependencies=dependencies)),
encoding="utf-8",
)
return plugin_dir
def test_build_plan_blocks_plugin_conflicting_with_host_requirement(tmp_path: Path) -> None:
"""与主程序依赖冲突的插件应被阻止加载。"""
plugin_root = tmp_path / "plugins"
_write_plugin(
plugin_root,
"conflict_plugin",
"test.conflict-plugin",
dependencies=[
{
"type": "python_package",
"name": "numpy",
"version_spec": "<1.0.0",
}
],
)
pipeline = PluginDependencyPipeline(project_root=Path.cwd())
plan = pipeline.build_plan([plugin_root])
assert "test.conflict-plugin" in plan.blocked_plugin_reasons
assert "主程序" in plan.blocked_plugin_reasons["test.conflict-plugin"]
assert plan.install_requirements == ()
def test_build_plan_blocks_plugins_with_conflicting_python_dependencies(tmp_path: Path) -> None:
"""插件之间出现 Python 包版本冲突时应同时阻止双方加载。"""
plugin_root = tmp_path / "plugins"
_write_plugin(
plugin_root,
"plugin_a",
"test.plugin-a",
dependencies=[
{
"type": "python_package",
"name": "demo-package",
"version_spec": "<2.0.0",
}
],
)
_write_plugin(
plugin_root,
"plugin_b",
"test.plugin-b",
dependencies=[
{
"type": "python_package",
"name": "demo-package",
"version_spec": ">=3.0.0",
}
],
)
pipeline = PluginDependencyPipeline(project_root=Path.cwd())
plan = pipeline.build_plan([plugin_root])
assert "test.plugin-a" in plan.blocked_plugin_reasons
assert "test.plugin-b" in plan.blocked_plugin_reasons
assert "test.plugin-b" in plan.blocked_plugin_reasons["test.plugin-a"]
assert "test.plugin-a" in plan.blocked_plugin_reasons["test.plugin-b"]
def test_build_plan_collects_install_requirements_for_missing_packages(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
"""未安装但无冲突的依赖应进入自动安装计划。"""
plugin_root = tmp_path / "plugins"
_write_plugin(
plugin_root,
"plugin_a",
"test.plugin-a",
dependencies=[
{
"type": "python_package",
"name": "demo-package",
"version_spec": ">=1.0.0,<2.0.0",
}
],
)
pipeline = PluginDependencyPipeline(project_root=Path.cwd())
monkeypatch.setattr(
pipeline._manifest_validator,
"get_installed_package_version",
lambda package_name: None if package_name == "demo-package" else "1.0.0",
)
plan = pipeline.build_plan([plugin_root])
assert plan.blocked_plugin_reasons == {}
assert len(plan.install_requirements) == 1
assert plan.install_requirements[0].package_name == "demo-package"
assert plan.install_requirements[0].plugin_ids == ("test.plugin-a",)
assert plan.install_requirements[0].requirement_text == "demo-package>=1.0.0,<2.0.0"
@pytest.mark.asyncio
async def test_execute_blocks_plugins_when_auto_install_fails(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
"""自动安装失败时,相关插件应被阻止加载。"""
plugin_root = tmp_path / "plugins"
_write_plugin(
plugin_root,
"plugin_a",
"test.plugin-a",
dependencies=[
{
"type": "python_package",
"name": "demo-package",
"version_spec": ">=1.0.0,<2.0.0",
}
],
)
pipeline = PluginDependencyPipeline(project_root=Path.cwd())
monkeypatch.setattr(
pipeline._manifest_validator,
"get_installed_package_version",
lambda package_name: None if package_name == "demo-package" else "1.0.0",
)
async def fake_install(_requirements) -> tuple[bool, str]:
"""模拟依赖安装失败。"""
return False, "network error"
monkeypatch.setattr(pipeline, "_install_requirements", fake_install)
result = await pipeline.execute([plugin_root])
assert result.environment_changed is False
assert "test.plugin-a" in result.blocked_plugin_reasons
assert "自动安装 Python 依赖失败" in result.blocked_plugin_reasons["test.plugin-a"]

View File

@@ -3,6 +3,8 @@
验证协议层、传输层、RPC 通信链路的正确性。 验证协议层、传输层、RPC 通信链路的正确性。
""" """
# pyright: reportArgumentType=false, reportAttributeAccessIssue=false, reportCallIssue=false, reportIndexIssue=false, reportMissingImports=false, reportOptionalMemberAccess=false
from pathlib import Path from pathlib import Path
from types import SimpleNamespace from types import SimpleNamespace
from typing import Any, Awaitable, Callable, Dict, List, Optional, Sequence from typing import Any, Awaitable, Callable, Dict, List, Optional, Sequence
@@ -2891,7 +2893,7 @@ class TestIntegration:
assert instances[0].stopped is True assert instances[0].stopped is True
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_handle_plugin_source_changes_only_reload_matching_supervisor(self, monkeypatch, tmp_path): async def test_handle_plugin_source_changes_restarts_supervisors_after_dependency_sync(self, monkeypatch, tmp_path):
from src.config.file_watcher import FileChange from src.config.file_watcher import FileChange
from src.plugin_runtime import integration as integration_module from src.plugin_runtime import integration as integration_module
import json import json
@@ -2915,7 +2917,6 @@ class TestIntegration:
def __init__(self, plugin_dirs, registered_plugins): def __init__(self, plugin_dirs, registered_plugins):
self._plugin_dirs = plugin_dirs self._plugin_dirs = plugin_dirs
self._registered_plugins = registered_plugins self._registered_plugins = registered_plugins
self.reload_reasons = []
self.config_updates = [] self.config_updates = []
def get_loaded_plugin_ids(self): def get_loaded_plugin_ids(self):
@@ -2924,9 +2925,6 @@ class TestIntegration:
def get_loaded_plugin_versions(self): def get_loaded_plugin_versions(self):
return {plugin_id: "1.0.0" for plugin_id in self._registered_plugins} return {plugin_id: "1.0.0" for plugin_id in self._registered_plugins}
async def reload_plugins(self, plugin_ids=None, reason="manual", external_available_plugins=None):
self.reload_reasons.append((plugin_ids, reason, external_available_plugins or {}))
async def notify_plugin_config_updated(self, plugin_id, config_data, config_version=""): async def notify_plugin_config_updated(self, plugin_id, config_data, config_version=""):
self.config_updates.append((plugin_id, config_data, config_version)) self.config_updates.append((plugin_id, config_data, config_version))
return True return True
@@ -2935,27 +2933,37 @@ class TestIntegration:
manager._started = True manager._started = True
manager._builtin_supervisor = FakeSupervisor([builtin_root], {"test.alpha": object()}) manager._builtin_supervisor = FakeSupervisor([builtin_root], {"test.alpha": object()})
manager._third_party_supervisor = FakeSupervisor([thirdparty_root], {"test.beta": object()}) manager._third_party_supervisor = FakeSupervisor([thirdparty_root], {"test.beta": object()})
dependency_sync_calls = []
restart_calls = []
async def fake_sync(plugin_dirs: Sequence[Path]) -> Any:
"""记录依赖同步调用。"""
dependency_sync_calls.append(list(plugin_dirs))
return integration_module.DependencySyncState(
blocked_changed_plugin_ids={"test.beta"},
environment_changed=False,
)
async def fake_restart(reason: str) -> bool:
"""记录 Supervisor 重启调用。"""
restart_calls.append(reason)
return True
monkeypatch.setattr(manager, "_sync_plugin_dependencies", fake_sync)
monkeypatch.setattr(manager, "_restart_supervisors", fake_restart)
changes = [ changes = [
FileChange(change_type=1, path=beta_dir / "plugin.py"), FileChange(change_type=1, path=beta_dir / "plugin.py"),
] ]
refresh_calls = []
def fake_refresh() -> None:
refresh_calls.append(True)
manager._refresh_plugin_config_watch_subscriptions = fake_refresh
await manager._handle_plugin_source_changes(changes) await manager._handle_plugin_source_changes(changes)
assert manager._builtin_supervisor.reload_reasons == [] assert dependency_sync_calls == [[builtin_root, thirdparty_root]]
assert manager._third_party_supervisor.reload_reasons == [ assert restart_calls == ["file_watcher_blocklist_changed"]
(["test.beta"], "file_watcher", {"test.alpha": "1.0.0"})
]
assert manager._builtin_supervisor.config_updates == [] assert manager._builtin_supervisor.config_updates == []
assert manager._third_party_supervisor.config_updates == [] assert manager._third_party_supervisor.config_updates == []
assert refresh_calls == [True]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_reload_plugins_globally_warns_and_skips_cross_supervisor_dependents(self, monkeypatch): async def test_reload_plugins_globally_warns_and_skips_cross_supervisor_dependents(self, monkeypatch):

View File

@@ -0,0 +1,96 @@
"""插件运行时浏览器渲染能力测试。"""
from typing import Optional
import pytest
from src.plugin_runtime.integration import PluginRuntimeManager
from src.plugin_runtime.host.supervisor import PluginSupervisor
from src.services.html_render_service import HtmlRenderRequest, HtmlRenderResult
class _FakeRenderService:
"""用于替代真实浏览器渲染服务的测试桩。"""
def __init__(self) -> None:
"""初始化测试桩。"""
self.last_request: Optional[HtmlRenderRequest] = None
async def render_html_to_png(self, request: HtmlRenderRequest) -> HtmlRenderResult:
"""记录请求并返回固定的渲染结果。
Args:
request: 当前渲染请求。
Returns:
HtmlRenderResult: 固定的测试渲染结果。
"""
self.last_request = request
return HtmlRenderResult(
image_base64="ZmFrZS1pbWFnZQ==",
mime_type="image/png",
width=640,
height=480,
render_ms=12,
)
def test_render_capability_is_registered() -> None:
"""Host 注册能力时应包含 render.html2png。"""
manager = PluginRuntimeManager()
supervisor = PluginSupervisor(plugin_dirs=[])
manager._register_capability_impls(supervisor)
assert "render.html2png" in supervisor.capability_service.list_capabilities()
@pytest.mark.asyncio
async def test_render_capability_forwards_request(monkeypatch: pytest.MonkeyPatch) -> None:
"""render.html2png 应将请求透传给浏览器渲染服务。"""
from src.plugin_runtime.capabilities import render as render_capability_module
fake_service = _FakeRenderService()
monkeypatch.setattr(render_capability_module, "get_html_render_service", lambda: fake_service)
manager = PluginRuntimeManager()
result = await manager._cap_render_html2png(
"demo.plugin",
"render.html2png",
{
"html": "<body><div id='card'>hello</div></body>",
"selector": "#card",
"viewport": {"width": 1024, "height": 768},
"device_scale_factor": 1.5,
"full_page": False,
"omit_background": True,
"wait_until": "networkidle",
"wait_for_selector": "#card",
"wait_for_timeout_ms": 150,
"timeout_ms": 3000,
"allow_network": True,
},
)
assert result == {
"success": True,
"result": {
"image_base64": "ZmFrZS1pbWFnZQ==",
"mime_type": "image/png",
"width": 640,
"height": 480,
"render_ms": 12,
},
}
assert fake_service.last_request is not None
assert fake_service.last_request.selector == "#card"
assert fake_service.last_request.viewport_width == 1024
assert fake_service.last_request.viewport_height == 768
assert fake_service.last_request.device_scale_factor == 1.5
assert fake_service.last_request.omit_background is True
assert fake_service.last_request.wait_until == "networkidle"
assert fake_service.last_request.allow_network is True

View File

@@ -1959,6 +1959,129 @@ class MCPConfig(ConfigBase):
return super().model_post_init(context) return super().model_post_init(context)
class PluginRuntimeRenderConfig(ConfigBase):
"""插件运行时浏览器渲染配置。"""
enabled: bool = Field(
default=True,
json_schema_extra={
"x-widget": "switch",
"x-icon": "image",
},
)
"""是否启用插件运行时浏览器渲染能力"""
browser_ws_endpoint: str = Field(
default="",
json_schema_extra={
"x-widget": "input",
"x-icon": "link",
},
)
"""优先复用的现有 Chromium CDP 地址,可填写 ws/http 端点"""
executable_path: str = Field(
default="",
json_schema_extra={
"x-widget": "input",
"x-icon": "folder",
},
)
"""浏览器可执行文件路径,留空时自动探测本机 Chrome/Chromium"""
browser_install_root: str = Field(
default="data/playwright-browsers",
json_schema_extra={
"x-widget": "input",
"x-icon": "hard-drive",
},
)
"""Playwright 托管浏览器目录,自动下载 Chromium 时会复用该目录"""
headless: bool = Field(
default=True,
json_schema_extra={
"x-widget": "switch",
"x-icon": "monitor",
},
)
"""是否以无头模式启动浏览器"""
launch_args: list[str] = Field(
default_factory=lambda: [
"--disable-gpu",
"--disable-dev-shm-usage",
"--disable-setuid-sandbox",
"--no-sandbox",
"--no-zygote",
],
json_schema_extra={
"x-widget": "custom",
"x-icon": "terminal",
},
)
"""浏览器启动参数列表"""
concurrency_limit: int = Field(
default=2,
ge=1,
json_schema_extra={
"x-widget": "number",
"x-icon": "layers",
},
)
"""同时允许进行的最大渲染任务数"""
startup_timeout_sec: float = Field(
default=20.0,
gt=0,
json_schema_extra={
"x-widget": "number",
"x-icon": "clock",
},
)
"""浏览器连接或启动超时时间(秒)"""
render_timeout_sec: float = Field(
default=15.0,
gt=0,
json_schema_extra={
"x-widget": "number",
"x-icon": "timer",
},
)
"""单次渲染默认超时时间(秒)"""
auto_download_chromium: bool = Field(
default=True,
json_schema_extra={
"x-widget": "switch",
"x-icon": "download",
},
)
"""未检测到可用浏览器时,是否自动下载 Playwright Chromium"""
download_connection_timeout_sec: float = Field(
default=120.0,
gt=0,
json_schema_extra={
"x-widget": "number",
"x-icon": "cloud-lightning",
},
)
"""自动下载 Chromium 时的连接超时时间(秒)"""
restart_after_render_count: int = Field(
default=200,
ge=0,
json_schema_extra={
"x-widget": "number",
"x-icon": "refresh-cw",
},
)
"""累计渲染指定次数后自动重建本地浏览器0 表示关闭该策略"""
class PluginRuntimeConfig(ConfigBase): class PluginRuntimeConfig(ConfigBase):
"""插件运行时配置类""" """插件运行时配置类"""
@@ -2021,3 +2144,6 @@ class PluginRuntimeConfig(ConfigBase):
自定义 IPC Socket 路径(仅 Linux/macOS 生效) 自定义 IPC Socket 路径(仅 Linux/macOS 生效)
留空则自动生成临时路径 留空则自动生成临时路径
""" """
render: PluginRuntimeRenderConfig = Field(default_factory=PluginRuntimeRenderConfig)
"""浏览器渲染能力配置"""

View File

@@ -20,5 +20,8 @@ ENV_HOST_VERSION = "MAIBOT_HOST_VERSION"
ENV_EXTERNAL_PLUGIN_IDS = "MAIBOT_EXTERNAL_PLUGIN_IDS" ENV_EXTERNAL_PLUGIN_IDS = "MAIBOT_EXTERNAL_PLUGIN_IDS"
"""Runner 启动时可视为已满足的外部插件依赖版本映射JSON 对象)""" """Runner 启动时可视为已满足的外部插件依赖版本映射JSON 对象)"""
ENV_BLOCKED_PLUGIN_REASONS = "MAIBOT_BLOCKED_PLUGIN_REASONS"
"""Runner 启动时收到的拒绝加载插件原因映射JSON 对象)"""
ENV_GLOBAL_CONFIG_SNAPSHOT = "MAIBOT_GLOBAL_CONFIG_SNAPSHOT" ENV_GLOBAL_CONFIG_SNAPSHOT = "MAIBOT_GLOBAL_CONFIG_SNAPSHOT"
"""Runner 启动时注入的全局配置快照JSON 对象)""" """Runner 启动时注入的全局配置快照JSON 对象)"""

View File

@@ -1,9 +1,11 @@
from .components import RuntimeComponentCapabilityMixin from .components import RuntimeComponentCapabilityMixin
from .core import RuntimeCoreCapabilityMixin from .core import RuntimeCoreCapabilityMixin
from .data import RuntimeDataCapabilityMixin from .data import RuntimeDataCapabilityMixin
from .render import RuntimeRenderCapabilityMixin
__all__ = [ __all__ = [
"RuntimeComponentCapabilityMixin", "RuntimeComponentCapabilityMixin",
"RuntimeCoreCapabilityMixin", "RuntimeCoreCapabilityMixin",
"RuntimeDataCapabilityMixin", "RuntimeDataCapabilityMixin",
"RuntimeRenderCapabilityMixin",
] ]

View File

@@ -91,4 +91,5 @@ def register_capability_impls(manager: "PluginRuntimeManager", supervisor: Plugi
_register("component.reload_plugin", manager._cap_component_reload_plugin) _register("component.reload_plugin", manager._cap_component_reload_plugin)
_register("knowledge.search", manager._cap_knowledge_search) _register("knowledge.search", manager._cap_knowledge_search)
_register("render.html2png", manager._cap_render_html2png)
logger.debug("已注册全部主程序能力实现") logger.debug("已注册全部主程序能力实现")

View File

@@ -0,0 +1,121 @@
"""插件运行时的浏览器渲染能力。"""
from typing import Any, Dict
from src.common.logger import get_logger
from src.services.html_render_service import HtmlRenderRequest, get_html_render_service
logger = get_logger("plugin_runtime.integration")
class RuntimeRenderCapabilityMixin:
"""插件运行时的浏览器渲染能力混入。"""
@staticmethod
def _coerce_int(value: Any, default: int) -> int:
"""将任意值尽量转换为整数。
Args:
value: 原始输入值。
default: 转换失败时返回的默认值。
Returns:
int: 规范化后的整数结果。
"""
try:
return int(value)
except (TypeError, ValueError):
return default
@staticmethod
def _coerce_float(value: Any, default: float) -> float:
"""将任意值尽量转换为浮点数。
Args:
value: 原始输入值。
default: 转换失败时返回的默认值。
Returns:
float: 规范化后的浮点结果。
"""
try:
return float(value)
except (TypeError, ValueError):
return default
@staticmethod
def _coerce_bool(value: Any, default: bool = False) -> bool:
"""将任意值转换为布尔值。
Args:
value: 原始输入值。
default: 输入为空时返回的默认值。
Returns:
bool: 规范化后的布尔结果。
"""
if value is None:
return default
if isinstance(value, str):
normalized_value = value.strip().lower()
if normalized_value in {"0", "false", "no", "off"}:
return False
if normalized_value in {"1", "true", "yes", "on"}:
return True
return bool(value)
def _build_html_render_request(self, args: Dict[str, Any]) -> HtmlRenderRequest:
"""根据 capability 调用参数构造渲染请求。
Args:
args: capability 调用参数。
Returns:
HtmlRenderRequest: 结构化后的渲染请求。
"""
viewport = args.get("viewport", {})
viewport_width = 900
viewport_height = 500
if isinstance(viewport, dict):
viewport_width = self._coerce_int(viewport.get("width"), viewport_width)
viewport_height = self._coerce_int(viewport.get("height"), viewport_height)
return HtmlRenderRequest(
html=str(args.get("html", "") or ""),
selector=str(args.get("selector", "body") or "body"),
viewport_width=viewport_width,
viewport_height=viewport_height,
device_scale_factor=self._coerce_float(args.get("device_scale_factor"), 2.0),
full_page=self._coerce_bool(args.get("full_page"), False),
omit_background=self._coerce_bool(args.get("omit_background"), False),
wait_until=str(args.get("wait_until", "load") or "load"),
wait_for_selector=str(args.get("wait_for_selector", "") or ""),
wait_for_timeout_ms=self._coerce_int(args.get("wait_for_timeout_ms"), 0),
timeout_ms=self._coerce_int(args.get("timeout_ms"), 0),
allow_network=self._coerce_bool(args.get("allow_network"), False),
)
async def _cap_render_html2png(self, plugin_id: str, capability: str, args: Dict[str, Any]) -> Any:
"""将 HTML 内容渲染为 PNG 图片。
Args:
plugin_id: 调用该能力的插件 ID。
capability: 当前能力名称。
args: 能力调用参数。
Returns:
Any: 标准化后的能力返回结构。
"""
del plugin_id, capability
try:
request = self._build_html_render_request(args)
result = await get_html_render_service().render_html_to_png(request)
return {"success": True, "result": result.to_payload()}
except Exception as exc:
logger.error(f"[cap.render.html2png] 执行失败: {exc}", exc_info=True)
return {"success": False, "error": str(exc)}

View File

@@ -0,0 +1,441 @@
"""插件 Python 依赖流水线。
负责在 Host 侧统一完成以下工作:
1. 扫描插件 Manifest
2. 检测插件与主程序、插件与插件之间的 Python 依赖冲突;
3. 为可加载插件自动安装缺失的 Python 依赖;
4. 产出最终的拒绝加载列表,供运行时使用。
"""
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Tuple
import asyncio
import shutil
import subprocess
import sys
from packaging.utils import canonicalize_name
from src.common.logger import get_logger
from src.plugin_runtime.runner.manifest_validator import ManifestValidator, PluginManifest
logger = get_logger("plugin_runtime.dependency_pipeline")
@dataclass(frozen=True)
class PackageDependencyUsage:
"""记录单个插件对某个 Python 包的依赖声明。"""
package_name: str
plugin_id: str
version_spec: str
@dataclass(frozen=True)
class CombinedPackageRequirement:
"""表示一个已经合并后的 Python 包安装需求。"""
package_name: str
plugin_ids: Tuple[str, ...]
requirement_text: str
version_spec: str
@dataclass(frozen=True)
class DependencyPipelinePlan:
"""表示一次依赖分析后得到的计划。"""
blocked_plugin_reasons: Dict[str, str]
install_requirements: Tuple[CombinedPackageRequirement, ...]
@dataclass(frozen=True)
class DependencyPipelineResult:
"""表示一次依赖流水线执行后的结果。"""
blocked_plugin_reasons: Dict[str, str]
environment_changed: bool
install_requirements: Tuple[CombinedPackageRequirement, ...]
class PluginDependencyPipeline:
"""插件依赖流水线。
该类不负责插件启停,只负责对插件目录进行依赖分析,并在必要时
使用 ``uv`` 为可加载插件补齐缺失的 Python 依赖。
"""
def __init__(self, project_root: Optional[Path] = None) -> None:
"""初始化依赖流水线。
Args:
project_root: 项目根目录;留空时自动推断。
"""
self._project_root: Path = project_root or Path(__file__).resolve().parents[2]
self._manifest_validator: ManifestValidator = ManifestValidator(
project_root=self._project_root,
validate_python_package_dependencies=False,
)
async def execute(self, plugin_dirs: Iterable[Path]) -> DependencyPipelineResult:
"""执行完整的依赖分析与自动安装流程。
Args:
plugin_dirs: 需要扫描的插件根目录集合。
Returns:
DependencyPipelineResult: 最终的阻止加载结果与环境变更状态。
"""
plan = self.build_plan(plugin_dirs)
if not plan.install_requirements:
return DependencyPipelineResult(
blocked_plugin_reasons=dict(plan.blocked_plugin_reasons),
environment_changed=False,
install_requirements=plan.install_requirements,
)
install_succeeded, error_message = await self._install_requirements(plan.install_requirements)
if install_succeeded:
return DependencyPipelineResult(
blocked_plugin_reasons=dict(plan.blocked_plugin_reasons),
environment_changed=True,
install_requirements=plan.install_requirements,
)
blocked_plugin_reasons = dict(plan.blocked_plugin_reasons)
affected_plugin_ids = sorted(
{
plugin_id
for requirement in plan.install_requirements
for plugin_id in requirement.plugin_ids
}
)
for plugin_id in affected_plugin_ids:
self._append_block_reason(
blocked_plugin_reasons,
plugin_id,
f"自动安装 Python 依赖失败: {error_message}",
)
return DependencyPipelineResult(
blocked_plugin_reasons=blocked_plugin_reasons,
environment_changed=False,
install_requirements=plan.install_requirements,
)
def build_plan(self, plugin_dirs: Iterable[Path]) -> DependencyPipelinePlan:
"""构建依赖分析计划。
Args:
plugin_dirs: 需要扫描的插件根目录集合。
Returns:
DependencyPipelinePlan: 分析后的阻止加载列表与安装计划。
"""
manifests = self._collect_manifests(plugin_dirs)
blocked_plugin_reasons = self._detect_host_conflicts(manifests)
plugin_conflict_reasons = self._detect_plugin_conflicts(manifests, blocked_plugin_reasons)
for plugin_id, reason in plugin_conflict_reasons.items():
self._append_block_reason(blocked_plugin_reasons, plugin_id, reason)
install_requirements = self._build_install_requirements(manifests, blocked_plugin_reasons)
return DependencyPipelinePlan(
blocked_plugin_reasons=blocked_plugin_reasons,
install_requirements=install_requirements,
)
def _collect_manifests(self, plugin_dirs: Iterable[Path]) -> Dict[str, PluginManifest]:
"""收集所有可成功解析的插件 Manifest。
Args:
plugin_dirs: 需要扫描的插件根目录集合。
Returns:
Dict[str, PluginManifest]: 以插件 ID 为键的 Manifest 映射。
"""
manifests: Dict[str, PluginManifest] = {}
for _plugin_path, manifest in self._manifest_validator.iter_plugin_manifests(plugin_dirs):
manifests[manifest.id] = manifest
return manifests
def _detect_host_conflicts(self, manifests: Dict[str, PluginManifest]) -> Dict[str, str]:
"""检测插件与主程序依赖之间的冲突。
Args:
manifests: 当前已解析到的插件 Manifest 映射。
Returns:
Dict[str, str]: 需要被阻止加载的插件及原因。
"""
host_requirements = self._manifest_validator.load_host_dependency_requirements()
blocked_plugin_reasons: Dict[str, str] = {}
for manifest in manifests.values():
for dependency in manifest.python_package_dependencies:
package_specifier = self._manifest_validator.build_specifier_set(dependency.version_spec)
if package_specifier is None:
self._append_block_reason(
blocked_plugin_reasons,
manifest.id,
f"Python 包依赖声明无效: {dependency.name}{dependency.version_spec}",
)
continue
normalized_package_name = canonicalize_name(dependency.name)
host_requirement = host_requirements.get(normalized_package_name)
if host_requirement is None:
continue
if self._manifest_validator.requirements_may_overlap(
host_requirement.specifier,
package_specifier,
):
continue
host_specifier_text = str(host_requirement.specifier or "") or "任意版本"
self._append_block_reason(
blocked_plugin_reasons,
manifest.id,
(
f"Python 包依赖与主程序冲突: {dependency.name} 需要 "
f"{dependency.version_spec},主程序约束为 {host_specifier_text}"
),
)
return blocked_plugin_reasons
def _detect_plugin_conflicts(
self,
manifests: Dict[str, PluginManifest],
blocked_plugin_reasons: Dict[str, str],
) -> Dict[str, str]:
"""检测插件之间的 Python 依赖冲突。
Args:
manifests: 当前已解析到的插件 Manifest 映射。
blocked_plugin_reasons: 已经因为其他原因被阻止加载的插件。
Returns:
Dict[str, str]: 新增的插件冲突原因映射。
"""
blocked_by_plugin_conflicts: Dict[str, str] = {}
dependency_usages = self._collect_package_usages(manifests, blocked_plugin_reasons)
for _package_name, usages in dependency_usages.items():
display_package_name = usages[0].package_name
for index, left_usage in enumerate(usages):
for right_usage in usages[index + 1 :]:
left_specifier = self._manifest_validator.build_specifier_set(left_usage.version_spec)
right_specifier = self._manifest_validator.build_specifier_set(right_usage.version_spec)
if left_specifier is None or right_specifier is None:
continue
if self._manifest_validator.requirements_may_overlap(left_specifier, right_specifier):
continue
left_reason = (
f"Python 包依赖冲突: 与插件 {right_usage.plugin_id}{display_package_name} 上的约束不兼容 "
f"({left_usage.version_spec} vs {right_usage.version_spec})"
)
right_reason = (
f"Python 包依赖冲突: 与插件 {left_usage.plugin_id}{display_package_name} 上的约束不兼容 "
f"({right_usage.version_spec} vs {left_usage.version_spec})"
)
self._append_block_reason(blocked_by_plugin_conflicts, left_usage.plugin_id, left_reason)
self._append_block_reason(blocked_by_plugin_conflicts, right_usage.plugin_id, right_reason)
return blocked_by_plugin_conflicts
def _collect_package_usages(
self,
manifests: Dict[str, PluginManifest],
blocked_plugin_reasons: Dict[str, str],
) -> Dict[str, List[PackageDependencyUsage]]:
"""收集所有未被阻止加载插件的包依赖声明。
Args:
manifests: 当前已解析到的插件 Manifest 映射。
blocked_plugin_reasons: 已经被阻止加载的插件及原因。
Returns:
Dict[str, List[PackageDependencyUsage]]: 按规范化包名分组后的依赖声明。
"""
dependency_usages: Dict[str, List[PackageDependencyUsage]] = {}
for manifest in manifests.values():
if manifest.id in blocked_plugin_reasons:
continue
for dependency in manifest.python_package_dependencies:
normalized_package_name = canonicalize_name(dependency.name)
dependency_usages.setdefault(normalized_package_name, []).append(
PackageDependencyUsage(
package_name=dependency.name,
plugin_id=manifest.id,
version_spec=dependency.version_spec,
)
)
return dependency_usages
def _build_install_requirements(
self,
manifests: Dict[str, PluginManifest],
blocked_plugin_reasons: Dict[str, str],
) -> Tuple[CombinedPackageRequirement, ...]:
"""构建需要安装到当前环境的 Python 包需求列表。
Args:
manifests: 当前已解析到的插件 Manifest 映射。
blocked_plugin_reasons: 已经被阻止加载的插件及原因。
Returns:
Tuple[CombinedPackageRequirement, ...]: 需要安装或调整版本的依赖列表。
"""
combined_requirements: List[CombinedPackageRequirement] = []
dependency_usages = self._collect_package_usages(manifests, blocked_plugin_reasons)
for usages in dependency_usages.values():
merged_specifier_text = self._merge_specifier_texts([usage.version_spec for usage in usages])
package_name = usages[0].package_name
requirement_text = f"{package_name}{merged_specifier_text}"
installed_version = self._manifest_validator.get_installed_package_version(package_name)
if installed_version is not None and self._manifest_validator.version_matches_specifier(
installed_version,
merged_specifier_text,
):
continue
combined_requirements.append(
CombinedPackageRequirement(
package_name=package_name,
plugin_ids=tuple(sorted({usage.plugin_id for usage in usages})),
requirement_text=requirement_text,
version_spec=merged_specifier_text,
)
)
return tuple(sorted(combined_requirements, key=lambda requirement: canonicalize_name(requirement.package_name)))
@staticmethod
def _merge_specifier_texts(specifier_texts: Sequence[str]) -> str:
"""合并多个版本约束文本。
Args:
specifier_texts: 需要合并的版本约束文本序列。
Returns:
str: 合并后的版本约束文本。
"""
merged_parts: List[str] = []
for specifier_text in specifier_texts:
for part in str(specifier_text or "").split(","):
normalized_part = part.strip()
if not normalized_part or normalized_part in merged_parts:
continue
merged_parts.append(normalized_part)
return f"{','.join(merged_parts)}" if merged_parts else ""
async def _install_requirements(self, requirements: Sequence[CombinedPackageRequirement]) -> Tuple[bool, str]:
"""安装指定的 Python 包需求列表。
Args:
requirements: 需要安装的依赖列表。
Returns:
Tuple[bool, str]: 安装是否成功,以及错误摘要。
"""
requirement_texts = [requirement.requirement_text for requirement in requirements]
if not requirement_texts:
return True, ""
logger.info(f"开始自动安装插件 Python 依赖: {', '.join(requirement_texts)}")
command = self._build_install_command(requirement_texts)
try:
completed_process = await asyncio.to_thread(
subprocess.run,
command,
capture_output=True,
check=False,
cwd=self._project_root,
text=True,
)
except Exception as exc:
return False, str(exc)
if completed_process.returncode == 0:
logger.info("插件 Python 依赖自动安装完成")
return True, ""
output = self._summarize_install_error(completed_process.stdout, completed_process.stderr)
return False, output or f"命令执行失败,退出码 {completed_process.returncode}"
@staticmethod
def _build_install_command(requirement_texts: Sequence[str]) -> List[str]:
"""构造依赖安装命令。
Args:
requirement_texts: 待安装的依赖文本序列。
Returns:
List[str]: 适用于 ``subprocess.run`` 的命令参数列表。
"""
if shutil.which("uv"):
return ["uv", "pip", "install", "--python", sys.executable, *requirement_texts]
return [sys.executable, "-m", "pip", "install", *requirement_texts]
@staticmethod
def _summarize_install_error(stdout: str, stderr: str) -> str:
"""提炼安装失败输出。
Args:
stdout: 标准输出内容。
stderr: 标准错误内容。
Returns:
str: 简短的错误摘要。
"""
merged_output = "\n".join(part.strip() for part in (stderr, stdout) if part and part.strip()).strip()
if not merged_output:
return ""
lines = [line.strip() for line in merged_output.splitlines() if line.strip()]
return " | ".join(lines[-5:])
@staticmethod
def _append_block_reason(
blocked_plugin_reasons: Dict[str, str],
plugin_id: str,
reason: str,
) -> None:
"""向阻止加载映射中追加原因。
Args:
blocked_plugin_reasons: 待更新的阻止加载映射。
plugin_id: 目标插件 ID。
reason: 需要追加的原因文本。
"""
existing_reason = blocked_plugin_reasons.get(plugin_id)
if existing_reason is None:
blocked_plugin_reasons[plugin_id] = reason
return
existing_parts = [part.strip() for part in existing_reason.split("") if part.strip()]
if reason in existing_parts:
return
blocked_plugin_reasons[plugin_id] = f"{existing_reason}{reason}"

View File

@@ -14,6 +14,7 @@ from src.platform_io import DriverKind, InboundMessageEnvelope, RouteBinding, Ro
from src.platform_io.drivers import PluginPlatformDriver from src.platform_io.drivers import PluginPlatformDriver
from src.platform_io.route_key_factory import RouteKeyFactory from src.platform_io.route_key_factory import RouteKeyFactory
from src.plugin_runtime import ( from src.plugin_runtime import (
ENV_BLOCKED_PLUGIN_REASONS,
ENV_EXTERNAL_PLUGIN_IDS, ENV_EXTERNAL_PLUGIN_IDS,
ENV_GLOBAL_CONFIG_SNAPSHOT, ENV_GLOBAL_CONFIG_SNAPSHOT,
ENV_HOST_VERSION, ENV_HOST_VERSION,
@@ -131,6 +132,7 @@ class PluginRunnerSupervisor:
self._registered_plugins: Dict[str, RegisterPluginPayload] = {} self._registered_plugins: Dict[str, RegisterPluginPayload] = {}
self._message_gateway_states: Dict[str, Dict[str, _MessageGatewayRuntimeState]] = {} self._message_gateway_states: Dict[str, Dict[str, _MessageGatewayRuntimeState]] = {}
self._external_available_plugins: Dict[str, str] = {} self._external_available_plugins: Dict[str, str] = {}
self._blocked_plugin_reasons: Dict[str, str] = {}
self._runner_ready_events: asyncio.Event = asyncio.Event() self._runner_ready_events: asyncio.Event = asyncio.Event()
self._runner_ready_payloads: RunnerReadyPayload = RunnerReadyPayload() self._runner_ready_payloads: RunnerReadyPayload = RunnerReadyPayload()
self._health_task: Optional[asyncio.Task[None]] = None self._health_task: Optional[asyncio.Task[None]] = None
@@ -211,6 +213,19 @@ class PluginRunnerSupervisor:
""" """
return {plugin_id: registration.plugin_version for plugin_id, registration in self._registered_plugins.items()} return {plugin_id: registration.plugin_version for plugin_id, registration in self._registered_plugins.items()}
def set_blocked_plugin_reasons(self, blocked_plugin_reasons: Dict[str, str]) -> None:
"""设置当前 Runner 启动时应拒绝加载的插件列表。
Args:
blocked_plugin_reasons: 需要拒绝加载的插件及原因映射。
"""
self._blocked_plugin_reasons = {
str(plugin_id or "").strip(): str(reason or "").strip()
for plugin_id, reason in blocked_plugin_reasons.items()
if str(plugin_id or "").strip() and str(reason or "").strip()
}
@staticmethod @staticmethod
def _normalize_reload_plugin_ids(plugin_ids: Optional[List[str] | str]) -> List[str]: def _normalize_reload_plugin_ids(plugin_ids: Optional[List[str] | str]) -> List[str]:
"""规范化批量重载入参。 """规范化批量重载入参。
@@ -1303,6 +1318,7 @@ class PluginRunnerSupervisor:
global_config_snapshot = config_manager.get_global_config().model_dump(mode="json") global_config_snapshot = config_manager.get_global_config().model_dump(mode="json")
global_config_snapshot["model"] = config_manager.get_model_config().model_dump(mode="json") global_config_snapshot["model"] = config_manager.get_model_config().model_dump(mode="json")
return { return {
ENV_BLOCKED_PLUGIN_REASONS: json.dumps(self._blocked_plugin_reasons, ensure_ascii=False),
ENV_EXTERNAL_PLUGIN_IDS: json.dumps(self._external_available_plugins, ensure_ascii=False), ENV_EXTERNAL_PLUGIN_IDS: json.dumps(self._external_available_plugins, ensure_ascii=False),
ENV_GLOBAL_CONFIG_SNAPSHOT: json.dumps(global_config_snapshot, ensure_ascii=False), ENV_GLOBAL_CONFIG_SNAPSHOT: json.dumps(global_config_snapshot, ensure_ascii=False),
ENV_HOST_VERSION: PROTOCOL_VERSION, ENV_HOST_VERSION: PROTOCOL_VERSION,

View File

@@ -8,6 +8,7 @@
5. 提供统一的能力实现注册接口,使插件可以调用主程序功能 5. 提供统一的能力实现注册接口,使插件可以调用主程序功能
""" """
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import ( from typing import (
TYPE_CHECKING, TYPE_CHECKING,
@@ -33,13 +34,15 @@ from src.common.logger import get_logger
from src.config.config import config_manager from src.config.config import config_manager
from src.config.file_watcher import FileChange, FileWatcher from src.config.file_watcher import FileChange, FileWatcher
from src.platform_io import DeliveryBatch, InboundMessageEnvelope, get_platform_io_manager from src.platform_io import DeliveryBatch, InboundMessageEnvelope, get_platform_io_manager
from src.plugin_runtime.hook_catalog import register_builtin_hook_specs
from src.plugin_runtime.capabilities import ( from src.plugin_runtime.capabilities import (
RuntimeComponentCapabilityMixin, RuntimeComponentCapabilityMixin,
RuntimeCoreCapabilityMixin, RuntimeCoreCapabilityMixin,
RuntimeDataCapabilityMixin, RuntimeDataCapabilityMixin,
RuntimeRenderCapabilityMixin,
) )
from src.plugin_runtime.capabilities.registry import register_capability_impls from src.plugin_runtime.capabilities.registry import register_capability_impls
from src.plugin_runtime.dependency_pipeline import PluginDependencyPipeline
from src.plugin_runtime.hook_catalog import register_builtin_hook_specs
from src.plugin_runtime.host.hook_dispatcher import HookDispatchResult, HookDispatcher from src.plugin_runtime.host.hook_dispatcher import HookDispatchResult, HookDispatcher
from src.plugin_runtime.host.hook_spec_registry import HookSpec, HookSpecRegistry from src.plugin_runtime.host.hook_spec_registry import HookSpec, HookSpecRegistry
from src.plugin_runtime.host.message_utils import MessageDict, PluginMessageUtils from src.plugin_runtime.host.message_utils import MessageDict, PluginMessageUtils
@@ -67,10 +70,19 @@ _EVENT_TYPE_MAP: Dict[str, str] = {
} }
@dataclass(frozen=True)
class DependencySyncState:
"""表示一次插件依赖同步后的状态。"""
blocked_changed_plugin_ids: Set[str]
environment_changed: bool
class PluginRuntimeManager( class PluginRuntimeManager(
RuntimeCoreCapabilityMixin, RuntimeCoreCapabilityMixin,
RuntimeDataCapabilityMixin, RuntimeDataCapabilityMixin,
RuntimeComponentCapabilityMixin, RuntimeComponentCapabilityMixin,
RuntimeRenderCapabilityMixin,
): ):
"""插件运行时管理器(单例) """插件运行时管理器(单例)
@@ -88,7 +100,9 @@ class PluginRuntimeManager(
self._plugin_source_watcher_subscription_id: Optional[str] = None self._plugin_source_watcher_subscription_id: Optional[str] = None
self._plugin_config_watcher_subscriptions: Dict[str, Tuple[Path, str]] = {} self._plugin_config_watcher_subscriptions: Dict[str, Tuple[Path, str]] = {}
self._plugin_path_cache: Dict[str, Path] = {} self._plugin_path_cache: Dict[str, Path] = {}
self._manifest_validator: ManifestValidator = ManifestValidator() self._manifest_validator: ManifestValidator = ManifestValidator(validate_python_package_dependencies=False)
self._plugin_dependency_pipeline: PluginDependencyPipeline = PluginDependencyPipeline()
self._blocked_plugin_reasons: Dict[str, str] = {}
self._config_reload_callback: Callable[[Sequence[str]], Awaitable[None]] = self._handle_main_config_reload self._config_reload_callback: Callable[[Sequence[str]], Awaitable[None]] = self._handle_main_config_reload
self._config_reload_callback_registered: bool = False self._config_reload_callback_registered: bool = False
self._hook_spec_registry: HookSpecRegistry = HookSpecRegistry() self._hook_spec_registry: HookSpecRegistry = HookSpecRegistry()
@@ -131,7 +145,7 @@ class PluginRuntimeManager(
@classmethod @classmethod
def _discover_plugin_dependency_map(cls, plugin_dirs: Iterable[Path]) -> Dict[str, List[str]]: def _discover_plugin_dependency_map(cls, plugin_dirs: Iterable[Path]) -> Dict[str, List[str]]:
"""扫描指定插件目录集合,返回 ``plugin_id -> dependencies`` 映射。""" """扫描指定插件目录集合,返回 ``plugin_id -> dependencies`` 映射。"""
validator = ManifestValidator() validator = ManifestValidator(validate_python_package_dependencies=False)
return validator.build_plugin_dependency_map(plugin_dirs) return validator.build_plugin_dependency_map(plugin_dirs)
@classmethod @classmethod
@@ -191,6 +205,206 @@ class PluginRuntimeManager(
} }
return supervisor_cls(**supported_kwargs) return supervisor_cls(**supported_kwargs)
def _resolve_runtime_plugin_dirs(self) -> Tuple[List[Path], List[Path]]:
"""解析当前运行时应管理的插件根目录。
Returns:
Tuple[List[Path], List[Path]]: 内置插件目录列表与第三方插件目录列表。
"""
return self._get_builtin_plugin_dirs(), self._get_third_party_plugin_dirs()
@staticmethod
def _resolve_supervisor_socket_paths() -> Tuple[Optional[str], Optional[str]]:
"""解析内置与第三方 Supervisor 的 IPC 地址。
Returns:
Tuple[Optional[str], Optional[str]]: 内置 Runner 与第三方 Runner 的 socket 地址。
"""
runtime_config = config_manager.get_global_config().plugin_runtime
socket_path_base = runtime_config.ipc_socket_path or None
builtin_socket = f"{socket_path_base}-builtin" if socket_path_base else None
third_party_socket = f"{socket_path_base}-third_party" if socket_path_base else None
return builtin_socket, third_party_socket
def _apply_blocked_plugin_reasons_to_supervisors(self) -> None:
"""将当前阻止加载插件列表同步到全部 Supervisor。"""
for supervisor in self.supervisors:
set_blocked_plugin_reasons = getattr(supervisor, "set_blocked_plugin_reasons", None)
if callable(set_blocked_plugin_reasons):
set_blocked_plugin_reasons(self._blocked_plugin_reasons)
def _set_blocked_plugin_reasons(self, blocked_plugin_reasons: Dict[str, str]) -> Set[str]:
"""更新 Host 侧维护的阻止加载插件列表。
Args:
blocked_plugin_reasons: 最新的阻止加载插件及原因映射。
Returns:
Set[str]: 本次发生状态变化的插件 ID 集合。
"""
normalized_reasons = {
str(plugin_id or "").strip(): str(reason or "").strip()
for plugin_id, reason in blocked_plugin_reasons.items()
if str(plugin_id or "").strip() and str(reason or "").strip()
}
changed_plugin_ids = {
plugin_id
for plugin_id in set(self._blocked_plugin_reasons) | set(normalized_reasons)
if self._blocked_plugin_reasons.get(plugin_id) != normalized_reasons.get(plugin_id)
}
self._blocked_plugin_reasons = normalized_reasons
self._apply_blocked_plugin_reasons_to_supervisors()
return changed_plugin_ids
async def _sync_plugin_dependencies(self, plugin_dirs: Sequence[Path]) -> DependencySyncState:
"""执行插件依赖同步,并刷新阻止加载插件列表。
Args:
plugin_dirs: 当前需要参与分析的插件根目录列表。
Returns:
DependencySyncState: 同步后的环境变更状态与阻止列表变化集合。
"""
result = await self._plugin_dependency_pipeline.execute(plugin_dirs)
changed_plugin_ids = self._set_blocked_plugin_reasons(result.blocked_plugin_reasons)
return DependencySyncState(
blocked_changed_plugin_ids=changed_plugin_ids,
environment_changed=result.environment_changed,
)
def _build_supervisors(self, builtin_dirs: Sequence[Path], third_party_dirs: Sequence[Path]) -> None:
"""根据目录列表创建当前运行时所需的 Supervisor。
Args:
builtin_dirs: 内置插件目录列表。
third_party_dirs: 第三方插件目录列表。
"""
from src.plugin_runtime.host.supervisor import PluginSupervisor
builtin_socket, third_party_socket = self._resolve_supervisor_socket_paths()
self._builtin_supervisor = None
self._third_party_supervisor = None
if builtin_dirs:
builtin_supervisor = self._instantiate_supervisor(
PluginSupervisor,
plugin_dirs=list(builtin_dirs),
group_name="builtin",
hook_spec_registry=self._hook_spec_registry,
socket_path=builtin_socket,
)
self._builtin_supervisor = builtin_supervisor
self._register_capability_impls(builtin_supervisor)
if third_party_dirs:
third_party_supervisor = self._instantiate_supervisor(
PluginSupervisor,
plugin_dirs=list(third_party_dirs),
group_name="third_party",
hook_spec_registry=self._hook_spec_registry,
socket_path=third_party_socket,
)
self._third_party_supervisor = third_party_supervisor
self._register_capability_impls(third_party_supervisor)
self._apply_blocked_plugin_reasons_to_supervisors()
async def _start_supervisors(
self,
builtin_dirs: Sequence[Path],
third_party_dirs: Sequence[Path],
) -> List["PluginSupervisor"]:
"""按依赖顺序启动当前已创建的 Supervisor。
Args:
builtin_dirs: 内置插件目录列表。
third_party_dirs: 第三方插件目录列表。
Returns:
List[PluginSupervisor]: 成功启动的 Supervisor 列表。
"""
started_supervisors: List["PluginSupervisor"] = []
supervisor_groups: Dict[str, Optional["PluginSupervisor"]] = {
"builtin": self._builtin_supervisor,
"third_party": self._third_party_supervisor,
}
start_order = self._build_group_start_order(builtin_dirs, third_party_dirs)
try:
for group_name in start_order:
supervisor = supervisor_groups.get(group_name)
if supervisor is None:
continue
external_plugin_versions = {
plugin_id: plugin_version
for started_supervisor in started_supervisors
for plugin_id, plugin_version in started_supervisor.get_loaded_plugin_versions().items()
}
supervisor.set_external_available_plugins(external_plugin_versions)
set_blocked_plugin_reasons = getattr(supervisor, "set_blocked_plugin_reasons", None)
if callable(set_blocked_plugin_reasons):
set_blocked_plugin_reasons(self._blocked_plugin_reasons)
await supervisor.start()
started_supervisors.append(supervisor)
except Exception:
await asyncio.gather(*(supervisor.stop() for supervisor in started_supervisors), return_exceptions=True)
raise
return started_supervisors
async def _stop_supervisors(self) -> None:
"""停止当前全部 Supervisor。"""
supervisors = self.supervisors
if not supervisors:
return
await asyncio.gather(*(supervisor.stop() for supervisor in supervisors), return_exceptions=True)
self._builtin_supervisor = None
self._third_party_supervisor = None
async def _restart_supervisors(self, reason: str) -> bool:
"""重启当前全部 Supervisor。
Args:
reason: 本次重启的原因。
Returns:
bool: 是否重启成功。
"""
builtin_dirs, third_party_dirs = self._resolve_runtime_plugin_dirs()
if duplicate_plugin_ids := self._find_duplicate_plugin_ids(builtin_dirs + third_party_dirs):
details = "; ".join(
f"{plugin_id}: {', '.join(str(path) for path in paths)}"
for plugin_id, paths in sorted(duplicate_plugin_ids.items())
)
logger.error(f"检测到重复插件 ID拒绝执行 Supervisor 重启: {details}")
return False
logger.info(f"开始重启插件运行时 Supervisor: {reason}")
await self._stop_supervisors()
self._build_supervisors(builtin_dirs, third_party_dirs)
try:
await self._start_supervisors(builtin_dirs, third_party_dirs)
except Exception as exc:
logger.error(f"重启插件运行时 Supervisor 失败: {exc}", exc_info=True)
await self._stop_supervisors()
return False
self._refresh_plugin_config_watch_subscriptions()
logger.info(f"插件运行时 Supervisor 已重启完成: {reason}")
return True
# ─── 生命周期 ───────────────────────────────────────────── # ─── 生命周期 ─────────────────────────────────────────────
async def start(self) -> None: async def start(self) -> None:
@@ -204,10 +418,7 @@ class PluginRuntimeManager(
logger.info("插件运行时已在配置中禁用,跳过启动") logger.info("插件运行时已在配置中禁用,跳过启动")
return return
from src.plugin_runtime.host.supervisor import PluginSupervisor builtin_dirs, third_party_dirs = self._resolve_runtime_plugin_dirs()
builtin_dirs = self._get_builtin_plugin_dirs()
third_party_dirs = self._get_third_party_plugin_dirs()
if duplicate_plugin_ids := self._find_duplicate_plugin_ids(builtin_dirs + third_party_dirs): if duplicate_plugin_ids := self._find_duplicate_plugin_ids(builtin_dirs + third_party_dirs):
details = "; ".join( details = "; ".join(
@@ -221,61 +432,19 @@ class PluginRuntimeManager(
logger.info("未找到任何插件目录,跳过插件运行时启动") logger.info("未找到任何插件目录,跳过插件运行时启动")
return return
dependency_sync_state = await self._sync_plugin_dependencies(builtin_dirs + third_party_dirs)
if dependency_sync_state.environment_changed:
logger.info("插件依赖流水线已更新当前 Python 环境,启动时将直接加载最新环境")
self.ensure_builtin_hook_specs_registered() self.ensure_builtin_hook_specs_registered()
platform_io_manager = get_platform_io_manager() platform_io_manager = get_platform_io_manager()
self._build_supervisors(builtin_dirs, third_party_dirs)
# 从配置读取自定义 IPC socket 路径(留空则自动生成) started_supervisors: List["PluginSupervisor"] = []
socket_path_base = _cfg.ipc_socket_path or None
# 当用户指定了自定义路径时,为两个 Supervisor 添加后缀以避免 UDS 冲突
builtin_socket = f"{socket_path_base}-builtin" if socket_path_base else None
third_party_socket = f"{socket_path_base}-third_party" if socket_path_base else None
# 创建两个 Supervisor各自拥有独立的 socket / Runner 子进程
if builtin_dirs:
self._builtin_supervisor = self._instantiate_supervisor(
PluginSupervisor,
plugin_dirs=builtin_dirs,
group_name="builtin",
hook_spec_registry=self._hook_spec_registry,
socket_path=builtin_socket,
)
self._register_capability_impls(self._builtin_supervisor)
if third_party_dirs:
self._third_party_supervisor = self._instantiate_supervisor(
PluginSupervisor,
plugin_dirs=third_party_dirs,
group_name="third_party",
hook_spec_registry=self._hook_spec_registry,
socket_path=third_party_socket,
)
self._register_capability_impls(self._third_party_supervisor)
started_supervisors: List[PluginSupervisor] = []
try: try:
platform_io_manager.set_inbound_dispatcher(self._dispatch_platform_inbound) platform_io_manager.set_inbound_dispatcher(self._dispatch_platform_inbound)
await platform_io_manager.ensure_send_pipeline_ready() await platform_io_manager.ensure_send_pipeline_ready()
started_supervisors = await self._start_supervisors(builtin_dirs, third_party_dirs)
supervisor_groups: Dict[str, Optional[PluginSupervisor]] = {
"builtin": self._builtin_supervisor,
"third_party": self._third_party_supervisor,
}
start_order = self._build_group_start_order(builtin_dirs, third_party_dirs)
for group_name in start_order:
supervisor = supervisor_groups.get(group_name)
if supervisor is None:
continue
external_plugin_versions = {
plugin_id: plugin_version
for started_supervisor in started_supervisors
for plugin_id, plugin_version in started_supervisor.get_loaded_plugin_versions().items()
}
supervisor.set_external_available_plugins(external_plugin_versions)
await supervisor.start()
started_supervisors.append(supervisor)
await self._start_plugin_file_watcher() await self._start_plugin_file_watcher()
config_manager.register_reload_callback(self._config_reload_callback) config_manager.register_reload_callback(self._config_reload_callback)
@@ -529,6 +698,21 @@ class PluginRuntimeManager(
if not normalized_plugin_ids: if not normalized_plugin_ids:
return True return True
blocked_plugin_ids = [plugin_id for plugin_id in normalized_plugin_ids if plugin_id in self._blocked_plugin_reasons]
if blocked_plugin_ids:
logger.warning(
"以下插件当前被依赖流水线阻止加载,已拒绝重载请求: "
+ ", ".join(
f"{plugin_id} ({self._blocked_plugin_reasons[plugin_id]})"
for plugin_id in sorted(blocked_plugin_ids)
)
)
normalized_plugin_ids = [
plugin_id for plugin_id in normalized_plugin_ids if plugin_id not in self._blocked_plugin_reasons
]
if not normalized_plugin_ids:
return False
dependency_map = self._build_registered_dependency_map() dependency_map = self._build_registered_dependency_map()
supervisor_by_plugin = self._build_registered_supervisor_map() supervisor_by_plugin = self._build_registered_supervisor_map()
supervisor_roots: Dict["PluginSupervisor", List[str]] = {} supervisor_roots: Dict["PluginSupervisor", List[str]] = {}
@@ -909,6 +1093,12 @@ class PluginRuntimeManager(
normalized_plugin_id = str(plugin_id or "").strip() normalized_plugin_id = str(plugin_id or "").strip()
if not normalized_plugin_id: if not normalized_plugin_id:
return False return False
if normalized_plugin_id in self._blocked_plugin_reasons:
logger.warning(
f"插件 {normalized_plugin_id} 当前被依赖流水线阻止加载: "
f"{self._blocked_plugin_reasons[normalized_plugin_id]}"
)
return False
try: try:
registered_supervisor = self._get_supervisor_for_plugin(normalized_plugin_id) registered_supervisor = self._get_supervisor_for_plugin(normalized_plugin_id)
@@ -933,7 +1123,7 @@ class PluginRuntimeManager(
def _find_duplicate_plugin_ids(cls, plugin_dirs: List[Path]) -> Dict[str, List[Path]]: def _find_duplicate_plugin_ids(cls, plugin_dirs: List[Path]) -> Dict[str, List[Path]]:
"""扫描插件目录,找出被多个目录重复声明的插件 ID。""" """扫描插件目录,找出被多个目录重复声明的插件 ID。"""
plugin_locations: Dict[str, List[Path]] = {} plugin_locations: Dict[str, List[Path]] = {}
validator = ManifestValidator() validator = ManifestValidator(validate_python_package_dependencies=False)
for plugin_path, manifest in validator.iter_plugin_manifests(plugin_dirs): for plugin_path, manifest in validator.iter_plugin_manifests(plugin_dirs):
plugin_locations.setdefault(manifest.id, []).append(plugin_path) plugin_locations.setdefault(manifest.id, []).append(plugin_path)
@@ -1190,7 +1380,8 @@ class PluginRuntimeManager(
if not self._started or not changes: if not self._started or not changes:
return return
if duplicate_plugin_ids := self._find_duplicate_plugin_ids(list(self._iter_plugin_dirs())): plugin_dirs = list(self._iter_plugin_dirs())
if duplicate_plugin_ids := self._find_duplicate_plugin_ids(plugin_dirs):
details = "; ".join( details = "; ".join(
f"{plugin_id}: {', '.join(str(path) for path in paths)}" f"{plugin_id}: {', '.join(str(path) for path in paths)}"
for plugin_id, paths in sorted(duplicate_plugin_ids.items()) for plugin_id, paths in sorted(duplicate_plugin_ids.items())
@@ -1198,21 +1389,24 @@ class PluginRuntimeManager(
logger.error(f"检测到重复插件 ID跳过本次插件热重载: {details}") logger.error(f"检测到重复插件 ID跳过本次插件热重载: {details}")
return return
changed_plugin_ids: List[str] = [] relevant_source_changes = [
changed_paths = [change.path.resolve() for change in changes] change.path.resolve()
for change in changes
if change.path.name in {"plugin.py", "_manifest.json"} or change.path.suffix == ".py"
]
if not relevant_source_changes:
return
for supervisor in self.supervisors: dependency_sync_state = await self._sync_plugin_dependencies(plugin_dirs)
for path in changed_paths: restart_reason = "file_watcher"
plugin_id = self._match_plugin_id_for_supervisor(supervisor, path) if dependency_sync_state.environment_changed:
if plugin_id is None: restart_reason = "file_watcher_dependency_install"
continue elif dependency_sync_state.blocked_changed_plugin_ids:
if path.name in {"plugin.py", "_manifest.json"} or path.suffix == ".py": restart_reason = "file_watcher_blocklist_changed"
if plugin_id not in changed_plugin_ids:
changed_plugin_ids.append(plugin_id)
if changed_plugin_ids: restarted = await self._restart_supervisors(restart_reason)
await self.reload_plugins_globally(changed_plugin_ids, reason="file_watcher") if not restarted:
self._refresh_plugin_config_watch_subscriptions() logger.warning(f"插件源码变更后重启 Supervisor 失败: {restart_reason}")
@staticmethod @staticmethod
def _plugin_dir_matches(path: Path, plugin_dir: Path) -> bool: def _plugin_dir_matches(path: Path, plugin_dir: Path) -> bool:

View File

@@ -609,6 +609,7 @@ class ManifestValidator:
host_version: str = "", host_version: str = "",
sdk_version: str = "", sdk_version: str = "",
project_root: Optional[Path] = None, project_root: Optional[Path] = None,
validate_python_package_dependencies: bool = True,
) -> None: ) -> None:
"""初始化 Manifest 校验器。 """初始化 Manifest 校验器。
@@ -616,10 +617,12 @@ class ManifestValidator:
host_version: 当前 Host 版本号;留空时自动从主程序 ``pyproject.toml`` 读取。 host_version: 当前 Host 版本号;留空时自动从主程序 ``pyproject.toml`` 读取。
sdk_version: 当前 SDK 版本号;留空时自动从运行环境中探测。 sdk_version: 当前 SDK 版本号;留空时自动从运行环境中探测。
project_root: 项目根目录;留空时自动推断。 project_root: 项目根目录;留空时自动推断。
validate_python_package_dependencies: 是否校验 Python 包依赖与当前环境的关系。
""" """
self._project_root: Path = project_root or self._resolve_project_root() self._project_root: Path = project_root or self._resolve_project_root()
self._host_version: str = host_version or self._detect_default_host_version(self._project_root) self._host_version: str = host_version or self._detect_default_host_version(self._project_root)
self._sdk_version: str = sdk_version or self._detect_default_sdk_version(self._project_root) self._sdk_version: str = sdk_version or self._detect_default_sdk_version(self._project_root)
self._validate_python_package_dependencies: bool = validate_python_package_dependencies
self.errors: List[str] = [] self.errors: List[str] = []
self.warnings: List[str] = [] self.warnings: List[str] = []
@@ -823,9 +826,10 @@ class ManifestValidator:
if not sdk_ok: if not sdk_ok:
self.errors.append(f"SDK 版本不兼容: {sdk_message} (当前 SDK: {self._sdk_version})") self.errors.append(f"SDK 版本不兼容: {sdk_message} (当前 SDK: {self._sdk_version})")
self._validate_python_package_dependencies(manifest) if self._validate_python_package_dependencies:
self._validate_python_package_dependencies_against_runtime(manifest)
def _validate_python_package_dependencies(self, manifest: PluginManifest) -> None: def _validate_python_package_dependencies_against_runtime(self, manifest: PluginManifest) -> None:
"""校验 Python 包依赖与主程序运行环境是否冲突。 """校验 Python 包依赖与主程序运行环境是否冲突。
Args: Args:
@@ -865,6 +869,68 @@ class ManifestValidator:
f"主程序依赖约束为 {host_specifier or '任意版本'}" f"主程序依赖约束为 {host_specifier or '任意版本'}"
) )
def load_host_dependency_requirements(self) -> Dict[str, Requirement]:
"""读取主程序在 ``pyproject.toml`` 中声明的依赖约束。
Returns:
Dict[str, Requirement]: 以规范化包名为键的依赖约束映射。
"""
return self._load_host_dependency_requirements(self._project_root)
def get_installed_package_version(self, package_name: str) -> Optional[str]:
"""查询当前运行环境中指定包的安装版本。
Args:
package_name: 需要查询的包名。
Returns:
Optional[str]: 已安装版本号;未安装时返回 ``None``。
"""
return self._get_installed_package_version(package_name)
@staticmethod
def build_specifier_set(version_spec: str) -> Optional[SpecifierSet]:
"""将版本约束文本转换为 ``SpecifierSet``。
Args:
version_spec: 原始版本约束文本。
Returns:
Optional[SpecifierSet]: 转换成功时返回约束对象,否则返回 ``None``。
"""
return ManifestValidator._build_specifier_set(version_spec)
@staticmethod
def version_matches_specifier(version: str, version_spec: str) -> bool:
"""判断版本号是否满足给定约束。
Args:
version: 待判断的版本号。
version_spec: 版本约束表达式。
Returns:
bool: 是否满足约束。
"""
return ManifestValidator._version_matches_specifier(version, version_spec)
@classmethod
def requirements_may_overlap(cls, left: SpecifierSet, right: SpecifierSet) -> bool:
"""判断两个版本约束是否可能存在交集。
Args:
left: 左侧版本约束。
right: 右侧版本约束。
Returns:
bool: 若两者可能同时满足则返回 ``True``。
"""
return cls._requirements_may_overlap(left, right)
def _log_errors(self) -> None: def _log_errors(self) -> None:
"""输出当前累计的 Manifest 校验错误。""" """输出当前累计的 Manifest 校验错误。"""
for error_message in self.errors: for error_message in self.errors:

View File

@@ -75,6 +75,35 @@ class PluginLoader:
self._failed_plugins: Dict[str, str] = {} self._failed_plugins: Dict[str, str] = {}
self._manifest_validator = ManifestValidator(host_version=host_version) self._manifest_validator = ManifestValidator(host_version=host_version)
self._compat_hook_installed = False self._compat_hook_installed = False
self._blocked_plugin_reasons: Dict[str, str] = {}
def set_blocked_plugin_reasons(self, blocked_plugin_reasons: Optional[Dict[str, str]] = None) -> None:
"""更新当前加载器持有的拒绝加载插件列表。
Args:
blocked_plugin_reasons: 需要拒绝加载的插件及原因映射。
"""
self._blocked_plugin_reasons = {
str(plugin_id or "").strip(): str(reason or "").strip()
for plugin_id, reason in (blocked_plugin_reasons or {}).items()
if str(plugin_id or "").strip() and str(reason or "").strip()
}
def get_blocked_plugin_reason(self, plugin_id: str) -> Optional[str]:
"""返回指定插件当前的拒绝加载原因。
Args:
plugin_id: 目标插件 ID。
Returns:
Optional[str]: 若插件被阻止加载则返回原因,否则返回 ``None``。
"""
normalized_plugin_id = str(plugin_id or "").strip()
if not normalized_plugin_id:
return None
return self._blocked_plugin_reasons.get(normalized_plugin_id)
def discover_and_load( def discover_and_load(
self, self,
@@ -156,6 +185,11 @@ class PluginLoader:
return None return None
plugin_id = manifest.id plugin_id = manifest.id
if blocked_reason := self.get_blocked_plugin_reason(plugin_id):
self._failed_plugins[plugin_id] = blocked_reason
logger.warning(f"插件 {plugin_id} 已被 Host 依赖流水线阻止加载: {blocked_reason}")
return None
return plugin_id, (plugin_dir, manifest, plugin_path) return plugin_id, (plugin_dir, manifest, plugin_path)
def _record_duplicate_candidates(self, duplicate_candidates: Dict[str, List[Path]]) -> None: def _record_duplicate_candidates(self, duplicate_candidates: Dict[str, List[Path]]) -> None:

View File

@@ -29,6 +29,7 @@ import tomlkit
from src.common.logger import get_console_handler, get_logger, initialize_logging from src.common.logger import get_console_handler, get_logger, initialize_logging
from src.plugin_runtime import ( from src.plugin_runtime import (
ENV_BLOCKED_PLUGIN_REASONS,
ENV_EXTERNAL_PLUGIN_IDS, ENV_EXTERNAL_PLUGIN_IDS,
ENV_HOST_VERSION, ENV_HOST_VERSION,
ENV_IPC_ADDRESS, ENV_IPC_ADDRESS,
@@ -196,6 +197,7 @@ class PluginRunner:
session_token: str, session_token: str,
plugin_dirs: List[str], plugin_dirs: List[str],
external_available_plugins: Optional[Dict[str, str]] = None, external_available_plugins: Optional[Dict[str, str]] = None,
blocked_plugin_reasons: Optional[Dict[str, str]] = None,
) -> None: ) -> None:
"""初始化 Runner。 """初始化 Runner。
@@ -204,6 +206,7 @@ class PluginRunner:
session_token: 握手用会话令牌。 session_token: 握手用会话令牌。
plugin_dirs: 当前 Runner 负责扫描的插件目录列表。 plugin_dirs: 当前 Runner 负责扫描的插件目录列表。
external_available_plugins: 视为已满足的外部依赖插件版本映射。 external_available_plugins: 视为已满足的外部依赖插件版本映射。
blocked_plugin_reasons: 需要拒绝加载的插件及原因映射。
""" """
self._host_address: str = host_address self._host_address: str = host_address
self._session_token: str = session_token self._session_token: str = session_token
@@ -213,9 +216,15 @@ class PluginRunner:
for plugin_id, plugin_version in (external_available_plugins or {}).items() for plugin_id, plugin_version in (external_available_plugins or {}).items()
if str(plugin_id or "").strip() and str(plugin_version or "").strip() if str(plugin_id or "").strip() and str(plugin_version or "").strip()
} }
self._blocked_plugin_reasons: Dict[str, str] = {
str(plugin_id or "").strip(): str(reason or "").strip()
for plugin_id, reason in (blocked_plugin_reasons or {}).items()
if str(plugin_id or "").strip() and str(reason or "").strip()
}
self._rpc_client: RPCClient = RPCClient(host_address, session_token) self._rpc_client: RPCClient = RPCClient(host_address, session_token)
self._loader: PluginLoader = PluginLoader(host_version=os.getenv(ENV_HOST_VERSION, "")) self._loader: PluginLoader = PluginLoader(host_version=os.getenv(ENV_HOST_VERSION, ""))
self._loader.set_blocked_plugin_reasons(self._blocked_plugin_reasons)
self._start_time: float = time.monotonic() self._start_time: float = time.monotonic()
self._shutting_down: bool = False self._shutting_down: bool = False
self._reload_lock: asyncio.Lock = asyncio.Lock() self._reload_lock: asyncio.Lock = asyncio.Lock()
@@ -1639,6 +1648,7 @@ class PluginRunner:
async def _async_main() -> None: async def _async_main() -> None:
"""异步主入口""" """异步主入口"""
blocked_plugin_reasons_raw = os.environ.get(ENV_BLOCKED_PLUGIN_REASONS, "")
host_address = os.environ.pop(ENV_IPC_ADDRESS, "") host_address = os.environ.pop(ENV_IPC_ADDRESS, "")
external_plugin_ids_raw = os.environ.get(ENV_EXTERNAL_PLUGIN_IDS, "") external_plugin_ids_raw = os.environ.get(ENV_EXTERNAL_PLUGIN_IDS, "")
session_token = os.environ.pop(ENV_SESSION_TOKEN, "") session_token = os.environ.pop(ENV_SESSION_TOKEN, "")
@@ -1658,13 +1668,30 @@ async def _async_main() -> None:
logger.warning("外部依赖插件版本映射格式非法,已回退为空映射") logger.warning("外部依赖插件版本映射格式非法,已回退为空映射")
external_plugin_ids = {} external_plugin_ids = {}
try:
blocked_plugin_reasons = json.loads(blocked_plugin_reasons_raw) if blocked_plugin_reasons_raw else {}
except json.JSONDecodeError:
logger.warning("解析阻止加载插件原因映射失败,已回退为空映射")
blocked_plugin_reasons = {}
if not isinstance(blocked_plugin_reasons, dict):
logger.warning("阻止加载插件原因映射格式非法,已回退为空映射")
blocked_plugin_reasons = {}
runner_kwargs: Dict[str, Any] = {
"external_available_plugins": {
str(plugin_id): str(plugin_version) for plugin_id, plugin_version in external_plugin_ids.items()
}
}
if blocked_plugin_reasons:
runner_kwargs["blocked_plugin_reasons"] = {
str(plugin_id): str(reason) for plugin_id, reason in blocked_plugin_reasons.items()
}
runner = PluginRunner( runner = PluginRunner(
host_address, host_address,
session_token, session_token,
plugin_dirs, plugin_dirs,
external_available_plugins={ **runner_kwargs,
str(plugin_id): str(plugin_version) for plugin_id, plugin_version in external_plugin_ids.items()
},
) )
# 注册信号处理 # 注册信号处理

View File

@@ -0,0 +1,937 @@
"""HTML 浏览器渲染服务。
负责在 Host 侧复用已有浏览器,并将 HTML 内容渲染为 PNG 图片。
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from importlib import metadata
from io import BytesIO
from pathlib import Path
from typing import Any, Dict, Literal, Optional, Tuple, cast
from urllib.parse import urlparse
import asyncio
import base64
import contextlib
import functools
import json
import os
import shutil
import sys
import time
from src.common.logger import PROJECT_ROOT, get_logger
from src.config.config import config_manager
from src.config.official_configs import PluginRuntimeRenderConfig
logger = get_logger("services.html_render_service")
_NETWORK_ALLOW_SCHEMES = frozenset({"about", "blob", "data", "file"})
_WINDOWS_BROWSER_PATHS = (
Path("C:/Program Files/Google/Chrome/Application/chrome.exe"),
Path("C:/Program Files (x86)/Google/Chrome/Application/chrome.exe"),
Path("C:/Program Files/Microsoft/Edge/Application/msedge.exe"),
Path("C:/Program Files (x86)/Microsoft/Edge/Application/msedge.exe"),
)
_MACOS_BROWSER_PATHS = (
Path("/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"),
Path("/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge"),
)
_UNIX_BROWSER_NAMES = (
"chromium",
"chromium-browser",
"google-chrome",
"google-chrome-stable",
"microsoft-edge",
"msedge",
)
_PLAYWRIGHT_MANAGED_BROWSER_PREFIXES = ("chromium-", "chrome-", "chrome-headless-shell-")
@dataclass(slots=True)
class HtmlRenderRequest:
"""描述一次 HTML 转 PNG 请求。"""
html: str
selector: str = "body"
viewport_width: int = 900
viewport_height: int = 500
device_scale_factor: float = 2.0
full_page: bool = False
omit_background: bool = False
wait_until: str = "load"
wait_for_selector: str = ""
wait_for_timeout_ms: int = 0
timeout_ms: int = 10000
allow_network: bool = False
@dataclass(slots=True)
class HtmlRenderResult:
"""描述一次 HTML 转 PNG 的输出结果。"""
image_base64: str
mime_type: str
width: int
height: int
render_ms: int
def to_payload(self) -> Dict[str, Any]:
"""将结果序列化为能力层返回结构。
Returns:
Dict[str, Any]: 可直接返回给插件运行时的结构化数据。
"""
return {
"image_base64": self.image_base64,
"mime_type": self.mime_type,
"width": self.width,
"height": self.height,
"render_ms": self.render_ms,
}
@dataclass(slots=True)
class ManagedBrowserRecord:
"""记录 Playwright 托管浏览器的本地状态。"""
browser_name: str
browsers_path: str
install_source: Literal["auto_download", "existing_cache"]
playwright_version: str
recorded_at: str
last_verified_at: str
def to_dict(self) -> Dict[str, str]:
"""将浏览器记录转换为可持久化字典。
Returns:
Dict[str, str]: 可写入 JSON 文件的字典结构。
"""
return {
"browser_name": self.browser_name,
"browsers_path": self.browsers_path,
"install_source": self.install_source,
"playwright_version": self.playwright_version,
"recorded_at": self.recorded_at,
"last_verified_at": self.last_verified_at,
}
@classmethod
def from_dict(cls, payload: Dict[str, Any]) -> Optional["ManagedBrowserRecord"]:
"""从字典中恢复浏览器状态记录。
Args:
payload: 原始字典数据。
Returns:
Optional[ManagedBrowserRecord]: 解析成功时返回记录对象,否则返回 ``None``。
"""
browser_name = str(payload.get("browser_name", "") or "").strip()
browsers_path = str(payload.get("browsers_path", "") or "").strip()
install_source = str(payload.get("install_source", "") or "").strip()
playwright_version = str(payload.get("playwright_version", "") or "").strip()
recorded_at = str(payload.get("recorded_at", "") or "").strip()
last_verified_at = str(payload.get("last_verified_at", "") or "").strip()
if not all([browser_name, browsers_path, install_source, playwright_version, recorded_at, last_verified_at]):
return None
if install_source not in {"auto_download", "existing_cache"}:
return None
validated_install_source = cast(Literal["auto_download", "existing_cache"], install_source)
return cls(
browser_name=browser_name,
browsers_path=browsers_path,
install_source=validated_install_source,
playwright_version=playwright_version,
recorded_at=recorded_at,
last_verified_at=last_verified_at,
)
class HTMLRenderService:
"""HTML 浏览器渲染服务。"""
def __init__(self) -> None:
"""初始化渲染服务。"""
self._browser: Any = None
self._browser_lock: asyncio.Lock = asyncio.Lock()
self._connected_via_cdp: bool = False
self._playwright: Any = None
self._render_count: int = 0
self._render_semaphore: Optional[asyncio.Semaphore] = None
self._render_semaphore_limit: int = 0
def _get_render_config(self) -> PluginRuntimeRenderConfig:
"""读取当前插件运行时的浏览器渲染配置。
Returns:
PluginRuntimeRenderConfig: 当前生效的浏览器渲染配置。
"""
return config_manager.get_global_config().plugin_runtime.render
def _get_render_semaphore(self) -> asyncio.Semaphore:
"""根据当前配置返回渲染并发信号量。
Returns:
asyncio.Semaphore: 控制并发的信号量对象。
"""
config = self._get_render_config()
limit = max(1, int(config.concurrency_limit))
if self._render_semaphore is None or self._render_semaphore_limit != limit:
self._render_semaphore = asyncio.Semaphore(limit)
self._render_semaphore_limit = limit
return self._render_semaphore
async def render_html_to_png(self, request: HtmlRenderRequest) -> HtmlRenderResult:
"""将 HTML 内容渲染为 PNG 图片。
Args:
request: 本次渲染请求。
Returns:
HtmlRenderResult: 渲染结果。
Raises:
RuntimeError: 浏览器能力被禁用、Playwright 不可用或浏览器启动失败时抛出。
ValueError: 请求参数非法时抛出。
"""
config = self._get_render_config()
if not config.enabled:
raise RuntimeError("插件运行时浏览器渲染能力已禁用")
normalized_request = self._normalize_request(request, config)
semaphore = self._get_render_semaphore()
async with semaphore:
start_time = time.perf_counter()
browser = await self._ensure_browser(config)
context: Any = None
try:
context = await browser.new_context(
device_scale_factor=normalized_request.device_scale_factor,
locale="zh-CN",
viewport={
"width": normalized_request.viewport_width,
"height": normalized_request.viewport_height,
},
)
page = await context.new_page()
await self._configure_page(page, normalized_request)
image_bytes = await self._capture_image(page, normalized_request)
width, height = self._measure_image_size(image_bytes)
self._render_count += 1
await self._maybe_restart_browser(config)
return HtmlRenderResult(
image_base64=base64.b64encode(image_bytes).decode("utf-8"),
mime_type="image/png",
width=width,
height=height,
render_ms=int((time.perf_counter() - start_time) * 1000),
)
except Exception:
await self.reset_browser(restart_playwright=False)
raise
finally:
if context is not None:
with contextlib.suppress(Exception):
await context.close()
async def reset_browser(self, restart_playwright: bool = False) -> None:
"""关闭当前缓存的浏览器实例。
Args:
restart_playwright: 是否同时关闭 Playwright 运行时。
"""
async with self._browser_lock:
await self._close_browser_unlocked(restart_playwright=restart_playwright)
async def _close_browser_unlocked(self, restart_playwright: bool = False) -> None:
"""在已持有锁的情况下关闭浏览器与 Playwright。
Args:
restart_playwright: 是否同时关闭 Playwright 运行时。
"""
if self._browser is not None:
with contextlib.suppress(Exception):
await self._browser.close()
self._browser = None
self._connected_via_cdp = False
if restart_playwright and self._playwright is not None:
with contextlib.suppress(Exception):
await self._playwright.stop()
self._playwright = None
async def _ensure_browser(self, config: PluginRuntimeRenderConfig) -> Any:
"""获取可复用的浏览器实例。
Args:
config: 当前浏览器渲染配置。
Returns:
Any: Playwright Browser 对象。
Raises:
RuntimeError: 当无法连接或启动浏览器时抛出。
"""
async with self._browser_lock:
if self._is_browser_connected(self._browser):
logger.debug("HTML 渲染服务复用进程内缓存浏览器实例")
return self._browser
await self._close_browser_unlocked(restart_playwright=False)
self._prepare_playwright_environment(config)
playwright = await self._ensure_playwright()
browser = await self._connect_to_existing_browser(playwright, config)
if browser is None:
browser = await self._launch_browser(playwright, config)
self._connected_via_cdp = False
else:
self._connected_via_cdp = True
self._browser = browser
self._bind_browser_events(browser)
return browser
async def _ensure_playwright(self) -> Any:
"""懒加载并启动 Playwright 运行时。
Returns:
Any: 已启动的 Playwright 对象。
Raises:
RuntimeError: 当前环境未安装 Playwright 时抛出。
"""
if self._playwright is not None:
return self._playwright
try:
from playwright.async_api import async_playwright
except ImportError as exc:
raise RuntimeError(
"当前环境未安装 Python Playwright请先在宿主环境安装 `playwright` 依赖。"
) from exc
self._playwright = await async_playwright().start()
return self._playwright
@staticmethod
def _is_browser_connected(browser: Any) -> bool:
"""判断浏览器对象当前是否仍然可用。
Args:
browser: 待检查的浏览器对象。
Returns:
bool: 若浏览器仍连接,则返回 ``True``。
"""
if browser is None:
return False
try:
return bool(browser.is_connected())
except Exception:
return False
async def _connect_to_existing_browser(self, playwright: Any, config: PluginRuntimeRenderConfig) -> Any:
"""优先连接外部已有的 Chromium 浏览器。
Args:
playwright: 已启动的 Playwright 对象。
config: 当前浏览器渲染配置。
Returns:
Any: 连接成功时返回 Browser否则返回 ``None``。
"""
if not config.browser_ws_endpoint.strip():
return None
try:
timeout_ms = int(config.startup_timeout_sec * 1000)
logger.info(
"HTML 渲染服务准备连接现有浏览器: "
f"endpoint={config.browser_ws_endpoint.strip()}, timeout_ms={timeout_ms}"
)
browser = await playwright.chromium.connect_over_cdp(
config.browser_ws_endpoint.strip(),
timeout=timeout_ms,
)
logger.info("HTML 渲染服务已连接到现有浏览器")
return browser
except Exception as exc:
logger.warning(f"连接现有浏览器失败,将回退为本地启动: {exc}")
return None
async def _launch_browser(self, playwright: Any, config: PluginRuntimeRenderConfig) -> Any:
"""启动本地 Chromium 浏览器。
Args:
playwright: 已启动的 Playwright 对象。
config: 当前浏览器渲染配置。
Returns:
Any: 新启动的 Browser 对象。
Raises:
RuntimeError: 浏览器启动失败时抛出。
"""
launch_options = self._build_launch_options(config)
logger.info(
"HTML 渲染服务准备启动浏览器: "
f"source={'system' if 'executable_path' in launch_options else 'managed'}, "
f"headless={bool(launch_options.get('headless'))}, "
f"timeout_ms={int(launch_options.get('timeout', 0))}"
)
try:
browser = await playwright.chromium.launch(**launch_options)
if "executable_path" in launch_options:
logger.info(f"HTML 渲染服务已启动本机浏览器: executable_path={launch_options['executable_path']}")
else:
self._update_managed_browser_record(config, install_source="existing_cache")
logger.info("HTML 渲染服务已启动 Playwright 托管浏览器")
return browser
except Exception as exc:
if self._should_auto_download_browser(exc, launch_options, config):
logger.warning(f"HTML 渲染服务未找到可用浏览器,将尝试自动下载 Chromium: {exc}")
await self._install_chromium_browser(config)
retry_browser = await playwright.chromium.launch(**launch_options)
self._update_managed_browser_record(config, install_source="auto_download")
logger.info("HTML 渲染服务已自动下载并启动 Chromium")
return retry_browser
raise RuntimeError(f"启动本地浏览器失败: {exc}") from exc
def _bind_browser_events(self, browser: Any) -> None:
"""为浏览器绑定断线回调。
Args:
browser: 需要绑定事件的浏览器对象。
"""
try:
browser.on("disconnected", self._handle_browser_disconnected)
except Exception:
return
def _handle_browser_disconnected(self, *_args: Any) -> None:
"""处理浏览器断线事件。
Args:
*_args: 浏览器断线事件透传的参数。
"""
self._browser = None
self._connected_via_cdp = False
logger.warning("HTML 渲染浏览器已断开,将在下次请求时重新建立连接")
def _build_launch_options(self, config: PluginRuntimeRenderConfig) -> Dict[str, Any]:
"""构造本地浏览器启动参数。
Args:
config: 当前浏览器渲染配置。
Returns:
Dict[str, Any]: 可直接传给 Playwright 的启动参数。
"""
launch_options: Dict[str, Any] = {
"args": list(config.launch_args),
"headless": bool(config.headless),
"timeout": int(config.startup_timeout_sec * 1000),
}
executable_path = self._resolve_executable_path(config)
if executable_path:
launch_options["executable_path"] = executable_path
return launch_options
@staticmethod
def _should_auto_download_browser(
exc: Exception,
launch_options: Dict[str, Any],
config: PluginRuntimeRenderConfig,
) -> bool:
"""判断当前启动错误是否适合自动下载 Chromium 后重试。
Args:
exc: 浏览器启动异常。
launch_options: 本次启动参数。
config: 当前浏览器渲染配置。
Returns:
bool: 若应自动下载后重试,则返回 ``True``。
"""
if "executable_path" in launch_options:
logger.debug("当前启动参数已指定本机浏览器路径,不进入自动下载分支")
return False
if not config.auto_download_chromium:
logger.warning("HTML 渲染服务未检测到可用浏览器,且已禁用自动下载 Chromium")
return False
error_text = str(exc).lower()
should_download = "executable doesn't exist" in error_text or "browser executable" in error_text
if not should_download:
logger.warning(f"浏览器启动失败,但错误不属于可自动下载恢复的类型: {exc}")
return should_download
def _resolve_executable_path(self, config: PluginRuntimeRenderConfig) -> str:
"""解析实际应使用的浏览器可执行文件路径。
Args:
config: 当前浏览器渲染配置。
Returns:
str: 命中的浏览器可执行文件路径;未命中时返回空字符串。
"""
configured_path = config.executable_path.strip()
if configured_path:
path = Path(configured_path).expanduser()
if path.exists():
logger.info(f"HTML 渲染服务使用配置指定的浏览器路径: {path}")
return str(path)
logger.warning(f"配置的浏览器路径不存在,将尝试自动探测: {configured_path}")
detected_path = self._detect_local_browser_executable()
if detected_path:
logger.info(f"HTML 渲染服务自动探测到本机浏览器: {detected_path}")
else:
logger.info("HTML 渲染服务未探测到本机浏览器,将尝试使用 Playwright 托管浏览器")
return detected_path
def _prepare_playwright_environment(self, config: PluginRuntimeRenderConfig) -> Path:
"""准备 Playwright 运行所需的共享浏览器目录环境变量。
Args:
config: 当前浏览器渲染配置。
Returns:
Path: Playwright 浏览器缓存目录。
"""
browsers_path = self._get_managed_browsers_path(config)
browsers_path.mkdir(parents=True, exist_ok=True)
os.environ["PLAYWRIGHT_BROWSERS_PATH"] = str(browsers_path)
logger.debug(f"HTML 渲染服务使用 Playwright 浏览器目录: {browsers_path}")
return browsers_path
def _get_managed_browsers_path(self, config: PluginRuntimeRenderConfig) -> Path:
"""获取 Playwright 托管浏览器目录。
Args:
config: 当前浏览器渲染配置。
Returns:
Path: 托管浏览器目录的绝对路径。
"""
configured_path = config.browser_install_root.strip()
if not configured_path:
return (PROJECT_ROOT / "data" / "playwright-browsers").resolve()
candidate_path = Path(configured_path).expanduser()
if candidate_path.is_absolute():
return candidate_path.resolve()
return (PROJECT_ROOT / candidate_path).resolve()
def _get_browser_state_path(self) -> Path:
"""获取托管浏览器状态文件路径。
Returns:
Path: 浏览器状态文件路径。
"""
return (PROJECT_ROOT / "data" / "plugin_runtime" / "html_render_browser_state.json").resolve()
def _load_managed_browser_record(self) -> Optional[ManagedBrowserRecord]:
"""读取最近一次成功使用的托管浏览器记录。
Returns:
Optional[ManagedBrowserRecord]: 解析成功时返回记录对象,否则返回 ``None``。
"""
state_path = self._get_browser_state_path()
if not state_path.exists():
return None
try:
raw_payload = json.loads(state_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
logger.warning(f"HTML 渲染浏览器状态文件读取失败,将忽略并继续: {state_path}")
return None
if not isinstance(raw_payload, dict):
logger.warning(f"HTML 渲染浏览器状态文件格式无效,将忽略并继续: {state_path}")
return None
browser_record = ManagedBrowserRecord.from_dict(raw_payload)
if browser_record is not None:
logger.debug(
"HTML 渲染服务已加载浏览器状态记录: "
f"source={browser_record.install_source}, path={browser_record.browsers_path}, "
f"verified_at={browser_record.last_verified_at}"
)
return browser_record
def _save_managed_browser_record(self, record: ManagedBrowserRecord) -> None:
"""保存托管浏览器记录。
Args:
record: 待保存的浏览器记录。
"""
state_path = self._get_browser_state_path()
state_path.parent.mkdir(parents=True, exist_ok=True)
state_path.write_text(
json.dumps(record.to_dict(), ensure_ascii=False, indent=2),
encoding="utf-8",
)
logger.info(
"HTML 渲染服务已写入浏览器状态记录: "
f"path={state_path}, source={record.install_source}, browsers_path={record.browsers_path}"
)
def _update_managed_browser_record(
self,
config: PluginRuntimeRenderConfig,
install_source: Literal["auto_download", "existing_cache"],
) -> None:
"""更新托管 Chromium 的使用记录。
Args:
config: 当前浏览器渲染配置。
install_source: 本次记录的浏览器来源。
"""
browsers_path = self._get_managed_browsers_path(config)
if not self._has_managed_browser_artifact(browsers_path):
return
now_iso = datetime.now(timezone.utc).isoformat()
existing_record = self._load_managed_browser_record()
recorded_at = now_iso
if existing_record is not None and existing_record.browsers_path == str(browsers_path):
recorded_at = existing_record.recorded_at
self._save_managed_browser_record(
ManagedBrowserRecord(
browser_name="chromium",
browsers_path=str(browsers_path),
install_source=install_source,
playwright_version=self._get_playwright_version(),
recorded_at=recorded_at,
last_verified_at=now_iso,
)
)
logger.info(
"HTML 渲染服务已更新托管浏览器记录: "
f"source={install_source}, browsers_path={browsers_path}, last_verified_at={now_iso}"
)
async def _install_chromium_browser(self, config: PluginRuntimeRenderConfig) -> None:
"""自动下载 Playwright Chromium 浏览器。
Args:
config: 当前浏览器渲染配置。
Raises:
RuntimeError: 下载失败时抛出。
"""
browsers_path = self._prepare_playwright_environment(config)
logger.warning(
"HTML 渲染服务开始自动下载 Chromium: "
f"target_dir={browsers_path}, timeout_sec={config.download_connection_timeout_sec}"
)
env = os.environ.copy()
env["PLAYWRIGHT_BROWSERS_PATH"] = str(browsers_path)
env["PLAYWRIGHT_DOWNLOAD_CONNECTION_TIMEOUT"] = str(int(config.download_connection_timeout_sec * 1000))
process = await asyncio.create_subprocess_exec(
sys.executable,
"-m",
"playwright",
"install",
"chromium",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
stdout_bytes, stderr_bytes = await process.communicate()
if process.returncode != 0:
stderr_text = stderr_bytes.decode("utf-8", errors="ignore").strip()
stdout_text = stdout_bytes.decode("utf-8", errors="ignore").strip()
error_detail = stderr_text or stdout_text or f"退出码 {process.returncode}"
raise RuntimeError(f"自动下载 Chromium 失败: {error_detail}")
if not self._has_managed_browser_artifact(browsers_path):
raise RuntimeError("Chromium 下载完成后未检测到可用浏览器文件")
logger.info(f"HTML 渲染服务自动下载 Chromium 完成: target_dir={browsers_path}")
@staticmethod
def _get_playwright_version() -> str:
"""读取当前环境中的 Playwright 版本号。
Returns:
str: Playwright 版本字符串;读取失败时返回 ``unknown``。
"""
try:
return metadata.version("playwright")
except metadata.PackageNotFoundError:
return "unknown"
@staticmethod
def _has_managed_browser_artifact(browsers_path: Path) -> bool:
"""检查共享目录中是否存在可用的 Playwright 托管浏览器。
Args:
browsers_path: Playwright 浏览器目录。
Returns:
bool: 若检测到 Chromium/Chrome 相关浏览器文件夹,则返回 ``True``。
"""
if not browsers_path.exists():
return False
for child_path in browsers_path.iterdir():
if not child_path.is_dir():
continue
if child_path.name.startswith(_PLAYWRIGHT_MANAGED_BROWSER_PREFIXES):
return True
return False
def _detect_local_browser_executable(self) -> str:
"""自动探测当前宿主系统中的可复用浏览器路径。
Returns:
str: 命中的浏览器可执行文件路径;未命中时返回空字符串。
"""
for browser_name in _UNIX_BROWSER_NAMES:
resolved_path = shutil.which(browser_name)
if resolved_path:
return resolved_path
for candidate_path in self._get_candidate_executable_paths():
if candidate_path.exists():
return str(candidate_path)
return ""
@staticmethod
def _get_candidate_executable_paths() -> Tuple[Path, ...]:
"""返回当前平台常见浏览器路径候选集合。
Returns:
Tuple[Path, ...]: 可能存在浏览器可执行文件的路径列表。
"""
if sys.platform.startswith("win"):
return _WINDOWS_BROWSER_PATHS
if sys.platform == "darwin":
return _MACOS_BROWSER_PATHS
return ()
async def _configure_page(self, page: Any, request: HtmlRenderRequest) -> None:
"""为页面设置超时、网络策略并写入 HTML。
Args:
page: Playwright 页面对象。
request: 当前渲染请求。
"""
page.set_default_timeout(request.timeout_ms)
await page.route(
"**/*",
functools.partial(self._handle_network_route, allow_network=request.allow_network),
)
await page.set_content(
request.html,
timeout=request.timeout_ms,
wait_until=request.wait_until,
)
if request.wait_for_selector:
await page.locator(request.wait_for_selector).first.wait_for(
state="attached",
timeout=request.timeout_ms,
)
if request.wait_for_timeout_ms > 0:
await page.wait_for_timeout(request.wait_for_timeout_ms)
async def _handle_network_route(self, route: Any, allow_network: bool) -> None:
"""处理页面资源请求的网络准入策略。
Args:
route: Playwright 路由对象。
allow_network: 是否允许页面访问外部网络资源。
"""
request_url = str(route.request.url)
if allow_network or self._is_network_request_allowed(request_url):
await route.continue_()
return
await route.abort()
@staticmethod
def _is_network_request_allowed(request_url: str) -> bool:
"""判断某个资源 URL 是否属于本地安全资源。
Args:
request_url: 待判断的资源地址。
Returns:
bool: 若请求可在无网络模式下放行,则返回 ``True``。
"""
if not request_url:
return False
parsed_url = urlparse(request_url)
return parsed_url.scheme in _NETWORK_ALLOW_SCHEMES
async def _capture_image(self, page: Any, request: HtmlRenderRequest) -> bytes:
"""从页面或目标元素中截取 PNG 图片。
Args:
page: Playwright 页面对象。
request: 当前渲染请求。
Returns:
bytes: PNG 二进制内容。
Raises:
RuntimeError: 目标元素不存在或截图结果为空时抛出。
"""
if request.full_page and request.selector == "body":
image_bytes = await page.screenshot(
full_page=True,
omit_background=request.omit_background,
timeout=request.timeout_ms,
type="png",
)
else:
locator = page.locator(request.selector).first
await locator.wait_for(state="visible", timeout=request.timeout_ms)
image_bytes = await locator.screenshot(
omit_background=request.omit_background,
timeout=request.timeout_ms,
type="png",
)
if not image_bytes:
raise RuntimeError("浏览器截图结果为空")
return image_bytes
@staticmethod
def _measure_image_size(image_bytes: bytes) -> Tuple[int, int]:
"""读取 PNG 图片的真实像素尺寸。
Args:
image_bytes: PNG 图片二进制内容。
Returns:
Tuple[int, int]: 图片宽高像素值。
"""
from PIL import Image
with Image.open(BytesIO(image_bytes)) as image:
return int(image.width), int(image.height)
async def _maybe_restart_browser(self, config: PluginRuntimeRenderConfig) -> None:
"""按策略决定是否重建本地浏览器实例。
Args:
config: 当前浏览器渲染配置。
"""
restart_after = int(config.restart_after_render_count)
if restart_after <= 0 or self._connected_via_cdp:
return
if self._render_count % restart_after != 0:
return
await self.reset_browser(restart_playwright=False)
logger.info("HTML 渲染服务已按累计次数策略重建本地浏览器")
@staticmethod
def _normalize_request(
request: HtmlRenderRequest,
config: PluginRuntimeRenderConfig,
) -> HtmlRenderRequest:
"""规范化并补齐 HTML 渲染请求。
Args:
request: 原始渲染请求。
config: 当前浏览器渲染配置。
Returns:
HtmlRenderRequest: 规范化后的请求对象。
Raises:
ValueError: 请求缺少必要字段或取值非法时抛出。
"""
html = request.html.strip()
if not html:
raise ValueError("缺少必要参数 html")
selector = request.selector.strip() or "body"
wait_until = HTMLRenderService._normalize_wait_until(request.wait_until)
timeout_ms = request.timeout_ms
if timeout_ms <= 0:
timeout_ms = int(config.render_timeout_sec * 1000)
return HtmlRenderRequest(
html=html,
selector=selector,
viewport_width=max(1, int(request.viewport_width)),
viewport_height=max(1, int(request.viewport_height)),
device_scale_factor=max(1.0, float(request.device_scale_factor)),
full_page=bool(request.full_page),
omit_background=bool(request.omit_background),
wait_until=wait_until,
wait_for_selector=request.wait_for_selector.strip(),
wait_for_timeout_ms=max(0, int(request.wait_for_timeout_ms)),
timeout_ms=max(1, int(timeout_ms)),
allow_network=bool(request.allow_network),
)
@staticmethod
def _normalize_wait_until(wait_until: str) -> str:
"""规范化页面等待阶段参数。
Args:
wait_until: 原始等待阶段字符串。
Returns:
str: Playwright 支持的等待阶段值。
"""
normalized_wait_until = wait_until.strip().lower()
if normalized_wait_until in {"commit", "domcontentloaded", "load", "networkidle"}:
return normalized_wait_until
return "load"
_html_render_service: Optional[HTMLRenderService] = None
def get_html_render_service() -> HTMLRenderService:
"""获取 HTML 浏览器渲染服务单例。
Returns:
HTMLRenderService: 全局唯一的浏览器渲染服务实例。
"""
global _html_render_service
if _html_render_service is None:
_html_render_service = HTMLRenderService()
return _html_render_service