"""NapCat 事件路由协调器。""" from __future__ import annotations from dataclasses import dataclass from typing import Any, Callable, Dict, Mapping, Optional, Protocol import asyncio from ..config import NapCatPluginSettings from ..constants import DEFAULT_HISTORY_RECOVERY_BATCH_SIZE, DEFAULT_HISTORY_RECOVERY_CHECKPOINT_LIMIT from ..services import NapCatChatCheckpoint from ..types import NapCatPayloadDict from .bundle import NapCatRuntimeBundle class _GatewayCapabilityProtocol(Protocol): """插件网关能力协议。""" async def route_message( self, gateway_name: str, message: Dict[str, Any], *, route_metadata: Optional[Dict[str, Any]] = None, external_message_id: str = "", dedupe_key: str = "", ) -> bool: """向 Host 注入一条消息。""" ... @dataclass(frozen=True) class _NapCatChatIdentity: """描述一条 NapCat 消息所属的会话身份。""" chat_type: str chat_id: str class NapCatEventRouter: """协调 NapCat 运行时组件处理各类平台事件。""" def __init__( self, gateway_capability: _GatewayCapabilityProtocol, logger: Any, gateway_name: str, load_settings: Callable[[], NapCatPluginSettings], ) -> None: """初始化事件路由器。 Args: gateway_capability: SDK 提供的消息网关能力对象。 logger: 插件日志对象。 gateway_name: 当前消息网关名称。 load_settings: 返回当前生效插件配置的回调。 """ self._gateway_capability = gateway_capability self._logger = logger self._gateway_name = gateway_name self._load_settings = load_settings self._runtime: Optional[NapCatRuntimeBundle] = None self._recovery_task: Optional[asyncio.Task[None]] = None def bind_runtime(self, runtime: NapCatRuntimeBundle) -> None: """绑定当前路由器使用的运行时依赖。 Args: runtime: 已初始化的运行时组件集合。 """ self._runtime = runtime def reset_caches(self) -> None: """重置与路由相关的短期缓存。""" runtime = self._runtime if runtime is None: return self._cancel_recovery_task() runtime.official_bot_guard.clear_cache() async def handle_transport_payload(self, payload: NapCatPayloadDict) -> None: """处理来自传输层的非 echo 载荷。 Args: payload: NapCat 推送的原始事件数据。 """ post_type = str(payload.get("post_type") or "").strip() if post_type == "message": await self.handle_inbound_message(payload) return if post_type == "notice": await self.handle_notice_event(payload) return if post_type == "meta_event": await self.handle_meta_event(payload) async def handle_inbound_message(self, payload: NapCatPayloadDict) -> bool: """处理单条 NapCat 入站消息并注入 Host。 Args: payload: NapCat / OneBot 推送的原始消息事件。 """ runtime = self._require_runtime() settings = self._load_settings() self_id = str(payload.get("self_id") or "").strip() if self_id: await runtime.runtime_state.report_connected(self_id, settings.napcat_server) sender = payload.get("sender", {}) if not isinstance(sender, Mapping): sender = {} sender_user_id = str(payload.get("user_id") or sender.get("user_id") or "").strip() if not sender_user_id: return False group_id = str(payload.get("group_id") or "").strip() if self_id and sender_user_id == self_id and settings.filters.ignore_self_message: return False if not runtime.chat_filter.is_inbound_chat_allowed(sender_user_id, group_id, settings.chat): return False if await runtime.official_bot_guard.should_reject( sender_user_id=sender_user_id, group_id=group_id, ban_qq_bot=settings.chat.ban_qq_bot, ): return False try: message_dict = await runtime.inbound_codec.build_message_dict(payload, self_id, sender_user_id, sender) except ValueError as exc: self._logger.warning(f"NapCat 入站消息格式不受支持,已丢弃: {exc}") return False route_metadata = self._build_route_metadata(self_id, settings.napcat_server.connection_id) external_message_id = str(payload.get("message_id") or "").strip() accepted = await self._gateway_capability.route_message( gateway_name=self._gateway_name, message=message_dict, route_metadata=route_metadata, external_message_id=external_message_id, dedupe_key=external_message_id, ) if not accepted: self._logger.debug(f"Host 丢弃了 NapCat 入站消息: {external_message_id or '无消息 ID'}") return False await self._record_inbound_checkpoint( payload=payload, self_id=self_id, external_message_id=external_message_id or str(message_dict.get("message_id") or "").strip(), scope=settings.napcat_server.connection_id, ) return True async def handle_notice_event(self, payload: NapCatPayloadDict) -> None: """处理 NapCat ``notice`` 事件并注入 Host。 Args: payload: NapCat 推送的通知事件。 """ runtime = self._require_runtime() settings = self._load_settings() self_id = str(payload.get("self_id") or "").strip() if self_id: await runtime.runtime_state.report_connected(self_id, settings.napcat_server) await runtime.ban_tracker.record_notice(payload) await self.route_notice_payload(payload, self_id, settings.napcat_server.connection_id) async def route_notice_payload( self, payload: NapCatPayloadDict, self_id: str, connection_id: str, ) -> None: """将单条通知载荷转换并注入 Host。 Args: payload: NapCat 通知载荷。 self_id: 当前机器人账号 ID。 connection_id: 当前连接标识。 """ runtime = self._require_runtime() message_dict = await runtime.notice_codec.build_notice_message_dict(payload) if message_dict is None: return route_metadata = self._build_route_metadata(self_id, connection_id) external_message_id = str(payload.get("message_id") or "").strip() dedupe_key = runtime.notice_codec.build_notice_dedupe_key(payload) or "" accepted = await self._gateway_capability.route_message( gateway_name=self._gateway_name, message=message_dict, route_metadata=route_metadata, external_message_id=external_message_id, dedupe_key=dedupe_key, ) if not accepted: self._logger.debug(f"Host 丢弃了 NapCat 通知事件: {external_message_id or dedupe_key or '无消息 ID'}") async def emit_natural_lift_notice(self, payload: NapCatPayloadDict) -> None: """注入一条由适配器合成的自然解除禁言通知。 Args: payload: 合成后的 NapCat 通知载荷。 """ settings = self._load_settings() self_id = str(payload.get("self_id") or "").strip() await self.route_notice_payload(payload, self_id, settings.napcat_server.connection_id) async def handle_meta_event(self, payload: NapCatPayloadDict) -> None: """处理 NapCat ``meta_event`` 事件。 Args: payload: NapCat 推送的元事件。 """ runtime = self._require_runtime() settings = self._load_settings() meta_event_type = str(payload.get("meta_event_type") or "").strip() self_id = str(payload.get("self_id") or "").strip() should_report_connected = False if meta_event_type == "lifecycle": should_report_connected = str(payload.get("sub_type") or "").strip() == "connect" elif meta_event_type == "heartbeat": status = payload.get("status", {}) if not isinstance(status, Mapping): status = {} should_report_connected = bool(status.get("online", False)) and bool(status.get("good", False)) if self_id and should_report_connected: await runtime.runtime_state.report_connected(self_id, settings.napcat_server) elif meta_event_type == "heartbeat" and not should_report_connected: await runtime.runtime_state.report_disconnected() await runtime.heartbeat_monitor.observe_meta_event(payload, settings.napcat_server.heartbeat_interval) await runtime.notice_codec.handle_meta_event(payload) async def bootstrap_adapter_runtime_state(self) -> None: """在连接建立后主动获取账号信息并激活消息网关路由。""" runtime = self._require_runtime() settings = self._load_settings() max_attempts = 3 last_error: Optional[Exception] = None for attempt in range(1, max_attempts + 1): try: login_info = await runtime.query_service.get_login_info() self_id = self._extract_self_id_from_login_response(login_info) await runtime.runtime_state.report_connected(self_id, settings.napcat_server) await runtime.heartbeat_monitor.start(self_id, settings.napcat_server.heartbeat_interval) await runtime.ban_tracker.start() await runtime.history_recovery_store.load() self._schedule_history_recovery(self_id=self_id, scope=settings.napcat_server.connection_id) return except asyncio.CancelledError: raise except Exception as exc: last_error = exc self._logger.warning(f"NapCat 消息网关获取登录信息失败,第 {attempt}/{max_attempts} 次重试: {exc}") if attempt < max_attempts: await asyncio.sleep(1.0) if last_error is not None: self._logger.error(f"NapCat 消息网关未能完成路由激活,连接将保持只接收状态: {last_error}") async def handle_transport_disconnected(self) -> None: """处理传输层断开事件。""" runtime = self._require_runtime() await runtime.heartbeat_monitor.stop() await runtime.ban_tracker.stop() self.reset_caches() await runtime.runtime_state.report_disconnected() async def handle_heartbeat_timeout(self, self_id: str) -> None: """处理 NapCat 心跳长时间未更新的情况。 Args: self_id: 当前机器人账号 ID。 """ runtime = self._require_runtime() if self_id: self._logger.warning(f"NapCat Bot {self_id} 心跳超时,暂时将消息网关标记为未就绪") else: self._logger.warning("NapCat 心跳超时,暂时将消息网关标记为未就绪") await runtime.runtime_state.report_disconnected() def _require_runtime(self) -> NapCatRuntimeBundle: """返回当前已绑定的运行时依赖。 Returns: NapCatRuntimeBundle: 已初始化的运行时依赖。 Raises: RuntimeError: 当运行时尚未绑定时抛出。 """ runtime = self._runtime if runtime is None: raise RuntimeError("NapCat 运行时尚未初始化") return runtime def _schedule_history_recovery(self, self_id: str, scope: str) -> None: """在连接恢复后调度一次历史补拉任务。""" self._cancel_recovery_task() runtime = self._runtime if runtime is None: return self._recovery_task = asyncio.create_task( self._recover_recent_history(self_id=self_id, scope=scope), name="napcat_adapter.history_recovery", ) def _cancel_recovery_task(self) -> None: """取消当前仍在运行的历史补拉任务。""" recovery_task = self._recovery_task self._recovery_task = None if recovery_task is not None and not recovery_task.done(): recovery_task.cancel() async def _recover_recent_history(self, *, self_id: str, scope: str) -> None: """按 checkpoint 列表逐个尝试补拉断线期间遗漏的消息。""" runtime = self._require_runtime() checkpoints = await runtime.history_recovery_store.list_checkpoints( self_id, scope=scope, limit=DEFAULT_HISTORY_RECOVERY_CHECKPOINT_LIMIT, ) if not checkpoints: return recovered_count = 0 for checkpoint in checkpoints: recovered_count += await self._recover_chat_history_from_checkpoint( self_id=self_id, scope=scope, checkpoint=checkpoint, ) if recovered_count > 0: self._logger.info(f"NapCat 历史补拉完成,共补回 {recovered_count} 条消息") async def _recover_chat_history_from_checkpoint( self, *, self_id: str, scope: str, checkpoint: NapCatChatCheckpoint, ) -> int: """针对单个会话执行一次小批量历史补拉。""" runtime = self._require_runtime() history_messages = await self._query_history_messages(checkpoint, limit=DEFAULT_HISTORY_RECOVERY_BATCH_SIZE) if not history_messages: return 0 ordered_messages = sorted( history_messages, key=lambda item: ( self._extract_message_timestamp(item), self._extract_message_seq(item), str(item.get("message_id") or "").strip(), ), ) recovered_count = 0 for history_payload in ordered_messages: external_message_id = str(history_payload.get("message_id") or "").strip() if not external_message_id: continue if external_message_id == checkpoint.last_message_id: continue if await runtime.history_recovery_store.has_recovered_message_seen( account_id=self_id, scope=scope, chat_type=checkpoint.chat_type, chat_id=checkpoint.chat_id, external_message_id=external_message_id, ): continue if not self._is_message_after_checkpoint(history_payload, checkpoint): continue accepted = await self._reinject_history_payload(history_payload, self_id=self_id) if not accepted: continue await runtime.history_recovery_store.mark_recovered_message_seen( account_id=self_id, scope=scope, chat_type=checkpoint.chat_type, chat_id=checkpoint.chat_id, external_message_id=external_message_id, ) recovered_count += 1 return recovered_count async def _query_history_messages( self, checkpoint: NapCatChatCheckpoint, *, limit: int, ) -> list[NapCatPayloadDict]: """查询某个会话在 checkpoint 之后的一小批历史消息。""" runtime = self._require_runtime() payload_collections: list[list[NapCatPayloadDict]] = [] if checkpoint.last_message_seq is not None: payload_collections.append( await self._fetch_history_messages( chat_type=checkpoint.chat_type, chat_id=checkpoint.chat_id, message_seq=checkpoint.last_message_seq, limit=limit, ) ) payload_collections.append( await self._fetch_history_messages( chat_type=checkpoint.chat_type, chat_id=checkpoint.chat_id, message_seq=None, limit=limit, ) ) merged_payloads: list[NapCatPayloadDict] = [] seen_message_ids: set[str] = set() for payloads in payload_collections: for payload in payloads: external_message_id = str(payload.get("message_id") or "").strip() dedupe_key = external_message_id or repr(sorted(payload.items())) if dedupe_key in seen_message_ids: continue seen_message_ids.add(dedupe_key) merged_payloads.append(payload) return merged_payloads async def _fetch_history_messages( self, *, chat_type: str, chat_id: str, message_seq: int | None, limit: int, ) -> list[NapCatPayloadDict]: """调用查询服务获取一批历史消息。""" runtime = self._require_runtime() if chat_type == "group": history_payloads = await runtime.query_service.get_group_message_history( chat_id, message_seq=message_seq, count=limit, reverse_order=False, ) elif chat_type == "private": history_payloads = await runtime.query_service.get_friend_message_history( chat_id, message_seq=message_seq, count=limit, reverse_order=False, ) else: return [] if history_payloads is None: return [] return [dict(payload) for payload in history_payloads if isinstance(payload, Mapping)] async def _reinject_history_payload(self, payload: NapCatPayloadDict, *, self_id: str) -> bool: """将补拉到的历史消息重新送回实时入站路径。""" try: normalized_payload = dict(payload) if self_id and not str(normalized_payload.get("self_id") or "").strip(): normalized_payload["self_id"] = self_id return await self.handle_inbound_message(normalized_payload) except asyncio.CancelledError: raise except Exception as exc: external_message_id = str(payload.get("message_id") or "").strip() or "unknown" self._logger.warning(f"NapCat 历史消息补拉注入失败: message_id={external_message_id} error={exc}") return False async def _record_inbound_checkpoint( self, *, payload: NapCatPayloadDict, self_id: str, external_message_id: str, scope: str, ) -> None: """在消息被 Host 接受后更新该会话的最新 checkpoint。""" runtime = self._require_runtime() chat_identity = self._extract_chat_identity(payload) if chat_identity is None: return await runtime.history_recovery_store.record_checkpoint( account_id=self_id, scope=scope, chat_type=chat_identity.chat_type, chat_id=chat_identity.chat_id, message_id=external_message_id, message_time=self._extract_message_timestamp(payload), message_seq=self._extract_message_seq(payload), ) @staticmethod def _extract_chat_identity(payload: Mapping[str, Any]) -> _NapCatChatIdentity | None: """从 NapCat 载荷中提取会话身份。""" group_id = str(payload.get("group_id") or "").strip() user_id = str(payload.get("user_id") or "").strip() if group_id: return _NapCatChatIdentity(chat_type="group", chat_id=group_id) if user_id: return _NapCatChatIdentity(chat_type="private", chat_id=user_id) return None @staticmethod def _extract_message_seq(payload: Mapping[str, Any]) -> int | None: """从 NapCat 载荷中提取历史接口可复用的消息序号。""" for field_name in ("message_seq", "messageSeq", "msg_seq"): raw_value = payload.get(field_name) if raw_value is None or str(raw_value).strip() == "": continue try: return int(raw_value) except (TypeError, ValueError): continue return None @staticmethod def _extract_message_timestamp(payload: Mapping[str, Any]) -> float: """从 NapCat 载荷中提取消息时间戳。""" raw_timestamp = payload.get("time") if isinstance(raw_timestamp, (int, float)): return float(raw_timestamp) return 0.0 @classmethod def _is_message_after_checkpoint( cls, payload: Mapping[str, Any], checkpoint: NapCatChatCheckpoint, ) -> bool: """判断历史消息是否位于 checkpoint 之后。""" payload_message_id = str(payload.get("message_id") or "").strip() if payload_message_id == checkpoint.last_message_id: return False payload_message_seq = cls._extract_message_seq(payload) if payload_message_seq is not None and checkpoint.last_message_seq is not None: return payload_message_seq > checkpoint.last_message_seq payload_timestamp = cls._extract_message_timestamp(payload) if payload_timestamp != checkpoint.last_message_time: return payload_timestamp > checkpoint.last_message_time return True @staticmethod def _build_route_metadata(self_id: str, connection_id: str) -> Dict[str, Any]: """构造注入 Host 时使用的路由元数据。 Args: self_id: 当前机器人账号 ID。 connection_id: 当前连接标识。 Returns: Dict[str, Any]: 路由元数据字典。 """ route_metadata: Dict[str, Any] = {} if self_id: route_metadata["self_id"] = self_id if connection_id: route_metadata["connection_id"] = connection_id return route_metadata @staticmethod def _extract_self_id_from_login_response(response: Optional[Dict[str, Any]]) -> str: """从 ``get_login_info`` 查询结果中提取当前账号 ID。 Args: response: NapCat 返回的登录信息字典。 Returns: str: 规范化后的账号 ID 字符串。 Raises: ValueError: 当响应中缺少有效账号 ID 时抛出。 """ if not isinstance(response, Mapping): raise ValueError("get_login_info 响应缺少 data 字段") self_id = str(response.get("user_id") or "").strip() if not self_id: raise ValueError("get_login_info 响应缺少有效的 user_id") return self_id