fix:修复并发导致的重复表达学习问题

This commit is contained in:
SengokuCola
2025-12-02 12:47:54 +08:00
parent ec90951539
commit c562ebe97a
3 changed files with 364 additions and 323 deletions

View File

@@ -182,6 +182,9 @@ class JargonMiner:
self.stream_name = stream_name if stream_name else self.chat_id
self.cache_limit = 100
self.cache: OrderedDict[str, None] = OrderedDict()
# 黑话提取锁,防止并发执行
self._extraction_lock = asyncio.Lock()
def _add_to_cache(self, content: str) -> None:
"""将提取到的黑话加入缓存保持LRU语义"""
@@ -436,261 +439,265 @@ class JargonMiner:
return bool(recent_messages and len(recent_messages) >= self.min_messages_for_learning)
async def run_once(self) -> None:
try:
if not self.should_trigger():
return
chat_stream = get_chat_manager().get_stream(self.chat_id)
if not chat_stream:
return
# 记录本次提取的时间窗口,避免重复提取
extraction_start_time = self.last_learning_time
extraction_end_time = time.time()
# 拉取学习窗口内的消息
messages = get_raw_msg_by_timestamp_with_chat_inclusive(
chat_id=self.chat_id,
timestamp_start=extraction_start_time,
timestamp_end=extraction_end_time,
limit=20,
)
if not messages:
return
# 按时间排序,确保编号与上下文一致
messages = sorted(messages, key=lambda msg: msg.time or 0)
chat_str, message_id_list = build_readable_messages_with_id(
messages=messages,
replace_bot_name=True,
timestamp_mode="relative",
truncate=False,
show_actions=False,
show_pic=True,
pic_single=True,
)
if not chat_str.strip():
return
msg_id_to_index: Dict[str, int] = {}
for idx, (msg_id, _msg) in enumerate(message_id_list or []):
if not msg_id:
continue
msg_id_to_index[msg_id] = idx
if not msg_id_to_index:
logger.warning("未能生成消息ID映射跳过本次提取")
return
prompt: str = await global_prompt_manager.format_prompt(
"extract_jargon_prompt",
bot_name=global_config.bot.nickname,
chat_str=chat_str,
)
response, _ = await self.llm.generate_response_async(prompt, temperature=0.2)
if not response:
return
if global_config.debug.show_jargon_prompt:
logger.info(f"jargon提取提示词: {prompt}")
logger.info(f"jargon提取结果: {response}")
# 解析为JSON
entries: List[dict] = []
# 使用异步锁防止并发执行
async with self._extraction_lock:
try:
resp = response.strip()
parsed = None
if resp.startswith("[") and resp.endswith("]"):
parsed = json.loads(resp)
else:
repaired = repair_json(resp)
if isinstance(repaired, str):
parsed = json.loads(repaired)
else:
parsed = repaired
if isinstance(parsed, dict):
parsed = [parsed]
if not isinstance(parsed, list):
# 在锁内检查,避免并发触发
if not self.should_trigger():
return
for item in parsed:
if not isinstance(item, dict):
continue
chat_stream = get_chat_manager().get_stream(self.chat_id)
if not chat_stream:
return
content = str(item.get("content", "")).strip()
msg_id_value = item.get("msg_id")
if not content:
continue
if contains_bot_self_name(content):
logger.info(f"解析阶段跳过包含机器人昵称/别名的词条: {content}")
continue
msg_id_str = str(msg_id_value or "").strip()
if not msg_id_str:
logger.warning(f"解析jargon失败msg_id缺失content={content}")
continue
msg_index = msg_id_to_index.get(msg_id_str)
if msg_index is None:
logger.warning(f"解析jargon失败msg_id未找到content={content}, msg_id={msg_id_str}")
continue
target_msg = messages[msg_index]
if is_bot_message(target_msg):
logger.info(f"解析阶段跳过引用机器人自身消息的词条: content={content}, msg_id={msg_id_str}")
continue
context_paragraph = build_context_paragraph(messages, msg_index)
if not context_paragraph:
logger.warning(f"解析jargon失败上下文为空content={content}, msg_id={msg_id_str}")
continue
entries.append({"content": content, "raw_content": [context_paragraph]})
cached_entries = self._collect_cached_entries(messages)
if cached_entries:
entries.extend(cached_entries)
except Exception as e:
logger.error(f"解析jargon JSON失败: {e}; 原始: {response}")
return
if not entries:
return
# 去重并合并raw_content按 content 聚合)
merged_entries: OrderedDict[str, Dict[str, List[str]]] = OrderedDict()
for entry in entries:
content_key = entry["content"]
raw_list = entry.get("raw_content", []) or []
if content_key in merged_entries:
merged_entries[content_key]["raw_content"].extend(raw_list)
else:
merged_entries[content_key] = {
"content": content_key,
"raw_content": list(raw_list),
}
uniq_entries = []
for merged_entry in merged_entries.values():
raw_content_list = merged_entry["raw_content"]
if raw_content_list:
merged_entry["raw_content"] = list(dict.fromkeys(raw_content_list))
uniq_entries.append(merged_entry)
saved = 0
updated = 0
for entry in uniq_entries:
content = entry["content"]
raw_content_list = entry["raw_content"] # 已经是列表
try:
# 查询所有content匹配的记录
query = Jargon.select().where(Jargon.content == content)
# 查找匹配的记录
matched_obj = None
for obj in query:
if global_config.jargon.all_global:
# 开启all_global所有content匹配的记录都可以
matched_obj = obj
break
else:
# 关闭all_global需要检查chat_id列表是否包含目标chat_id
chat_id_list = parse_chat_id_list(obj.chat_id)
if chat_id_list_contains(chat_id_list, self.chat_id):
matched_obj = obj
break
if matched_obj:
obj = matched_obj
try:
obj.count = (obj.count or 0) + 1
except Exception:
obj.count = 1
# 合并raw_content列表读取现有列表追加新值去重
existing_raw_content = []
if obj.raw_content:
try:
existing_raw_content = (
json.loads(obj.raw_content) if isinstance(obj.raw_content, str) else obj.raw_content
)
if not isinstance(existing_raw_content, list):
existing_raw_content = [existing_raw_content] if existing_raw_content else []
except (json.JSONDecodeError, TypeError):
existing_raw_content = [obj.raw_content] if obj.raw_content else []
# 合并并去重
merged_list = list(dict.fromkeys(existing_raw_content + raw_content_list))
obj.raw_content = json.dumps(merged_list, ensure_ascii=False)
# 更新chat_id列表增加当前chat_id的计数
chat_id_list = parse_chat_id_list(obj.chat_id)
updated_chat_id_list = update_chat_id_list(chat_id_list, self.chat_id, increment=1)
obj.chat_id = json.dumps(updated_chat_id_list, ensure_ascii=False)
# 开启all_global时确保记录标记为is_global=True
if global_config.jargon.all_global:
obj.is_global = True
# 关闭all_global时保持原有is_global不变不修改
obj.save()
# 检查是否需要推断(达到阈值且超过上次判定值)
if _should_infer_meaning(obj):
# 异步触发推断,不阻塞主流程
# 重新加载对象以确保数据最新
jargon_id = obj.id
asyncio.create_task(self._infer_meaning_by_id(jargon_id))
updated += 1
else:
# 没找到匹配记录,创建新记录
if global_config.jargon.all_global:
# 开启all_global新记录默认为is_global=True
is_global_new = True
else:
# 关闭all_global新记录is_global=False
is_global_new = False
# 使用新格式创建chat_id列表[[chat_id, count]]
chat_id_list = [[self.chat_id, 1]]
chat_id_json = json.dumps(chat_id_list, ensure_ascii=False)
Jargon.create(
content=content,
raw_content=json.dumps(raw_content_list, ensure_ascii=False),
chat_id=chat_id_json,
is_global=is_global_new,
count=1,
)
saved += 1
except Exception as e:
logger.error(f"保存jargon失败: chat_id={self.chat_id}, content={content}, err={e}")
continue
finally:
self._add_to_cache(content)
# 固定输出提取的jargon结果格式化为可读形式只要有提取结果就输出
if uniq_entries:
# 收集所有提取的jargon内容
jargon_list = [entry["content"] for entry in uniq_entries]
jargon_str = ",".join(jargon_list)
# 输出格式化的结果使用logger.info会自动应用jargon模块的颜色
logger.info(f"[{self.stream_name}]疑似黑话: {jargon_str}")
# 更新为本次提取的结束时间,确保不会重复提取相同的消息窗口
# 记录本次提取的时间窗口,避免重复提取
extraction_start_time = self.last_learning_time
extraction_end_time = time.time()
# 立即更新学习时间,防止并发触发
self.last_learning_time = extraction_end_time
if saved or updated:
logger.info(f"jargon写入: 新增 {saved} 条,更新 {updated}chat_id={self.chat_id}")
except Exception as e:
logger.error(f"JargonMiner 运行失败: {e}")
# 拉取学习窗口内的消息
messages = get_raw_msg_by_timestamp_with_chat_inclusive(
chat_id=self.chat_id,
timestamp_start=extraction_start_time,
timestamp_end=extraction_end_time,
limit=20,
)
if not messages:
return
# 按时间排序,确保编号与上下文一致
messages = sorted(messages, key=lambda msg: msg.time or 0)
chat_str, message_id_list = build_readable_messages_with_id(
messages=messages,
replace_bot_name=True,
timestamp_mode="relative",
truncate=False,
show_actions=False,
show_pic=True,
pic_single=True,
)
if not chat_str.strip():
return
msg_id_to_index: Dict[str, int] = {}
for idx, (msg_id, _msg) in enumerate(message_id_list or []):
if not msg_id:
continue
msg_id_to_index[msg_id] = idx
if not msg_id_to_index:
logger.warning("未能生成消息ID映射跳过本次提取")
return
prompt: str = await global_prompt_manager.format_prompt(
"extract_jargon_prompt",
bot_name=global_config.bot.nickname,
chat_str=chat_str,
)
response, _ = await self.llm.generate_response_async(prompt, temperature=0.2)
if not response:
return
if global_config.debug.show_jargon_prompt:
logger.info(f"jargon提取提示词: {prompt}")
logger.info(f"jargon提取结果: {response}")
# 解析为JSON
entries: List[dict] = []
try:
resp = response.strip()
parsed = None
if resp.startswith("[") and resp.endswith("]"):
parsed = json.loads(resp)
else:
repaired = repair_json(resp)
if isinstance(repaired, str):
parsed = json.loads(repaired)
else:
parsed = repaired
if isinstance(parsed, dict):
parsed = [parsed]
if not isinstance(parsed, list):
return
for item in parsed:
if not isinstance(item, dict):
continue
content = str(item.get("content", "")).strip()
msg_id_value = item.get("msg_id")
if not content:
continue
if contains_bot_self_name(content):
logger.info(f"解析阶段跳过包含机器人昵称/别名的词条: {content}")
continue
msg_id_str = str(msg_id_value or "").strip()
if not msg_id_str:
logger.warning(f"解析jargon失败msg_id缺失content={content}")
continue
msg_index = msg_id_to_index.get(msg_id_str)
if msg_index is None:
logger.warning(f"解析jargon失败msg_id未找到content={content}, msg_id={msg_id_str}")
continue
target_msg = messages[msg_index]
if is_bot_message(target_msg):
logger.info(f"解析阶段跳过引用机器人自身消息的词条: content={content}, msg_id={msg_id_str}")
continue
context_paragraph = build_context_paragraph(messages, msg_index)
if not context_paragraph:
logger.warning(f"解析jargon失败上下文为空content={content}, msg_id={msg_id_str}")
continue
entries.append({"content": content, "raw_content": [context_paragraph]})
cached_entries = self._collect_cached_entries(messages)
if cached_entries:
entries.extend(cached_entries)
except Exception as e:
logger.error(f"解析jargon JSON失败: {e}; 原始: {response}")
return
if not entries:
return
# 去重并合并raw_content按 content 聚合)
merged_entries: OrderedDict[str, Dict[str, List[str]]] = OrderedDict()
for entry in entries:
content_key = entry["content"]
raw_list = entry.get("raw_content", []) or []
if content_key in merged_entries:
merged_entries[content_key]["raw_content"].extend(raw_list)
else:
merged_entries[content_key] = {
"content": content_key,
"raw_content": list(raw_list),
}
uniq_entries = []
for merged_entry in merged_entries.values():
raw_content_list = merged_entry["raw_content"]
if raw_content_list:
merged_entry["raw_content"] = list(dict.fromkeys(raw_content_list))
uniq_entries.append(merged_entry)
saved = 0
updated = 0
for entry in uniq_entries:
content = entry["content"]
raw_content_list = entry["raw_content"] # 已经是列表
try:
# 查询所有content匹配的记录
query = Jargon.select().where(Jargon.content == content)
# 查找匹配的记录
matched_obj = None
for obj in query:
if global_config.jargon.all_global:
# 开启all_global所有content匹配的记录都可以
matched_obj = obj
break
else:
# 关闭all_global需要检查chat_id列表是否包含目标chat_id
chat_id_list = parse_chat_id_list(obj.chat_id)
if chat_id_list_contains(chat_id_list, self.chat_id):
matched_obj = obj
break
if matched_obj:
obj = matched_obj
try:
obj.count = (obj.count or 0) + 1
except Exception:
obj.count = 1
# 合并raw_content列表读取现有列表追加新值去重
existing_raw_content = []
if obj.raw_content:
try:
existing_raw_content = (
json.loads(obj.raw_content) if isinstance(obj.raw_content, str) else obj.raw_content
)
if not isinstance(existing_raw_content, list):
existing_raw_content = [existing_raw_content] if existing_raw_content else []
except (json.JSONDecodeError, TypeError):
existing_raw_content = [obj.raw_content] if obj.raw_content else []
# 合并并去重
merged_list = list(dict.fromkeys(existing_raw_content + raw_content_list))
obj.raw_content = json.dumps(merged_list, ensure_ascii=False)
# 更新chat_id列表增加当前chat_id的计数
chat_id_list = parse_chat_id_list(obj.chat_id)
updated_chat_id_list = update_chat_id_list(chat_id_list, self.chat_id, increment=1)
obj.chat_id = json.dumps(updated_chat_id_list, ensure_ascii=False)
# 开启all_global时确保记录标记为is_global=True
if global_config.jargon.all_global:
obj.is_global = True
# 关闭all_global时保持原有is_global不变不修改
obj.save()
# 检查是否需要推断(达到阈值且超过上次判定值)
if _should_infer_meaning(obj):
# 异步触发推断,不阻塞主流程
# 重新加载对象以确保数据最新
jargon_id = obj.id
asyncio.create_task(self._infer_meaning_by_id(jargon_id))
updated += 1
else:
# 没找到匹配记录,创建新记录
if global_config.jargon.all_global:
# 开启all_global新记录默认为is_global=True
is_global_new = True
else:
# 关闭all_global新记录is_global=False
is_global_new = False
# 使用新格式创建chat_id列表[[chat_id, count]]
chat_id_list = [[self.chat_id, 1]]
chat_id_json = json.dumps(chat_id_list, ensure_ascii=False)
Jargon.create(
content=content,
raw_content=json.dumps(raw_content_list, ensure_ascii=False),
chat_id=chat_id_json,
is_global=is_global_new,
count=1,
)
saved += 1
except Exception as e:
logger.error(f"保存jargon失败: chat_id={self.chat_id}, content={content}, err={e}")
continue
finally:
self._add_to_cache(content)
# 固定输出提取的jargon结果格式化为可读形式只要有提取结果就输出
if uniq_entries:
# 收集所有提取的jargon内容
jargon_list = [entry["content"] for entry in uniq_entries]
jargon_str = ",".join(jargon_list)
# 输出格式化的结果使用logger.info会自动应用jargon模块的颜色
logger.info(f"[{self.stream_name}]疑似黑话: {jargon_str}")
if saved or updated:
logger.info(f"jargon写入: 新增 {saved} 条,更新 {updated}chat_id={self.chat_id}")
except Exception as e:
logger.error(f"JargonMiner 运行失败: {e}")
# 即使失败也保持时间戳更新,避免频繁重试
class JargonMinerManager: