From 04f260e570070a792ff00906f5694877a001637e Mon Sep 17 00:00:00 2001 From: DrSmoothl <1787882683@qq.com> Date: Fri, 20 Mar 2026 01:15:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=AE=8C=E6=95=B4=E7=9A=84?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E4=B8=AD=E9=97=B4=E5=B1=82=E5=9C=B0=E5=9F=BA?= =?UTF-8?q?=EF=BC=8C=E6=9A=82=E6=9C=AA=E6=8E=A5=E5=85=A5=E5=AE=9E=E9=99=85?= =?UTF-8?q?=E7=9A=84=E6=B6=88=E6=81=AF=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/platform_io/__init__.py | 35 ++ src/platform_io/dedupe.py | 133 ++++++ src/platform_io/drivers/__init__.py | 11 + src/platform_io/drivers/base.py | 104 +++++ src/platform_io/drivers/legacy_driver.py | 61 +++ src/platform_io/drivers/plugin_driver.py | 64 +++ src/platform_io/manager.py | 529 +++++++++++++++++++++++ src/platform_io/outbound_tracker.py | 242 +++++++++++ src/platform_io/registry.py | 70 +++ src/platform_io/route_key_factory.py | 150 +++++++ src/platform_io/routing.py | 202 +++++++++ src/platform_io/types.py | 240 ++++++++++ 12 files changed, 1841 insertions(+) create mode 100644 src/platform_io/__init__.py create mode 100644 src/platform_io/dedupe.py create mode 100644 src/platform_io/drivers/__init__.py create mode 100644 src/platform_io/drivers/base.py create mode 100644 src/platform_io/drivers/legacy_driver.py create mode 100644 src/platform_io/drivers/plugin_driver.py create mode 100644 src/platform_io/manager.py create mode 100644 src/platform_io/outbound_tracker.py create mode 100644 src/platform_io/registry.py create mode 100644 src/platform_io/route_key_factory.py create mode 100644 src/platform_io/routing.py create mode 100644 src/platform_io/types.py diff --git a/src/platform_io/__init__.py b/src/platform_io/__init__.py new file mode 100644 index 00000000..380ecbb6 --- /dev/null +++ b/src/platform_io/__init__.py @@ -0,0 +1,35 @@ +"""导出 Platform IO 层的公开入口。 + +当前仍处于地基阶段,调用方应优先从这里导入共享类型和全局管理器, +而不是直接依赖更底层的私有子模块。 +""" + +from .manager import PlatformIOManager, get_platform_io_manager +from .route_key_factory import RouteKeyFactory +from .routing import RouteBindingConflictError, RouteTable +from .types import ( + DeliveryReceipt, + DeliveryStatus, + DriverDescriptor, + DriverKind, + InboundMessageEnvelope, + RouteBinding, + RouteKey, + RouteMode, +) + +__all__ = [ + "DeliveryReceipt", + "DeliveryStatus", + "DriverDescriptor", + "DriverKind", + "InboundMessageEnvelope", + "PlatformIOManager", + "RouteKeyFactory", + "RouteBinding", + "RouteBindingConflictError", + "RouteKey", + "RouteMode", + "RouteTable", + "get_platform_io_manager", +] diff --git a/src/platform_io/dedupe.py b/src/platform_io/dedupe.py new file mode 100644 index 00000000..4c5c55a2 --- /dev/null +++ b/src/platform_io/dedupe.py @@ -0,0 +1,133 @@ +"""提供 Platform IO 的轻量入站消息去重能力。 + +当前实现基于 ``dict + heapq``: +- ``dict`` 保存去重键到过期时间的映射 +- ``heapq`` 维护按过期时间排序的小顶堆 + +这样就不需要在每次检查时全表扫描,而是通过懒清理逐步弹出已经过期 +或已经失效的堆节点。 +""" + +from typing import Dict, List, Tuple + +import heapq +import time + + +class MessageDeduplicator: + """使用基于 TTL 的内存缓存进行入站消息去重。 + + 主要用于解决同一条外部消息被重复送入 Core 的问题,例如双路径并存、 + 适配器重试、重连或重复回调等场景。Broker 可以借助这个组件在进入 + Core 前先拦住重复投递,避免重复处理、重复回复和重复入库。 + + 当前实现使用 ``dict + heapq`` 维护过期时间: + - ``dict`` 负责 ``O(1)`` 级别的去重键查找 + - ``heapq`` 负责按过期时间顺序做懒清理 + + 这比“每次调用都全表扫描过期项”的实现更适合高吞吐消息场景。 + + Notes: + 复杂度说明如下,设 ``n`` 为当前缓存中的有效去重键数量: + + - 单次 ``mark_seen()`` 在常见路径下的时间复杂度接近 ``O(log n)`` + - 从长期摊还角度看,``mark_seen()`` 的时间复杂度也接近 ``O(log n)`` + - 如果某次调用恰好触发一批过期键的集中清理,则该次调用的最坏时间复杂度 + 可达到 ``O(k log n)``,其中 ``k`` 为本次被弹出或清理的键数量 + - 空间复杂度为 ``O(n)`` + """ + + def __init__(self, ttl_seconds: float = 300.0, max_entries: int = 10000) -> None: + """初始化去重器。 + + Args: + ttl_seconds: 每个去重键在缓存中的保留时长,单位为秒。 + max_entries: 缓存允许保留的最大有效键数量,超出后会触发 + 机会性淘汰。 + + Raises: + ValueError: 当 ``ttl_seconds`` 或 ``max_entries`` 非正数时抛出。 + """ + if ttl_seconds <= 0: + raise ValueError("ttl_seconds 必须大于 0") + if max_entries <= 0: + raise ValueError("max_entries 必须大于 0") + + self._ttl_seconds = ttl_seconds + self._max_entries = max_entries + self._expire_heap: List[Tuple[float, str]] = [] + self._seen: Dict[str, float] = {} + + def mark_seen(self, dedupe_key: str) -> bool: + """标记一条去重键已经出现过。 + + Args: + dedupe_key: 能稳定标识一条外部入站消息的去重键。 + + Returns: + bool: 若该键在当前 TTL 窗口内首次出现则返回 ``True``, + 否则返回 ``False``。 + + Notes: + 方法会先基于小顶堆做一次懒清理,再判断当前键是否仍在有效期内。 + 如果缓存已达到上限,则会优先淘汰“最早过期的仍然有效的键”。 + + 复杂度方面,常见路径下该方法接近 ``O(log n)``;如果恰好需要 + 集中清理一批过期键,则单次调用最坏可达到 ``O(k log n)``。 + """ + now = time.monotonic() + self._purge_expired(now) + + expires_at = self._seen.get(dedupe_key) + if expires_at is not None and expires_at > now: + return False + + if len(self._seen) >= self._max_entries: + self._evict_earliest_live() + + expires_at = now + self._ttl_seconds + self._seen[dedupe_key] = expires_at + heapq.heappush(self._expire_heap, (expires_at, dedupe_key)) + return True + + def clear(self) -> None: + """清空全部去重缓存。""" + self._expire_heap.clear() + self._seen.clear() + + def _purge_expired(self, now: float) -> None: + """从缓存中清理已经过期的去重键。 + + Args: + now: 当前单调时钟时间戳。 + + Notes: + 堆中可能存在旧版本节点。例如同一个 ``dedupe_key`` 被重新写入后, + 旧的过期时间节点仍会留在堆里。这里会通过和 ``dict`` 中当前值比对, + 跳过这类失效节点。 + """ + while self._expire_heap and self._expire_heap[0][0] <= now: + expires_at, dedupe_key = heapq.heappop(self._expire_heap) + current_expires_at = self._seen.get(dedupe_key) + if current_expires_at is None: + continue + if current_expires_at != expires_at: + continue + self._seen.pop(dedupe_key, None) + + def _evict_earliest_live(self) -> None: + """当缓存达到容量上限时,淘汰一条最早过期的有效键。 + + Notes: + 堆顶可能是已经过期或已失效的旧节点,因此这里同样需要循环弹出, + 直到找到一条当前仍然在 ``dict`` 中生效的键。 + """ + while self._expire_heap: + expires_at, dedupe_key = heapq.heappop(self._expire_heap) + current_expires_at = self._seen.get(dedupe_key) + if current_expires_at is None: + continue + if current_expires_at != expires_at: + continue + self._seen.pop(dedupe_key, None) + return diff --git a/src/platform_io/drivers/__init__.py b/src/platform_io/drivers/__init__.py new file mode 100644 index 00000000..b12120cf --- /dev/null +++ b/src/platform_io/drivers/__init__.py @@ -0,0 +1,11 @@ +"""导出 Platform IO 层的公开驱动类型。""" + +from .base import PlatformIODriver +from .legacy_driver import LegacyPlatformDriver +from .plugin_driver import PluginPlatformDriver + +__all__ = [ + "LegacyPlatformDriver", + "PlatformIODriver", + "PluginPlatformDriver", +] diff --git a/src/platform_io/drivers/base.py b/src/platform_io/drivers/base.py new file mode 100644 index 00000000..c6173d8c --- /dev/null +++ b/src/platform_io/drivers/base.py @@ -0,0 +1,104 @@ +"""定义 Platform IO 传输驱动的基础抽象协议。""" + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional + +from src.platform_io.types import DeliveryReceipt, DriverDescriptor, InboundMessageEnvelope, RouteKey + +if TYPE_CHECKING: + from src.chat.message_receive.message import SessionMessage + +InboundHandler = Callable[[InboundMessageEnvelope], Awaitable[bool]] + + +class PlatformIODriver(ABC): + """定义所有 Platform IO 驱动都必须实现的最小契约。 + + 当前实现故意保持接口很小,让中间层可以先落地,再逐步把 legacy + 与 plugin 路径的真实收发能力迁入这套协议之下。 + """ + + def __init__(self, descriptor: DriverDescriptor) -> None: + """使用驱动描述对象初始化驱动。 + + Args: + descriptor: 注册到 Broker 中的静态驱动元数据。 + """ + self._descriptor = descriptor + self._inbound_handler: Optional[InboundHandler] = None + + @property + def descriptor(self) -> DriverDescriptor: + """返回当前驱动的描述对象。 + + Returns: + DriverDescriptor: 当前驱动实例对应的描述对象。 + """ + return self._descriptor + + @property + def driver_id(self) -> str: + """返回驱动标识。 + + Returns: + str: 当前驱动的唯一 ID。 + """ + return self._descriptor.driver_id + + def set_inbound_handler(self, handler: InboundHandler) -> None: + """注册入站消息交回 Broker 的回调函数。 + + Args: + handler: 将规范化入站封装继续转发给 Broker 的异步回调。 + """ + self._inbound_handler = handler + + def clear_inbound_handler(self) -> None: + """清除当前注册的入站回调函数。""" + self._inbound_handler = None + + async def emit_inbound(self, envelope: InboundMessageEnvelope) -> bool: + """将一条入站封装转交给 Broker 回调。 + + Args: + envelope: 由驱动产出的规范化入站封装。 + + Returns: + bool: 若 Broker 接受该入站消息则返回 ``True``,否则返回 ``False``。 + """ + + if self._inbound_handler is None: + return False + return await self._inbound_handler(envelope) + + async def start(self) -> None: + """启动驱动生命周期。 + + 子类后续若需要初始化逻辑,可以覆盖这个钩子。 + """ + return None + + async def stop(self) -> None: + """停止驱动生命周期。 + + 子类后续若需要清理逻辑,可以覆盖这个钩子。 + """ + return None + + @abstractmethod + async def send_message( + self, + message: "SessionMessage", + route_key: RouteKey, + metadata: Optional[Dict[str, Any]] = None, + ) -> DeliveryReceipt: + """通过具体驱动发送一条消息。 + + Args: + message: 要投递的内部会话消息。 + route_key: Broker 为本次投递选中的路由键。 + metadata: 本次出站投递可选的 Broker 侧元数据。 + + Returns: + DeliveryReceipt: 规范化后的投递结果。 + """ diff --git a/src/platform_io/drivers/legacy_driver.py b/src/platform_io/drivers/legacy_driver.py new file mode 100644 index 00000000..bd74d8c7 --- /dev/null +++ b/src/platform_io/drivers/legacy_driver.py @@ -0,0 +1,61 @@ +"""提供 Platform IO 的 legacy 传输驱动骨架。""" + +from typing import TYPE_CHECKING, Any, Dict, Optional + +from src.platform_io.drivers.base import PlatformIODriver +from src.platform_io.types import DeliveryReceipt, DriverDescriptor, DriverKind, RouteKey + +if TYPE_CHECKING: + from src.chat.message_receive.message import SessionMessage + + +class LegacyPlatformDriver(PlatformIODriver): + """面向 ``maim_message`` 旧链路的 Platform IO 驱动骨架。""" + + def __init__( + self, + driver_id: str, + platform: str, + account_id: Optional[str] = None, + scope: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> None: + """初始化一个 legacy 驱动描述对象。 + + Args: + driver_id: Broker 内的唯一驱动 ID。 + platform: 该 legacy 适配器链路负责的平台。 + account_id: 可选的账号 ID 或 self ID。 + scope: 可选的额外路由作用域。 + metadata: 可选的额外驱动元数据。 + """ + descriptor = DriverDescriptor( + driver_id=driver_id, + kind=DriverKind.LEGACY, + platform=platform, + account_id=account_id, + scope=scope, + metadata=metadata or {}, + ) + super().__init__(descriptor) + + async def send_message( + self, + message: "SessionMessage", + route_key: RouteKey, + metadata: Optional[Dict[str, Any]] = None, + ) -> DeliveryReceipt: + """通过 legacy 传输路径发送消息。 + + Args: + message: 要投递的内部会话消息。 + route_key: Broker 为本次投递选择的路由键。 + metadata: 本次出站投递可选的 Broker 侧元数据。 + + Returns: + DeliveryReceipt: 由驱动返回的规范化回执。 + + Raises: + NotImplementedError: 当前仍处于骨架阶段,尚未真正接入旧发送链。 + """ + raise NotImplementedError("LegacyPlatformDriver 仅完成地基实现,尚未接入旧发送链") diff --git a/src/platform_io/drivers/plugin_driver.py b/src/platform_io/drivers/plugin_driver.py new file mode 100644 index 00000000..9c139309 --- /dev/null +++ b/src/platform_io/drivers/plugin_driver.py @@ -0,0 +1,64 @@ +"""提供 Platform IO 的 plugin 传输驱动骨架。""" + +from typing import TYPE_CHECKING, Any, Dict, Optional + +from src.platform_io.drivers.base import PlatformIODriver +from src.platform_io.types import DeliveryReceipt, DriverDescriptor, DriverKind, RouteKey + +if TYPE_CHECKING: + from src.chat.message_receive.message import SessionMessage + + +class PluginPlatformDriver(PlatformIODriver): + """面向 ``MessageGateway`` 插件链路的 Platform IO 驱动骨架。""" + + def __init__( + self, + driver_id: str, + platform: str, + account_id: Optional[str] = None, + scope: Optional[str] = None, + plugin_id: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> None: + """初始化一个 plugin 驱动描述对象。 + + Args: + driver_id: Broker 内的唯一驱动 ID。 + platform: 该 plugin 适配器链路负责的平台。 + account_id: 可选的账号 ID 或 self ID。 + scope: 可选的额外路由作用域。 + plugin_id: 拥有该适配器实现的插件 ID,可为空。 + metadata: 可选的额外驱动元数据。 + """ + descriptor = DriverDescriptor( + driver_id=driver_id, + kind=DriverKind.PLUGIN, + platform=platform, + account_id=account_id, + scope=scope, + plugin_id=plugin_id, + metadata=metadata or {}, + ) + super().__init__(descriptor) + + async def send_message( + self, + message: "SessionMessage", + route_key: RouteKey, + metadata: Optional[Dict[str, Any]] = None, + ) -> DeliveryReceipt: + """通过 plugin 传输路径发送消息。 + + Args: + message: 要投递的内部会话消息。 + route_key: Broker 为本次投递选择的路由键。 + metadata: 本次出站投递可选的 Broker 侧元数据。 + + Returns: + DeliveryReceipt: 由驱动返回的规范化回执。 + + Raises: + NotImplementedError: 当前仍处于骨架阶段,尚未真正接入 MessageGateway。 + """ + raise NotImplementedError("PluginPlatformDriver 仅完成地基实现,尚未接入 MessageGateway") diff --git a/src/platform_io/manager.py b/src/platform_io/manager.py new file mode 100644 index 00000000..6135a567 --- /dev/null +++ b/src/platform_io/manager.py @@ -0,0 +1,529 @@ +"""提供 Platform IO 层的中心 Broker 管理器。""" + +from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional + +import hashlib +import json + +from src.common.logger import get_logger +from src.platform_io.drivers.base import PlatformIODriver + +from .dedupe import MessageDeduplicator +from .outbound_tracker import OutboundTracker +from .route_key_factory import RouteKeyFactory +from .registry import DriverRegistry +from .routing import RouteTable +from .types import DeliveryReceipt, DeliveryStatus, InboundMessageEnvelope, RouteBinding, RouteKey + +if TYPE_CHECKING: + from src.chat.message_receive.message import SessionMessage + +logger = get_logger("platform_io.manager") + +InboundDispatcher = Callable[[InboundMessageEnvelope], Awaitable[None]] + + +class PlatformIOManager: + """统一协调双路径平台消息 IO 的路由、去重与状态跟踪。 + + 这个管理器预期会成为 legacy 适配器链路与 plugin 适配器链路之间的 + 唯一裁决点。当前地基阶段,它只提供共享状态和 Broker 侧契约,还没有 + 真正把生产流量切到新中间层。 + """ + + def __init__(self) -> None: + """初始化 Broker 管理器及其内存状态。""" + self._driver_registry = DriverRegistry() + self._route_table = RouteTable() + self._deduplicator = MessageDeduplicator() + self._outbound_tracker = OutboundTracker() + self._inbound_dispatcher: Optional[InboundDispatcher] = None + self._started = False + + @property + def is_started(self) -> bool: + """返回 Broker 当前是否已进入运行态。 + + Returns: + bool: 若 Broker 已启动则返回 ``True``。 + """ + return self._started + + async def start(self) -> None: + """启动 Broker,并依次启动当前已注册的全部驱动。 + + Raises: + Exception: 当某个驱动启动失败时,异常会继续上抛;已成功启动的驱动 + 会被自动回滚停止。 + """ + if self._started: + return + + started_drivers: list[PlatformIODriver] = [] + try: + for driver in self._driver_registry.list(): + await driver.start() + started_drivers.append(driver) + except Exception: + for driver in reversed(started_drivers): + try: + await driver.stop() + except Exception: + logger.exception("回滚驱动停止失败: driver_id=%s", driver.driver_id) + raise + + self._started = True + + async def stop(self) -> None: + """停止 Broker,并按逆序停止全部已注册驱动。 + + 停止完成后,会同步清空仅对当前运行周期有效的去重缓存和出站跟踪状态, + 避免下一次启动时继续沿用上一个运行周期的瞬时内存数据。 + + Raises: + RuntimeError: 当一个或多个驱动停止失败时抛出汇总异常。 + """ + if not self._started: + return + + stop_errors: list[str] = [] + for driver in reversed(self._driver_registry.list()): + try: + await driver.stop() + except Exception as exc: + stop_errors.append(f"{driver.driver_id}: {exc}") + logger.exception("驱动停止失败: driver_id=%s", driver.driver_id) + + self._started = False + self._deduplicator.clear() + self._outbound_tracker.clear() + if stop_errors: + raise RuntimeError(f"部分驱动停止失败: {'; '.join(stop_errors)}") + + async def add_driver(self, driver: PlatformIODriver) -> None: + """向运行中的 Broker 注册并启动一个驱动。 + + 如果 Broker 尚未启动,则该方法等价于 ``register_driver()``。 + + Args: + driver: 要添加的驱动实例。 + + Raises: + Exception: 当驱动启动失败时,注册会自动回滚,异常继续上抛。 + """ + self._register_driver_internal(driver) + if not self._started: + return + + try: + await driver.start() + except Exception: + self._unregister_driver_internal(driver.driver_id) + raise + + async def remove_driver(self, driver_id: str) -> Optional[PlatformIODriver]: + """从运行中的 Broker 停止并移除一个驱动。 + + 如果 Broker 尚未启动,则该方法等价于 ``unregister_driver()``。 + + Args: + driver_id: 要移除的驱动 ID。 + + Returns: + Optional[PlatformIODriver]: 若驱动存在,则返回被移除的驱动实例。 + + Raises: + Exception: 当 Broker 运行中且驱动停止失败时,异常会继续上抛。 + """ + if not self._started: + return self.unregister_driver(driver_id) + + driver = self._driver_registry.get(driver_id) + if driver is None: + return None + + await driver.stop() + return self._unregister_driver_internal(driver_id) + + @property + def driver_registry(self) -> DriverRegistry: + """返回管理器持有的驱动注册表。 + + Returns: + DriverRegistry: 用于保存全部已注册驱动的注册表。 + """ + return self._driver_registry + + @property + def route_table(self) -> RouteTable: + """返回管理器持有的路由绑定表。 + + Returns: + RouteTable: 用于归属解析的路由绑定表。 + """ + return self._route_table + + @property + def deduplicator(self) -> MessageDeduplicator: + """返回管理器持有的入站去重器。 + + Returns: + MessageDeduplicator: 用于抑制重复入站的去重器。 + """ + return self._deduplicator + + @property + def outbound_tracker(self) -> OutboundTracker: + """返回管理器持有的出站跟踪器。 + + Returns: + OutboundTracker: 用于记录出站 pending 状态与回执的跟踪器。 + """ + return self._outbound_tracker + + def set_inbound_dispatcher(self, dispatcher: InboundDispatcher) -> None: + """设置统一的入站分发回调。 + + Args: + dispatcher: 接收已通过 Broker 审核的入站封装,并继续送入 + Core 下一处理阶段的异步回调。 + """ + + self._inbound_dispatcher = dispatcher + + def clear_inbound_dispatcher(self) -> None: + """清除当前的入站分发回调。""" + self._inbound_dispatcher = None + + @property + def has_inbound_dispatcher(self) -> bool: + """返回当前是否已经配置入站分发回调。 + + Returns: + bool: 若已经配置入站分发回调则返回 ``True``。 + """ + return self._inbound_dispatcher is not None + + def register_driver(self, driver: PlatformIODriver) -> None: + """注册驱动,并把它的入站回调挂到 Broker。 + + Args: + driver: 要注册的驱动实例。 + + Raises: + RuntimeError: 当 Broker 已经处于运行态时抛出。此时应改用 + ``add_driver()`` 以保证驱动生命周期和注册状态一致。 + """ + if self._started: + raise RuntimeError("Broker 运行中不允许直接 register_driver,请改用 add_driver()") + + self._register_driver_internal(driver) + + def _register_driver_internal(self, driver: PlatformIODriver) -> None: + """执行不带运行态限制的内部驱动注册。 + + Args: + driver: 要注册的驱动实例。 + """ + driver.set_inbound_handler(self.accept_inbound) + self._driver_registry.register(driver) + + def unregister_driver(self, driver_id: str) -> Optional[PlatformIODriver]: + """从 Broker 注销一个驱动。 + + Args: + driver_id: 要移除的驱动 ID。 + + Returns: + Optional[PlatformIODriver]: 若驱动存在,则返回被移除的驱动实例。 + + Raises: + RuntimeError: 当 Broker 已经处于运行态时抛出。此时应改用 + ``remove_driver()``,避免驱动停止与路由解绑脱节。 + """ + if self._started: + raise RuntimeError("Broker 运行中不允许直接 unregister_driver,请改用 remove_driver()") + + return self._unregister_driver_internal(driver_id) + + def _unregister_driver_internal(self, driver_id: str) -> Optional[PlatformIODriver]: + """执行不带运行态限制的内部驱动注销。 + + Args: + driver_id: 要移除的驱动 ID。 + + Returns: + Optional[PlatformIODriver]: 若驱动存在,则返回被移除的驱动实例。 + """ + removed_driver = self._driver_registry.unregister(driver_id) + if removed_driver is None: + return None + + removed_driver.clear_inbound_handler() + self._route_table.remove_bindings_by_driver(driver_id) + return removed_driver + + def bind_route(self, binding: RouteBinding, *, replace: bool = False) -> None: + """为某个路由键绑定驱动。 + + Args: + binding: 要保存的路由绑定。 + replace: 是否允许替换已有的精确 active owner。 + + Raises: + ValueError: 当绑定引用了不存在的驱动,或者绑定与驱动描述不一致时抛出。 + """ + driver = self._driver_registry.get(binding.driver_id) + if driver is None: + raise ValueError(f"驱动 {binding.driver_id} 未注册,无法绑定路由") + + self._validate_binding_against_driver(binding, driver) + self._route_table.bind(binding, replace=replace) + + def unbind_route(self, route_key: RouteKey, driver_id: Optional[str] = None) -> None: + """移除一个或多个路由绑定。 + + Args: + route_key: 要移除绑定的路由键。 + driver_id: 可选的特定驱动 ID。 + """ + self._route_table.unbind(route_key, driver_id) + + def resolve_driver(self, route_key: RouteKey) -> Optional[PlatformIODriver]: + """解析某个路由键当前的 active 驱动。 + + Args: + route_key: 要解析的路由键。 + + Returns: + Optional[PlatformIODriver]: 若存在 active 驱动,则返回该驱动实例。 + """ + active_binding = self._route_table.get_active_binding(route_key) + if active_binding is None: + return None + return self._driver_registry.get(active_binding.driver_id) + + @staticmethod + def build_route_key_from_message(message: "SessionMessage") -> RouteKey: + """根据 ``SessionMessage`` 构造路由键。 + + Args: + message: 内部会话消息对象。 + + Returns: + RouteKey: 由消息内容提取出的规范化路由键。 + """ + return RouteKeyFactory.from_session_message(message) + + @staticmethod + def build_route_key_from_message_dict(message_dict: Dict[str, Any]) -> RouteKey: + """根据消息字典构造路由键。 + + Args: + message_dict: Host 与插件之间传输的消息字典。 + + Returns: + RouteKey: 由消息字典提取出的规范化路由键。 + """ + return RouteKeyFactory.from_message_dict(message_dict) + + async def accept_inbound(self, envelope: InboundMessageEnvelope) -> bool: + """处理一条由驱动上报的入站封装。 + + Args: + envelope: 由传输驱动产出的入站封装。 + + Returns: + bool: 若消息被接受并继续转发给入站分发器,则返回 ``True``, + 否则返回 ``False``。 + """ + + if not self._route_table.accepts_inbound(envelope.route_key, envelope.driver_id): + logger.info( + "忽略非 active owner 的入站消息: route=%s driver=%s", + envelope.route_key, + envelope.driver_id, + ) + return False + + if self._inbound_dispatcher is None: + logger.debug("PlatformIOManager 尚未配置 inbound dispatcher,暂不继续分发") + return False + + dedupe_key = self._build_inbound_dedupe_key(envelope) + if dedupe_key is not None: + if not self._deduplicator.mark_seen(dedupe_key): + logger.info("忽略重复入站消息: dedupe_key=%s", dedupe_key) + return False + + await self._inbound_dispatcher(envelope) + return True + + async def send_message( + self, + message: "SessionMessage", + route_key: RouteKey, + metadata: Optional[Dict[str, Any]] = None, + ) -> DeliveryReceipt: + """通过 Broker 选中的驱动发送一条消息。 + + Args: + message: 要投递的内部会话消息。 + route_key: 本次出站投递选择的路由键。 + metadata: 可选的额外 Broker 侧元数据。 + + Returns: + DeliveryReceipt: 规范化后的出站回执。若路由不存在、驱动缺失, + 或同一消息已存在未完成的出站跟踪,也会返回失败回执而不是抛异常。 + """ + + active_binding = self._route_table.get_active_binding(route_key) + if active_binding is None: + return DeliveryReceipt( + internal_message_id=message.message_id, + route_key=route_key, + status=DeliveryStatus.FAILED, + error="未找到 active 路由绑定", + ) + + driver = self._driver_registry.get(active_binding.driver_id) + if driver is None: + return DeliveryReceipt( + internal_message_id=message.message_id, + route_key=route_key, + status=DeliveryStatus.FAILED, + driver_id=active_binding.driver_id, + driver_kind=active_binding.driver_kind, + error="active 路由绑定对应的驱动不存在", + ) + + try: + self._outbound_tracker.begin_tracking( + internal_message_id=message.message_id, + route_key=route_key, + driver_id=driver.driver_id, + metadata=metadata, + ) + except ValueError as exc: + return DeliveryReceipt( + internal_message_id=message.message_id, + route_key=route_key, + status=DeliveryStatus.FAILED, + driver_id=driver.driver_id, + driver_kind=driver.descriptor.kind, + error=str(exc), + ) + + try: + receipt = await driver.send_message(message=message, route_key=route_key, metadata=metadata) + except Exception as exc: + receipt = DeliveryReceipt( + internal_message_id=message.message_id, + route_key=route_key, + status=DeliveryStatus.FAILED, + driver_id=driver.driver_id, + driver_kind=driver.descriptor.kind, + error=str(exc), + ) + + self._outbound_tracker.finish_tracking(receipt) + return receipt + + @staticmethod + def _build_inbound_dedupe_key(envelope: InboundMessageEnvelope) -> Optional[str]: + """构造用于入站抑制的去重键。 + + Args: + envelope: 当前正在处理的入站封装。 + + Returns: + Optional[str]: 若可以构造稳定去重键则返回该键,否则返回 ``None``。 + """ + raw_dedupe_key = envelope.dedupe_key or envelope.external_message_id + if raw_dedupe_key is None and envelope.session_message is not None: + raw_dedupe_key = envelope.session_message.message_id + if raw_dedupe_key is None and envelope.payload is not None: + raw_dedupe_key = PlatformIOManager._build_payload_fingerprint(envelope.payload) + if raw_dedupe_key is None: + return None + + normalized_dedupe_key = str(raw_dedupe_key).strip() + if not normalized_dedupe_key: + return None + + return f"{envelope.route_key.to_dedupe_scope()}:{normalized_dedupe_key}" + + @staticmethod + def _build_payload_fingerprint(payload: Dict[str, Any]) -> Optional[str]: + """根据消息载荷构造稳定指纹。 + + Args: + payload: 待构造指纹的原始载荷字典。 + + Returns: + Optional[str]: 若成功生成指纹则返回十六进制摘要,否则返回 ``None``。 + """ + try: + serialized_payload = json.dumps( + payload, + default=str, + ensure_ascii=True, + separators=(",", ":"), + sort_keys=True, + ) + except Exception: + return None + + return hashlib.sha256(serialized_payload.encode()).hexdigest() + + @staticmethod + def _validate_binding_against_driver(binding: RouteBinding, driver: PlatformIODriver) -> None: + """校验路由绑定与驱动描述是否一致。 + + Args: + binding: 待校验的路由绑定。 + driver: 被绑定的驱动实例。 + + Raises: + ValueError: 当绑定类型、平台或更细粒度路由维度与驱动描述冲突时抛出。 + """ + descriptor = driver.descriptor + if binding.driver_kind != descriptor.kind: + raise ValueError( + f"路由绑定的 driver_kind={binding.driver_kind} 与驱动 {driver.driver_id} 的类型 " + f"{descriptor.kind} 不一致" + ) + + if binding.route_key.platform != descriptor.platform: + raise ValueError( + f"路由绑定的平台 {binding.route_key.platform} 与驱动 {driver.driver_id} 的平台 " + f"{descriptor.platform} 不一致" + ) + + if descriptor.account_id is not None and binding.route_key.account_id not in (None, descriptor.account_id): + raise ValueError( + f"路由绑定的 account_id={binding.route_key.account_id} 与驱动 {driver.driver_id} 的 " + f"account_id={descriptor.account_id} 冲突" + ) + + if descriptor.scope is not None and binding.route_key.scope not in (None, descriptor.scope): + raise ValueError( + f"路由绑定的 scope={binding.route_key.scope} 与驱动 {driver.driver_id} 的 " + f"scope={descriptor.scope} 冲突" + ) + + +_platform_io_manager: Optional[PlatformIOManager] = None + + +def get_platform_io_manager() -> PlatformIOManager: + """返回全局 ``PlatformIOManager`` 单例。 + + Returns: + PlatformIOManager: 进程级共享的 Broker 管理器实例。 + """ + + global _platform_io_manager + if _platform_io_manager is None: + _platform_io_manager = PlatformIOManager() + return _platform_io_manager diff --git a/src/platform_io/outbound_tracker.py b/src/platform_io/outbound_tracker.py new file mode 100644 index 00000000..438aa566 --- /dev/null +++ b/src/platform_io/outbound_tracker.py @@ -0,0 +1,242 @@ +"""跟踪 Platform IO 层的出站投递状态。 + +当前实现基于两组 ``dict + heapq``: +- ``_pending`` 和 ``_pending_expire_heap`` 负责管理待完成的出站记录 +- ``_receipts_by_external_id`` 和 ``_receipt_expire_heap`` 负责管理已完成回执索引 + +这样就不需要在每次读写时全表扫描过期项,而是通过懒清理逐步弹出已经过期 +或已经失效的堆节点。 +""" + +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Tuple + +import heapq +import time + +from .types import DeliveryReceipt, RouteKey + + +@dataclass(slots=True) +class PendingOutboundRecord: + """表示一条仍在等待完成的出站投递记录。 + + Attributes: + internal_message_id: 正在跟踪的内部 ``SessionMessage.message_id``。 + route_key: 该出站投递开始时使用的路由键。 + driver_id: 负责这次出站投递的驱动 ID。 + created_at: 开始跟踪时记录的单调时钟时间戳。 + expires_at: 该待完成记录预计过期的单调时钟时间戳。 + metadata: 与待完成记录一同保留的额外 Broker 侧元数据。 + """ + + internal_message_id: str + route_key: RouteKey + driver_id: str + created_at: float = field(default_factory=time.monotonic) + expires_at: float = 0.0 + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass(slots=True) +class StoredDeliveryReceipt: + """表示一条已完成并暂存的出站回执。 + + Attributes: + receipt: 规范化后的出站投递回执。 + stored_at: 回执被写入索引时记录的单调时钟时间戳。 + expires_at: 该回执索引预计过期的单调时钟时间戳。 + """ + + receipt: DeliveryReceipt + stored_at: float = field(default_factory=time.monotonic) + expires_at: float = 0.0 + + +class OutboundTracker: + """统一跟踪出站消息的 pending 状态与最终回执。 + + 主要用于解决出站消息在发送过程中“状态散落在不同路径里”的问题: + - 发送开始后,需要在最终回执返回前保留一份 pending 状态 + - 平台返回 ``external_message_id`` 后,需要保留一段时间的回执索引 + + 当前实现使用 ``dict + heapq`` 做 TTL 管理: + - ``dict`` 提供 ``O(1)`` 级别的主键查询 + - ``heapq`` 提供按过期时间排序的懒清理能力 + + 这比“每次 begin/finish/get 都全表扫描”的实现更适合高吞吐出站场景。 + + Notes: + 复杂度说明如下,设 ``p`` 为当前有效 pending 数量,``r`` 为当前有效回执数量: + + - ``begin_tracking()``、``finish_tracking()`` 的常见路径时间复杂度接近 + ``O(log p)`` 或 ``O(log r)`` + - ``get_pending()``、``get_receipt_by_external_id()`` 的查询本身是 ``O(1)`` + ,连同懒清理一起看,长期摊还复杂度接近 ``O(log n)`` + - 如果某次调用恰好触发一批过期节点的集中清理,则该次调用的最坏时间复杂度 + 可达到 ``O(k log n)``,其中 ``k`` 为本次被弹出的节点数量 + - 空间复杂度为 ``O(p + r)`` + """ + + def __init__(self, ttl_seconds: float = 1800.0) -> None: + """初始化出站跟踪器。 + + Args: + ttl_seconds: 待完成记录与按外部消息 ID 建立的回执索引保留时长, + 单位为秒。 + + Raises: + ValueError: 当 ``ttl_seconds`` 非正数时抛出。 + """ + if ttl_seconds <= 0: + raise ValueError("ttl_seconds 必须大于 0") + + self._ttl_seconds = ttl_seconds + self._pending: Dict[str, PendingOutboundRecord] = {} + self._pending_expire_heap: List[Tuple[float, str]] = [] + self._receipts_by_external_id: Dict[str, StoredDeliveryReceipt] = {} + self._receipt_expire_heap: List[Tuple[float, str]] = [] + + def begin_tracking( + self, + internal_message_id: str, + route_key: RouteKey, + driver_id: str, + metadata: Optional[Dict[str, Any]] = None, + ) -> PendingOutboundRecord: + """开始跟踪一次出站投递。 + + Args: + internal_message_id: 正在投递的内部消息 ID。 + route_key: 这次出站投递选择的路由键。 + driver_id: 负责本次投递的驱动 ID。 + metadata: 可选的额外元数据,会一并保存在待完成记录中。 + + Returns: + PendingOutboundRecord: 新创建的待完成记录。 + + Raises: + ValueError: 当同一个 ``internal_message_id`` 已经存在未完成记录时抛出。 + """ + now = time.monotonic() + self._cleanup_expired(now) + + if internal_message_id in self._pending: + raise ValueError(f"消息 {internal_message_id} 已存在未完成的出站跟踪记录") + + expires_at = now + self._ttl_seconds + record = PendingOutboundRecord( + internal_message_id=internal_message_id, + route_key=route_key, + driver_id=driver_id, + created_at=now, + expires_at=expires_at, + metadata=metadata or {}, + ) + self._pending[internal_message_id] = record + heapq.heappush(self._pending_expire_heap, (expires_at, internal_message_id)) + return record + + def finish_tracking(self, receipt: DeliveryReceipt) -> Optional[PendingOutboundRecord]: + """使用最终回执结束一条出站跟踪。 + + Args: + receipt: 规范化后的最终投递回执。 + + Returns: + Optional[PendingOutboundRecord]: 若此前存在待完成记录,则返回该记录。 + """ + now = time.monotonic() + self._cleanup_expired(now) + + pending_record = self._pending.pop(receipt.internal_message_id, None) + if receipt.external_message_id: + expires_at = now + self._ttl_seconds + self._receipts_by_external_id[receipt.external_message_id] = StoredDeliveryReceipt( + receipt=receipt, + stored_at=now, + expires_at=expires_at, + ) + heapq.heappush(self._receipt_expire_heap, (expires_at, receipt.external_message_id)) + return pending_record + + def get_pending(self, internal_message_id: str) -> Optional[PendingOutboundRecord]: + """根据内部消息 ID 查询待完成记录。 + + Args: + internal_message_id: 要查询的内部消息 ID。 + + Returns: + Optional[PendingOutboundRecord]: 若记录仍存在,则返回对应待完成记录。 + """ + self._cleanup_expired(time.monotonic()) + return self._pending.get(internal_message_id) + + def get_receipt_by_external_id(self, external_message_id: str) -> Optional[DeliveryReceipt]: + """根据外部平台消息 ID 查询已完成回执。 + + Args: + external_message_id: 要查询的平台侧消息 ID。 + + Returns: + Optional[DeliveryReceipt]: 若存在对应回执,则返回该回执。 + """ + self._cleanup_expired(time.monotonic()) + stored_receipt = self._receipts_by_external_id.get(external_message_id) + return stored_receipt.receipt if stored_receipt else None + + def clear(self) -> None: + """清空全部待完成记录与已保存回执。""" + self._pending.clear() + self._pending_expire_heap.clear() + self._receipts_by_external_id.clear() + self._receipt_expire_heap.clear() + + def _cleanup_expired(self, now: float) -> None: + """清理内存中已经过期的待完成记录与已保存回执。 + + Args: + now: 当前单调时钟时间戳。 + """ + self._cleanup_expired_pending(now) + self._cleanup_expired_receipts(now) + + def _cleanup_expired_pending(self, now: float) -> None: + """清理已经过期的待完成记录。 + + Args: + now: 当前单调时钟时间戳。 + + Notes: + 堆中可能存在已经失效的旧节点。例如某条记录提前 ``finish`` 后, + 它原本的过期节点仍可能留在堆里。这里会通过和 ``dict`` 中当前记录的 + ``expires_at`` 对比,跳过这类旧节点。 + """ + while self._pending_expire_heap and self._pending_expire_heap[0][0] <= now: + expires_at, internal_message_id = heapq.heappop(self._pending_expire_heap) + current_record = self._pending.get(internal_message_id) + if current_record is None: + continue + if current_record.expires_at != expires_at: + continue + self._pending.pop(internal_message_id, None) + + def _cleanup_expired_receipts(self, now: float) -> None: + """清理已经过期的回执索引。 + + Args: + now: 当前单调时钟时间戳。 + + Notes: + 同一个 ``external_message_id`` 在极端情况下可能被重复写入索引, + 因此这里同样需要通过 ``expires_at`` 和当前 ``dict`` 中的值比对, + 跳过已经失效的旧堆节点。 + """ + while self._receipt_expire_heap and self._receipt_expire_heap[0][0] <= now: + expires_at, external_message_id = heapq.heappop(self._receipt_expire_heap) + current_receipt = self._receipts_by_external_id.get(external_message_id) + if current_receipt is None: + continue + if current_receipt.expires_at != expires_at: + continue + self._receipts_by_external_id.pop(external_message_id, None) diff --git a/src/platform_io/registry.py b/src/platform_io/registry.py new file mode 100644 index 00000000..9ad8ea8a --- /dev/null +++ b/src/platform_io/registry.py @@ -0,0 +1,70 @@ +"""提供 Platform IO 的驱动注册与查询能力。""" + +from typing import Dict, List, Optional + +from src.platform_io.drivers.base import PlatformIODriver +from src.platform_io.types import DriverKind + + +class DriverRegistry: + """集中保存已注册的 Platform IO 驱动,并提供基础查询接口。""" + + def __init__(self) -> None: + """初始化一个空的驱动注册表。""" + self._drivers: Dict[str, PlatformIODriver] = {} + + def register(self, driver: PlatformIODriver) -> None: + """注册一个驱动实例。 + + Args: + driver: 要注册的驱动实例。 + + Raises: + ValueError: 当驱动 ID 已经存在时抛出。 + """ + if driver.driver_id in self._drivers: + raise ValueError(f"驱动 {driver.driver_id} 已注册") + self._drivers[driver.driver_id] = driver + + def unregister(self, driver_id: str) -> Optional[PlatformIODriver]: + """按驱动 ID 注销一个驱动。 + + Args: + driver_id: 要移除的驱动 ID。 + + Returns: + Optional[PlatformIODriver]: 若驱动存在,则返回被移除的驱动实例。 + """ + return self._drivers.pop(driver_id, None) + + def get(self, driver_id: str) -> Optional[PlatformIODriver]: + """按驱动 ID 获取驱动实例。 + + Args: + driver_id: 要查询的驱动 ID。 + + Returns: + Optional[PlatformIODriver]: 若存在匹配驱动,则返回该驱动实例。 + """ + return self._drivers.get(driver_id) + + def list(self, *, kind: Optional[DriverKind] = None, platform: Optional[str] = None) -> List[PlatformIODriver]: + """列出已注册驱动,并支持可选过滤。 + + Args: + kind: 可选的驱动类型过滤条件。 + platform: 可选的平台名称过滤条件。 + + Returns: + List[PlatformIODriver]: 符合过滤条件的驱动列表。 + """ + drivers = list(self._drivers.values()) + if kind is not None: + drivers = [driver for driver in drivers if driver.descriptor.kind == kind] + if platform is not None: + drivers = [driver for driver in drivers if driver.descriptor.platform == platform] + return drivers + + def clear(self) -> None: + """清空全部已注册驱动。""" + self._drivers.clear() diff --git a/src/platform_io/route_key_factory.py b/src/platform_io/route_key_factory.py new file mode 100644 index 00000000..05bac6e8 --- /dev/null +++ b/src/platform_io/route_key_factory.py @@ -0,0 +1,150 @@ +"""提供 Platform IO 路由键的统一提取与构造能力。 + +这层的目标不是直接接入具体消息链,而是先把“未来接线时用什么字段构造 +RouteKey”约定下来,避免 legacy 和 plugin 两条链路各自发明一套隐式规则。 +""" + +from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple + +from .types import RouteKey + +if TYPE_CHECKING: + from src.chat.message_receive.message import SessionMessage + + +class RouteKeyFactory: + """统一构造 ``RouteKey`` 的工厂。 + + 当前约定会优先从消息字典顶层、``message_info``、``additional_config`` 或传入 metadata 中提取 + 以下字段: + + - account_id: ``platform_io_account_id`` / ``account_id`` / ``self_id`` / ``bot_account`` + - scope: ``platform_io_scope`` / ``route_scope`` / ``adapter_scope`` / ``connection_id`` + + 这样即使上游主链暂时还没有正式的 ``self_id`` 字段,中间层也能先统一 + 约定提取口径,等具体消息链接入时直接复用。 + """ + + ACCOUNT_ID_KEYS = ( + "platform_io_account_id", + "account_id", + "self_id", + "bot_account", + ) + SCOPE_KEYS = ( + "platform_io_scope", + "route_scope", + "adapter_scope", + "connection_id", + ) + + @classmethod + def from_platform( + cls, + platform: str, + *, + account_id: Optional[str] = None, + scope: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None, + ) -> RouteKey: + """根据平台名和可选 metadata 构造 ``RouteKey``。 + + Args: + platform: 平台名称。 + account_id: 显式传入的账号 ID;若为空,则尝试从 metadata 提取。 + scope: 显式传入的路由作用域;若为空,则尝试从 metadata 提取。 + metadata: 可选的元数据字典。 + + Returns: + RouteKey: 构造出的规范化路由键。 + """ + extracted_account_id, extracted_scope = cls.extract_components(metadata) + return RouteKey( + platform=platform, + account_id=account_id or extracted_account_id, + scope=scope or extracted_scope, + ) + + @classmethod + def from_message_dict(cls, message_dict: Dict[str, Any]) -> RouteKey: + """从消息字典中提取 ``RouteKey``。 + + Args: + message_dict: Host 与插件之间传输的消息字典。 + + Returns: + RouteKey: 构造出的规范化路由键。 + + Raises: + ValueError: 当消息字典缺少有效 ``platform`` 字段时抛出。 + """ + platform = str(message_dict.get("platform") or "").strip() + if not platform: + raise ValueError("消息字典缺少有效的 platform 字段,无法构造 RouteKey") + + message_info = message_dict.get("message_info", {}) + additional_config = {} + if isinstance(message_info, dict): + raw_additional_config = message_info.get("additional_config", {}) + if isinstance(raw_additional_config, dict): + additional_config = raw_additional_config + + explicit_account_id, explicit_scope = cls.extract_components(message_dict) + message_info_account_id, message_info_scope = cls.extract_components(message_info) + metadata_account_id, metadata_scope = cls.extract_components(additional_config) + return RouteKey( + platform=platform, + account_id=explicit_account_id or message_info_account_id or metadata_account_id, + scope=explicit_scope or message_info_scope or metadata_scope, + ) + + @classmethod + def from_session_message(cls, message: "SessionMessage") -> RouteKey: + """从 ``SessionMessage`` 中提取 ``RouteKey``。 + + Args: + message: 内部会话消息对象。 + + Returns: + RouteKey: 构造出的规范化路由键。 + """ + additional_config = message.message_info.additional_config or {} + metadata = additional_config if isinstance(additional_config, dict) else {} + return cls.from_platform(message.platform, metadata=metadata) + + @classmethod + def extract_components(cls, mapping: Optional[Dict[str, Any]]) -> Tuple[Optional[str], Optional[str]]: + """从任意字典中提取 ``account_id`` 与 ``scope``。 + + Args: + mapping: 待提取的字典;若为空或不是字典,则返回空结果。 + + Returns: + Tuple[Optional[str], Optional[str]]: ``(account_id, scope)``。 + """ + if not mapping or not isinstance(mapping, dict): + return None, None + + account_id = cls._pick_string(mapping, cls.ACCOUNT_ID_KEYS) + scope = cls._pick_string(mapping, cls.SCOPE_KEYS) + return account_id, scope + + @staticmethod + def _pick_string(mapping: Dict[str, Any], keys: Tuple[str, ...]) -> Optional[str]: + """按优先级从字典里挑选第一个有效字符串。 + + Args: + mapping: 待查询的字典。 + keys: 按优先级排列的候选键名。 + + Returns: + Optional[str]: 第一个规范化后非空的字符串值;若不存在则返回 ``None``。 + """ + for key in keys: + value = mapping.get(key) + if value is None: + continue + normalized = str(value).strip() + if normalized: + return normalized + return None diff --git a/src/platform_io/routing.py b/src/platform_io/routing.py new file mode 100644 index 00000000..7f85bbfa --- /dev/null +++ b/src/platform_io/routing.py @@ -0,0 +1,202 @@ +"""提供 Platform IO 的路由绑定存储与归属解析能力。""" + +from typing import Dict, List, Optional + +from .types import RouteBinding, RouteKey, RouteMode + + +class RouteBindingConflictError(ValueError): + """当同一路由键出现多个 active owner 竞争时抛出。""" + + +class RouteTable: + """维护路由绑定并解析路由归属。 + + 这个表刻意保持轻量,只负责归属规则本身,不掺杂具体发送或接收逻辑。 + 它决定某个路由键当前由哪个驱动 active 接管,哪些驱动仅以 shadow + 方式旁路观测。 + """ + + def __init__(self) -> None: + """初始化一个空的路由绑定表。""" + self._bindings: Dict[RouteKey, Dict[str, RouteBinding]] = {} + + def bind(self, binding: RouteBinding, *, replace: bool = False) -> None: + """注册或更新一条路由绑定。 + + Args: + binding: 要注册的绑定对象。 + replace: 当精确路由键上已经存在 active owner 时,是否允许替换。 + + Raises: + RouteBindingConflictError: 当精确路由键上已存在其他 active owner, + 且 ``replace`` 为 ``False`` 时抛出。 + """ + + if binding.mode == RouteMode.DISABLED: + self.unbind(binding.route_key, binding.driver_id) + return + + if binding.mode == RouteMode.ACTIVE: + active_binding = self.get_active_binding(binding.route_key, exact_only=True) + if active_binding and active_binding.driver_id != binding.driver_id: + if not replace: + raise RouteBindingConflictError( + f"RouteKey {binding.route_key} 已由 {active_binding.driver_id} 接管," + f"拒绝绑定到 {binding.driver_id}" + ) + self.unbind(binding.route_key, active_binding.driver_id) + + self._bindings.setdefault(binding.route_key, {})[binding.driver_id] = binding + + def unbind(self, route_key: RouteKey, driver_id: Optional[str] = None) -> List[RouteBinding]: + """移除指定路由键上的绑定。 + + Args: + route_key: 要移除绑定的路由键。 + driver_id: 可选的特定驱动 ID;若为空,则移除该路由键上的全部绑定。 + + Returns: + List[RouteBinding]: 被移除的绑定列表。 + """ + + binding_map = self._bindings.get(route_key) + if not binding_map: + return [] + + if driver_id is None: + removed = list(binding_map.values()) + self._bindings.pop(route_key, None) + return removed + + removed_binding = binding_map.pop(driver_id, None) + if not binding_map: + self._bindings.pop(route_key, None) + return [removed_binding] if removed_binding else [] + + def remove_bindings_by_driver(self, driver_id: str) -> List[RouteBinding]: + """移除某个驱动在所有路由键上的绑定。 + + Args: + driver_id: 要移除绑定的驱动 ID。 + + Returns: + List[RouteBinding]: 被移除的绑定列表。 + """ + removed_bindings: List[RouteBinding] = [] + empty_route_keys: List[RouteKey] = [] + + for route_key, binding_map in self._bindings.items(): + removed_binding = binding_map.pop(driver_id, None) + if removed_binding is not None: + removed_bindings.append(removed_binding) + if not binding_map: + empty_route_keys.append(route_key) + + for route_key in empty_route_keys: + self._bindings.pop(route_key, None) + + return self._sort_bindings(removed_bindings) + + def list_bindings(self, route_key: Optional[RouteKey] = None) -> List[RouteBinding]: + """列出当前绑定。 + + Args: + route_key: 可选的路由键过滤条件;若为空,则返回全部路由键上的绑定。 + + Returns: + List[RouteBinding]: 按优先级降序排列的绑定列表。 + """ + + if route_key is None: + bindings: List[RouteBinding] = [] + for binding_map in self._bindings.values(): + bindings.extend(binding_map.values()) + return self._sort_bindings(bindings) + + binding_map = self._bindings.get(route_key, {}) + return self._sort_bindings(list(binding_map.values())) + + def get_active_binding(self, route_key: RouteKey, *, exact_only: bool = False) -> Optional[RouteBinding]: + """获取某个路由键当前生效的 active 绑定。 + + Args: + route_key: 要解析的路由键。 + exact_only: 是否只检查精确路由键而不做回退解析。 + + Returns: + Optional[RouteBinding]: 若存在 active owner,则返回对应绑定。 + """ + + candidate_keys = [route_key] if exact_only else route_key.resolution_order() + for candidate_key in candidate_keys: + binding_map = self._bindings.get(candidate_key, {}) + active_binding = self._pick_best_binding(binding_map, RouteMode.ACTIVE) + if active_binding is not None: + return active_binding + return None + + def get_shadow_bindings(self, route_key: RouteKey) -> List[RouteBinding]: + """获取某个精确路由键上的 shadow 绑定。 + + Args: + route_key: 要查看的路由键。 + + Returns: + List[RouteBinding]: 按优先级降序排列的 shadow 绑定列表。 + """ + binding_map = self._bindings.get(route_key, {}) + shadow_bindings = [binding for binding in binding_map.values() if binding.mode == RouteMode.SHADOW] + return self._sort_bindings(shadow_bindings) + + def accepts_inbound(self, route_key: RouteKey, driver_id: str) -> bool: + """判断某个驱动是否是当前允许入 Core 的 active owner。 + + Args: + route_key: 入站消息对应的路由键。 + driver_id: 希望将消息送入 Core 的驱动 ID。 + + Returns: + bool: 若该驱动是解析结果中的 active owner,则返回 ``True``。 + """ + + active_binding = self.get_active_binding(route_key) + return active_binding is not None and active_binding.driver_id == driver_id + + @staticmethod + def _sort_bindings(bindings: List[RouteBinding]) -> List[RouteBinding]: + """按优先级降序排列绑定列表。 + + Args: + bindings: 待排序的绑定列表。 + + Returns: + List[RouteBinding]: 排序后的绑定列表。 + """ + return sorted(bindings, key=lambda item: item.priority, reverse=True) + + @staticmethod + def _pick_best_binding( + binding_map: Dict[str, RouteBinding], + mode: RouteMode, + ) -> Optional[RouteBinding]: + """从绑定映射中挑选指定模式下优先级最高的一条绑定。 + + Args: + binding_map: 某个精确 ``RouteKey`` 对应的绑定映射。 + mode: 需要挑选的绑定模式。 + + Returns: + Optional[RouteBinding]: 若存在匹配模式的绑定,则返回优先级最高的一条。 + + Notes: + 这里使用单次线性扫描代替“先过滤成列表再排序”的做法,以减少 + 高频路由解析路径上的临时对象分配和排序开销。 + """ + best_binding: Optional[RouteBinding] = None + for binding in binding_map.values(): + if binding.mode != mode: + continue + if best_binding is None or binding.priority > best_binding.priority: + best_binding = binding + return best_binding diff --git a/src/platform_io/types.py b/src/platform_io/types.py new file mode 100644 index 00000000..c74dc246 --- /dev/null +++ b/src/platform_io/types.py @@ -0,0 +1,240 @@ +"""定义 Platform IO 中间层共享的核心类型。 + +本模块放置路由、驱动、入站与出站等规范化数据结构,供 Broker +层在 legacy 适配器链路和 plugin 适配器链路之间复用。 +""" + +from dataclasses import dataclass, field +from enum import Enum +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +if TYPE_CHECKING: + from src.chat.message_receive.message import SessionMessage + + +class DriverKind(str, Enum): + """底层收发驱动类型枚举。""" + + LEGACY = "legacy" + PLUGIN = "plugin" + + +class RouteMode(str, Enum): + """路由归属模式枚举。""" + + ACTIVE = "active" + SHADOW = "shadow" + DISABLED = "disabled" + + +class DeliveryStatus(str, Enum): + """统一出站回执状态枚举。""" + + PENDING = "pending" + SENT = "sent" + FAILED = "failed" + DROPPED = "dropped" + + +@dataclass(frozen=True, slots=True) +class RouteKey: + """用于 Platform IO 路由决策的唯一键。 + + 路由解析会按照“从最具体到最宽泛”的顺序进行回退,这样同一平台 + 后续就能自然支持按账号、自定义 scope 等更细粒度的归属控制。 + + Attributes: + platform: 平台名称,例如 ``qq``。 + account_id: 机器人账号 ID 或 self ID,用于区分同平台多身份。 + scope: 额外路由作用域,预留给未来的连接实例、租户或子通道等维度。 + """ + + platform: str + account_id: Optional[str] = None + scope: Optional[str] = None + + def __post_init__(self) -> None: + """规范化并校验路由键字段。 + + Raises: + ValueError: 当 ``platform`` 规范化后为空时抛出。 + """ + platform = str(self.platform).strip() + account_id = str(self.account_id).strip() if self.account_id is not None else None + scope = str(self.scope).strip() if self.scope is not None else None + + if not platform: + raise ValueError("RouteKey.platform 不能为空") + + object.__setattr__(self, "platform", platform) + object.__setattr__(self, "account_id", account_id or None) + object.__setattr__(self, "scope", scope or None) + + def resolution_order(self) -> List["RouteKey"]: + """返回从最具体到最宽泛的路由匹配顺序。 + + Returns: + List[RouteKey]: 按回退优先级排序的候选路由键列表。 + """ + + keys: List[RouteKey] = [self] + + if self.account_id is not None and self.scope is not None: + keys.append(RouteKey(platform=self.platform, account_id=self.account_id, scope=None)) + keys.append(RouteKey(platform=self.platform, account_id=None, scope=self.scope)) + elif self.account_id is not None: + keys.append(RouteKey(platform=self.platform, account_id=None, scope=None)) + elif self.scope is not None: + keys.append(RouteKey(platform=self.platform, account_id=None, scope=None)) + + default_key = RouteKey(platform=self.platform, account_id=None, scope=None) + if default_key not in keys: + keys.append(default_key) + + return keys + + def to_dedupe_scope(self) -> str: + """生成跨驱动共享的去重作用域字符串。 + + Returns: + str: 用于入站消息去重的稳定文本作用域键。 + """ + + account_id = self.account_id or "*" + scope = self.scope or "*" + return f"{self.platform}:{account_id}:{scope}" + + +@dataclass(frozen=True, slots=True) +class DriverDescriptor: + """描述一个已注册的 Platform IO 驱动。 + + Attributes: + driver_id: Broker 层内全局唯一的驱动标识。 + kind: 驱动实现类型,例如 legacy 或 plugin。 + platform: 驱动负责的平台名称。 + account_id: 可选的账号 ID 或 self ID。 + scope: 可选的额外路由作用域。 + plugin_id: 当驱动来自插件适配器时,对应的插件 ID。 + metadata: 预留给路由策略或观测能力的额外驱动元数据。 + """ + + driver_id: str + kind: DriverKind + platform: str + account_id: Optional[str] = None + scope: Optional[str] = None + plugin_id: Optional[str] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + """规范化并校验驱动描述字段。 + + Raises: + ValueError: 当 ``driver_id`` 或 ``platform`` 规范化后为空时抛出。 + """ + driver_id = str(self.driver_id).strip() + platform = str(self.platform).strip() + plugin_id = str(self.plugin_id).strip() if self.plugin_id is not None else None + + if not driver_id: + raise ValueError("DriverDescriptor.driver_id 不能为空") + if not platform: + raise ValueError("DriverDescriptor.platform 不能为空") + + object.__setattr__(self, "driver_id", driver_id) + object.__setattr__(self, "platform", platform) + object.__setattr__(self, "plugin_id", plugin_id or None) + + @property + def route_key(self) -> RouteKey: + """构造该驱动默认代表的路由键。 + + Returns: + RouteKey: 当前驱动描述对应的规范化路由键。 + """ + return RouteKey(platform=self.platform, account_id=self.account_id, scope=self.scope) + + +@dataclass(frozen=True, slots=True) +class RouteBinding: + """表示一条从路由键到驱动的归属绑定关系。 + + Attributes: + route_key: 该绑定覆盖的路由键。 + driver_id: 拥有或旁路观察该路由的驱动 ID。 + driver_kind: 绑定驱动的类型。 + mode: 绑定模式,例如 active owner 或 shadow observer。 + priority: 当同模式下存在多条绑定时使用的相对优先级。 + metadata: 预留给未来路由策略的额外绑定元数据。 + """ + + route_key: RouteKey + driver_id: str + driver_kind: DriverKind + mode: RouteMode = RouteMode.ACTIVE + priority: int = 0 + metadata: Dict[str, Any] = field(default_factory=dict) + + def __post_init__(self) -> None: + """规范化并校验绑定字段。 + + Raises: + ValueError: 当 ``driver_id`` 规范化后为空时抛出。 + """ + driver_id = str(self.driver_id).strip() + if not driver_id: + raise ValueError("RouteBinding.driver_id 不能为空") + object.__setattr__(self, "driver_id", driver_id) + + +@dataclass(slots=True) +class InboundMessageEnvelope: + """封装一次由驱动产出的规范化入站消息。 + + Attributes: + route_key: 该入站消息解析出的路由键。 + driver_id: 产出该消息的驱动 ID。 + driver_kind: 产出该消息的驱动类型。 + external_message_id: 可选的平台侧消息 ID,用于去重。 + dedupe_key: 可选的显式去重键。当外部消息没有稳定 ``message_id`` 时, + 可由上游驱动提供消息指纹。若这里为空,中间层仍可能继续回退到 + ``session_message.message_id`` 或 ``payload`` 指纹。 + session_message: 可选的、已经完成规范化的 ``SessionMessage`` 对象。 + payload: 可选的原始字典载荷,供延迟转换或调试使用。 + metadata: 额外入站元数据,例如连接信息或追踪上下文。 + """ + + route_key: RouteKey + driver_id: str + driver_kind: DriverKind + external_message_id: Optional[str] = None + dedupe_key: Optional[str] = None + session_message: Optional["SessionMessage"] = None + payload: Optional[Dict[str, Any]] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass(slots=True) +class DeliveryReceipt: + """表示一次出站投递尝试的统一结果。 + + Attributes: + internal_message_id: Broker 跟踪的内部 ``SessionMessage.message_id``。 + route_key: 本次投递使用的路由键。 + status: 规范化后的投递状态。 + driver_id: 实际处理该投递的驱动 ID,可为空。 + driver_kind: 实际处理该投递的驱动类型,可为空。 + external_message_id: 驱动或适配器返回的平台侧消息 ID,可为空。 + error: 投递失败时的错误信息,可为空。 + metadata: 预留给回执、时间戳或平台特有信息的额外元数据。 + """ + + internal_message_id: str + route_key: RouteKey + status: DeliveryStatus + driver_id: Optional[str] = None + driver_kind: Optional[DriverKind] = None + external_message_id: Optional[str] = None + error: Optional[str] = None + metadata: Dict[str, Any] = field(default_factory=dict)