Refactor API response models and improve documentation

- Updated response model functions in emoji, expression, jargon, and person routers to include detailed docstrings.
- Enhanced the clarity of function signatures by specifying return types.
- Removed redundant comments and improved code readability.
- Added error handling and logging improvements in various endpoints.
- Deleted outdated code documentation file.
This commit is contained in:
DrSmoothl
2026-04-22 23:33:57 +08:00
parent 1716272b25
commit c0be64f268
7 changed files with 538 additions and 299 deletions

View File

@@ -1,14 +1,15 @@
"""人物信息管理 API 路由"""
import json
from datetime import datetime
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy import case
from sqlmodel import col, delete, select
import json
from src.common.database.database import get_db_session
from src.common.database.database_model import PersonInfo
from src.common.logger import get_logger
@@ -97,7 +98,14 @@ class BatchDeleteResponse(BaseModel):
def parse_group_nick_name(group_nick_name_str: Optional[str]) -> Optional[List[Dict[str, str]]]:
"""解析群昵称 JSON 字符串"""
"""解析群昵称 JSON 字符串
Args:
group_nick_name_str: 数据库中保存的群昵称 JSON 字符串。
Returns:
Optional[List[Dict[str, str]]]: 解析后的群昵称列表,解析失败时返回 None。
"""
if not group_nick_name_str:
return None
try:
@@ -107,7 +115,14 @@ def parse_group_nick_name(group_nick_name_str: Optional[str]) -> Optional[List[D
def person_to_response(person: PersonInfo) -> PersonInfoResponse:
""" PersonInfo 模型转换为响应对象"""
"""人物信息模型转换为响应对象
Args:
person: 数据库中的人物信息记录。
Returns:
PersonInfoResponse: WebUI 可直接序列化的人物信息。
"""
know_since = person.first_known_time.timestamp() if person.first_known_time else None
last_know = person.last_known_time.timestamp() if person.last_known_time else None
return PersonInfoResponse(
@@ -134,19 +149,18 @@ async def get_person_list(
search: Optional[str] = Query(None, description="搜索关键词"),
is_known: Optional[bool] = Query(None, description="是否已认识筛选"),
platform: Optional[str] = Query(None, description="平台筛选"),
):
"""
获取人物信息列表
) -> PersonListResponse:
"""获取人物信息列表。
Args:
page: 页码 (从 1 开始)
page_size: 每页数量 (1-100)
search: 搜索关键词 (匹配 person_name, nickname, user_id)
is_known: 是否已认识筛选
platform: 平台筛选
page: 页码从 1 开始
page_size: 每页数量,范围为 1-100
search: 搜索关键词,用于匹配人物名称、昵称和用户 ID。
is_known: 是否已认识筛选条件。
platform: 平台筛选条件。
Returns:
人物信息列表
PersonListResponse: 分页后的人物信息列表
"""
try:
# 构建查询
@@ -193,8 +207,7 @@ async def get_person_list(
if platform:
count_statement = count_statement.where(col(PersonInfo.platform) == platform)
total = len(session.exec(count_statement).all())
data = [person_to_response(person) for person in persons]
data = [person_to_response(person) for person in persons]
return PersonListResponse(success=True, total=total, page=page, page_size=page_size, data=data)
@@ -206,25 +219,26 @@ async def get_person_list(
@router.get("/{person_id}", response_model=PersonDetailResponse)
async def get_person_detail(person_id: str):
"""
获取人物详细信息
async def get_person_detail(person_id: str) -> PersonDetailResponse:
"""获取人物详细信息。
Args:
person_id: 人物唯一 ID
person_id: 人物唯一 ID
Returns:
人物详细信息
PersonDetailResponse: 指定人物详细信息
"""
try:
with get_db_session() as session:
statement = select(PersonInfo).where(col(PersonInfo.person_id) == person_id).limit(1)
person = session.exec(statement).first()
if not person:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {person_id} 的人物信息")
if not person:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {person_id} 的人物信息")
return PersonDetailResponse(success=True, data=person_to_response(person))
data = person_to_response(person)
return PersonDetailResponse(success=True, data=data)
except HTTPException:
raise
@@ -237,25 +251,17 @@ async def get_person_detail(person_id: str):
async def update_person(
person_id: str,
request: PersonUpdateRequest,
):
"""
增量更新人物信息(只更新提供的字段)
) -> PersonUpdateResponse:
"""增量更新人物信息。
Args:
person_id: 人物唯一 ID
request: 更新请求(只包含需要更新字段
person_id: 人物唯一 ID
request: 只包含需要更新字段的请求数据。
Returns:
更新结果
PersonUpdateResponse: 更新结果和更新后的人物信息。
"""
try:
with get_db_session() as session:
statement = select(PersonInfo).where(col(PersonInfo.person_id) == person_id).limit(1)
person = session.exec(statement).first()
if not person:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {person_id} 的人物信息")
# 只更新提供的字段
update_data = request.model_dump(exclude_unset=True)
@@ -270,17 +276,23 @@ async def update_person(
db_person = session.exec(select(PersonInfo).where(col(PersonInfo.person_id) == person_id).limit(1)).first()
if not db_person:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {person_id} 的人物信息")
for field, value in update_data.items():
if hasattr(db_person, field):
setattr(db_person, field, value)
if "person_name" in update_data:
db_person.person_name = update_data["person_name"]
if "name_reason" in update_data:
db_person.name_reason = update_data["name_reason"]
if "nickname" in update_data:
db_person.user_nickname = update_data["nickname"]
if "memory_points" in update_data:
db_person.memory_points = update_data["memory_points"]
if "is_known" in update_data:
db_person.is_known = update_data["is_known"]
db_person.last_known_time = update_data["last_known_time"]
session.add(db_person)
person = db_person
data = person_to_response(db_person)
logger.info(f"人物信息已更新: {person_id}, 字段: {list(update_data.keys())}")
return PersonUpdateResponse(
success=True, message=f"成功更新 {len(update_data)} 个字段", data=person_to_response(person)
)
return PersonUpdateResponse(success=True, message=f"成功更新 {len(update_data)} 个字段", data=data)
except HTTPException:
raise
@@ -290,29 +302,26 @@ async def update_person(
@router.delete("/{person_id}", response_model=PersonDeleteResponse)
async def delete_person(person_id: str):
"""
删除人物信息
async def delete_person(person_id: str) -> PersonDeleteResponse:
"""删除人物信息。
Args:
person_id: 人物唯一 ID
person_id: 人物唯一 ID
Returns:
删除结果
PersonDeleteResponse: 删除结果
"""
try:
with get_db_session() as session:
statement = select(PersonInfo).where(col(PersonInfo.person_id) == person_id).limit(1)
person = session.exec(statement).first()
if not person:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {person_id} 的人物信息")
if not person:
raise HTTPException(status_code=404, detail=f"未找到 ID 为 {person_id} 的人物信息")
# 记录删除信息
person_name = person.person_name or person.user_nickname or person.user_id
# 记录删除信息
person_name = person.person_name or person.user_nickname or person.user_id
# 执行删除
with get_db_session() as session:
session.exec(delete(PersonInfo).where(col(PersonInfo.person_id) == person_id))
logger.info(f"人物信息已删除: {person_id} ({person_name})")
@@ -327,12 +336,11 @@ async def delete_person(person_id: str):
@router.get("/stats/summary")
async def get_person_stats():
"""
获取人物信息统计数据
async def get_person_stats() -> Dict[str, Any]:
"""获取人物信息统计数据。
Returns:
统计数据
Dict[str, Any]: 人物总数、已认识数量和平台分布统计。
"""
try:
with get_db_session() as session:
@@ -359,15 +367,14 @@ async def get_person_stats():
@router.post("/batch/delete", response_model=BatchDeleteResponse)
async def batch_delete_persons(
request: BatchDeleteRequest,
):
"""
批量删除人物信息
) -> BatchDeleteResponse:
"""批量删除人物信息。
Args:
request: 包含person_ids列表的请求
request: 包含人物 ID 列表的请求
Returns:
批量删除结果
BatchDeleteResponse: 批量删除结果
"""
try:
if not request.person_ids: