refcator: 重命名policy为authorization;移除envelope的generation(runner不再重载);

This commit is contained in:
UnCLAS-Prommer
2026-03-17 01:30:31 +08:00
committed by DrSmoothl
parent e1b2ecb5b1
commit 49b620219d
4 changed files with 69 additions and 108 deletions

View File

@@ -0,0 +1,61 @@
"""授权管理器
负责管理插件的能力授权以及校验
每个插件在 manifest 中声明能力需求Host 启动时签发能力令牌。
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set, Tuple
@dataclass
class CapabilityPermissionToken:
"""能力令牌"""
plugin_id: str
capabilities: Set[str] = field(default_factory=set)
class AuthorizationManager:
"""授权管理器
管理所有插件的能力令牌,提供授权校验。
"""
def __init__(self) -> None:
self._permission_tokens: Dict[str, CapabilityPermissionToken] = {}
def register_plugin(self, plugin_id: str, capabilities: List[str]) -> CapabilityPermissionToken:
"""为插件签发能力令牌"""
token = CapabilityPermissionToken(plugin_id=plugin_id, capabilities=set(capabilities))
self._permission_tokens[plugin_id] = token
return token
def revoke_permission_token(self, plugin_id: str):
"""移除插件的能力令牌。"""
self._permission_tokens.pop(plugin_id, None)
def clear(self) -> None:
"""清空所有能力令牌。"""
self._permission_tokens.clear()
def check_capability(self, plugin_id: str, capability: str) -> Tuple[bool, str]:
"""检查插件是否有权调用某项能力
Returns:
return (bool, str): (是否有此能力, 原因)
"""
token = self._permission_tokens.get(plugin_id)
if not token:
return False, f"插件 {plugin_id} 未注册能力令牌"
if capability not in token.capabilities:
return False, f"插件 {plugin_id} 未获授权能力: {capability}"
return True, ""
def get_token(self, plugin_id: str) -> Optional[CapabilityPermissionToken]:
"""获取插件的能力令牌"""
return self._permission_tokens.get(plugin_id)
def list_plugins(self) -> List[str]:
"""列出所有已注册的插件"""
return list(self._permission_tokens.keys())

View File

@@ -4,10 +4,9 @@ Host 端实现的能力服务,处理来自插件的 cap.* 请求。
每个能力方法被注册到 RPC Server接收 Runner 转发的请求并执行实际操作。
"""
from typing import Any, Awaitable, Callable, Dict, List
from typing import Any, Callable, Dict, List, Coroutine, TYPE_CHECKING
from src.common.logger import get_logger
from src.plugin_runtime.host.policy_engine import PolicyEngine
from src.plugin_runtime.protocol.envelope import (
CapabilityRequestPayload,
CapabilityResponsePayload,
@@ -15,10 +14,13 @@ from src.plugin_runtime.protocol.envelope import (
)
from src.plugin_runtime.protocol.errors import ErrorCode, RPCError
if TYPE_CHECKING:
from src.plugin_runtime.host.authorization import AuthorizationManager
logger = get_logger("plugin_runtime.host.capability_service")
# 能力实现函数类型: (plugin_id, capability, args) -> result
CapabilityImpl = Callable[[str, str, Dict[str, Any]], Awaitable[Any]]
CapabilityImpl = Callable[[str, str, Dict[str, Any]], Coroutine[Any, Any, Any]]
class CapabilityService:
@@ -31,8 +33,8 @@ class CapabilityService:
4. 执行实际操作并返回结果
"""
def __init__(self, policy_engine: PolicyEngine) -> None:
self._policy = policy_engine
def __init__(self, authorization: "AuthorizationManager") -> None:
self._authorization = authorization
# capability_name -> implementation
self._implementations: Dict[str, CapabilityImpl] = {}
@@ -65,7 +67,7 @@ class CapabilityService:
capability = req.capability
# 1. 权限校验
allowed, reason = self._policy.check_capability(plugin_id, capability, envelope.generation)
allowed, reason = self._authorization.check_capability(plugin_id, capability)
if not allowed:
error_code = (
ErrorCode.E_GENERATION_MISMATCH if "generation 不匹配" in reason else ErrorCode.E_CAPABILITY_DENIED

View File

@@ -1,97 +0,0 @@
"""策略引擎
负责能力授权校验。
每个插件在 manifest 中声明能力需求Host 启动时签发能力令牌。
"""
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Set, Tuple
@dataclass
class CapabilityToken:
"""能力令牌"""
plugin_id: str
generation: int
capabilities: Set[str] = field(default_factory=set)
class PolicyEngine:
"""策略引擎
管理所有插件的能力令牌,提供授权校验。
"""
def __init__(self) -> None:
self._tokens: Dict[str, Dict[int, CapabilityToken]] = {}
def register_plugin(
self,
plugin_id: str,
generation: int,
capabilities: List[str],
) -> CapabilityToken:
"""为插件签发能力令牌"""
token = CapabilityToken(
plugin_id=plugin_id,
generation=generation,
capabilities=set(capabilities),
)
self._tokens.setdefault(plugin_id, {})[generation] = token
return token
def revoke_plugin(self, plugin_id: str, generation: Optional[int] = None) -> None:
"""撤销插件的能力令牌。"""
if generation is None:
self._tokens.pop(plugin_id, None)
return
generations = self._tokens.get(plugin_id)
if generations is None:
return
generations.pop(generation, None)
if not generations:
self._tokens.pop(plugin_id, None)
def clear(self) -> None:
"""清空所有能力令牌。"""
self._tokens.clear()
def check_capability(self, plugin_id: str, capability: str, generation: Optional[int] = None) -> Tuple[bool, str]:
"""检查插件是否有权调用某项能力
Returns:
(allowed, reason)
"""
generations = self._tokens.get(plugin_id)
if not generations:
return False, f"插件 {plugin_id} 未注册能力令牌"
if generation is None:
token = generations[max(generations)]
else:
token = generations.get(generation)
if token is None:
active_generation = max(generations)
return False, f"插件 {plugin_id} generation 不匹配: {generation} != {active_generation}"
if capability not in token.capabilities:
return False, f"插件 {plugin_id} 未获授权能力: {capability}"
if generation is not None and token.generation != generation:
return False, f"插件 {plugin_id} generation 不匹配: {generation} != {token.generation}"
return True, ""
def get_token(self, plugin_id: str) -> Optional[CapabilityToken]:
"""获取插件的能力令牌"""
generations = self._tokens.get(plugin_id)
if not generations:
return None
return generations[max(generations)]
def list_plugins(self) -> List[str]:
"""列出所有已注册的插件"""
return list(self._tokens.keys())

View File

@@ -65,8 +65,6 @@ class Envelope(BaseModel):
"""发送时间戳 (ms)"""
timeout_ms: int = Field(default=30000, description="相对超时 (ms)")
"""相对超时 (ms)"""
generation: int = Field(default=0, description="Runner generation 编号")
"""Runner generation 编号"""
payload: Dict[str, Any] = Field(default_factory=dict, description="业务数据")
"""业务数据"""
error: Optional[Dict[str, Any]] = Field(default=None, description="错误信息 (仅 response)")
@@ -91,7 +89,6 @@ class Envelope(BaseModel):
message_type=MessageType.RESPONSE,
method=self.method,
plugin_id=self.plugin_id,
generation=self.generation,
payload=payload or {},
error=error,
)
@@ -126,8 +123,6 @@ class HelloResponsePayload(BaseModel):
"""是否接受连接"""
host_version: str = Field(default="", description="Host 版本号")
"""Host 版本号"""
assigned_generation: int = Field(default=0, description="分配的 generation 编号")
"""分配的 generation 编号"""
reason: str = Field(default="", description="拒绝原因 (若 accepted=False)")
"""拒绝原因 (若 `accepted`=`False`)"""