feat:将记忆配置项添加到配置文件

This commit is contained in:
SengokuCola
2025-10-08 18:45:06 +08:00
parent e0a5cd5922
commit 16ae212adc
6 changed files with 253 additions and 282 deletions

View File

@@ -926,202 +926,6 @@ async def build_anonymous_messages(messages: List[DatabaseMessages]) -> str:
return formatted_string
def build_readable_messages_anonymized(
messages: List[DatabaseMessages],
timestamp_mode: str = "relative",
show_actions: bool = False,
show_pic: bool = True,
replace_bot_name: bool = True,
remove_emoji_stickers: bool = False,
) -> Tuple[str, Dict[str, str]]:
"""
仿照 build_readable_messages构建匿名化的可读消息
- 所有用户名替换为 用户A、用户B、...、用户Z、用户AA、用户AB ...
- 内容中的 回复<aaa:bbb> 与 @<aaa:bbb> 也替换为匿名名
Returns:
formatted_string: 格式化后的聊天记录字符串
mapping: 原始显示用户名 -> 匿名名 的映射表
"""
if not messages:
return "", {}
# 生成匿名标签A..Z, AA..AZ, BA.. 等
def alphabet_labels() -> Iterable[str]:
import string
letters = string.ascii_uppercase
# 单字母
for ch in letters:
yield ch
# 多字母(简单生成两位,若需要可继续扩展)
for a in letters:
for b in letters:
yield f"{a}{b}"
label_iter = alphabet_labels()
user_to_label: Dict[Tuple[str, str], str] = {}
name_mapping: Dict[str, str] = {}
def get_display_name(platform: str, user_id: str, user_nickname: str, user_cardname: Optional[str]) -> str:
person = Person(platform=platform, user_id=user_id)
return person.person_name or f"{user_nickname}" or (f"昵称:{user_cardname}" if user_cardname else "某人")
def get_anon_name(platform: str, user_id: str, user_nickname: str, user_cardname: Optional[str]) -> str:
key = (platform or "", user_id or "")
# 机器人处理:若需要替换机器人名称,则直接返回 昵称(你)
if replace_bot_name and user_id == global_config.bot.qq_account:
anon = f"{global_config.bot.nickname}(你)"
original_display = get_display_name(platform, user_id, user_nickname, user_cardname)
if original_display not in name_mapping:
name_mapping[original_display] = anon
return anon
if key not in user_to_label:
user_to_label[key] = f"用户{next(label_iter)}"
anon = user_to_label[key]
# 记录原始显示名到匿名名(可能重复显示名时后写覆盖)
original_display = get_display_name(platform, user_id, user_nickname, user_cardname)
if original_display not in name_mapping:
name_mapping[original_display] = anon
return anon
# 如果启用移除表情包,先过滤消息
if remove_emoji_stickers:
filtered_messages = []
for msg in messages:
# 获取消息内容
content = msg.display_message or msg.processed_plain_text or ""
# 移除表情包
emoji_pattern = r"\[表情包:[^\]]+\]"
content = re.sub(emoji_pattern, "", content)
# 如果移除表情包后内容不为空,则保留消息
if content.strip():
filtered_messages.append(msg)
messages = filtered_messages
# 将 DatabaseMessages 转换为可处理结构,并可选拼入动作
copy_messages: List[MessageAndActionModel] = []
for msg in messages:
if remove_emoji_stickers:
# 创建 MessageAndActionModel 但移除表情包
model = MessageAndActionModel.from_DatabaseMessages(msg)
# 移除表情包
if model.display_message:
model.display_message = re.sub(r"\[表情包:[^\]]+\]", "", model.display_message)
if model.processed_plain_text:
model.processed_plain_text = re.sub(r"\[表情包:[^\]]+\]", "", model.processed_plain_text)
copy_messages.append(model)
else:
copy_messages.append(MessageAndActionModel.from_DatabaseMessages(msg))
if show_actions and copy_messages:
min_time = min(msg.time or 0 for msg in copy_messages)
max_time = max(msg.time or 0 for msg in copy_messages)
chat_id = messages[0].chat_id if messages else None
actions_in_range = (
ActionRecords.select()
.where((ActionRecords.time >= min_time) & (ActionRecords.time <= max_time) & (ActionRecords.chat_id == chat_id))
.order_by(ActionRecords.time)
)
action_after_latest = (
ActionRecords.select()
.where((ActionRecords.time > max_time) & (ActionRecords.chat_id == chat_id))
.order_by(ActionRecords.time)
.limit(1)
)
actions: List[ActionRecords] = list(actions_in_range) + list(action_after_latest)
for action in actions:
if action.action_build_into_prompt:
action_msg = MessageAndActionModel(
time=float(action.time), # type: ignore
user_id=global_config.bot.qq_account,
user_platform=global_config.bot.platform,
user_nickname=global_config.bot.nickname,
user_cardname="",
processed_plain_text=f"{action.action_prompt_display}",
display_message=f"{action.action_prompt_display}",
chat_info_platform=str(action.chat_info_platform),
is_action_record=True,
action_name=str(action.action_name),
)
copy_messages.append(action_msg)
copy_messages.sort(key=lambda x: x.time or 0)
# 图片替换帮助
def process_pic_ids(content: Optional[str]) -> str:
if content is None:
return ""
pic_pattern = r"\[picid:([^\]]+)\]"
def replace_pic_id(_m: re.Match) -> str:
return "[图片]" if show_pic else ""
return re.sub(pic_pattern, replace_pic_id, content)
# 内容引用替换的 resolver将 <aaa:bbb> / @<aaa:bbb> 中的 bbb 映射为匿名名
def anon_name_resolver(platform: str, user_id: str) -> str:
try:
# 与主流程一致处理机器人名字
if replace_bot_name and user_id == global_config.bot.qq_account:
return f"{global_config.bot.nickname}(你)"
return get_anon_name(platform, user_id, "", None)
except Exception:
return "用户?"
# 构建结果
detailed: List[Tuple[float, str, str, bool]] = []
for m in copy_messages:
if m.is_action_record:
content = process_pic_ids(m.display_message)
detailed.append((m.time or 0.0, "", content, True))
continue
platform = m.user_platform
user_id = m.user_id
user_nickname = m.user_nickname
user_cardname = m.user_cardname
content = m.display_message or m.processed_plain_text or ""
content = process_pic_ids(content)
anon_name = get_anon_name(platform, user_id, user_nickname, user_cardname)
try:
content = replace_user_references(content, platform, anon_name_resolver, replace_bot_name=False)
except Exception:
pass
detailed.append((m.time or 0.0, anon_name, content, False))
if not detailed:
return "", name_mapping
detailed.sort(key=lambda x: x[0])
output_lines: List[str] = []
for ts, name, content, is_action in detailed:
readable_time = translate_timestamp_to_human_readable(ts, mode=timestamp_mode)
if is_action:
output_lines.append(f"{readable_time}, {content}")
else:
output_lines.append(f"{readable_time}, {name}: {content}")
output_lines.append("\n")
formatted_string = "".join(output_lines).strip()
# 最后对完整字符串再按映射表做一次替换,处理正文里直接出现的原始昵称
if name_mapping:
for original_name, anon_name in sorted(name_mapping.items(), key=lambda x: len(x[0]), reverse=True):
if original_name:
formatted_string = formatted_string.replace(original_name, anon_name)
return formatted_string, name_mapping
async def get_person_id_list(messages: List[Dict[str, Any]]) -> List[str]:
"""