diff --git a/pyproject.toml b/pyproject.toml index 0ba96250..a8a961b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "openai>=1.95.0", "pandas>=2.3.1", "pillow>=11.3.0", + "playwright>=1.54.0", "pyarrow>=20.0.0", "pydantic>=2.11.7", "pypinyin>=0.54.0", diff --git a/pytests/test_html_render_service.py b/pytests/test_html_render_service.py new file mode 100644 index 00000000..3485ee5a --- /dev/null +++ b/pytests/test_html_render_service.py @@ -0,0 +1,194 @@ +"""HTML 浏览器渲染服务测试。""" + +from pathlib import Path +from typing import Any, Dict, List + +import pytest + +from src.config.official_configs import PluginRuntimeRenderConfig +from src.services import html_render_service as html_render_service_module +from src.services.html_render_service import HTMLRenderService, ManagedBrowserRecord + + +class _FakeChromium: + """用于模拟 Playwright Chromium 启动器的测试桩。""" + + def __init__(self, effects: List[Any]) -> None: + """初始化 Chromium 启动测试桩。 + + Args: + effects: 每次调用 ``launch`` 时依次返回或抛出的结果。 + """ + + self._effects: List[Any] = list(effects) + self.calls: List[Dict[str, Any]] = [] + + async def launch(self, **kwargs: Any) -> Any: + """模拟 Playwright Chromium 的启动过程。 + + Args: + **kwargs: 浏览器启动参数。 + + Returns: + Any: 预设的浏览器对象。 + + Raises: + Exception: 当预设结果为异常对象时抛出。 + """ + + self.calls.append(dict(kwargs)) + effect = self._effects.pop(0) + if isinstance(effect, Exception): + raise effect + return effect + + +class _FakePlaywright: + """用于模拟 Playwright 根对象的测试桩。""" + + def __init__(self, chromium: _FakeChromium) -> None: + """初始化 Playwright 测试桩。 + + Args: + chromium: Chromium 启动器测试桩。 + """ + + self.chromium = chromium + + +def _build_render_config(**kwargs: Any) -> PluginRuntimeRenderConfig: + """构造用于测试的浏览器渲染配置。 + + Args: + **kwargs: 需要覆盖的配置字段。 + + Returns: + PluginRuntimeRenderConfig: 测试使用的配置对象。 + """ + + payload: Dict[str, Any] = { + "auto_download_chromium": True, + "browser_install_root": "data/test-playwright-browsers", + } + payload.update(kwargs) + return PluginRuntimeRenderConfig(**payload) + + +@pytest.mark.asyncio +async def test_launch_browser_auto_downloads_chromium_when_missing(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + """未检测到可用浏览器时,应自动下载 Chromium 并记录状态。""" + + monkeypatch.setattr(html_render_service_module, "PROJECT_ROOT", tmp_path) + service = HTMLRenderService() + config = _build_render_config() + fake_browser = object() + fake_chromium = _FakeChromium( + [ + RuntimeError("browserType.launch: Executable doesn't exist at /tmp/chromium"), + fake_browser, + ] + ) + install_calls: List[str] = [] + + monkeypatch.setattr(service, "_resolve_executable_path", lambda _config: "") + + async def fake_install(_config: PluginRuntimeRenderConfig) -> None: + """模拟 Chromium 自动下载。 + + Args: + _config: 当前浏览器渲染配置。 + """ + + install_calls.append(_config.browser_install_root) + browsers_path = service._get_managed_browsers_path(_config) + (browsers_path / "chromium-1234").mkdir(parents=True, exist_ok=True) + + monkeypatch.setattr(service, "_install_chromium_browser", fake_install) + + browser = await service._launch_browser(_FakePlaywright(fake_chromium), config) + + assert browser is fake_browser + assert install_calls == ["data/test-playwright-browsers"] + assert len(fake_chromium.calls) == 2 + + browser_record = service._load_managed_browser_record() + assert browser_record is not None + assert browser_record.install_source == "auto_download" + assert browser_record.browser_name == "chromium" + + +@pytest.mark.asyncio +async def test_launch_browser_reuses_existing_managed_browser(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + """已存在 Playwright 托管浏览器时,不应重复下载。""" + + monkeypatch.setattr(html_render_service_module, "PROJECT_ROOT", tmp_path) + service = HTMLRenderService() + config = _build_render_config() + browsers_path = service._get_managed_browsers_path(config) + (browsers_path / "chrome-headless-shell-1234").mkdir(parents=True, exist_ok=True) + fake_browser = object() + fake_chromium = _FakeChromium([fake_browser]) + + monkeypatch.setattr(service, "_resolve_executable_path", lambda _config: "") + + async def fail_install(_config: PluginRuntimeRenderConfig) -> None: + """若被错误调用则立即失败。 + + Args: + _config: 当前浏览器渲染配置。 + + Raises: + AssertionError: 表示本测试不期望进入下载逻辑。 + """ + + raise AssertionError("不应触发自动下载") + + monkeypatch.setattr(service, "_install_chromium_browser", fail_install) + + browser = await service._launch_browser(_FakePlaywright(fake_chromium), config) + + assert browser is fake_browser + assert len(fake_chromium.calls) == 1 + + browser_record = service._load_managed_browser_record() + assert browser_record is not None + assert browser_record.install_source == "existing_cache" + assert browser_record.browsers_path == str(browsers_path) + + +@pytest.mark.asyncio +async def test_launch_browser_prefers_local_executable(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None: + """探测到本机浏览器时,应优先使用可执行文件路径启动。""" + + monkeypatch.setattr(html_render_service_module, "PROJECT_ROOT", tmp_path) + service = HTMLRenderService() + config = _build_render_config() + fake_browser = object() + fake_chromium = _FakeChromium([fake_browser]) + executable_path = "/usr/bin/google-chrome" + + monkeypatch.setattr(service, "_resolve_executable_path", lambda _config: executable_path) + + browser = await service._launch_browser(_FakePlaywright(fake_chromium), config) + + assert browser is fake_browser + assert len(fake_chromium.calls) == 1 + assert fake_chromium.calls[0]["executable_path"] == executable_path + assert service._load_managed_browser_record() is None + + +def test_managed_browser_record_roundtrip() -> None: + """托管浏览器记录应支持序列化与反序列化。""" + + record = ManagedBrowserRecord( + browser_name="chromium", + browsers_path="/tmp/playwright-browsers", + install_source="auto_download", + playwright_version="1.58.0", + recorded_at="2026-04-03T10:00:00+00:00", + last_verified_at="2026-04-03T10:00:01+00:00", + ) + + restored_record = ManagedBrowserRecord.from_dict(record.to_dict()) + + assert restored_record == record diff --git a/pytests/test_plugin_dependency_pipeline.py b/pytests/test_plugin_dependency_pipeline.py new file mode 100644 index 00000000..68fd1786 --- /dev/null +++ b/pytests/test_plugin_dependency_pipeline.py @@ -0,0 +1,225 @@ +"""插件依赖流水线测试。""" + +from pathlib import Path + +import json + +import pytest + +from src.plugin_runtime.dependency_pipeline import PluginDependencyPipeline + + +def _build_manifest( + plugin_id: str, + *, + dependencies: list[dict[str, str]] | None = None, +) -> dict[str, object]: + """构造测试用的 Manifest v2 数据。 + + Args: + plugin_id: 插件 ID。 + dependencies: 依赖声明列表。 + + Returns: + dict[str, object]: 可直接写入 ``_manifest.json`` 的字典。 + """ + + return { + "manifest_version": 2, + "version": "1.0.0", + "name": plugin_id, + "description": "测试插件", + "author": { + "name": "tester", + "url": "https://example.com/tester", + }, + "license": "MIT", + "urls": { + "repository": f"https://example.com/{plugin_id}", + }, + "host_application": { + "min_version": "1.0.0", + "max_version": "1.0.0", + }, + "sdk": { + "min_version": "2.0.0", + "max_version": "2.99.99", + }, + "dependencies": dependencies or [], + "capabilities": [], + "i18n": { + "default_locale": "zh-CN", + "supported_locales": ["zh-CN"], + }, + "id": plugin_id, + } + + +def _write_plugin( + plugin_root: Path, + plugin_name: str, + plugin_id: str, + *, + dependencies: list[dict[str, str]] | None = None, +) -> Path: + """在临时目录中写入一个测试插件。 + + Args: + plugin_root: 插件根目录。 + plugin_name: 插件目录名。 + plugin_id: 插件 ID。 + dependencies: Python 依赖声明列表。 + + Returns: + Path: 插件目录路径。 + """ + + plugin_dir = plugin_root / plugin_name + plugin_dir.mkdir(parents=True) + (plugin_dir / "plugin.py").write_text("def create_plugin():\n return object()\n", encoding="utf-8") + (plugin_dir / "_manifest.json").write_text( + json.dumps(_build_manifest(plugin_id, dependencies=dependencies)), + encoding="utf-8", + ) + return plugin_dir + + +def test_build_plan_blocks_plugin_conflicting_with_host_requirement(tmp_path: Path) -> None: + """与主程序依赖冲突的插件应被阻止加载。""" + + plugin_root = tmp_path / "plugins" + _write_plugin( + plugin_root, + "conflict_plugin", + "test.conflict-plugin", + dependencies=[ + { + "type": "python_package", + "name": "numpy", + "version_spec": "<1.0.0", + } + ], + ) + + pipeline = PluginDependencyPipeline(project_root=Path.cwd()) + plan = pipeline.build_plan([plugin_root]) + + assert "test.conflict-plugin" in plan.blocked_plugin_reasons + assert "主程序" in plan.blocked_plugin_reasons["test.conflict-plugin"] + assert plan.install_requirements == () + + +def test_build_plan_blocks_plugins_with_conflicting_python_dependencies(tmp_path: Path) -> None: + """插件之间出现 Python 包版本冲突时应同时阻止双方加载。""" + + plugin_root = tmp_path / "plugins" + _write_plugin( + plugin_root, + "plugin_a", + "test.plugin-a", + dependencies=[ + { + "type": "python_package", + "name": "demo-package", + "version_spec": "<2.0.0", + } + ], + ) + _write_plugin( + plugin_root, + "plugin_b", + "test.plugin-b", + dependencies=[ + { + "type": "python_package", + "name": "demo-package", + "version_spec": ">=3.0.0", + } + ], + ) + + pipeline = PluginDependencyPipeline(project_root=Path.cwd()) + plan = pipeline.build_plan([plugin_root]) + + assert "test.plugin-a" in plan.blocked_plugin_reasons + assert "test.plugin-b" in plan.blocked_plugin_reasons + assert "test.plugin-b" in plan.blocked_plugin_reasons["test.plugin-a"] + assert "test.plugin-a" in plan.blocked_plugin_reasons["test.plugin-b"] + + +def test_build_plan_collects_install_requirements_for_missing_packages( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + """未安装但无冲突的依赖应进入自动安装计划。""" + + plugin_root = tmp_path / "plugins" + _write_plugin( + plugin_root, + "plugin_a", + "test.plugin-a", + dependencies=[ + { + "type": "python_package", + "name": "demo-package", + "version_spec": ">=1.0.0,<2.0.0", + } + ], + ) + + pipeline = PluginDependencyPipeline(project_root=Path.cwd()) + monkeypatch.setattr( + pipeline._manifest_validator, + "get_installed_package_version", + lambda package_name: None if package_name == "demo-package" else "1.0.0", + ) + + plan = pipeline.build_plan([plugin_root]) + + assert plan.blocked_plugin_reasons == {} + assert len(plan.install_requirements) == 1 + assert plan.install_requirements[0].package_name == "demo-package" + assert plan.install_requirements[0].plugin_ids == ("test.plugin-a",) + assert plan.install_requirements[0].requirement_text == "demo-package>=1.0.0,<2.0.0" + + +@pytest.mark.asyncio +async def test_execute_blocks_plugins_when_auto_install_fails( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + """自动安装失败时,相关插件应被阻止加载。""" + + plugin_root = tmp_path / "plugins" + _write_plugin( + plugin_root, + "plugin_a", + "test.plugin-a", + dependencies=[ + { + "type": "python_package", + "name": "demo-package", + "version_spec": ">=1.0.0,<2.0.0", + } + ], + ) + + pipeline = PluginDependencyPipeline(project_root=Path.cwd()) + monkeypatch.setattr( + pipeline._manifest_validator, + "get_installed_package_version", + lambda package_name: None if package_name == "demo-package" else "1.0.0", + ) + + async def fake_install(_requirements) -> tuple[bool, str]: + """模拟依赖安装失败。""" + + return False, "network error" + + monkeypatch.setattr(pipeline, "_install_requirements", fake_install) + + result = await pipeline.execute([plugin_root]) + + assert result.environment_changed is False + assert "test.plugin-a" in result.blocked_plugin_reasons + assert "自动安装 Python 依赖失败" in result.blocked_plugin_reasons["test.plugin-a"] diff --git a/pytests/test_plugin_runtime.py b/pytests/test_plugin_runtime.py index aef14b6b..5c2706b6 100644 --- a/pytests/test_plugin_runtime.py +++ b/pytests/test_plugin_runtime.py @@ -3,6 +3,8 @@ 验证协议层、传输层、RPC 通信链路的正确性。 """ +# pyright: reportArgumentType=false, reportAttributeAccessIssue=false, reportCallIssue=false, reportIndexIssue=false, reportMissingImports=false, reportOptionalMemberAccess=false + from pathlib import Path from types import SimpleNamespace from typing import Any, Awaitable, Callable, Dict, List, Optional, Sequence @@ -2891,7 +2893,7 @@ class TestIntegration: assert instances[0].stopped is True @pytest.mark.asyncio - async def test_handle_plugin_source_changes_only_reload_matching_supervisor(self, monkeypatch, tmp_path): + async def test_handle_plugin_source_changes_restarts_supervisors_after_dependency_sync(self, monkeypatch, tmp_path): from src.config.file_watcher import FileChange from src.plugin_runtime import integration as integration_module import json @@ -2915,7 +2917,6 @@ class TestIntegration: def __init__(self, plugin_dirs, registered_plugins): self._plugin_dirs = plugin_dirs self._registered_plugins = registered_plugins - self.reload_reasons = [] self.config_updates = [] def get_loaded_plugin_ids(self): @@ -2924,9 +2925,6 @@ class TestIntegration: def get_loaded_plugin_versions(self): return {plugin_id: "1.0.0" for plugin_id in self._registered_plugins} - async def reload_plugins(self, plugin_ids=None, reason="manual", external_available_plugins=None): - self.reload_reasons.append((plugin_ids, reason, external_available_plugins or {})) - async def notify_plugin_config_updated(self, plugin_id, config_data, config_version=""): self.config_updates.append((plugin_id, config_data, config_version)) return True @@ -2935,27 +2933,37 @@ class TestIntegration: manager._started = True manager._builtin_supervisor = FakeSupervisor([builtin_root], {"test.alpha": object()}) manager._third_party_supervisor = FakeSupervisor([thirdparty_root], {"test.beta": object()}) + dependency_sync_calls = [] + restart_calls = [] + + async def fake_sync(plugin_dirs: Sequence[Path]) -> Any: + """记录依赖同步调用。""" + + dependency_sync_calls.append(list(plugin_dirs)) + return integration_module.DependencySyncState( + blocked_changed_plugin_ids={"test.beta"}, + environment_changed=False, + ) + + async def fake_restart(reason: str) -> bool: + """记录 Supervisor 重启调用。""" + + restart_calls.append(reason) + return True + + monkeypatch.setattr(manager, "_sync_plugin_dependencies", fake_sync) + monkeypatch.setattr(manager, "_restart_supervisors", fake_restart) changes = [ FileChange(change_type=1, path=beta_dir / "plugin.py"), ] - refresh_calls = [] - - def fake_refresh() -> None: - refresh_calls.append(True) - - manager._refresh_plugin_config_watch_subscriptions = fake_refresh - await manager._handle_plugin_source_changes(changes) - assert manager._builtin_supervisor.reload_reasons == [] - assert manager._third_party_supervisor.reload_reasons == [ - (["test.beta"], "file_watcher", {"test.alpha": "1.0.0"}) - ] + assert dependency_sync_calls == [[builtin_root, thirdparty_root]] + assert restart_calls == ["file_watcher_blocklist_changed"] assert manager._builtin_supervisor.config_updates == [] assert manager._third_party_supervisor.config_updates == [] - assert refresh_calls == [True] @pytest.mark.asyncio async def test_reload_plugins_globally_warns_and_skips_cross_supervisor_dependents(self, monkeypatch): diff --git a/pytests/test_plugin_runtime_render.py b/pytests/test_plugin_runtime_render.py new file mode 100644 index 00000000..f90dfad9 --- /dev/null +++ b/pytests/test_plugin_runtime_render.py @@ -0,0 +1,96 @@ +"""插件运行时浏览器渲染能力测试。""" + +from typing import Optional + +import pytest + +from src.plugin_runtime.integration import PluginRuntimeManager +from src.plugin_runtime.host.supervisor import PluginSupervisor +from src.services.html_render_service import HtmlRenderRequest, HtmlRenderResult + + +class _FakeRenderService: + """用于替代真实浏览器渲染服务的测试桩。""" + + def __init__(self) -> None: + """初始化测试桩。""" + + self.last_request: Optional[HtmlRenderRequest] = None + + async def render_html_to_png(self, request: HtmlRenderRequest) -> HtmlRenderResult: + """记录请求并返回固定的渲染结果。 + + Args: + request: 当前渲染请求。 + + Returns: + HtmlRenderResult: 固定的测试渲染结果。 + """ + + self.last_request = request + return HtmlRenderResult( + image_base64="ZmFrZS1pbWFnZQ==", + mime_type="image/png", + width=640, + height=480, + render_ms=12, + ) + + +def test_render_capability_is_registered() -> None: + """Host 注册能力时应包含 render.html2png。""" + + manager = PluginRuntimeManager() + supervisor = PluginSupervisor(plugin_dirs=[]) + + manager._register_capability_impls(supervisor) + + assert "render.html2png" in supervisor.capability_service.list_capabilities() + + +@pytest.mark.asyncio +async def test_render_capability_forwards_request(monkeypatch: pytest.MonkeyPatch) -> None: + """render.html2png 应将请求透传给浏览器渲染服务。""" + + from src.plugin_runtime.capabilities import render as render_capability_module + + fake_service = _FakeRenderService() + monkeypatch.setattr(render_capability_module, "get_html_render_service", lambda: fake_service) + + manager = PluginRuntimeManager() + result = await manager._cap_render_html2png( + "demo.plugin", + "render.html2png", + { + "html": "
hello
", + "selector": "#card", + "viewport": {"width": 1024, "height": 768}, + "device_scale_factor": 1.5, + "full_page": False, + "omit_background": True, + "wait_until": "networkidle", + "wait_for_selector": "#card", + "wait_for_timeout_ms": 150, + "timeout_ms": 3000, + "allow_network": True, + }, + ) + + assert result == { + "success": True, + "result": { + "image_base64": "ZmFrZS1pbWFnZQ==", + "mime_type": "image/png", + "width": 640, + "height": 480, + "render_ms": 12, + }, + } + assert fake_service.last_request is not None + assert fake_service.last_request.selector == "#card" + assert fake_service.last_request.viewport_width == 1024 + assert fake_service.last_request.viewport_height == 768 + assert fake_service.last_request.device_scale_factor == 1.5 + assert fake_service.last_request.omit_background is True + assert fake_service.last_request.wait_until == "networkidle" + assert fake_service.last_request.allow_network is True diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 392c4f18..10fa331a 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -1959,6 +1959,129 @@ class MCPConfig(ConfigBase): return super().model_post_init(context) +class PluginRuntimeRenderConfig(ConfigBase): + """插件运行时浏览器渲染配置。""" + + enabled: bool = Field( + default=True, + json_schema_extra={ + "x-widget": "switch", + "x-icon": "image", + }, + ) + """是否启用插件运行时浏览器渲染能力""" + + browser_ws_endpoint: str = Field( + default="", + json_schema_extra={ + "x-widget": "input", + "x-icon": "link", + }, + ) + """优先复用的现有 Chromium CDP 地址,可填写 ws/http 端点""" + + executable_path: str = Field( + default="", + json_schema_extra={ + "x-widget": "input", + "x-icon": "folder", + }, + ) + """浏览器可执行文件路径,留空时自动探测本机 Chrome/Chromium""" + + browser_install_root: str = Field( + default="data/playwright-browsers", + json_schema_extra={ + "x-widget": "input", + "x-icon": "hard-drive", + }, + ) + """Playwright 托管浏览器目录,自动下载 Chromium 时会复用该目录""" + + headless: bool = Field( + default=True, + json_schema_extra={ + "x-widget": "switch", + "x-icon": "monitor", + }, + ) + """是否以无头模式启动浏览器""" + + launch_args: list[str] = Field( + default_factory=lambda: [ + "--disable-gpu", + "--disable-dev-shm-usage", + "--disable-setuid-sandbox", + "--no-sandbox", + "--no-zygote", + ], + json_schema_extra={ + "x-widget": "custom", + "x-icon": "terminal", + }, + ) + """浏览器启动参数列表""" + + concurrency_limit: int = Field( + default=2, + ge=1, + json_schema_extra={ + "x-widget": "number", + "x-icon": "layers", + }, + ) + """同时允许进行的最大渲染任务数""" + + startup_timeout_sec: float = Field( + default=20.0, + gt=0, + json_schema_extra={ + "x-widget": "number", + "x-icon": "clock", + }, + ) + """浏览器连接或启动超时时间(秒)""" + + render_timeout_sec: float = Field( + default=15.0, + gt=0, + json_schema_extra={ + "x-widget": "number", + "x-icon": "timer", + }, + ) + """单次渲染默认超时时间(秒)""" + + auto_download_chromium: bool = Field( + default=True, + json_schema_extra={ + "x-widget": "switch", + "x-icon": "download", + }, + ) + """未检测到可用浏览器时,是否自动下载 Playwright Chromium""" + + download_connection_timeout_sec: float = Field( + default=120.0, + gt=0, + json_schema_extra={ + "x-widget": "number", + "x-icon": "cloud-lightning", + }, + ) + """自动下载 Chromium 时的连接超时时间(秒)""" + + restart_after_render_count: int = Field( + default=200, + ge=0, + json_schema_extra={ + "x-widget": "number", + "x-icon": "refresh-cw", + }, + ) + """累计渲染指定次数后自动重建本地浏览器,0 表示关闭该策略""" + + class PluginRuntimeConfig(ConfigBase): """插件运行时配置类""" @@ -2021,3 +2144,6 @@ class PluginRuntimeConfig(ConfigBase): 自定义 IPC Socket 路径(仅 Linux/macOS 生效) 留空则自动生成临时路径 """ + + render: PluginRuntimeRenderConfig = Field(default_factory=PluginRuntimeRenderConfig) + """浏览器渲染能力配置""" diff --git a/src/plugin_runtime/__init__.py b/src/plugin_runtime/__init__.py index 7f2d789f..85c9c3f7 100644 --- a/src/plugin_runtime/__init__.py +++ b/src/plugin_runtime/__init__.py @@ -20,5 +20,8 @@ ENV_HOST_VERSION = "MAIBOT_HOST_VERSION" ENV_EXTERNAL_PLUGIN_IDS = "MAIBOT_EXTERNAL_PLUGIN_IDS" """Runner 启动时可视为已满足的外部插件依赖版本映射(JSON 对象)""" +ENV_BLOCKED_PLUGIN_REASONS = "MAIBOT_BLOCKED_PLUGIN_REASONS" +"""Runner 启动时收到的拒绝加载插件原因映射(JSON 对象)""" + ENV_GLOBAL_CONFIG_SNAPSHOT = "MAIBOT_GLOBAL_CONFIG_SNAPSHOT" """Runner 启动时注入的全局配置快照(JSON 对象)""" diff --git a/src/plugin_runtime/capabilities/__init__.py b/src/plugin_runtime/capabilities/__init__.py index 3ade3886..9ee8b5f4 100644 --- a/src/plugin_runtime/capabilities/__init__.py +++ b/src/plugin_runtime/capabilities/__init__.py @@ -1,9 +1,11 @@ from .components import RuntimeComponentCapabilityMixin from .core import RuntimeCoreCapabilityMixin from .data import RuntimeDataCapabilityMixin +from .render import RuntimeRenderCapabilityMixin __all__ = [ "RuntimeComponentCapabilityMixin", "RuntimeCoreCapabilityMixin", "RuntimeDataCapabilityMixin", + "RuntimeRenderCapabilityMixin", ] diff --git a/src/plugin_runtime/capabilities/registry.py b/src/plugin_runtime/capabilities/registry.py index a4aed9bd..2925a2b3 100644 --- a/src/plugin_runtime/capabilities/registry.py +++ b/src/plugin_runtime/capabilities/registry.py @@ -91,4 +91,5 @@ def register_capability_impls(manager: "PluginRuntimeManager", supervisor: Plugi _register("component.reload_plugin", manager._cap_component_reload_plugin) _register("knowledge.search", manager._cap_knowledge_search) + _register("render.html2png", manager._cap_render_html2png) logger.debug("已注册全部主程序能力实现") diff --git a/src/plugin_runtime/capabilities/render.py b/src/plugin_runtime/capabilities/render.py new file mode 100644 index 00000000..0e6086e6 --- /dev/null +++ b/src/plugin_runtime/capabilities/render.py @@ -0,0 +1,121 @@ +"""插件运行时的浏览器渲染能力。""" + +from typing import Any, Dict + +from src.common.logger import get_logger +from src.services.html_render_service import HtmlRenderRequest, get_html_render_service + +logger = get_logger("plugin_runtime.integration") + + +class RuntimeRenderCapabilityMixin: + """插件运行时的浏览器渲染能力混入。""" + + @staticmethod + def _coerce_int(value: Any, default: int) -> int: + """将任意值尽量转换为整数。 + + Args: + value: 原始输入值。 + default: 转换失败时返回的默认值。 + + Returns: + int: 规范化后的整数结果。 + """ + + try: + return int(value) + except (TypeError, ValueError): + return default + + @staticmethod + def _coerce_float(value: Any, default: float) -> float: + """将任意值尽量转换为浮点数。 + + Args: + value: 原始输入值。 + default: 转换失败时返回的默认值。 + + Returns: + float: 规范化后的浮点结果。 + """ + + try: + return float(value) + except (TypeError, ValueError): + return default + + @staticmethod + def _coerce_bool(value: Any, default: bool = False) -> bool: + """将任意值转换为布尔值。 + + Args: + value: 原始输入值。 + default: 输入为空时返回的默认值。 + + Returns: + bool: 规范化后的布尔结果。 + """ + + if value is None: + return default + if isinstance(value, str): + normalized_value = value.strip().lower() + if normalized_value in {"0", "false", "no", "off"}: + return False + if normalized_value in {"1", "true", "yes", "on"}: + return True + return bool(value) + + def _build_html_render_request(self, args: Dict[str, Any]) -> HtmlRenderRequest: + """根据 capability 调用参数构造渲染请求。 + + Args: + args: capability 调用参数。 + + Returns: + HtmlRenderRequest: 结构化后的渲染请求。 + """ + + viewport = args.get("viewport", {}) + viewport_width = 900 + viewport_height = 500 + if isinstance(viewport, dict): + viewport_width = self._coerce_int(viewport.get("width"), viewport_width) + viewport_height = self._coerce_int(viewport.get("height"), viewport_height) + + return HtmlRenderRequest( + html=str(args.get("html", "") or ""), + selector=str(args.get("selector", "body") or "body"), + viewport_width=viewport_width, + viewport_height=viewport_height, + device_scale_factor=self._coerce_float(args.get("device_scale_factor"), 2.0), + full_page=self._coerce_bool(args.get("full_page"), False), + omit_background=self._coerce_bool(args.get("omit_background"), False), + wait_until=str(args.get("wait_until", "load") or "load"), + wait_for_selector=str(args.get("wait_for_selector", "") or ""), + wait_for_timeout_ms=self._coerce_int(args.get("wait_for_timeout_ms"), 0), + timeout_ms=self._coerce_int(args.get("timeout_ms"), 0), + allow_network=self._coerce_bool(args.get("allow_network"), False), + ) + + async def _cap_render_html2png(self, plugin_id: str, capability: str, args: Dict[str, Any]) -> Any: + """将 HTML 内容渲染为 PNG 图片。 + + Args: + plugin_id: 调用该能力的插件 ID。 + capability: 当前能力名称。 + args: 能力调用参数。 + + Returns: + Any: 标准化后的能力返回结构。 + """ + + del plugin_id, capability + try: + request = self._build_html_render_request(args) + result = await get_html_render_service().render_html_to_png(request) + return {"success": True, "result": result.to_payload()} + except Exception as exc: + logger.error(f"[cap.render.html2png] 执行失败: {exc}", exc_info=True) + return {"success": False, "error": str(exc)} diff --git a/src/plugin_runtime/dependency_pipeline.py b/src/plugin_runtime/dependency_pipeline.py new file mode 100644 index 00000000..4acbd5ef --- /dev/null +++ b/src/plugin_runtime/dependency_pipeline.py @@ -0,0 +1,441 @@ +"""插件 Python 依赖流水线。 + +负责在 Host 侧统一完成以下工作: +1. 扫描插件 Manifest; +2. 检测插件与主程序、插件与插件之间的 Python 依赖冲突; +3. 为可加载插件自动安装缺失的 Python 依赖; +4. 产出最终的拒绝加载列表,供运行时使用。 +""" + +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, Iterable, List, Optional, Sequence, Tuple + +import asyncio +import shutil +import subprocess +import sys + +from packaging.utils import canonicalize_name + +from src.common.logger import get_logger +from src.plugin_runtime.runner.manifest_validator import ManifestValidator, PluginManifest + + +logger = get_logger("plugin_runtime.dependency_pipeline") + + +@dataclass(frozen=True) +class PackageDependencyUsage: + """记录单个插件对某个 Python 包的依赖声明。""" + + package_name: str + plugin_id: str + version_spec: str + + +@dataclass(frozen=True) +class CombinedPackageRequirement: + """表示一个已经合并后的 Python 包安装需求。""" + + package_name: str + plugin_ids: Tuple[str, ...] + requirement_text: str + version_spec: str + + +@dataclass(frozen=True) +class DependencyPipelinePlan: + """表示一次依赖分析后得到的计划。""" + + blocked_plugin_reasons: Dict[str, str] + install_requirements: Tuple[CombinedPackageRequirement, ...] + + +@dataclass(frozen=True) +class DependencyPipelineResult: + """表示一次依赖流水线执行后的结果。""" + + blocked_plugin_reasons: Dict[str, str] + environment_changed: bool + install_requirements: Tuple[CombinedPackageRequirement, ...] + + +class PluginDependencyPipeline: + """插件依赖流水线。 + + 该类不负责插件启停,只负责对插件目录进行依赖分析,并在必要时 + 使用 ``uv`` 为可加载插件补齐缺失的 Python 依赖。 + """ + + def __init__(self, project_root: Optional[Path] = None) -> None: + """初始化依赖流水线。 + + Args: + project_root: 项目根目录;留空时自动推断。 + """ + + self._project_root: Path = project_root or Path(__file__).resolve().parents[2] + self._manifest_validator: ManifestValidator = ManifestValidator( + project_root=self._project_root, + validate_python_package_dependencies=False, + ) + + async def execute(self, plugin_dirs: Iterable[Path]) -> DependencyPipelineResult: + """执行完整的依赖分析与自动安装流程。 + + Args: + plugin_dirs: 需要扫描的插件根目录集合。 + + Returns: + DependencyPipelineResult: 最终的阻止加载结果与环境变更状态。 + """ + + plan = self.build_plan(plugin_dirs) + if not plan.install_requirements: + return DependencyPipelineResult( + blocked_plugin_reasons=dict(plan.blocked_plugin_reasons), + environment_changed=False, + install_requirements=plan.install_requirements, + ) + + install_succeeded, error_message = await self._install_requirements(plan.install_requirements) + if install_succeeded: + return DependencyPipelineResult( + blocked_plugin_reasons=dict(plan.blocked_plugin_reasons), + environment_changed=True, + install_requirements=plan.install_requirements, + ) + + blocked_plugin_reasons = dict(plan.blocked_plugin_reasons) + affected_plugin_ids = sorted( + { + plugin_id + for requirement in plan.install_requirements + for plugin_id in requirement.plugin_ids + } + ) + for plugin_id in affected_plugin_ids: + self._append_block_reason( + blocked_plugin_reasons, + plugin_id, + f"自动安装 Python 依赖失败: {error_message}", + ) + + return DependencyPipelineResult( + blocked_plugin_reasons=blocked_plugin_reasons, + environment_changed=False, + install_requirements=plan.install_requirements, + ) + + def build_plan(self, plugin_dirs: Iterable[Path]) -> DependencyPipelinePlan: + """构建依赖分析计划。 + + Args: + plugin_dirs: 需要扫描的插件根目录集合。 + + Returns: + DependencyPipelinePlan: 分析后的阻止加载列表与安装计划。 + """ + + manifests = self._collect_manifests(plugin_dirs) + blocked_plugin_reasons = self._detect_host_conflicts(manifests) + plugin_conflict_reasons = self._detect_plugin_conflicts(manifests, blocked_plugin_reasons) + for plugin_id, reason in plugin_conflict_reasons.items(): + self._append_block_reason(blocked_plugin_reasons, plugin_id, reason) + + install_requirements = self._build_install_requirements(manifests, blocked_plugin_reasons) + return DependencyPipelinePlan( + blocked_plugin_reasons=blocked_plugin_reasons, + install_requirements=install_requirements, + ) + + def _collect_manifests(self, plugin_dirs: Iterable[Path]) -> Dict[str, PluginManifest]: + """收集所有可成功解析的插件 Manifest。 + + Args: + plugin_dirs: 需要扫描的插件根目录集合。 + + Returns: + Dict[str, PluginManifest]: 以插件 ID 为键的 Manifest 映射。 + """ + + manifests: Dict[str, PluginManifest] = {} + for _plugin_path, manifest in self._manifest_validator.iter_plugin_manifests(plugin_dirs): + manifests[manifest.id] = manifest + return manifests + + def _detect_host_conflicts(self, manifests: Dict[str, PluginManifest]) -> Dict[str, str]: + """检测插件与主程序依赖之间的冲突。 + + Args: + manifests: 当前已解析到的插件 Manifest 映射。 + + Returns: + Dict[str, str]: 需要被阻止加载的插件及原因。 + """ + + host_requirements = self._manifest_validator.load_host_dependency_requirements() + blocked_plugin_reasons: Dict[str, str] = {} + + for manifest in manifests.values(): + for dependency in manifest.python_package_dependencies: + package_specifier = self._manifest_validator.build_specifier_set(dependency.version_spec) + if package_specifier is None: + self._append_block_reason( + blocked_plugin_reasons, + manifest.id, + f"Python 包依赖声明无效: {dependency.name}{dependency.version_spec}", + ) + continue + + normalized_package_name = canonicalize_name(dependency.name) + host_requirement = host_requirements.get(normalized_package_name) + if host_requirement is None: + continue + + if self._manifest_validator.requirements_may_overlap( + host_requirement.specifier, + package_specifier, + ): + continue + + host_specifier_text = str(host_requirement.specifier or "") or "任意版本" + self._append_block_reason( + blocked_plugin_reasons, + manifest.id, + ( + f"Python 包依赖与主程序冲突: {dependency.name} 需要 " + f"{dependency.version_spec},主程序约束为 {host_specifier_text}" + ), + ) + + return blocked_plugin_reasons + + def _detect_plugin_conflicts( + self, + manifests: Dict[str, PluginManifest], + blocked_plugin_reasons: Dict[str, str], + ) -> Dict[str, str]: + """检测插件之间的 Python 依赖冲突。 + + Args: + manifests: 当前已解析到的插件 Manifest 映射。 + blocked_plugin_reasons: 已经因为其他原因被阻止加载的插件。 + + Returns: + Dict[str, str]: 新增的插件冲突原因映射。 + """ + + blocked_by_plugin_conflicts: Dict[str, str] = {} + dependency_usages = self._collect_package_usages(manifests, blocked_plugin_reasons) + + for _package_name, usages in dependency_usages.items(): + display_package_name = usages[0].package_name + for index, left_usage in enumerate(usages): + for right_usage in usages[index + 1 :]: + left_specifier = self._manifest_validator.build_specifier_set(left_usage.version_spec) + right_specifier = self._manifest_validator.build_specifier_set(right_usage.version_spec) + if left_specifier is None or right_specifier is None: + continue + + if self._manifest_validator.requirements_may_overlap(left_specifier, right_specifier): + continue + + left_reason = ( + f"Python 包依赖冲突: 与插件 {right_usage.plugin_id} 在 {display_package_name} 上的约束不兼容 " + f"({left_usage.version_spec} vs {right_usage.version_spec})" + ) + right_reason = ( + f"Python 包依赖冲突: 与插件 {left_usage.plugin_id} 在 {display_package_name} 上的约束不兼容 " + f"({right_usage.version_spec} vs {left_usage.version_spec})" + ) + self._append_block_reason(blocked_by_plugin_conflicts, left_usage.plugin_id, left_reason) + self._append_block_reason(blocked_by_plugin_conflicts, right_usage.plugin_id, right_reason) + + return blocked_by_plugin_conflicts + + def _collect_package_usages( + self, + manifests: Dict[str, PluginManifest], + blocked_plugin_reasons: Dict[str, str], + ) -> Dict[str, List[PackageDependencyUsage]]: + """收集所有未被阻止加载插件的包依赖声明。 + + Args: + manifests: 当前已解析到的插件 Manifest 映射。 + blocked_plugin_reasons: 已经被阻止加载的插件及原因。 + + Returns: + Dict[str, List[PackageDependencyUsage]]: 按规范化包名分组后的依赖声明。 + """ + + dependency_usages: Dict[str, List[PackageDependencyUsage]] = {} + for manifest in manifests.values(): + if manifest.id in blocked_plugin_reasons: + continue + + for dependency in manifest.python_package_dependencies: + normalized_package_name = canonicalize_name(dependency.name) + dependency_usages.setdefault(normalized_package_name, []).append( + PackageDependencyUsage( + package_name=dependency.name, + plugin_id=manifest.id, + version_spec=dependency.version_spec, + ) + ) + + return dependency_usages + + def _build_install_requirements( + self, + manifests: Dict[str, PluginManifest], + blocked_plugin_reasons: Dict[str, str], + ) -> Tuple[CombinedPackageRequirement, ...]: + """构建需要安装到当前环境的 Python 包需求列表。 + + Args: + manifests: 当前已解析到的插件 Manifest 映射。 + blocked_plugin_reasons: 已经被阻止加载的插件及原因。 + + Returns: + Tuple[CombinedPackageRequirement, ...]: 需要安装或调整版本的依赖列表。 + """ + + combined_requirements: List[CombinedPackageRequirement] = [] + dependency_usages = self._collect_package_usages(manifests, blocked_plugin_reasons) + + for usages in dependency_usages.values(): + merged_specifier_text = self._merge_specifier_texts([usage.version_spec for usage in usages]) + package_name = usages[0].package_name + requirement_text = f"{package_name}{merged_specifier_text}" + installed_version = self._manifest_validator.get_installed_package_version(package_name) + if installed_version is not None and self._manifest_validator.version_matches_specifier( + installed_version, + merged_specifier_text, + ): + continue + + combined_requirements.append( + CombinedPackageRequirement( + package_name=package_name, + plugin_ids=tuple(sorted({usage.plugin_id for usage in usages})), + requirement_text=requirement_text, + version_spec=merged_specifier_text, + ) + ) + + return tuple(sorted(combined_requirements, key=lambda requirement: canonicalize_name(requirement.package_name))) + + @staticmethod + def _merge_specifier_texts(specifier_texts: Sequence[str]) -> str: + """合并多个版本约束文本。 + + Args: + specifier_texts: 需要合并的版本约束文本序列。 + + Returns: + str: 合并后的版本约束文本。 + """ + + merged_parts: List[str] = [] + for specifier_text in specifier_texts: + for part in str(specifier_text or "").split(","): + normalized_part = part.strip() + if not normalized_part or normalized_part in merged_parts: + continue + merged_parts.append(normalized_part) + return f"{','.join(merged_parts)}" if merged_parts else "" + + async def _install_requirements(self, requirements: Sequence[CombinedPackageRequirement]) -> Tuple[bool, str]: + """安装指定的 Python 包需求列表。 + + Args: + requirements: 需要安装的依赖列表。 + + Returns: + Tuple[bool, str]: 安装是否成功,以及错误摘要。 + """ + + requirement_texts = [requirement.requirement_text for requirement in requirements] + if not requirement_texts: + return True, "" + + logger.info(f"开始自动安装插件 Python 依赖: {', '.join(requirement_texts)}") + command = self._build_install_command(requirement_texts) + + try: + completed_process = await asyncio.to_thread( + subprocess.run, + command, + capture_output=True, + check=False, + cwd=self._project_root, + text=True, + ) + except Exception as exc: + return False, str(exc) + + if completed_process.returncode == 0: + logger.info("插件 Python 依赖自动安装完成") + return True, "" + + output = self._summarize_install_error(completed_process.stdout, completed_process.stderr) + return False, output or f"命令执行失败,退出码 {completed_process.returncode}" + + @staticmethod + def _build_install_command(requirement_texts: Sequence[str]) -> List[str]: + """构造依赖安装命令。 + + Args: + requirement_texts: 待安装的依赖文本序列。 + + Returns: + List[str]: 适用于 ``subprocess.run`` 的命令参数列表。 + """ + + if shutil.which("uv"): + return ["uv", "pip", "install", "--python", sys.executable, *requirement_texts] + return [sys.executable, "-m", "pip", "install", *requirement_texts] + + @staticmethod + def _summarize_install_error(stdout: str, stderr: str) -> str: + """提炼安装失败输出。 + + Args: + stdout: 标准输出内容。 + stderr: 标准错误内容。 + + Returns: + str: 简短的错误摘要。 + """ + + merged_output = "\n".join(part.strip() for part in (stderr, stdout) if part and part.strip()).strip() + if not merged_output: + return "" + lines = [line.strip() for line in merged_output.splitlines() if line.strip()] + return " | ".join(lines[-5:]) + + @staticmethod + def _append_block_reason( + blocked_plugin_reasons: Dict[str, str], + plugin_id: str, + reason: str, + ) -> None: + """向阻止加载映射中追加原因。 + + Args: + blocked_plugin_reasons: 待更新的阻止加载映射。 + plugin_id: 目标插件 ID。 + reason: 需要追加的原因文本。 + """ + + existing_reason = blocked_plugin_reasons.get(plugin_id) + if existing_reason is None: + blocked_plugin_reasons[plugin_id] = reason + return + + existing_parts = [part.strip() for part in existing_reason.split(";") if part.strip()] + if reason in existing_parts: + return + blocked_plugin_reasons[plugin_id] = f"{existing_reason};{reason}" diff --git a/src/plugin_runtime/host/supervisor.py b/src/plugin_runtime/host/supervisor.py index bc0fca85..1864b454 100644 --- a/src/plugin_runtime/host/supervisor.py +++ b/src/plugin_runtime/host/supervisor.py @@ -14,6 +14,7 @@ from src.platform_io import DriverKind, InboundMessageEnvelope, RouteBinding, Ro from src.platform_io.drivers import PluginPlatformDriver from src.platform_io.route_key_factory import RouteKeyFactory from src.plugin_runtime import ( + ENV_BLOCKED_PLUGIN_REASONS, ENV_EXTERNAL_PLUGIN_IDS, ENV_GLOBAL_CONFIG_SNAPSHOT, ENV_HOST_VERSION, @@ -131,6 +132,7 @@ class PluginRunnerSupervisor: self._registered_plugins: Dict[str, RegisterPluginPayload] = {} self._message_gateway_states: Dict[str, Dict[str, _MessageGatewayRuntimeState]] = {} self._external_available_plugins: Dict[str, str] = {} + self._blocked_plugin_reasons: Dict[str, str] = {} self._runner_ready_events: asyncio.Event = asyncio.Event() self._runner_ready_payloads: RunnerReadyPayload = RunnerReadyPayload() self._health_task: Optional[asyncio.Task[None]] = None @@ -211,6 +213,19 @@ class PluginRunnerSupervisor: """ return {plugin_id: registration.plugin_version for plugin_id, registration in self._registered_plugins.items()} + def set_blocked_plugin_reasons(self, blocked_plugin_reasons: Dict[str, str]) -> None: + """设置当前 Runner 启动时应拒绝加载的插件列表。 + + Args: + blocked_plugin_reasons: 需要拒绝加载的插件及原因映射。 + """ + + self._blocked_plugin_reasons = { + str(plugin_id or "").strip(): str(reason or "").strip() + for plugin_id, reason in blocked_plugin_reasons.items() + if str(plugin_id or "").strip() and str(reason or "").strip() + } + @staticmethod def _normalize_reload_plugin_ids(plugin_ids: Optional[List[str] | str]) -> List[str]: """规范化批量重载入参。 @@ -1303,6 +1318,7 @@ class PluginRunnerSupervisor: global_config_snapshot = config_manager.get_global_config().model_dump(mode="json") global_config_snapshot["model"] = config_manager.get_model_config().model_dump(mode="json") return { + ENV_BLOCKED_PLUGIN_REASONS: json.dumps(self._blocked_plugin_reasons, ensure_ascii=False), ENV_EXTERNAL_PLUGIN_IDS: json.dumps(self._external_available_plugins, ensure_ascii=False), ENV_GLOBAL_CONFIG_SNAPSHOT: json.dumps(global_config_snapshot, ensure_ascii=False), ENV_HOST_VERSION: PROTOCOL_VERSION, diff --git a/src/plugin_runtime/integration.py b/src/plugin_runtime/integration.py index 39d899c3..62ecb5cb 100644 --- a/src/plugin_runtime/integration.py +++ b/src/plugin_runtime/integration.py @@ -8,6 +8,7 @@ 5. 提供统一的能力实现注册接口,使插件可以调用主程序功能 """ +from dataclasses import dataclass from pathlib import Path from typing import ( TYPE_CHECKING, @@ -33,13 +34,15 @@ from src.common.logger import get_logger from src.config.config import config_manager from src.config.file_watcher import FileChange, FileWatcher from src.platform_io import DeliveryBatch, InboundMessageEnvelope, get_platform_io_manager -from src.plugin_runtime.hook_catalog import register_builtin_hook_specs from src.plugin_runtime.capabilities import ( RuntimeComponentCapabilityMixin, RuntimeCoreCapabilityMixin, RuntimeDataCapabilityMixin, + RuntimeRenderCapabilityMixin, ) from src.plugin_runtime.capabilities.registry import register_capability_impls +from src.plugin_runtime.dependency_pipeline import PluginDependencyPipeline +from src.plugin_runtime.hook_catalog import register_builtin_hook_specs from src.plugin_runtime.host.hook_dispatcher import HookDispatchResult, HookDispatcher from src.plugin_runtime.host.hook_spec_registry import HookSpec, HookSpecRegistry from src.plugin_runtime.host.message_utils import MessageDict, PluginMessageUtils @@ -67,10 +70,19 @@ _EVENT_TYPE_MAP: Dict[str, str] = { } +@dataclass(frozen=True) +class DependencySyncState: + """表示一次插件依赖同步后的状态。""" + + blocked_changed_plugin_ids: Set[str] + environment_changed: bool + + class PluginRuntimeManager( RuntimeCoreCapabilityMixin, RuntimeDataCapabilityMixin, RuntimeComponentCapabilityMixin, + RuntimeRenderCapabilityMixin, ): """插件运行时管理器(单例) @@ -88,7 +100,9 @@ class PluginRuntimeManager( self._plugin_source_watcher_subscription_id: Optional[str] = None self._plugin_config_watcher_subscriptions: Dict[str, Tuple[Path, str]] = {} self._plugin_path_cache: Dict[str, Path] = {} - self._manifest_validator: ManifestValidator = ManifestValidator() + self._manifest_validator: ManifestValidator = ManifestValidator(validate_python_package_dependencies=False) + self._plugin_dependency_pipeline: PluginDependencyPipeline = PluginDependencyPipeline() + self._blocked_plugin_reasons: Dict[str, str] = {} self._config_reload_callback: Callable[[Sequence[str]], Awaitable[None]] = self._handle_main_config_reload self._config_reload_callback_registered: bool = False self._hook_spec_registry: HookSpecRegistry = HookSpecRegistry() @@ -131,7 +145,7 @@ class PluginRuntimeManager( @classmethod def _discover_plugin_dependency_map(cls, plugin_dirs: Iterable[Path]) -> Dict[str, List[str]]: """扫描指定插件目录集合,返回 ``plugin_id -> dependencies`` 映射。""" - validator = ManifestValidator() + validator = ManifestValidator(validate_python_package_dependencies=False) return validator.build_plugin_dependency_map(plugin_dirs) @classmethod @@ -191,6 +205,206 @@ class PluginRuntimeManager( } return supervisor_cls(**supported_kwargs) + def _resolve_runtime_plugin_dirs(self) -> Tuple[List[Path], List[Path]]: + """解析当前运行时应管理的插件根目录。 + + Returns: + Tuple[List[Path], List[Path]]: 内置插件目录列表与第三方插件目录列表。 + """ + + return self._get_builtin_plugin_dirs(), self._get_third_party_plugin_dirs() + + @staticmethod + def _resolve_supervisor_socket_paths() -> Tuple[Optional[str], Optional[str]]: + """解析内置与第三方 Supervisor 的 IPC 地址。 + + Returns: + Tuple[Optional[str], Optional[str]]: 内置 Runner 与第三方 Runner 的 socket 地址。 + """ + + runtime_config = config_manager.get_global_config().plugin_runtime + socket_path_base = runtime_config.ipc_socket_path or None + builtin_socket = f"{socket_path_base}-builtin" if socket_path_base else None + third_party_socket = f"{socket_path_base}-third_party" if socket_path_base else None + return builtin_socket, third_party_socket + + def _apply_blocked_plugin_reasons_to_supervisors(self) -> None: + """将当前阻止加载插件列表同步到全部 Supervisor。""" + + for supervisor in self.supervisors: + set_blocked_plugin_reasons = getattr(supervisor, "set_blocked_plugin_reasons", None) + if callable(set_blocked_plugin_reasons): + set_blocked_plugin_reasons(self._blocked_plugin_reasons) + + def _set_blocked_plugin_reasons(self, blocked_plugin_reasons: Dict[str, str]) -> Set[str]: + """更新 Host 侧维护的阻止加载插件列表。 + + Args: + blocked_plugin_reasons: 最新的阻止加载插件及原因映射。 + + Returns: + Set[str]: 本次发生状态变化的插件 ID 集合。 + """ + + normalized_reasons = { + str(plugin_id or "").strip(): str(reason or "").strip() + for plugin_id, reason in blocked_plugin_reasons.items() + if str(plugin_id or "").strip() and str(reason or "").strip() + } + changed_plugin_ids = { + plugin_id + for plugin_id in set(self._blocked_plugin_reasons) | set(normalized_reasons) + if self._blocked_plugin_reasons.get(plugin_id) != normalized_reasons.get(plugin_id) + } + self._blocked_plugin_reasons = normalized_reasons + self._apply_blocked_plugin_reasons_to_supervisors() + return changed_plugin_ids + + async def _sync_plugin_dependencies(self, plugin_dirs: Sequence[Path]) -> DependencySyncState: + """执行插件依赖同步,并刷新阻止加载插件列表。 + + Args: + plugin_dirs: 当前需要参与分析的插件根目录列表。 + + Returns: + DependencySyncState: 同步后的环境变更状态与阻止列表变化集合。 + """ + + result = await self._plugin_dependency_pipeline.execute(plugin_dirs) + changed_plugin_ids = self._set_blocked_plugin_reasons(result.blocked_plugin_reasons) + return DependencySyncState( + blocked_changed_plugin_ids=changed_plugin_ids, + environment_changed=result.environment_changed, + ) + + def _build_supervisors(self, builtin_dirs: Sequence[Path], third_party_dirs: Sequence[Path]) -> None: + """根据目录列表创建当前运行时所需的 Supervisor。 + + Args: + builtin_dirs: 内置插件目录列表。 + third_party_dirs: 第三方插件目录列表。 + """ + + from src.plugin_runtime.host.supervisor import PluginSupervisor + + builtin_socket, third_party_socket = self._resolve_supervisor_socket_paths() + self._builtin_supervisor = None + self._third_party_supervisor = None + + if builtin_dirs: + builtin_supervisor = self._instantiate_supervisor( + PluginSupervisor, + plugin_dirs=list(builtin_dirs), + group_name="builtin", + hook_spec_registry=self._hook_spec_registry, + socket_path=builtin_socket, + ) + self._builtin_supervisor = builtin_supervisor + self._register_capability_impls(builtin_supervisor) + + if third_party_dirs: + third_party_supervisor = self._instantiate_supervisor( + PluginSupervisor, + plugin_dirs=list(third_party_dirs), + group_name="third_party", + hook_spec_registry=self._hook_spec_registry, + socket_path=third_party_socket, + ) + self._third_party_supervisor = third_party_supervisor + self._register_capability_impls(third_party_supervisor) + + self._apply_blocked_plugin_reasons_to_supervisors() + + async def _start_supervisors( + self, + builtin_dirs: Sequence[Path], + third_party_dirs: Sequence[Path], + ) -> List["PluginSupervisor"]: + """按依赖顺序启动当前已创建的 Supervisor。 + + Args: + builtin_dirs: 内置插件目录列表。 + third_party_dirs: 第三方插件目录列表。 + + Returns: + List[PluginSupervisor]: 成功启动的 Supervisor 列表。 + """ + + started_supervisors: List["PluginSupervisor"] = [] + supervisor_groups: Dict[str, Optional["PluginSupervisor"]] = { + "builtin": self._builtin_supervisor, + "third_party": self._third_party_supervisor, + } + start_order = self._build_group_start_order(builtin_dirs, third_party_dirs) + + try: + for group_name in start_order: + supervisor = supervisor_groups.get(group_name) + if supervisor is None: + continue + + external_plugin_versions = { + plugin_id: plugin_version + for started_supervisor in started_supervisors + for plugin_id, plugin_version in started_supervisor.get_loaded_plugin_versions().items() + } + supervisor.set_external_available_plugins(external_plugin_versions) + set_blocked_plugin_reasons = getattr(supervisor, "set_blocked_plugin_reasons", None) + if callable(set_blocked_plugin_reasons): + set_blocked_plugin_reasons(self._blocked_plugin_reasons) + await supervisor.start() + started_supervisors.append(supervisor) + except Exception: + await asyncio.gather(*(supervisor.stop() for supervisor in started_supervisors), return_exceptions=True) + raise + + return started_supervisors + + async def _stop_supervisors(self) -> None: + """停止当前全部 Supervisor。""" + + supervisors = self.supervisors + if not supervisors: + return + + await asyncio.gather(*(supervisor.stop() for supervisor in supervisors), return_exceptions=True) + self._builtin_supervisor = None + self._third_party_supervisor = None + + async def _restart_supervisors(self, reason: str) -> bool: + """重启当前全部 Supervisor。 + + Args: + reason: 本次重启的原因。 + + Returns: + bool: 是否重启成功。 + """ + + builtin_dirs, third_party_dirs = self._resolve_runtime_plugin_dirs() + if duplicate_plugin_ids := self._find_duplicate_plugin_ids(builtin_dirs + third_party_dirs): + details = "; ".join( + f"{plugin_id}: {', '.join(str(path) for path in paths)}" + for plugin_id, paths in sorted(duplicate_plugin_ids.items()) + ) + logger.error(f"检测到重复插件 ID,拒绝执行 Supervisor 重启: {details}") + return False + + logger.info(f"开始重启插件运行时 Supervisor: {reason}") + await self._stop_supervisors() + self._build_supervisors(builtin_dirs, third_party_dirs) + + try: + await self._start_supervisors(builtin_dirs, third_party_dirs) + except Exception as exc: + logger.error(f"重启插件运行时 Supervisor 失败: {exc}", exc_info=True) + await self._stop_supervisors() + return False + + self._refresh_plugin_config_watch_subscriptions() + logger.info(f"插件运行时 Supervisor 已重启完成: {reason}") + return True + # ─── 生命周期 ───────────────────────────────────────────── async def start(self) -> None: @@ -204,10 +418,7 @@ class PluginRuntimeManager( logger.info("插件运行时已在配置中禁用,跳过启动") return - from src.plugin_runtime.host.supervisor import PluginSupervisor - - builtin_dirs = self._get_builtin_plugin_dirs() - third_party_dirs = self._get_third_party_plugin_dirs() + builtin_dirs, third_party_dirs = self._resolve_runtime_plugin_dirs() if duplicate_plugin_ids := self._find_duplicate_plugin_ids(builtin_dirs + third_party_dirs): details = "; ".join( @@ -221,61 +432,19 @@ class PluginRuntimeManager( logger.info("未找到任何插件目录,跳过插件运行时启动") return + dependency_sync_state = await self._sync_plugin_dependencies(builtin_dirs + third_party_dirs) + if dependency_sync_state.environment_changed: + logger.info("插件依赖流水线已更新当前 Python 环境,启动时将直接加载最新环境") + self.ensure_builtin_hook_specs_registered() platform_io_manager = get_platform_io_manager() + self._build_supervisors(builtin_dirs, third_party_dirs) - # 从配置读取自定义 IPC socket 路径(留空则自动生成) - socket_path_base = _cfg.ipc_socket_path or None - - # 当用户指定了自定义路径时,为两个 Supervisor 添加后缀以避免 UDS 冲突 - builtin_socket = f"{socket_path_base}-builtin" if socket_path_base else None - third_party_socket = f"{socket_path_base}-third_party" if socket_path_base else None - - # 创建两个 Supervisor,各自拥有独立的 socket / Runner 子进程 - if builtin_dirs: - self._builtin_supervisor = self._instantiate_supervisor( - PluginSupervisor, - plugin_dirs=builtin_dirs, - group_name="builtin", - hook_spec_registry=self._hook_spec_registry, - socket_path=builtin_socket, - ) - self._register_capability_impls(self._builtin_supervisor) - - if third_party_dirs: - self._third_party_supervisor = self._instantiate_supervisor( - PluginSupervisor, - plugin_dirs=third_party_dirs, - group_name="third_party", - hook_spec_registry=self._hook_spec_registry, - socket_path=third_party_socket, - ) - self._register_capability_impls(self._third_party_supervisor) - - started_supervisors: List[PluginSupervisor] = [] + started_supervisors: List["PluginSupervisor"] = [] try: platform_io_manager.set_inbound_dispatcher(self._dispatch_platform_inbound) await platform_io_manager.ensure_send_pipeline_ready() - - supervisor_groups: Dict[str, Optional[PluginSupervisor]] = { - "builtin": self._builtin_supervisor, - "third_party": self._third_party_supervisor, - } - start_order = self._build_group_start_order(builtin_dirs, third_party_dirs) - - for group_name in start_order: - supervisor = supervisor_groups.get(group_name) - if supervisor is None: - continue - - external_plugin_versions = { - plugin_id: plugin_version - for started_supervisor in started_supervisors - for plugin_id, plugin_version in started_supervisor.get_loaded_plugin_versions().items() - } - supervisor.set_external_available_plugins(external_plugin_versions) - await supervisor.start() - started_supervisors.append(supervisor) + started_supervisors = await self._start_supervisors(builtin_dirs, third_party_dirs) await self._start_plugin_file_watcher() config_manager.register_reload_callback(self._config_reload_callback) @@ -529,6 +698,21 @@ class PluginRuntimeManager( if not normalized_plugin_ids: return True + blocked_plugin_ids = [plugin_id for plugin_id in normalized_plugin_ids if plugin_id in self._blocked_plugin_reasons] + if blocked_plugin_ids: + logger.warning( + "以下插件当前被依赖流水线阻止加载,已拒绝重载请求: " + + ", ".join( + f"{plugin_id} ({self._blocked_plugin_reasons[plugin_id]})" + for plugin_id in sorted(blocked_plugin_ids) + ) + ) + normalized_plugin_ids = [ + plugin_id for plugin_id in normalized_plugin_ids if plugin_id not in self._blocked_plugin_reasons + ] + if not normalized_plugin_ids: + return False + dependency_map = self._build_registered_dependency_map() supervisor_by_plugin = self._build_registered_supervisor_map() supervisor_roots: Dict["PluginSupervisor", List[str]] = {} @@ -909,6 +1093,12 @@ class PluginRuntimeManager( normalized_plugin_id = str(plugin_id or "").strip() if not normalized_plugin_id: return False + if normalized_plugin_id in self._blocked_plugin_reasons: + logger.warning( + f"插件 {normalized_plugin_id} 当前被依赖流水线阻止加载: " + f"{self._blocked_plugin_reasons[normalized_plugin_id]}" + ) + return False try: registered_supervisor = self._get_supervisor_for_plugin(normalized_plugin_id) @@ -933,7 +1123,7 @@ class PluginRuntimeManager( def _find_duplicate_plugin_ids(cls, plugin_dirs: List[Path]) -> Dict[str, List[Path]]: """扫描插件目录,找出被多个目录重复声明的插件 ID。""" plugin_locations: Dict[str, List[Path]] = {} - validator = ManifestValidator() + validator = ManifestValidator(validate_python_package_dependencies=False) for plugin_path, manifest in validator.iter_plugin_manifests(plugin_dirs): plugin_locations.setdefault(manifest.id, []).append(plugin_path) @@ -1190,7 +1380,8 @@ class PluginRuntimeManager( if not self._started or not changes: return - if duplicate_plugin_ids := self._find_duplicate_plugin_ids(list(self._iter_plugin_dirs())): + plugin_dirs = list(self._iter_plugin_dirs()) + if duplicate_plugin_ids := self._find_duplicate_plugin_ids(plugin_dirs): details = "; ".join( f"{plugin_id}: {', '.join(str(path) for path in paths)}" for plugin_id, paths in sorted(duplicate_plugin_ids.items()) @@ -1198,21 +1389,24 @@ class PluginRuntimeManager( logger.error(f"检测到重复插件 ID,跳过本次插件热重载: {details}") return - changed_plugin_ids: List[str] = [] - changed_paths = [change.path.resolve() for change in changes] + relevant_source_changes = [ + change.path.resolve() + for change in changes + if change.path.name in {"plugin.py", "_manifest.json"} or change.path.suffix == ".py" + ] + if not relevant_source_changes: + return - for supervisor in self.supervisors: - for path in changed_paths: - plugin_id = self._match_plugin_id_for_supervisor(supervisor, path) - if plugin_id is None: - continue - if path.name in {"plugin.py", "_manifest.json"} or path.suffix == ".py": - if plugin_id not in changed_plugin_ids: - changed_plugin_ids.append(plugin_id) + dependency_sync_state = await self._sync_plugin_dependencies(plugin_dirs) + restart_reason = "file_watcher" + if dependency_sync_state.environment_changed: + restart_reason = "file_watcher_dependency_install" + elif dependency_sync_state.blocked_changed_plugin_ids: + restart_reason = "file_watcher_blocklist_changed" - if changed_plugin_ids: - await self.reload_plugins_globally(changed_plugin_ids, reason="file_watcher") - self._refresh_plugin_config_watch_subscriptions() + restarted = await self._restart_supervisors(restart_reason) + if not restarted: + logger.warning(f"插件源码变更后重启 Supervisor 失败: {restart_reason}") @staticmethod def _plugin_dir_matches(path: Path, plugin_dir: Path) -> bool: diff --git a/src/plugin_runtime/runner/manifest_validator.py b/src/plugin_runtime/runner/manifest_validator.py index 33c2b1e5..92e97d14 100644 --- a/src/plugin_runtime/runner/manifest_validator.py +++ b/src/plugin_runtime/runner/manifest_validator.py @@ -609,6 +609,7 @@ class ManifestValidator: host_version: str = "", sdk_version: str = "", project_root: Optional[Path] = None, + validate_python_package_dependencies: bool = True, ) -> None: """初始化 Manifest 校验器。 @@ -616,10 +617,12 @@ class ManifestValidator: host_version: 当前 Host 版本号;留空时自动从主程序 ``pyproject.toml`` 读取。 sdk_version: 当前 SDK 版本号;留空时自动从运行环境中探测。 project_root: 项目根目录;留空时自动推断。 + validate_python_package_dependencies: 是否校验 Python 包依赖与当前环境的关系。 """ self._project_root: Path = project_root or self._resolve_project_root() self._host_version: str = host_version or self._detect_default_host_version(self._project_root) self._sdk_version: str = sdk_version or self._detect_default_sdk_version(self._project_root) + self._validate_python_package_dependencies: bool = validate_python_package_dependencies self.errors: List[str] = [] self.warnings: List[str] = [] @@ -823,9 +826,10 @@ class ManifestValidator: if not sdk_ok: self.errors.append(f"SDK 版本不兼容: {sdk_message} (当前 SDK: {self._sdk_version})") - self._validate_python_package_dependencies(manifest) + if self._validate_python_package_dependencies: + self._validate_python_package_dependencies_against_runtime(manifest) - def _validate_python_package_dependencies(self, manifest: PluginManifest) -> None: + def _validate_python_package_dependencies_against_runtime(self, manifest: PluginManifest) -> None: """校验 Python 包依赖与主程序运行环境是否冲突。 Args: @@ -865,6 +869,68 @@ class ManifestValidator: f"主程序依赖约束为 {host_specifier or '任意版本'}" ) + def load_host_dependency_requirements(self) -> Dict[str, Requirement]: + """读取主程序在 ``pyproject.toml`` 中声明的依赖约束。 + + Returns: + Dict[str, Requirement]: 以规范化包名为键的依赖约束映射。 + """ + + return self._load_host_dependency_requirements(self._project_root) + + def get_installed_package_version(self, package_name: str) -> Optional[str]: + """查询当前运行环境中指定包的安装版本。 + + Args: + package_name: 需要查询的包名。 + + Returns: + Optional[str]: 已安装版本号;未安装时返回 ``None``。 + """ + + return self._get_installed_package_version(package_name) + + @staticmethod + def build_specifier_set(version_spec: str) -> Optional[SpecifierSet]: + """将版本约束文本转换为 ``SpecifierSet``。 + + Args: + version_spec: 原始版本约束文本。 + + Returns: + Optional[SpecifierSet]: 转换成功时返回约束对象,否则返回 ``None``。 + """ + + return ManifestValidator._build_specifier_set(version_spec) + + @staticmethod + def version_matches_specifier(version: str, version_spec: str) -> bool: + """判断版本号是否满足给定约束。 + + Args: + version: 待判断的版本号。 + version_spec: 版本约束表达式。 + + Returns: + bool: 是否满足约束。 + """ + + return ManifestValidator._version_matches_specifier(version, version_spec) + + @classmethod + def requirements_may_overlap(cls, left: SpecifierSet, right: SpecifierSet) -> bool: + """判断两个版本约束是否可能存在交集。 + + Args: + left: 左侧版本约束。 + right: 右侧版本约束。 + + Returns: + bool: 若两者可能同时满足则返回 ``True``。 + """ + + return cls._requirements_may_overlap(left, right) + def _log_errors(self) -> None: """输出当前累计的 Manifest 校验错误。""" for error_message in self.errors: diff --git a/src/plugin_runtime/runner/plugin_loader.py b/src/plugin_runtime/runner/plugin_loader.py index 6e85714b..491c51bd 100644 --- a/src/plugin_runtime/runner/plugin_loader.py +++ b/src/plugin_runtime/runner/plugin_loader.py @@ -75,6 +75,35 @@ class PluginLoader: self._failed_plugins: Dict[str, str] = {} self._manifest_validator = ManifestValidator(host_version=host_version) self._compat_hook_installed = False + self._blocked_plugin_reasons: Dict[str, str] = {} + + def set_blocked_plugin_reasons(self, blocked_plugin_reasons: Optional[Dict[str, str]] = None) -> None: + """更新当前加载器持有的拒绝加载插件列表。 + + Args: + blocked_plugin_reasons: 需要拒绝加载的插件及原因映射。 + """ + + self._blocked_plugin_reasons = { + str(plugin_id or "").strip(): str(reason or "").strip() + for plugin_id, reason in (blocked_plugin_reasons or {}).items() + if str(plugin_id or "").strip() and str(reason or "").strip() + } + + def get_blocked_plugin_reason(self, plugin_id: str) -> Optional[str]: + """返回指定插件当前的拒绝加载原因。 + + Args: + plugin_id: 目标插件 ID。 + + Returns: + Optional[str]: 若插件被阻止加载则返回原因,否则返回 ``None``。 + """ + + normalized_plugin_id = str(plugin_id or "").strip() + if not normalized_plugin_id: + return None + return self._blocked_plugin_reasons.get(normalized_plugin_id) def discover_and_load( self, @@ -156,6 +185,11 @@ class PluginLoader: return None plugin_id = manifest.id + if blocked_reason := self.get_blocked_plugin_reason(plugin_id): + self._failed_plugins[plugin_id] = blocked_reason + logger.warning(f"插件 {plugin_id} 已被 Host 依赖流水线阻止加载: {blocked_reason}") + return None + return plugin_id, (plugin_dir, manifest, plugin_path) def _record_duplicate_candidates(self, duplicate_candidates: Dict[str, List[Path]]) -> None: diff --git a/src/plugin_runtime/runner/runner_main.py b/src/plugin_runtime/runner/runner_main.py index cc910bd8..751a7967 100644 --- a/src/plugin_runtime/runner/runner_main.py +++ b/src/plugin_runtime/runner/runner_main.py @@ -29,6 +29,7 @@ import tomlkit from src.common.logger import get_console_handler, get_logger, initialize_logging from src.plugin_runtime import ( + ENV_BLOCKED_PLUGIN_REASONS, ENV_EXTERNAL_PLUGIN_IDS, ENV_HOST_VERSION, ENV_IPC_ADDRESS, @@ -196,6 +197,7 @@ class PluginRunner: session_token: str, plugin_dirs: List[str], external_available_plugins: Optional[Dict[str, str]] = None, + blocked_plugin_reasons: Optional[Dict[str, str]] = None, ) -> None: """初始化 Runner。 @@ -204,6 +206,7 @@ class PluginRunner: session_token: 握手用会话令牌。 plugin_dirs: 当前 Runner 负责扫描的插件目录列表。 external_available_plugins: 视为已满足的外部依赖插件版本映射。 + blocked_plugin_reasons: 需要拒绝加载的插件及原因映射。 """ self._host_address: str = host_address self._session_token: str = session_token @@ -213,9 +216,15 @@ class PluginRunner: for plugin_id, plugin_version in (external_available_plugins or {}).items() if str(plugin_id or "").strip() and str(plugin_version or "").strip() } + self._blocked_plugin_reasons: Dict[str, str] = { + str(plugin_id or "").strip(): str(reason or "").strip() + for plugin_id, reason in (blocked_plugin_reasons or {}).items() + if str(plugin_id or "").strip() and str(reason or "").strip() + } self._rpc_client: RPCClient = RPCClient(host_address, session_token) self._loader: PluginLoader = PluginLoader(host_version=os.getenv(ENV_HOST_VERSION, "")) + self._loader.set_blocked_plugin_reasons(self._blocked_plugin_reasons) self._start_time: float = time.monotonic() self._shutting_down: bool = False self._reload_lock: asyncio.Lock = asyncio.Lock() @@ -1639,6 +1648,7 @@ class PluginRunner: async def _async_main() -> None: """异步主入口""" + blocked_plugin_reasons_raw = os.environ.get(ENV_BLOCKED_PLUGIN_REASONS, "") host_address = os.environ.pop(ENV_IPC_ADDRESS, "") external_plugin_ids_raw = os.environ.get(ENV_EXTERNAL_PLUGIN_IDS, "") session_token = os.environ.pop(ENV_SESSION_TOKEN, "") @@ -1658,13 +1668,30 @@ async def _async_main() -> None: logger.warning("外部依赖插件版本映射格式非法,已回退为空映射") external_plugin_ids = {} + try: + blocked_plugin_reasons = json.loads(blocked_plugin_reasons_raw) if blocked_plugin_reasons_raw else {} + except json.JSONDecodeError: + logger.warning("解析阻止加载插件原因映射失败,已回退为空映射") + blocked_plugin_reasons = {} + if not isinstance(blocked_plugin_reasons, dict): + logger.warning("阻止加载插件原因映射格式非法,已回退为空映射") + blocked_plugin_reasons = {} + + runner_kwargs: Dict[str, Any] = { + "external_available_plugins": { + str(plugin_id): str(plugin_version) for plugin_id, plugin_version in external_plugin_ids.items() + } + } + if blocked_plugin_reasons: + runner_kwargs["blocked_plugin_reasons"] = { + str(plugin_id): str(reason) for plugin_id, reason in blocked_plugin_reasons.items() + } + runner = PluginRunner( host_address, session_token, plugin_dirs, - external_available_plugins={ - str(plugin_id): str(plugin_version) for plugin_id, plugin_version in external_plugin_ids.items() - }, + **runner_kwargs, ) # 注册信号处理 diff --git a/src/services/html_render_service.py b/src/services/html_render_service.py new file mode 100644 index 00000000..6d564ef4 --- /dev/null +++ b/src/services/html_render_service.py @@ -0,0 +1,937 @@ +"""HTML 浏览器渲染服务。 + +负责在 Host 侧复用已有浏览器,并将 HTML 内容渲染为 PNG 图片。 +""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timezone +from importlib import metadata +from io import BytesIO +from pathlib import Path +from typing import Any, Dict, Literal, Optional, Tuple, cast +from urllib.parse import urlparse + +import asyncio +import base64 +import contextlib +import functools +import json +import os +import shutil +import sys +import time + +from src.common.logger import PROJECT_ROOT, get_logger +from src.config.config import config_manager +from src.config.official_configs import PluginRuntimeRenderConfig + +logger = get_logger("services.html_render_service") + +_NETWORK_ALLOW_SCHEMES = frozenset({"about", "blob", "data", "file"}) +_WINDOWS_BROWSER_PATHS = ( + Path("C:/Program Files/Google/Chrome/Application/chrome.exe"), + Path("C:/Program Files (x86)/Google/Chrome/Application/chrome.exe"), + Path("C:/Program Files/Microsoft/Edge/Application/msedge.exe"), + Path("C:/Program Files (x86)/Microsoft/Edge/Application/msedge.exe"), +) +_MACOS_BROWSER_PATHS = ( + Path("/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"), + Path("/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge"), +) +_UNIX_BROWSER_NAMES = ( + "chromium", + "chromium-browser", + "google-chrome", + "google-chrome-stable", + "microsoft-edge", + "msedge", +) +_PLAYWRIGHT_MANAGED_BROWSER_PREFIXES = ("chromium-", "chrome-", "chrome-headless-shell-") + + +@dataclass(slots=True) +class HtmlRenderRequest: + """描述一次 HTML 转 PNG 请求。""" + + html: str + selector: str = "body" + viewport_width: int = 900 + viewport_height: int = 500 + device_scale_factor: float = 2.0 + full_page: bool = False + omit_background: bool = False + wait_until: str = "load" + wait_for_selector: str = "" + wait_for_timeout_ms: int = 0 + timeout_ms: int = 10000 + allow_network: bool = False + + +@dataclass(slots=True) +class HtmlRenderResult: + """描述一次 HTML 转 PNG 的输出结果。""" + + image_base64: str + mime_type: str + width: int + height: int + render_ms: int + + def to_payload(self) -> Dict[str, Any]: + """将结果序列化为能力层返回结构。 + + Returns: + Dict[str, Any]: 可直接返回给插件运行时的结构化数据。 + """ + + return { + "image_base64": self.image_base64, + "mime_type": self.mime_type, + "width": self.width, + "height": self.height, + "render_ms": self.render_ms, + } + + +@dataclass(slots=True) +class ManagedBrowserRecord: + """记录 Playwright 托管浏览器的本地状态。""" + + browser_name: str + browsers_path: str + install_source: Literal["auto_download", "existing_cache"] + playwright_version: str + recorded_at: str + last_verified_at: str + + def to_dict(self) -> Dict[str, str]: + """将浏览器记录转换为可持久化字典。 + + Returns: + Dict[str, str]: 可写入 JSON 文件的字典结构。 + """ + + return { + "browser_name": self.browser_name, + "browsers_path": self.browsers_path, + "install_source": self.install_source, + "playwright_version": self.playwright_version, + "recorded_at": self.recorded_at, + "last_verified_at": self.last_verified_at, + } + + @classmethod + def from_dict(cls, payload: Dict[str, Any]) -> Optional["ManagedBrowserRecord"]: + """从字典中恢复浏览器状态记录。 + + Args: + payload: 原始字典数据。 + + Returns: + Optional[ManagedBrowserRecord]: 解析成功时返回记录对象,否则返回 ``None``。 + """ + + browser_name = str(payload.get("browser_name", "") or "").strip() + browsers_path = str(payload.get("browsers_path", "") or "").strip() + install_source = str(payload.get("install_source", "") or "").strip() + playwright_version = str(payload.get("playwright_version", "") or "").strip() + recorded_at = str(payload.get("recorded_at", "") or "").strip() + last_verified_at = str(payload.get("last_verified_at", "") or "").strip() + if not all([browser_name, browsers_path, install_source, playwright_version, recorded_at, last_verified_at]): + return None + if install_source not in {"auto_download", "existing_cache"}: + return None + validated_install_source = cast(Literal["auto_download", "existing_cache"], install_source) + return cls( + browser_name=browser_name, + browsers_path=browsers_path, + install_source=validated_install_source, + playwright_version=playwright_version, + recorded_at=recorded_at, + last_verified_at=last_verified_at, + ) + + +class HTMLRenderService: + """HTML 浏览器渲染服务。""" + + def __init__(self) -> None: + """初始化渲染服务。""" + + self._browser: Any = None + self._browser_lock: asyncio.Lock = asyncio.Lock() + self._connected_via_cdp: bool = False + self._playwright: Any = None + self._render_count: int = 0 + self._render_semaphore: Optional[asyncio.Semaphore] = None + self._render_semaphore_limit: int = 0 + + def _get_render_config(self) -> PluginRuntimeRenderConfig: + """读取当前插件运行时的浏览器渲染配置。 + + Returns: + PluginRuntimeRenderConfig: 当前生效的浏览器渲染配置。 + """ + + return config_manager.get_global_config().plugin_runtime.render + + def _get_render_semaphore(self) -> asyncio.Semaphore: + """根据当前配置返回渲染并发信号量。 + + Returns: + asyncio.Semaphore: 控制并发的信号量对象。 + """ + + config = self._get_render_config() + limit = max(1, int(config.concurrency_limit)) + if self._render_semaphore is None or self._render_semaphore_limit != limit: + self._render_semaphore = asyncio.Semaphore(limit) + self._render_semaphore_limit = limit + return self._render_semaphore + + async def render_html_to_png(self, request: HtmlRenderRequest) -> HtmlRenderResult: + """将 HTML 内容渲染为 PNG 图片。 + + Args: + request: 本次渲染请求。 + + Returns: + HtmlRenderResult: 渲染结果。 + + Raises: + RuntimeError: 浏览器能力被禁用、Playwright 不可用或浏览器启动失败时抛出。 + ValueError: 请求参数非法时抛出。 + """ + + config = self._get_render_config() + if not config.enabled: + raise RuntimeError("插件运行时浏览器渲染能力已禁用") + + normalized_request = self._normalize_request(request, config) + semaphore = self._get_render_semaphore() + async with semaphore: + start_time = time.perf_counter() + browser = await self._ensure_browser(config) + context: Any = None + try: + context = await browser.new_context( + device_scale_factor=normalized_request.device_scale_factor, + locale="zh-CN", + viewport={ + "width": normalized_request.viewport_width, + "height": normalized_request.viewport_height, + }, + ) + page = await context.new_page() + await self._configure_page(page, normalized_request) + image_bytes = await self._capture_image(page, normalized_request) + width, height = self._measure_image_size(image_bytes) + self._render_count += 1 + await self._maybe_restart_browser(config) + return HtmlRenderResult( + image_base64=base64.b64encode(image_bytes).decode("utf-8"), + mime_type="image/png", + width=width, + height=height, + render_ms=int((time.perf_counter() - start_time) * 1000), + ) + except Exception: + await self.reset_browser(restart_playwright=False) + raise + finally: + if context is not None: + with contextlib.suppress(Exception): + await context.close() + + async def reset_browser(self, restart_playwright: bool = False) -> None: + """关闭当前缓存的浏览器实例。 + + Args: + restart_playwright: 是否同时关闭 Playwright 运行时。 + """ + + async with self._browser_lock: + await self._close_browser_unlocked(restart_playwright=restart_playwright) + + async def _close_browser_unlocked(self, restart_playwright: bool = False) -> None: + """在已持有锁的情况下关闭浏览器与 Playwright。 + + Args: + restart_playwright: 是否同时关闭 Playwright 运行时。 + """ + + if self._browser is not None: + with contextlib.suppress(Exception): + await self._browser.close() + self._browser = None + self._connected_via_cdp = False + if restart_playwright and self._playwright is not None: + with contextlib.suppress(Exception): + await self._playwright.stop() + self._playwright = None + + async def _ensure_browser(self, config: PluginRuntimeRenderConfig) -> Any: + """获取可复用的浏览器实例。 + + Args: + config: 当前浏览器渲染配置。 + + Returns: + Any: Playwright Browser 对象。 + + Raises: + RuntimeError: 当无法连接或启动浏览器时抛出。 + """ + + async with self._browser_lock: + if self._is_browser_connected(self._browser): + logger.debug("HTML 渲染服务复用进程内缓存浏览器实例") + return self._browser + + await self._close_browser_unlocked(restart_playwright=False) + self._prepare_playwright_environment(config) + playwright = await self._ensure_playwright() + browser = await self._connect_to_existing_browser(playwright, config) + if browser is None: + browser = await self._launch_browser(playwright, config) + self._connected_via_cdp = False + else: + self._connected_via_cdp = True + + self._browser = browser + self._bind_browser_events(browser) + return browser + + async def _ensure_playwright(self) -> Any: + """懒加载并启动 Playwright 运行时。 + + Returns: + Any: 已启动的 Playwright 对象。 + + Raises: + RuntimeError: 当前环境未安装 Playwright 时抛出。 + """ + + if self._playwright is not None: + return self._playwright + + try: + from playwright.async_api import async_playwright + except ImportError as exc: + raise RuntimeError( + "当前环境未安装 Python Playwright,请先在宿主环境安装 `playwright` 依赖。" + ) from exc + + self._playwright = await async_playwright().start() + return self._playwright + + @staticmethod + def _is_browser_connected(browser: Any) -> bool: + """判断浏览器对象当前是否仍然可用。 + + Args: + browser: 待检查的浏览器对象。 + + Returns: + bool: 若浏览器仍连接,则返回 ``True``。 + """ + + if browser is None: + return False + try: + return bool(browser.is_connected()) + except Exception: + return False + + async def _connect_to_existing_browser(self, playwright: Any, config: PluginRuntimeRenderConfig) -> Any: + """优先连接外部已有的 Chromium 浏览器。 + + Args: + playwright: 已启动的 Playwright 对象。 + config: 当前浏览器渲染配置。 + + Returns: + Any: 连接成功时返回 Browser;否则返回 ``None``。 + """ + + if not config.browser_ws_endpoint.strip(): + return None + + try: + timeout_ms = int(config.startup_timeout_sec * 1000) + logger.info( + "HTML 渲染服务准备连接现有浏览器: " + f"endpoint={config.browser_ws_endpoint.strip()}, timeout_ms={timeout_ms}" + ) + browser = await playwright.chromium.connect_over_cdp( + config.browser_ws_endpoint.strip(), + timeout=timeout_ms, + ) + logger.info("HTML 渲染服务已连接到现有浏览器") + return browser + except Exception as exc: + logger.warning(f"连接现有浏览器失败,将回退为本地启动: {exc}") + return None + + async def _launch_browser(self, playwright: Any, config: PluginRuntimeRenderConfig) -> Any: + """启动本地 Chromium 浏览器。 + + Args: + playwright: 已启动的 Playwright 对象。 + config: 当前浏览器渲染配置。 + + Returns: + Any: 新启动的 Browser 对象。 + + Raises: + RuntimeError: 浏览器启动失败时抛出。 + """ + + launch_options = self._build_launch_options(config) + logger.info( + "HTML 渲染服务准备启动浏览器: " + f"source={'system' if 'executable_path' in launch_options else 'managed'}, " + f"headless={bool(launch_options.get('headless'))}, " + f"timeout_ms={int(launch_options.get('timeout', 0))}" + ) + try: + browser = await playwright.chromium.launch(**launch_options) + if "executable_path" in launch_options: + logger.info(f"HTML 渲染服务已启动本机浏览器: executable_path={launch_options['executable_path']}") + else: + self._update_managed_browser_record(config, install_source="existing_cache") + logger.info("HTML 渲染服务已启动 Playwright 托管浏览器") + return browser + except Exception as exc: + if self._should_auto_download_browser(exc, launch_options, config): + logger.warning(f"HTML 渲染服务未找到可用浏览器,将尝试自动下载 Chromium: {exc}") + await self._install_chromium_browser(config) + retry_browser = await playwright.chromium.launch(**launch_options) + self._update_managed_browser_record(config, install_source="auto_download") + logger.info("HTML 渲染服务已自动下载并启动 Chromium") + return retry_browser + raise RuntimeError(f"启动本地浏览器失败: {exc}") from exc + + def _bind_browser_events(self, browser: Any) -> None: + """为浏览器绑定断线回调。 + + Args: + browser: 需要绑定事件的浏览器对象。 + """ + + try: + browser.on("disconnected", self._handle_browser_disconnected) + except Exception: + return + + def _handle_browser_disconnected(self, *_args: Any) -> None: + """处理浏览器断线事件。 + + Args: + *_args: 浏览器断线事件透传的参数。 + """ + + self._browser = None + self._connected_via_cdp = False + logger.warning("HTML 渲染浏览器已断开,将在下次请求时重新建立连接") + + def _build_launch_options(self, config: PluginRuntimeRenderConfig) -> Dict[str, Any]: + """构造本地浏览器启动参数。 + + Args: + config: 当前浏览器渲染配置。 + + Returns: + Dict[str, Any]: 可直接传给 Playwright 的启动参数。 + """ + + launch_options: Dict[str, Any] = { + "args": list(config.launch_args), + "headless": bool(config.headless), + "timeout": int(config.startup_timeout_sec * 1000), + } + executable_path = self._resolve_executable_path(config) + if executable_path: + launch_options["executable_path"] = executable_path + return launch_options + + @staticmethod + def _should_auto_download_browser( + exc: Exception, + launch_options: Dict[str, Any], + config: PluginRuntimeRenderConfig, + ) -> bool: + """判断当前启动错误是否适合自动下载 Chromium 后重试。 + + Args: + exc: 浏览器启动异常。 + launch_options: 本次启动参数。 + config: 当前浏览器渲染配置。 + + Returns: + bool: 若应自动下载后重试,则返回 ``True``。 + """ + + if "executable_path" in launch_options: + logger.debug("当前启动参数已指定本机浏览器路径,不进入自动下载分支") + return False + if not config.auto_download_chromium: + logger.warning("HTML 渲染服务未检测到可用浏览器,且已禁用自动下载 Chromium") + return False + error_text = str(exc).lower() + should_download = "executable doesn't exist" in error_text or "browser executable" in error_text + if not should_download: + logger.warning(f"浏览器启动失败,但错误不属于可自动下载恢复的类型: {exc}") + return should_download + + def _resolve_executable_path(self, config: PluginRuntimeRenderConfig) -> str: + """解析实际应使用的浏览器可执行文件路径。 + + Args: + config: 当前浏览器渲染配置。 + + Returns: + str: 命中的浏览器可执行文件路径;未命中时返回空字符串。 + """ + + configured_path = config.executable_path.strip() + if configured_path: + path = Path(configured_path).expanduser() + if path.exists(): + logger.info(f"HTML 渲染服务使用配置指定的浏览器路径: {path}") + return str(path) + logger.warning(f"配置的浏览器路径不存在,将尝试自动探测: {configured_path}") + + detected_path = self._detect_local_browser_executable() + if detected_path: + logger.info(f"HTML 渲染服务自动探测到本机浏览器: {detected_path}") + else: + logger.info("HTML 渲染服务未探测到本机浏览器,将尝试使用 Playwright 托管浏览器") + return detected_path + + def _prepare_playwright_environment(self, config: PluginRuntimeRenderConfig) -> Path: + """准备 Playwright 运行所需的共享浏览器目录环境变量。 + + Args: + config: 当前浏览器渲染配置。 + + Returns: + Path: Playwright 浏览器缓存目录。 + """ + + browsers_path = self._get_managed_browsers_path(config) + browsers_path.mkdir(parents=True, exist_ok=True) + os.environ["PLAYWRIGHT_BROWSERS_PATH"] = str(browsers_path) + logger.debug(f"HTML 渲染服务使用 Playwright 浏览器目录: {browsers_path}") + return browsers_path + + def _get_managed_browsers_path(self, config: PluginRuntimeRenderConfig) -> Path: + """获取 Playwright 托管浏览器目录。 + + Args: + config: 当前浏览器渲染配置。 + + Returns: + Path: 托管浏览器目录的绝对路径。 + """ + + configured_path = config.browser_install_root.strip() + if not configured_path: + return (PROJECT_ROOT / "data" / "playwright-browsers").resolve() + candidate_path = Path(configured_path).expanduser() + if candidate_path.is_absolute(): + return candidate_path.resolve() + return (PROJECT_ROOT / candidate_path).resolve() + + def _get_browser_state_path(self) -> Path: + """获取托管浏览器状态文件路径。 + + Returns: + Path: 浏览器状态文件路径。 + """ + + return (PROJECT_ROOT / "data" / "plugin_runtime" / "html_render_browser_state.json").resolve() + + def _load_managed_browser_record(self) -> Optional[ManagedBrowserRecord]: + """读取最近一次成功使用的托管浏览器记录。 + + Returns: + Optional[ManagedBrowserRecord]: 解析成功时返回记录对象,否则返回 ``None``。 + """ + + state_path = self._get_browser_state_path() + if not state_path.exists(): + return None + + try: + raw_payload = json.loads(state_path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + logger.warning(f"HTML 渲染浏览器状态文件读取失败,将忽略并继续: {state_path}") + return None + if not isinstance(raw_payload, dict): + logger.warning(f"HTML 渲染浏览器状态文件格式无效,将忽略并继续: {state_path}") + return None + browser_record = ManagedBrowserRecord.from_dict(raw_payload) + if browser_record is not None: + logger.debug( + "HTML 渲染服务已加载浏览器状态记录: " + f"source={browser_record.install_source}, path={browser_record.browsers_path}, " + f"verified_at={browser_record.last_verified_at}" + ) + return browser_record + + def _save_managed_browser_record(self, record: ManagedBrowserRecord) -> None: + """保存托管浏览器记录。 + + Args: + record: 待保存的浏览器记录。 + """ + + state_path = self._get_browser_state_path() + state_path.parent.mkdir(parents=True, exist_ok=True) + state_path.write_text( + json.dumps(record.to_dict(), ensure_ascii=False, indent=2), + encoding="utf-8", + ) + logger.info( + "HTML 渲染服务已写入浏览器状态记录: " + f"path={state_path}, source={record.install_source}, browsers_path={record.browsers_path}" + ) + + def _update_managed_browser_record( + self, + config: PluginRuntimeRenderConfig, + install_source: Literal["auto_download", "existing_cache"], + ) -> None: + """更新托管 Chromium 的使用记录。 + + Args: + config: 当前浏览器渲染配置。 + install_source: 本次记录的浏览器来源。 + """ + + browsers_path = self._get_managed_browsers_path(config) + if not self._has_managed_browser_artifact(browsers_path): + return + + now_iso = datetime.now(timezone.utc).isoformat() + existing_record = self._load_managed_browser_record() + recorded_at = now_iso + if existing_record is not None and existing_record.browsers_path == str(browsers_path): + recorded_at = existing_record.recorded_at + + self._save_managed_browser_record( + ManagedBrowserRecord( + browser_name="chromium", + browsers_path=str(browsers_path), + install_source=install_source, + playwright_version=self._get_playwright_version(), + recorded_at=recorded_at, + last_verified_at=now_iso, + ) + ) + logger.info( + "HTML 渲染服务已更新托管浏览器记录: " + f"source={install_source}, browsers_path={browsers_path}, last_verified_at={now_iso}" + ) + + async def _install_chromium_browser(self, config: PluginRuntimeRenderConfig) -> None: + """自动下载 Playwright Chromium 浏览器。 + + Args: + config: 当前浏览器渲染配置。 + + Raises: + RuntimeError: 下载失败时抛出。 + """ + + browsers_path = self._prepare_playwright_environment(config) + logger.warning( + "HTML 渲染服务开始自动下载 Chromium: " + f"target_dir={browsers_path}, timeout_sec={config.download_connection_timeout_sec}" + ) + env = os.environ.copy() + env["PLAYWRIGHT_BROWSERS_PATH"] = str(browsers_path) + env["PLAYWRIGHT_DOWNLOAD_CONNECTION_TIMEOUT"] = str(int(config.download_connection_timeout_sec * 1000)) + process = await asyncio.create_subprocess_exec( + sys.executable, + "-m", + "playwright", + "install", + "chromium", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + ) + stdout_bytes, stderr_bytes = await process.communicate() + if process.returncode != 0: + stderr_text = stderr_bytes.decode("utf-8", errors="ignore").strip() + stdout_text = stdout_bytes.decode("utf-8", errors="ignore").strip() + error_detail = stderr_text or stdout_text or f"退出码 {process.returncode}" + raise RuntimeError(f"自动下载 Chromium 失败: {error_detail}") + + if not self._has_managed_browser_artifact(browsers_path): + raise RuntimeError("Chromium 下载完成后未检测到可用浏览器文件") + logger.info(f"HTML 渲染服务自动下载 Chromium 完成: target_dir={browsers_path}") + + @staticmethod + def _get_playwright_version() -> str: + """读取当前环境中的 Playwright 版本号。 + + Returns: + str: Playwright 版本字符串;读取失败时返回 ``unknown``。 + """ + + try: + return metadata.version("playwright") + except metadata.PackageNotFoundError: + return "unknown" + + @staticmethod + def _has_managed_browser_artifact(browsers_path: Path) -> bool: + """检查共享目录中是否存在可用的 Playwright 托管浏览器。 + + Args: + browsers_path: Playwright 浏览器目录。 + + Returns: + bool: 若检测到 Chromium/Chrome 相关浏览器文件夹,则返回 ``True``。 + """ + + if not browsers_path.exists(): + return False + for child_path in browsers_path.iterdir(): + if not child_path.is_dir(): + continue + if child_path.name.startswith(_PLAYWRIGHT_MANAGED_BROWSER_PREFIXES): + return True + return False + + def _detect_local_browser_executable(self) -> str: + """自动探测当前宿主系统中的可复用浏览器路径。 + + Returns: + str: 命中的浏览器可执行文件路径;未命中时返回空字符串。 + """ + + for browser_name in _UNIX_BROWSER_NAMES: + resolved_path = shutil.which(browser_name) + if resolved_path: + return resolved_path + + for candidate_path in self._get_candidate_executable_paths(): + if candidate_path.exists(): + return str(candidate_path) + return "" + + @staticmethod + def _get_candidate_executable_paths() -> Tuple[Path, ...]: + """返回当前平台常见浏览器路径候选集合。 + + Returns: + Tuple[Path, ...]: 可能存在浏览器可执行文件的路径列表。 + """ + + if sys.platform.startswith("win"): + return _WINDOWS_BROWSER_PATHS + if sys.platform == "darwin": + return _MACOS_BROWSER_PATHS + return () + + async def _configure_page(self, page: Any, request: HtmlRenderRequest) -> None: + """为页面设置超时、网络策略并写入 HTML。 + + Args: + page: Playwright 页面对象。 + request: 当前渲染请求。 + """ + + page.set_default_timeout(request.timeout_ms) + await page.route( + "**/*", + functools.partial(self._handle_network_route, allow_network=request.allow_network), + ) + await page.set_content( + request.html, + timeout=request.timeout_ms, + wait_until=request.wait_until, + ) + if request.wait_for_selector: + await page.locator(request.wait_for_selector).first.wait_for( + state="attached", + timeout=request.timeout_ms, + ) + if request.wait_for_timeout_ms > 0: + await page.wait_for_timeout(request.wait_for_timeout_ms) + + async def _handle_network_route(self, route: Any, allow_network: bool) -> None: + """处理页面资源请求的网络准入策略。 + + Args: + route: Playwright 路由对象。 + allow_network: 是否允许页面访问外部网络资源。 + """ + + request_url = str(route.request.url) + if allow_network or self._is_network_request_allowed(request_url): + await route.continue_() + return + await route.abort() + + @staticmethod + def _is_network_request_allowed(request_url: str) -> bool: + """判断某个资源 URL 是否属于本地安全资源。 + + Args: + request_url: 待判断的资源地址。 + + Returns: + bool: 若请求可在无网络模式下放行,则返回 ``True``。 + """ + + if not request_url: + return False + parsed_url = urlparse(request_url) + return parsed_url.scheme in _NETWORK_ALLOW_SCHEMES + + async def _capture_image(self, page: Any, request: HtmlRenderRequest) -> bytes: + """从页面或目标元素中截取 PNG 图片。 + + Args: + page: Playwright 页面对象。 + request: 当前渲染请求。 + + Returns: + bytes: PNG 二进制内容。 + + Raises: + RuntimeError: 目标元素不存在或截图结果为空时抛出。 + """ + + if request.full_page and request.selector == "body": + image_bytes = await page.screenshot( + full_page=True, + omit_background=request.omit_background, + timeout=request.timeout_ms, + type="png", + ) + else: + locator = page.locator(request.selector).first + await locator.wait_for(state="visible", timeout=request.timeout_ms) + image_bytes = await locator.screenshot( + omit_background=request.omit_background, + timeout=request.timeout_ms, + type="png", + ) + + if not image_bytes: + raise RuntimeError("浏览器截图结果为空") + return image_bytes + + @staticmethod + def _measure_image_size(image_bytes: bytes) -> Tuple[int, int]: + """读取 PNG 图片的真实像素尺寸。 + + Args: + image_bytes: PNG 图片二进制内容。 + + Returns: + Tuple[int, int]: 图片宽高像素值。 + """ + + from PIL import Image + + with Image.open(BytesIO(image_bytes)) as image: + return int(image.width), int(image.height) + + async def _maybe_restart_browser(self, config: PluginRuntimeRenderConfig) -> None: + """按策略决定是否重建本地浏览器实例。 + + Args: + config: 当前浏览器渲染配置。 + """ + + restart_after = int(config.restart_after_render_count) + if restart_after <= 0 or self._connected_via_cdp: + return + if self._render_count % restart_after != 0: + return + await self.reset_browser(restart_playwright=False) + logger.info("HTML 渲染服务已按累计次数策略重建本地浏览器") + + @staticmethod + def _normalize_request( + request: HtmlRenderRequest, + config: PluginRuntimeRenderConfig, + ) -> HtmlRenderRequest: + """规范化并补齐 HTML 渲染请求。 + + Args: + request: 原始渲染请求。 + config: 当前浏览器渲染配置。 + + Returns: + HtmlRenderRequest: 规范化后的请求对象。 + + Raises: + ValueError: 请求缺少必要字段或取值非法时抛出。 + """ + + html = request.html.strip() + if not html: + raise ValueError("缺少必要参数 html") + + selector = request.selector.strip() or "body" + wait_until = HTMLRenderService._normalize_wait_until(request.wait_until) + timeout_ms = request.timeout_ms + if timeout_ms <= 0: + timeout_ms = int(config.render_timeout_sec * 1000) + + return HtmlRenderRequest( + html=html, + selector=selector, + viewport_width=max(1, int(request.viewport_width)), + viewport_height=max(1, int(request.viewport_height)), + device_scale_factor=max(1.0, float(request.device_scale_factor)), + full_page=bool(request.full_page), + omit_background=bool(request.omit_background), + wait_until=wait_until, + wait_for_selector=request.wait_for_selector.strip(), + wait_for_timeout_ms=max(0, int(request.wait_for_timeout_ms)), + timeout_ms=max(1, int(timeout_ms)), + allow_network=bool(request.allow_network), + ) + + @staticmethod + def _normalize_wait_until(wait_until: str) -> str: + """规范化页面等待阶段参数。 + + Args: + wait_until: 原始等待阶段字符串。 + + Returns: + str: Playwright 支持的等待阶段值。 + """ + + normalized_wait_until = wait_until.strip().lower() + if normalized_wait_until in {"commit", "domcontentloaded", "load", "networkidle"}: + return normalized_wait_until + return "load" + + +_html_render_service: Optional[HTMLRenderService] = None + + +def get_html_render_service() -> HTMLRenderService: + """获取 HTML 浏览器渲染服务单例。 + + Returns: + HTMLRenderService: 全局唯一的浏览器渲染服务实例。 + """ + + global _html_render_service + if _html_render_service is None: + _html_render_service = HTMLRenderService() + return _html_render_service