198 lines
6.2 KiB
Python
198 lines
6.2 KiB
Python
import json
|
||
from typing import List, Dict, Optional, Any
|
||
|
||
from src.common.logger import get_logger
|
||
from src.common.database.database_model import Jargon
|
||
from src.config.config import global_config
|
||
from src.chat.utils.chat_message_builder import (
|
||
build_readable_messages,
|
||
build_readable_messages_with_id,
|
||
)
|
||
from src.chat.utils.utils import parse_platform_accounts
|
||
|
||
|
||
logger = get_logger("jargon")
|
||
|
||
|
||
def parse_chat_id_list(chat_id_value: Any) -> List[List[Any]]:
|
||
"""
|
||
解析chat_id字段,兼容旧格式(字符串)和新格式(JSON列表)
|
||
|
||
Args:
|
||
chat_id_value: 可能是字符串(旧格式)或JSON字符串(新格式)
|
||
|
||
Returns:
|
||
List[List[Any]]: 格式为 [[chat_id, count], ...] 的列表
|
||
"""
|
||
if not chat_id_value:
|
||
return []
|
||
|
||
# 如果是字符串,尝试解析为JSON
|
||
if isinstance(chat_id_value, str):
|
||
# 尝试解析JSON
|
||
try:
|
||
parsed = json.loads(chat_id_value)
|
||
if isinstance(parsed, list):
|
||
# 新格式:已经是列表
|
||
return parsed
|
||
elif isinstance(parsed, str):
|
||
# 解析后还是字符串,说明是旧格式
|
||
return [[parsed, 1]]
|
||
else:
|
||
# 其他类型,当作旧格式处理
|
||
return [[str(chat_id_value), 1]]
|
||
except (json.JSONDecodeError, TypeError):
|
||
# 解析失败,当作旧格式(纯字符串)
|
||
return [[str(chat_id_value), 1]]
|
||
elif isinstance(chat_id_value, list):
|
||
# 已经是列表格式
|
||
return chat_id_value
|
||
else:
|
||
# 其他类型,转换为旧格式
|
||
return [[str(chat_id_value), 1]]
|
||
|
||
|
||
def update_chat_id_list(chat_id_list: List[List[Any]], target_chat_id: str, increment: int = 1) -> List[List[Any]]:
|
||
"""
|
||
更新chat_id列表,如果target_chat_id已存在则增加计数,否则添加新条目
|
||
|
||
Args:
|
||
chat_id_list: 当前的chat_id列表,格式为 [[chat_id, count], ...]
|
||
target_chat_id: 要更新或添加的chat_id
|
||
increment: 增加的计数,默认为1
|
||
|
||
Returns:
|
||
List[List[Any]]: 更新后的chat_id列表
|
||
"""
|
||
# 查找是否已存在该chat_id
|
||
found = False
|
||
for item in chat_id_list:
|
||
if isinstance(item, list) and len(item) >= 1 and str(item[0]) == str(target_chat_id):
|
||
# 找到匹配的chat_id,增加计数
|
||
if len(item) >= 2:
|
||
item[1] = (item[1] if isinstance(item[1], (int, float)) else 0) + increment
|
||
else:
|
||
item.append(increment)
|
||
found = True
|
||
break
|
||
|
||
if not found:
|
||
# 未找到,添加新条目
|
||
chat_id_list.append([target_chat_id, increment])
|
||
|
||
return chat_id_list
|
||
|
||
|
||
def chat_id_list_contains(chat_id_list: List[List[Any]], target_chat_id: str) -> bool:
|
||
"""
|
||
检查chat_id列表中是否包含指定的chat_id
|
||
|
||
Args:
|
||
chat_id_list: chat_id列表,格式为 [[chat_id, count], ...]
|
||
target_chat_id: 要查找的chat_id
|
||
|
||
Returns:
|
||
bool: 如果包含则返回True
|
||
"""
|
||
for item in chat_id_list:
|
||
if isinstance(item, list) and len(item) >= 1 and str(item[0]) == str(target_chat_id):
|
||
return True
|
||
return False
|
||
|
||
|
||
def contains_bot_self_name(content: str) -> bool:
|
||
"""
|
||
判断词条是否包含机器人的昵称或别名
|
||
"""
|
||
if not content:
|
||
return False
|
||
|
||
bot_config = getattr(global_config, "bot", None)
|
||
if not bot_config:
|
||
return False
|
||
|
||
target = content.strip().lower()
|
||
nickname = str(getattr(bot_config, "nickname", "") or "").strip().lower()
|
||
alias_names = [str(alias or "").strip().lower() for alias in getattr(bot_config, "alias_names", []) or []]
|
||
|
||
candidates = [name for name in [nickname, *alias_names] if name]
|
||
|
||
return any(name in target for name in candidates if target)
|
||
|
||
|
||
def build_context_paragraph(messages: List[Any], center_index: int) -> Optional[str]:
|
||
"""
|
||
构建包含中心消息上下文的段落(前3条+后3条),使用标准的 readable builder 输出
|
||
"""
|
||
if not messages or center_index < 0 or center_index >= len(messages):
|
||
return None
|
||
|
||
context_start = max(0, center_index - 3)
|
||
context_end = min(len(messages), center_index + 1 + 3)
|
||
context_messages = messages[context_start:context_end]
|
||
|
||
if not context_messages:
|
||
return None
|
||
|
||
try:
|
||
paragraph = build_readable_messages(
|
||
messages=context_messages,
|
||
replace_bot_name=True,
|
||
timestamp_mode="relative",
|
||
read_mark=0.0,
|
||
truncate=False,
|
||
show_actions=False,
|
||
show_pic=True,
|
||
message_id_list=None,
|
||
remove_emoji_stickers=False,
|
||
pic_single=True,
|
||
)
|
||
except Exception as e:
|
||
logger.warning(f"构建上下文段落失败: {e}")
|
||
return None
|
||
|
||
paragraph = paragraph.strip()
|
||
return paragraph or None
|
||
|
||
|
||
def is_bot_message(msg: Any) -> bool:
|
||
"""判断消息是否来自机器人自身"""
|
||
if msg is None:
|
||
return False
|
||
|
||
bot_config = getattr(global_config, "bot", None)
|
||
if not bot_config:
|
||
return False
|
||
|
||
platform = (
|
||
str(getattr(msg, "user_platform", "") or getattr(getattr(msg, "user_info", None), "platform", "") or "")
|
||
.strip()
|
||
.lower()
|
||
)
|
||
user_id = str(getattr(msg, "user_id", "") or getattr(getattr(msg, "user_info", None), "user_id", "") or "").strip()
|
||
|
||
if not platform or not user_id:
|
||
return False
|
||
|
||
platform_accounts = {}
|
||
try:
|
||
platform_accounts = parse_platform_accounts(getattr(bot_config, "platforms", []) or [])
|
||
except Exception:
|
||
platform_accounts = {}
|
||
|
||
bot_accounts: Dict[str, str] = {}
|
||
qq_account = str(getattr(bot_config, "qq_account", "") or "").strip()
|
||
if qq_account:
|
||
bot_accounts["qq"] = qq_account
|
||
|
||
telegram_account = str(getattr(bot_config, "telegram_account", "") or "").strip()
|
||
if telegram_account:
|
||
bot_accounts["telegram"] = telegram_account
|
||
|
||
for plat, account in platform_accounts.items():
|
||
if account and plat not in bot_accounts:
|
||
bot_accounts[plat] = account
|
||
|
||
bot_account = bot_accounts.get(platform)
|
||
return bool(bot_account and user_id == bot_account)
|