feat: Enhance authentication mechanism to support token retrieval from both Cookie and Header

- Added a new auth module to manage authentication-related functions.
- Updated existing routes in expression_routes, person_routes, plugin_routes, and routes to utilize the new authentication methods.
- Implemented CORS middleware in webui_server for development environment support.
- Introduced functions to set and clear authentication cookies.
- Enhanced token verification to prioritize Cookie over Header for improved security and flexibility.
This commit is contained in:
墨梓柒
2025-11-30 15:53:39 +08:00
parent fdc0a87c31
commit c790dcb705
7 changed files with 429 additions and 148 deletions

View File

@@ -1,10 +1,11 @@
"""WebUI API 路由"""
from fastapi import APIRouter, HTTPException, Header
from fastapi import APIRouter, HTTPException, Header, Response, Request, Cookie
from pydantic import BaseModel, Field
from typing import Optional
from src.common.logger import get_logger
from .token_manager import get_token_manager
from .auth import set_auth_cookie, clear_auth_cookie
from .config_routes import router as config_router
from .statistics_routes import router as statistics_router
from .person_routes import router as person_router
@@ -51,6 +52,7 @@ class TokenVerifyResponse(BaseModel):
valid: bool = Field(..., description="Token 是否有效")
message: str = Field(..., description="验证结果消息")
is_first_setup: bool = Field(False, description="是否为首次设置")
class TokenUpdateRequest(BaseModel):
@@ -102,22 +104,27 @@ async def health_check():
@router.post("/auth/verify", response_model=TokenVerifyResponse)
async def verify_token(request: TokenVerifyRequest):
async def verify_token(request: TokenVerifyRequest, response: Response):
"""
验证访问令牌
验证访问令牌,验证成功后设置 HttpOnly Cookie
Args:
request: 包含 token 的验证请求
response: FastAPI Response 对象
Returns:
验证结果
验证结果(包含首次配置状态)
"""
try:
token_manager = get_token_manager()
is_valid = token_manager.verify_token(request.token)
if is_valid:
return TokenVerifyResponse(valid=True, message="Token 验证成功")
# 设置 HttpOnly Cookie
set_auth_cookie(response, request.token)
# 同时返回首次配置状态,避免额外请求
is_first_setup = token_manager.is_first_setup()
return TokenVerifyResponse(valid=True, message="Token 验证成功", is_first_setup=is_first_setup)
else:
return TokenVerifyResponse(valid=False, message="Token 无效或已过期")
except Exception as e:
@@ -125,24 +132,86 @@ async def verify_token(request: TokenVerifyRequest):
raise HTTPException(status_code=500, detail="Token 验证失败") from e
@router.post("/auth/logout")
async def logout(response: Response):
"""
登出并清除认证 Cookie
Args:
response: FastAPI Response 对象
Returns:
登出结果
"""
clear_auth_cookie(response)
return {"success": True, "message": "已成功登出"}
@router.get("/auth/check")
async def check_auth_status(
request: Request,
maibot_session: Optional[str] = Cookie(None),
authorization: Optional[str] = Header(None),
):
"""
检查当前认证状态(用于前端判断是否已登录)
Returns:
认证状态
"""
try:
token = None
# 优先从 Cookie 获取
if maibot_session:
token = maibot_session
# 其次从 Header 获取
elif authorization and authorization.startswith("Bearer "):
token = authorization.replace("Bearer ", "")
if not token:
return {"authenticated": False}
token_manager = get_token_manager()
if token_manager.verify_token(token):
return {"authenticated": True}
else:
return {"authenticated": False}
except Exception:
return {"authenticated": False}
@router.post("/auth/update", response_model=TokenUpdateResponse)
async def update_token(request: TokenUpdateRequest, authorization: Optional[str] = Header(None)):
async def update_token(
request: TokenUpdateRequest,
response: Response,
req: Request,
maibot_session: Optional[str] = Cookie(None),
authorization: Optional[str] = Header(None),
):
"""
更新访问令牌(需要当前有效的 token
Args:
request: 包含新 token 的更新请求
response: FastAPI Response 对象
maibot_session: Cookie 中的 token
authorization: Authorization header (Bearer token)
Returns:
更新结果
"""
try:
# 验证当前 token
if not authorization or not authorization.startswith("Bearer "):
# 验证当前 token(优先 Cookie其次 Header
current_token = None
if maibot_session:
current_token = maibot_session
elif authorization and authorization.startswith("Bearer "):
current_token = authorization.replace("Bearer ", "")
if not current_token:
raise HTTPException(status_code=401, detail="未提供有效的认证信息")
current_token = authorization.replace("Bearer ", "")
token_manager = get_token_manager()
if not token_manager.verify_token(current_token):
@@ -150,6 +219,10 @@ async def update_token(request: TokenUpdateRequest, authorization: Optional[str]
# 更新 token
success, message = token_manager.update_token(request.new_token)
# 如果更新成功,更新 Cookie
if success:
set_auth_cookie(response, request.new_token)
return TokenUpdateResponse(success=success, message=message)
except HTTPException:
@@ -160,22 +233,34 @@ async def update_token(request: TokenUpdateRequest, authorization: Optional[str]
@router.post("/auth/regenerate", response_model=TokenRegenerateResponse)
async def regenerate_token(authorization: Optional[str] = Header(None)):
async def regenerate_token(
response: Response,
request: Request,
maibot_session: Optional[str] = Cookie(None),
authorization: Optional[str] = Header(None),
):
"""
重新生成访问令牌(需要当前有效的 token
Args:
response: FastAPI Response 对象
maibot_session: Cookie 中的 token
authorization: Authorization header (Bearer token)
Returns:
新生成的 token
"""
try:
# 验证当前 token
if not authorization or not authorization.startswith("Bearer "):
raise HTTPException(status_code=401, detail="未提供有效的认证信息")
# 验证当前 token(优先 Cookie其次 Header
current_token = None
if maibot_session:
current_token = maibot_session
elif authorization and authorization.startswith("Bearer "):
current_token = authorization.replace("Bearer ", "")
current_token = authorization.replace("Bearer ", "")
if not current_token:
raise HTTPException(status_code=401, detail="未提供有效的认证信息")
token_manager = get_token_manager()
if not token_manager.verify_token(current_token):
@@ -183,6 +268,9 @@ async def regenerate_token(authorization: Optional[str] = Header(None)):
# 重新生成 token
new_token = token_manager.regenerate_token()
# 更新 Cookie
set_auth_cookie(response, new_token)
return TokenRegenerateResponse(success=True, token=new_token, message="Token 已重新生成")
except HTTPException:
@@ -193,22 +281,32 @@ async def regenerate_token(authorization: Optional[str] = Header(None)):
@router.get("/setup/status", response_model=FirstSetupStatusResponse)
async def get_setup_status(authorization: Optional[str] = Header(None)):
async def get_setup_status(
request: Request,
maibot_session: Optional[str] = Cookie(None),
authorization: Optional[str] = Header(None),
):
"""
获取首次配置状态
Args:
maibot_session: Cookie 中的 token
authorization: Authorization header (Bearer token)
Returns:
首次配置状态
"""
try:
# 验证 token
if not authorization or not authorization.startswith("Bearer "):
# 验证 token(优先 Cookie其次 Header
current_token = None
if maibot_session:
current_token = maibot_session
elif authorization and authorization.startswith("Bearer "):
current_token = authorization.replace("Bearer ", "")
if not current_token:
raise HTTPException(status_code=401, detail="未提供有效的认证信息")
current_token = authorization.replace("Bearer ", "")
token_manager = get_token_manager()
if not token_manager.verify_token(current_token):
@@ -226,22 +324,32 @@ async def get_setup_status(authorization: Optional[str] = Header(None)):
@router.post("/setup/complete", response_model=CompleteSetupResponse)
async def complete_setup(authorization: Optional[str] = Header(None)):
async def complete_setup(
request: Request,
maibot_session: Optional[str] = Cookie(None),
authorization: Optional[str] = Header(None),
):
"""
标记首次配置完成
Args:
maibot_session: Cookie 中的 token
authorization: Authorization header (Bearer token)
Returns:
完成结果
"""
try:
# 验证 token
if not authorization or not authorization.startswith("Bearer "):
# 验证 token(优先 Cookie其次 Header
current_token = None
if maibot_session:
current_token = maibot_session
elif authorization and authorization.startswith("Bearer "):
current_token = authorization.replace("Bearer ", "")
if not current_token:
raise HTTPException(status_code=401, detail="未提供有效的认证信息")
current_token = authorization.replace("Bearer ", "")
token_manager = get_token_manager()
if not token_manager.verify_token(current_token):
@@ -259,22 +367,32 @@ async def complete_setup(authorization: Optional[str] = Header(None)):
@router.post("/setup/reset", response_model=ResetSetupResponse)
async def reset_setup(authorization: Optional[str] = Header(None)):
async def reset_setup(
request: Request,
maibot_session: Optional[str] = Cookie(None),
authorization: Optional[str] = Header(None),
):
"""
重置首次配置状态,允许重新进入配置向导
Args:
maibot_session: Cookie 中的 token
authorization: Authorization header (Bearer token)
Returns:
重置结果
"""
try:
# 验证 token
if not authorization or not authorization.startswith("Bearer "):
# 验证 token(优先 Cookie其次 Header
current_token = None
if maibot_session:
current_token = maibot_session
elif authorization and authorization.startswith("Bearer "):
current_token = authorization.replace("Bearer ", "")
if not current_token:
raise HTTPException(status_code=401, detail="未提供有效的认证信息")
current_token = authorization.replace("Bearer ", "")
token_manager = get_token_manager()
if not token_manager.verify_token(current_token):