数据库的信息重构为dataclass

This commit is contained in:
UnCLAS-Prommer
2025-08-17 17:11:32 +08:00
parent d74beef4b4
commit 3481234d2b
18 changed files with 243 additions and 206 deletions

View File

@@ -7,9 +7,10 @@ from rich.traceback import install
from src.config.config import global_config
from src.common.message_repository import find_messages, count_messages
from src.common.data_models.database_data_model import DatabaseMessages
from src.common.database.database_model import ActionRecords
from src.common.database.database_model import Images
from src.person_info.person_info import Person,get_person_id
from src.person_info.person_info import Person, get_person_id
from src.chat.utils.utils import translate_timestamp_to_human_readable, assign_message_ids
install(extra_lines=3)
@@ -35,6 +36,7 @@ def replace_user_references_sync(
str: 处理后的内容字符串
"""
if name_resolver is None:
def default_resolver(platform: str, user_id: str) -> str:
# 检查是否是机器人自己
if replace_bot_name and user_id == global_config.bot.qq_account:
@@ -108,6 +110,7 @@ async def replace_user_references_async(
str: 处理后的内容字符串
"""
if name_resolver is None:
async def default_resolver(platform: str, user_id: str) -> str:
# 检查是否是机器人自己
if replace_bot_name and user_id == global_config.bot.qq_account:
@@ -161,9 +164,7 @@ async def replace_user_references_async(
return content
def get_raw_msg_by_timestamp(
timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest"
) -> List[Dict[str, Any]]:
def get_raw_msg_by_timestamp(timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest"):
"""
获取从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制
@@ -183,7 +184,7 @@ def get_raw_msg_by_timestamp_with_chat(
limit_mode: str = "latest",
filter_bot=False,
filter_command=False,
) -> List[Dict[str, Any]]:
) -> List[DatabaseMessages]:
"""获取在特定聊天从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制
limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'
@@ -209,7 +210,7 @@ def get_raw_msg_by_timestamp_with_chat_inclusive(
limit: int = 0,
limit_mode: str = "latest",
filter_bot=False,
) -> List[Dict[str, Any]]:
) -> List[DatabaseMessages]:
"""获取在特定聊天从指定时间戳到指定时间戳的消息(包含边界),按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制
limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'
@@ -218,7 +219,6 @@ def get_raw_msg_by_timestamp_with_chat_inclusive(
# 只有当 limit 为 0 时才应用外部 sort
sort_order = [("time", 1)] if limit == 0 else None
# 直接将 limit_mode 传递给 find_messages
return find_messages(
message_filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode, filter_bot=filter_bot
)
@@ -231,7 +231,7 @@ def get_raw_msg_by_timestamp_with_chat_users(
person_ids: List[str],
limit: int = 0,
limit_mode: str = "latest",
) -> List[Dict[str, Any]]:
) -> List[DatabaseMessages]:
"""获取某些特定用户在特定聊天从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制
limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'
@@ -302,7 +302,7 @@ def get_actions_by_timestamp_with_chat_inclusive(
def get_raw_msg_by_timestamp_random(
timestamp_start: float, timestamp_end: float, limit: int = 0, limit_mode: str = "latest"
) -> List[Dict[str, Any]]:
) -> List[DatabaseMessages]:
"""
先在范围时间戳内随机选择一条消息取得消息的chat_id然后根据chat_id获取该聊天在指定时间戳范围内的消息
"""
@@ -312,15 +312,15 @@ def get_raw_msg_by_timestamp_random(
return []
# 随机选一条
msg = random.choice(all_msgs)
chat_id = msg["chat_id"]
timestamp_start = msg["time"]
chat_id = msg.chat_id
timestamp_start = msg.time
# 用 chat_id 获取该聊天在指定时间戳范围内的消息
return get_raw_msg_by_timestamp_with_chat(chat_id, timestamp_start, timestamp_end, limit, "earliest")
def get_raw_msg_by_timestamp_with_users(
timestamp_start: float, timestamp_end: float, person_ids: list, limit: int = 0, limit_mode: str = "latest"
) -> List[Dict[str, Any]]:
) -> List[DatabaseMessages]:
"""获取某些特定用户在 *所有聊天* 中从指定时间戳到指定时间戳的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制
limit_mode: 当 limit > 0 时生效。 'earliest' 表示获取最早的记录, 'latest' 表示获取最新的记录。默认为 'latest'
@@ -331,7 +331,7 @@ def get_raw_msg_by_timestamp_with_users(
return find_messages(message_filter=filter_query, sort=sort_order, limit=limit, limit_mode=limit_mode)
def get_raw_msg_before_timestamp(timestamp: float, limit: int = 0) -> List[Dict[str, Any]]:
def get_raw_msg_before_timestamp(timestamp: float, limit: int = 0) -> List[DatabaseMessages]:
"""获取指定时间戳之前的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制
"""
@@ -340,7 +340,7 @@ def get_raw_msg_before_timestamp(timestamp: float, limit: int = 0) -> List[Dict[
return find_messages(message_filter=filter_query, sort=sort_order, limit=limit)
def get_raw_msg_before_timestamp_with_chat(chat_id: str, timestamp: float, limit: int = 0) -> List[Dict[str, Any]]:
def get_raw_msg_before_timestamp_with_chat(chat_id: str, timestamp: float, limit: int = 0) -> List[DatabaseMessages]:
"""获取指定时间戳之前的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制
"""
@@ -349,7 +349,7 @@ def get_raw_msg_before_timestamp_with_chat(chat_id: str, timestamp: float, limit
return find_messages(message_filter=filter_query, sort=sort_order, limit=limit)
def get_raw_msg_before_timestamp_with_users(timestamp: float, person_ids: list, limit: int = 0) -> List[Dict[str, Any]]:
def get_raw_msg_before_timestamp_with_users(timestamp: float, person_ids: list, limit: int = 0) -> List[DatabaseMessages]:
"""获取指定时间戳之前的消息,按时间升序排序,返回消息列表
limit: 限制返回的消息数量0为不限制
"""

View File

@@ -3,13 +3,15 @@ import re
import string
import time
import jieba
import json
import ast
import numpy as np
from collections import Counter
from maim_message import UserInfo
from typing import Optional, Tuple, Dict, List, Any
from src.common.logger import get_logger
from src.common.data_models.info_data_model import TargetPersonInfo
from src.common.message_repository import find_messages, count_messages
from src.config.config import global_config, model_config
from src.chat.message_receive.message import MessageRecv
@@ -130,22 +132,29 @@ def get_recent_group_speaker(chat_stream_id: str, sender, limit: int = 12) -> li
return []
who_chat_in_group = []
for msg_db_data in recent_messages:
user_info = UserInfo.from_dict(
{
"platform": msg_db_data["user_platform"],
"user_id": msg_db_data["user_id"],
"user_nickname": msg_db_data["user_nickname"],
"user_cardname": msg_db_data.get("user_cardname", ""),
}
)
for db_msg in recent_messages:
# user_info = UserInfo.from_dict(
# {
# "platform": msg_db_data["user_platform"],
# "user_id": msg_db_data["user_id"],
# "user_nickname": msg_db_data["user_nickname"],
# "user_cardname": msg_db_data.get("user_cardname", ""),
# }
# )
# if (
# (user_info.platform, user_info.user_id) != sender
# and user_info.user_id != global_config.bot.qq_account
# and (user_info.platform, user_info.user_id, user_info.user_nickname) not in who_chat_in_group
# and len(who_chat_in_group) < 5
# ): # 排除重复排除消息发送者排除bot限制加载的关系数目
# who_chat_in_group.append((user_info.platform, user_info.user_id, user_info.user_nickname))
if (
(user_info.platform, user_info.user_id) != sender
and user_info.user_id != global_config.bot.qq_account
and (user_info.platform, user_info.user_id, user_info.user_nickname) not in who_chat_in_group
(db_msg.user_info.platform, db_msg.user_info.user_id) != sender
and db_msg.user_info.user_id != global_config.bot.qq_account
and (db_msg.user_info.platform, db_msg.user_info.user_id, db_msg.user_info.user_nickname) not in who_chat_in_group
and len(who_chat_in_group) < 5
): # 排除重复排除消息发送者排除bot限制加载的关系数目
who_chat_in_group.append((user_info.platform, user_info.user_id, user_info.user_nickname))
who_chat_in_group.append((db_msg.user_info.platform, db_msg.user_info.user_id, db_msg.user_info.user_nickname))
return who_chat_in_group
@@ -555,7 +564,7 @@ def count_messages_between(start_time: float, end_time: float, stream_id: str) -
# 获取消息内容计算总长度
messages = find_messages(message_filter=filter_query)
total_length = sum(len(msg.get("processed_plain_text", "")) for msg in messages)
total_length = sum(len(msg.processed_plain_text or "") for msg in messages)
return count, total_length
@@ -628,41 +637,34 @@ def get_chat_type_and_target_info(chat_id: str) -> Tuple[bool, Optional[Dict]]:
user_id: str = user_info.user_id # type: ignore
# Initialize target_info with basic info
target_info = {
"platform": platform,
"user_id": user_id,
"user_nickname": user_info.user_nickname,
"person_id": None,
"person_name": None,
}
target_info = TargetPersonInfo(
platform=platform,
user_id=user_id,
user_nickname=user_info.user_nickname, # type: ignore
person_id=None,
person_name=None
)
# Try to fetch person info
try:
# Assume get_person_id is sync (as per original code), keep using to_thread
person = Person(platform=platform, user_id=user_id)
if not person.is_known:
logger.warning(f"用户 {user_info.user_nickname} 尚未认识")
# 如果用户尚未认识则返回False和None
return False, None
person_id = person.person_id
person_name = None
if person_id:
# get_value is async, so await it directly
person_name = person.person_name
target_info["person_id"] = person_id
target_info["person_name"] = person_name
if person.person_id:
target_info.person_id = person.person_id
target_info.person_name = person.person_name
except Exception as person_e:
logger.warning(
f"获取 person_id 或 person_name 时出错 for {platform}:{user_id} in utils: {person_e}"
)
chat_target_info = target_info
chat_target_info = target_info.__dict__
else:
logger.warning(f"无法获取 chat_stream for {chat_id} in utils")
except Exception as e:
logger.error(f"获取聊天类型和目标信息时出错 for {chat_id}: {e}", exc_info=True)
# Keep defaults on error
return is_group_chat, chat_target_info
@@ -771,6 +773,7 @@ def assign_message_ids_flexible(
# # 结果: [{'id': 'ts123a1b', 'message': 'Hello'}, {'id': 'ts123c2d', 'message': 'World'}, {'id': 'ts123e3f', 'message': 'Test message'}]
def parse_keywords_string(keywords_input) -> list[str]:
# sourcery skip: use-contextlib-suppress
"""
统一的关键词解析函数,支持多种格式的关键词字符串解析
@@ -802,7 +805,6 @@ def parse_keywords_string(keywords_input) -> list[str]:
try:
# 尝试作为JSON对象解析支持 {"keywords": [...]} 格式)
import json
json_data = json.loads(keywords_str)
if isinstance(json_data, dict) and "keywords" in json_data:
keywords_list = json_data["keywords"]
@@ -816,7 +818,6 @@ def parse_keywords_string(keywords_input) -> list[str]:
try:
# 尝试使用 ast.literal_eval 解析支持Python字面量格式
import ast
parsed = ast.literal_eval(keywords_str)
if isinstance(parsed, list):
return [str(k).strip() for k in parsed if str(k).strip()]