feat:添加规划器和回复器监控API路由

This commit is contained in:
墨梓柒
2025-12-23 23:13:05 +08:00
parent 839a42578c
commit ade25d4717
3 changed files with 539 additions and 0 deletions

279
src/webui/api/planner.py Normal file
View File

@@ -0,0 +1,279 @@
"""
规划器监控API
提供规划器日志数据的查询接口
性能优化:
1. 聊天摘要只统计文件数量和最新时间戳,不读取文件内容
2. 日志列表使用文件名解析时间戳,只在需要时读取完整内容
3. 详情按需加载
"""
import json
from pathlib import Path
from typing import List, Dict, Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
router = APIRouter(prefix="/api/planner", tags=["planner"])
# 规划器日志目录
PLAN_LOG_DIR = Path("logs/plan")
class ChatSummary(BaseModel):
"""聊天摘要 - 轻量级,不读取文件内容"""
chat_id: str
plan_count: int
latest_timestamp: float
latest_filename: str
class PlanLogSummary(BaseModel):
"""规划日志摘要"""
chat_id: str
timestamp: float
filename: str
action_count: int
total_plan_ms: float
llm_duration_ms: float
reasoning_preview: str
class PlanLogDetail(BaseModel):
"""规划日志详情"""
type: str
chat_id: str
timestamp: float
prompt: str
reasoning: str
raw_output: str
actions: List[Dict]
timing: Dict
extra: Optional[Dict] = None
class PlannerOverview(BaseModel):
"""规划器总览 - 轻量级统计"""
total_chats: int
total_plans: int
chats: List[ChatSummary]
class PaginatedChatLogs(BaseModel):
"""分页的聊天日志列表"""
data: List[PlanLogSummary]
total: int
page: int
page_size: int
chat_id: str
def parse_timestamp_from_filename(filename: str) -> float:
"""从文件名解析时间戳: 1766497488220_af92bdb1.json -> 1766497488.220"""
try:
timestamp_str = filename.split('_')[0]
# 时间戳是毫秒级,需要转换为秒
return float(timestamp_str) / 1000
except (ValueError, IndexError):
return 0
@router.get("/overview", response_model=PlannerOverview)
async def get_planner_overview():
"""
获取规划器总览 - 轻量级接口
只统计文件数量,不读取文件内容
"""
if not PLAN_LOG_DIR.exists():
return PlannerOverview(total_chats=0, total_plans=0, chats=[])
chats = []
total_plans = 0
for chat_dir in PLAN_LOG_DIR.iterdir():
if not chat_dir.is_dir():
continue
# 只统计json文件数量
json_files = list(chat_dir.glob("*.json"))
plan_count = len(json_files)
total_plans += plan_count
if plan_count == 0:
continue
# 从文件名获取最新时间戳
latest_file = max(json_files, key=lambda f: parse_timestamp_from_filename(f.name))
latest_timestamp = parse_timestamp_from_filename(latest_file.name)
chats.append(ChatSummary(
chat_id=chat_dir.name,
plan_count=plan_count,
latest_timestamp=latest_timestamp,
latest_filename=latest_file.name
))
# 按最新时间戳排序
chats.sort(key=lambda x: x.latest_timestamp, reverse=True)
return PlannerOverview(
total_chats=len(chats),
total_plans=total_plans,
chats=chats
)
@router.get("/chat/{chat_id}/logs", response_model=PaginatedChatLogs)
async def get_chat_plan_logs(
chat_id: str,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100)
):
"""
获取指定聊天的规划日志列表(分页)
需要读取文件内容获取摘要信息
"""
chat_dir = PLAN_LOG_DIR / chat_id
if not chat_dir.exists():
return PaginatedChatLogs(
data=[], total=0, page=page, page_size=page_size, chat_id=chat_id
)
# 先获取所有文件并按时间戳排序
json_files = list(chat_dir.glob("*.json"))
json_files.sort(key=lambda f: parse_timestamp_from_filename(f.name), reverse=True)
total = len(json_files)
# 分页 - 只读取当前页的文件
offset = (page - 1) * page_size
page_files = json_files[offset:offset + page_size]
logs = []
for log_file in page_files:
try:
with open(log_file, 'r', encoding='utf-8') as f:
data = json.load(f)
reasoning = data.get('reasoning', '')
logs.append(PlanLogSummary(
chat_id=data.get('chat_id', chat_id),
timestamp=data.get('timestamp', parse_timestamp_from_filename(log_file.name)),
filename=log_file.name,
action_count=len(data.get('actions', [])),
total_plan_ms=data.get('timing', {}).get('total_plan_ms', 0),
llm_duration_ms=data.get('timing', {}).get('llm_duration_ms', 0),
reasoning_preview=reasoning[:100] if reasoning else ''
))
except Exception:
# 文件读取失败时使用文件名信息
logs.append(PlanLogSummary(
chat_id=chat_id,
timestamp=parse_timestamp_from_filename(log_file.name),
filename=log_file.name,
action_count=0,
total_plan_ms=0,
llm_duration_ms=0,
reasoning_preview='[读取失败]'
))
return PaginatedChatLogs(
data=logs,
total=total,
page=page,
page_size=page_size,
chat_id=chat_id
)
@router.get("/log/{chat_id}/{filename}", response_model=PlanLogDetail)
async def get_log_detail(chat_id: str, filename: str):
"""获取规划日志详情 - 按需加载完整内容"""
log_file = PLAN_LOG_DIR / chat_id / filename
if not log_file.exists():
raise HTTPException(status_code=404, detail="日志文件不存在")
try:
with open(log_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return PlanLogDetail(**data)
except Exception as e:
raise HTTPException(status_code=500, detail=f"读取日志失败: {str(e)}")
# ========== 兼容旧接口 ==========
@router.get("/stats")
async def get_planner_stats():
"""获取规划器统计信息 - 兼容旧接口"""
overview = await get_planner_overview()
# 获取最近10条计划的摘要
recent_plans = []
for chat in overview.chats[:5]: # 从最近5个聊天中获取
try:
chat_logs = await get_chat_plan_logs(chat.chat_id, page=1, page_size=2)
recent_plans.extend(chat_logs.data)
except Exception:
continue
# 按时间排序取前10
recent_plans.sort(key=lambda x: x.timestamp, reverse=True)
recent_plans = recent_plans[:10]
return {
"total_chats": overview.total_chats,
"total_plans": overview.total_plans,
"avg_plan_time_ms": 0,
"avg_llm_time_ms": 0,
"recent_plans": recent_plans
}
@router.get("/chats")
async def get_chat_list():
"""获取所有聊天ID列表 - 兼容旧接口"""
overview = await get_planner_overview()
return [chat.chat_id for chat in overview.chats]
@router.get("/all-logs")
async def get_all_logs(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100)
):
"""获取所有规划日志 - 兼容旧接口"""
if not PLAN_LOG_DIR.exists():
return {"data": [], "total": 0, "page": page, "page_size": page_size}
# 收集所有文件
all_files = []
for chat_dir in PLAN_LOG_DIR.iterdir():
if chat_dir.is_dir():
for log_file in chat_dir.glob("*.json"):
all_files.append((chat_dir.name, log_file))
# 按时间戳排序
all_files.sort(key=lambda x: parse_timestamp_from_filename(x[1].name), reverse=True)
total = len(all_files)
offset = (page - 1) * page_size
page_files = all_files[offset:offset + page_size]
logs = []
for chat_id, log_file in page_files:
try:
with open(log_file, 'r', encoding='utf-8') as f:
data = json.load(f)
reasoning = data.get('reasoning', '')
logs.append({
"chat_id": data.get('chat_id', chat_id),
"timestamp": data.get('timestamp', parse_timestamp_from_filename(log_file.name)),
"filename": log_file.name,
"action_count": len(data.get('actions', [])),
"total_plan_ms": data.get('timing', {}).get('total_plan_ms', 0),
"llm_duration_ms": data.get('timing', {}).get('llm_duration_ms', 0),
"reasoning_preview": reasoning[:100] if reasoning else ''
})
except Exception:
continue
return {"data": logs, "total": total, "page": page, "page_size": page_size}

