feat:添加数据提取脚本

This commit is contained in:
SengokuCola
2025-09-29 20:28:26 +08:00
parent 0683f56e23
commit 741e123496
2 changed files with 495 additions and 1 deletions

View File

@@ -2,7 +2,7 @@ import time
import random
import re
from typing import List, Dict, Any, Tuple, Optional, Callable
from typing import List, Dict, Any, Tuple, Optional, Callable, Iterable
from rich.traceback import install
from src.config.config import global_config
@@ -895,6 +895,173 @@ async def build_anonymous_messages(messages: List[DatabaseMessages]) -> str:
return formatted_string
def build_readable_messages_anonymized(
messages: List[DatabaseMessages],
timestamp_mode: str = "relative",
show_actions: bool = False,
show_pic: bool = True,
replace_bot_name: bool = True,
) -> Tuple[str, Dict[str, str]]:
"""
仿照 build_readable_messages构建匿名化的可读消息
- 所有用户名替换为 用户A、用户B、...、用户Z、用户AA、用户AB ...
- 内容中的 回复<aaa:bbb> 与 @<aaa:bbb> 也替换为匿名名
Returns:
formatted_string: 格式化后的聊天记录字符串
mapping: 原始显示用户名 -> 匿名名 的映射表
"""
if not messages:
return "", {}
# 生成匿名标签A..Z, AA..AZ, BA.. 等
def alphabet_labels() -> Iterable[str]:
import string
letters = string.ascii_uppercase
# 单字母
for ch in letters:
yield ch
# 多字母(简单生成两位,若需要可继续扩展)
for a in letters:
for b in letters:
yield f"{a}{b}"
label_iter = alphabet_labels()
user_to_label: Dict[Tuple[str, str], str] = {}
name_mapping: Dict[str, str] = {}
def get_display_name(platform: str, user_id: str, user_nickname: str, user_cardname: Optional[str]) -> str:
person = Person(platform=platform, user_id=user_id)
return person.person_name or f"{user_nickname}" or (f"昵称:{user_cardname}" if user_cardname else "某人")
def get_anon_name(platform: str, user_id: str, user_nickname: str, user_cardname: Optional[str]) -> str:
key = (platform or "", user_id or "")
# 机器人处理:若需要替换机器人名称,则直接返回 昵称(你)
if replace_bot_name and user_id == global_config.bot.qq_account:
anon = f"{global_config.bot.nickname}(你)"
original_display = get_display_name(platform, user_id, user_nickname, user_cardname)
if original_display not in name_mapping:
name_mapping[original_display] = anon
return anon
if key not in user_to_label:
user_to_label[key] = f"用户{next(label_iter)}"
anon = user_to_label[key]
# 记录原始显示名到匿名名(可能重复显示名时后写覆盖)
original_display = get_display_name(platform, user_id, user_nickname, user_cardname)
if original_display not in name_mapping:
name_mapping[original_display] = anon
return anon
# 将 DatabaseMessages 转换为可处理结构,并可选拼入动作
copy_messages: List[MessageAndActionModel] = [MessageAndActionModel.from_DatabaseMessages(msg) for msg in messages]
if show_actions and copy_messages:
min_time = min(msg.time or 0 for msg in copy_messages)
max_time = max(msg.time or 0 for msg in copy_messages)
chat_id = messages[0].chat_id if messages else None
actions_in_range = (
ActionRecords.select()
.where((ActionRecords.time >= min_time) & (ActionRecords.time <= max_time) & (ActionRecords.chat_id == chat_id))
.order_by(ActionRecords.time)
)
action_after_latest = (
ActionRecords.select()
.where((ActionRecords.time > max_time) & (ActionRecords.chat_id == chat_id))
.order_by(ActionRecords.time)
.limit(1)
)
actions: List[ActionRecords] = list(actions_in_range) + list(action_after_latest)
for action in actions:
if action.action_build_into_prompt:
action_msg = MessageAndActionModel(
time=float(action.time), # type: ignore
user_id=global_config.bot.qq_account,
user_platform=global_config.bot.platform,
user_nickname=global_config.bot.nickname,
user_cardname="",
processed_plain_text=f"{action.action_prompt_display}",
display_message=f"{action.action_prompt_display}",
chat_info_platform=str(action.chat_info_platform),
is_action_record=True,
action_name=str(action.action_name),
)
copy_messages.append(action_msg)
copy_messages.sort(key=lambda x: x.time or 0)
# 图片替换帮助
def process_pic_ids(content: Optional[str]) -> str:
if content is None:
return ""
pic_pattern = r"\[picid:([^\]]+)\]"
def replace_pic_id(_m: re.Match) -> str:
return "[图片]" if show_pic else ""
return re.sub(pic_pattern, replace_pic_id, content)
# 内容引用替换的 resolver将 <aaa:bbb> / @<aaa:bbb> 中的 bbb 映射为匿名名
def anon_name_resolver(platform: str, user_id: str) -> str:
try:
# 与主流程一致处理机器人名字
if replace_bot_name and user_id == global_config.bot.qq_account:
return f"{global_config.bot.nickname}(你)"
return get_anon_name(platform, user_id, "", None)
except Exception:
return "用户?"
# 构建结果
detailed: List[Tuple[float, str, str, bool]] = []
for m in copy_messages:
if m.is_action_record:
content = process_pic_ids(m.display_message)
detailed.append((m.time or 0.0, "", content, True))
continue
platform = m.user_platform
user_id = m.user_id
user_nickname = m.user_nickname
user_cardname = m.user_cardname
content = m.display_message or m.processed_plain_text or ""
content = process_pic_ids(content)
anon_name = get_anon_name(platform, user_id, user_nickname, user_cardname)
try:
content = replace_user_references(content, platform, anon_name_resolver, replace_bot_name=False)
except Exception:
pass
detailed.append((m.time or 0.0, anon_name, content, False))
if not detailed:
return "", name_mapping
detailed.sort(key=lambda x: x[0])
output_lines: List[str] = []
for ts, name, content, is_action in detailed:
readable_time = translate_timestamp_to_human_readable(ts, mode=timestamp_mode)
if is_action:
output_lines.append(f"{readable_time}, {content}")
else:
output_lines.append(f"{readable_time}, {name}: {content}")
output_lines.append("\n")
formatted_string = "".join(output_lines).strip()
# 最后对完整字符串再按映射表做一次替换,处理正文里直接出现的原始昵称
if name_mapping:
for original_name, anon_name in sorted(name_mapping.items(), key=lambda x: len(x[0]), reverse=True):
if original_name:
formatted_string = formatted_string.replace(original_name, anon_name)
return formatted_string, name_mapping
async def get_person_id_list(messages: List[Dict[str, Any]]) -> List[str]:
"""
从消息列表中提取不重复的 person_id 列表 (忽略机器人自身)。