252
src/webui/api/replier.py Normal file
View File

@@ -0,0 +1,252 @@
"""
回复器监控API
提供回复器日志数据的查询接口
性能优化:
1. 聊天摘要只统计文件数量和最新时间戳,不读取文件内容
2. 日志列表使用文件名解析时间戳,只在需要时读取完整内容
3. 详情按需加载
"""
import json
from pathlib import Path
from typing import List, Dict, Optional
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel
router = APIRouter(prefix="/api/replier", tags=["replier"])
# 回复器日志目录
REPLY_LOG_DIR = Path("logs/reply")
class ReplierChatSummary(BaseModel):
"""聊天摘要 - 轻量级,不读取文件内容"""
chat_id: str
reply_count: int
latest_timestamp: float
latest_filename: str
class ReplyLogSummary(BaseModel):
"""回复日志摘要"""
chat_id: str
timestamp: float
filename: str
model: str
success: bool
llm_ms: float
overall_ms: float
output_preview: str
class ReplyLogDetail(BaseModel):
"""回复日志详情"""
type: str
chat_id: str
timestamp: float
prompt: str
output: str
processed_output: List[str]
model: str
reasoning: str
think_level: int
timing: Dict
error: Optional[str] = None
success: bool
class ReplierOverview(BaseModel):
"""回复器总览 - 轻量级统计"""
total_chats: int
total_replies: int
chats: List[ReplierChatSummary]
class PaginatedReplyLogs(BaseModel):
"""分页的回复日志列表"""
data: List[ReplyLogSummary]
total: int
page: int
page_size: int
chat_id: str
def parse_timestamp_from_filename(filename: str) -> float:
"""从文件名解析时间戳: 1766497488220_af92bdb1.json -> 1766497488.220"""
try:
timestamp_str = filename.split('_')[0]
# 时间戳是毫秒级,需要转换为秒
return float(timestamp_str) / 1000
except (ValueError, IndexError):
return 0
@router.get("/overview", response_model=ReplierOverview)
async def get_replier_overview():
"""
获取回复器总览 - 轻量级接口
只统计文件数量,不读取文件内容
"""
if not REPLY_LOG_DIR.exists():
return ReplierOverview(total_chats=0, total_replies=0, chats=[])
chats = []
total_replies = 0
for chat_dir in REPLY_LOG_DIR.iterdir():
if not chat_dir.is_dir():
continue
# 只统计json文件数量
json_files = list(chat_dir.glob("*.json"))
reply_count = len(json_files)
total_replies += reply_count
if reply_count == 0:
continue
# 从文件名获取最新时间戳
latest_file = max(json_files, key=lambda f: parse_timestamp_from_filename(f.name))
latest_timestamp = parse_timestamp_from_filename(latest_file.name)
chats.append(ReplierChatSummary(
chat_id=chat_dir.name,
reply_count=reply_count,
latest_timestamp=latest_timestamp,
latest_filename=latest_file.name
))
# 按最新时间戳排序
chats.sort(key=lambda x: x.latest_timestamp, reverse=True)
return ReplierOverview(
total_chats=len(chats),
total_replies=total_replies,
chats=chats
)
@router.get("/chat/{chat_id}/logs", response_model=PaginatedReplyLogs)
async def get_chat_reply_logs(
chat_id: str,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100)
):
"""
获取指定聊天的回复日志列表(分页)
需要读取文件内容获取摘要信息
"""
chat_dir = REPLY_LOG_DIR / chat_id
if not chat_dir.exists():
return PaginatedReplyLogs(
data=[], total=0, page=page, page_size=page_size, chat_id=chat_id
)
# 先获取所有文件并按时间戳排序
json_files = list(chat_dir.glob("*.json"))
json_files.sort(key=lambda f: parse_timestamp_from_filename(f.name), reverse=True)
total = len(json_files)
# 分页 - 只读取当前页的文件
offset = (page - 1) * page_size
page_files = json_files[offset:offset + page_size]
logs = []
for log_file in page_files:
try:
with open(log_file, 'r', encoding='utf-8') as f:
data = json.load(f)
output = data.get('output', '')
logs.append(ReplyLogSummary(
chat_id=data.get('chat_id', chat_id),
timestamp=data.get('timestamp', parse_timestamp_from_filename(log_file.name)),
filename=log_file.name,
model=data.get('model', ''),
success=data.get('success', True),
llm_ms=data.get('timing', {}).get('llm_ms', 0),
overall_ms=data.get('timing', {}).get('overall_ms', 0),
output_preview=output[:100] if output else ''
))
except Exception:
# 文件读取失败时使用文件名信息
logs.append(ReplyLogSummary(
chat_id=chat_id,
timestamp=parse_timestamp_from_filename(log_file.name),
filename=log_file.name,
model='',
success=False,
llm_ms=0,
overall_ms=0,
output_preview='[读取失败]'
))
return PaginatedReplyLogs(
data=logs,
total=total,
page=page,
page_size=page_size,
chat_id=chat_id
)
@router.get("/log/{chat_id}/{filename}", response_model=ReplyLogDetail)
async def get_reply_log_detail(chat_id: str, filename: str):
"""获取回复日志详情 - 按需加载完整内容"""
log_file = REPLY_LOG_DIR / chat_id / filename
if not log_file.exists():
raise HTTPException(status_code=404, detail="日志文件不存在")
try:
with open(log_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return ReplyLogDetail(
type=data.get('type', 'reply'),
chat_id=data.get('chat_id', chat_id),
timestamp=data.get('timestamp', 0),
prompt=data.get('prompt', ''),
output=data.get('output', ''),
processed_output=data.get('processed_output', []),
model=data.get('model', ''),
reasoning=data.get('reasoning', ''),
think_level=data.get('think_level', 0),
timing=data.get('timing', {}),
error=data.get('error'),
success=data.get('success', True)
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"读取日志失败: {str(e)}")
# ========== 兼容接口 ==========
@router.get("/stats")
async def get_replier_stats():
"""获取回复器统计信息"""
overview = await get_replier_overview()
# 获取最近10条回复的摘要
recent_replies = []
for chat in overview.chats[:5]: # 从最近5个聊天中获取
try:
chat_logs = await get_chat_reply_logs(chat.chat_id, page=1, page_size=2)
recent_replies.extend(chat_logs.data)
except Exception:
continue
# 按时间排序取前10
recent_replies.sort(key=lambda x: x.timestamp, reverse=True)
recent_replies = recent_replies[:10]
return {
"total_chats": overview.total_chats,
"total_replies": overview.total_replies,
"recent_replies": recent_replies
}
@router.get("/chats")
async def get_replier_chat_list():
"""获取所有聊天ID列表"""
overview = await get_replier_overview()
return [chat.chat_id for chat in overview.chats]

View File

@@ -168,12 +168,20 @@ class WebUIServer:
# 导入本地聊天室路由
from src.webui.chat_routes import router as chat_router
# 导入规划器监控路由
from src.webui.api.planner import router as planner_router
# 导入回复器监控路由
from src.webui.api.replier import router as replier_router
# 注册路由
self.app.include_router(webui_router)
self.app.include_router(logs_router)
self.app.include_router(knowledge_router)
self.app.include_router(chat_router)
self.app.include_router(planner_router)
self.app.include_router(replier_router)
logger.info("✅ WebUI API 路由已注册")
except Exception as e: