feat:添加术语推断和术语理解

This commit is contained in:
SengokuCola
2025-11-07 02:21:51 +08:00
parent 69a6116b2a
commit a8f4863d2f
6 changed files with 686 additions and 28 deletions

View File

@@ -1,5 +1,6 @@
import time
import json
import asyncio
from typing import List
from json_repair import repair_json
@@ -26,20 +27,22 @@ def _init_prompt() -> None:
请从上面这段聊天内容中提取"可能是黑话"的候选项(黑话/俚语/网络缩写/口头禅)。
- 必须为对话中真实出现过的短词或短语
- 必须是你无法理解含义的词语,或者出现频率较高的词语
- 必须是这几种类别之一:英文或中文缩写、中文拼音短语、字母数字混合、意义不明但频繁的词
- 请不要选择有明确含义,或者含义清晰的词
- 必须是这几种类别之一:英文或中文缩写、中文拼音短语、字母数字混合
- 排除:人名、@、明显的表情/图片占位、纯标点、常规功能词(如的、了、呢、啊等)
- 每个词条长度建议 2-8 个字符(不强制),尽量短小
- 合并重复项,去重
分类规则:
- p拼音缩写由字母或字母和汉字构成的疑似拼音简写词例如nb、yyds、xswl
分类规则,type必须根据规则填写
- p拼音缩写由字母或字母和汉字构成的用汉语拼音简写词,或汉语拼音首字母的简写词例如nb、yyds、xswl
- c中文缩写中文词语的缩写用几个汉字概括一个词汇或含义例如社死、内卷
- e英文缩写英文词语的缩写用英文字母概括一个词汇或含义例如CPU、GPU、API
- x谐音梗谐音梗用谐音词概括一个词汇或含义例如好似难崩
以 JSON 数组输出,元素为对象(严格按以下结构):
[
{{"content": "词条", "raw_content": "包含该词条的完整句子", "type": "p"}},
{{"content": "词条2", "raw_content": "包含该词条的完整句子", "type": "c"}}
{{"content": "词条", "raw_content": "包含该词条的完整对话原文", "type": "p"}},
{{"content": "词条2", "raw_content": "包含该词条的完整对话原文", "type": "c"}}
]
现在请输出:
@@ -47,7 +50,107 @@ def _init_prompt() -> None:
Prompt(prompt_str, "extract_jargon_prompt")
def _init_inference_prompts() -> None:
"""初始化含义推断相关的prompt"""
# Prompt 1: 基于raw_content和content推断
prompt1_str = """
**词条内容**
{content}
**词条出现的上下文raw_content**
{raw_content_list}
请根据以上词条内容和上下文,推断这个词条的含义。
- 如果这是一个黑话、俚语或网络用语,请推断其含义和翻译
- 如果含义明确(常规词汇),也请说明
以 JSON 格式输出:
{{
"meaning": "含义说明",
"translation": "翻译或解释"
}}
"""
Prompt(prompt1_str, "jargon_inference_with_context_prompt")
# Prompt 2: 仅基于content推断
prompt2_str = """
**词条内容**
{content}
请仅根据这个词条本身,推断其含义。
- 如果这是一个黑话、俚语或网络用语,请推断其含义和翻译
- 如果含义明确(常规词汇),也请说明
以 JSON 格式输出:
{{
"meaning": "含义说明",
"translation": "翻译或解释"
}}
"""
Prompt(prompt2_str, "jargon_inference_content_only_prompt")
# Prompt 3: 比较两个推断结果
prompt3_str = """
**推断结果1基于上下文**
{inference1}
**推断结果2仅基于词条**
{inference2}
请比较这两个推断结果,判断它们是否相同或类似。
- 如果两个推断结果的"含义"相同或类似,说明这个词条不是黑话(含义明确)
- 如果两个推断结果有差异,说明这个词条可能是黑话(需要上下文才能理解)
以 JSON 格式输出:
{{
"is_similar": true/false,
"reason": "判断理由"
}}
"""
Prompt(prompt3_str, "jargon_compare_inference_prompt")
_init_prompt()
_init_inference_prompts()
def _should_infer_meaning(jargon_obj: Jargon) -> bool:
"""
判断是否需要进行含义推断
在 count 达到 5, 10, 20, 40, 60, 100 时进行推断
并且count必须大于last_inference_count避免重启后重复判定
如果is_complete为True不再进行推断
"""
# 如果已完成所有推断,不再推断
if jargon_obj.is_complete:
return False
count = jargon_obj.count or 0
last_inference = jargon_obj.last_inference_count or 0
# 阈值列表5, 10, 20, 40, 60, 100
thresholds = [5, 10, 20, 40, 60, 100]
if count < thresholds[0]:
return False
# 如果count没有超过上次判定值不需要判定
if count <= last_inference:
return False
# 找到第一个大于last_inference的阈值
next_threshold = None
for threshold in thresholds:
if threshold > last_inference:
next_threshold = threshold
break
# 如果没有找到下一个阈值说明已经超过100不应该再推断
if next_threshold is None:
return False
# 检查count是否达到或超过这个阈值
return count >= next_threshold
class JargonMiner:
@@ -63,6 +166,162 @@ class JargonMiner:
request_type="jargon.extract",
)
async def _infer_meaning_by_id(self, jargon_id: int) -> None:
"""通过ID加载对象并推断"""
try:
jargon_obj = Jargon.get_by_id(jargon_id)
# 再次检查is_complete因为可能在异步任务执行时已被标记为完成
if jargon_obj.is_complete:
logger.debug(f"jargon {jargon_obj.content} 已完成所有推断,跳过")
return
await self.infer_meaning(jargon_obj)
except Exception as e:
logger.error(f"通过ID推断jargon失败: {e}")
async def infer_meaning(self, jargon_obj: Jargon) -> None:
"""
对jargon进行含义推断
"""
try:
content = jargon_obj.content
raw_content_str = jargon_obj.raw_content or ""
# 解析raw_content列表
raw_content_list = []
if raw_content_str:
try:
raw_content_list = json.loads(raw_content_str) if isinstance(raw_content_str, str) else raw_content_str
if not isinstance(raw_content_list, list):
raw_content_list = [raw_content_list] if raw_content_list else []
except (json.JSONDecodeError, TypeError):
raw_content_list = [raw_content_str] if raw_content_str else []
if not raw_content_list:
logger.warning(f"jargon {content} 没有raw_content跳过推断")
return
# 步骤1: 基于raw_content和content推断
raw_content_text = "\n".join(raw_content_list)
prompt1 = await global_prompt_manager.format_prompt(
"jargon_inference_with_context_prompt",
content=content,
raw_content_list=raw_content_text,
)
response1, _ = await self.llm.generate_response_async(prompt1, temperature=0.3)
if not response1:
logger.warning(f"jargon {content} 推断1失败无响应")
return
# 解析推断1结果
inference1 = None
try:
resp1 = response1.strip()
if resp1.startswith("{") and resp1.endswith("}"):
inference1 = json.loads(resp1)
else:
repaired = repair_json(resp1)
inference1 = json.loads(repaired) if isinstance(repaired, str) else repaired
if not isinstance(inference1, dict):
logger.warning(f"jargon {content} 推断1结果格式错误")
return
except Exception as e:
logger.error(f"jargon {content} 推断1解析失败: {e}")
return
# 步骤2: 仅基于content推断
prompt2 = await global_prompt_manager.format_prompt(
"jargon_inference_content_only_prompt",
content=content,
)
response2, _ = await self.llm.generate_response_async(prompt2, temperature=0.3)
if not response2:
logger.warning(f"jargon {content} 推断2失败无响应")
return
# 解析推断2结果
inference2 = None
try:
resp2 = response2.strip()
if resp2.startswith("{") and resp2.endswith("}"):
inference2 = json.loads(resp2)
else:
repaired = repair_json(resp2)
inference2 = json.loads(repaired) if isinstance(repaired, str) else repaired
if not isinstance(inference2, dict):
logger.warning(f"jargon {content} 推断2结果格式错误")
return
except Exception as e:
logger.error(f"jargon {content} 推断2解析失败: {e}")
return
logger.info(f"jargon {content} 推断2提示词: {prompt2}")
logger.info(f"jargon {content} 推断2结果: {response2}")
# logger.info(f"jargon {content} 推断2结果: {inference2}")
logger.info(f"jargon {content} 推断1提示词: {prompt1}")
logger.info(f"jargon {content} 推断1结果: {response1}")
# logger.info(f"jargon {content} 推断1结果: {inference1}")
# 步骤3: 比较两个推断结果
prompt3 = await global_prompt_manager.format_prompt(
"jargon_compare_inference_prompt",
inference1=json.dumps(inference1, ensure_ascii=False),
inference2=json.dumps(inference2, ensure_ascii=False),
)
logger.info(f"jargon {content} 比较提示词: {prompt3}")
response3, _ = await self.llm.generate_response_async(prompt3, temperature=0.3)
if not response3:
logger.warning(f"jargon {content} 比较失败:无响应")
return
# 解析比较结果
comparison = None
try:
resp3 = response3.strip()
if resp3.startswith("{") and resp3.endswith("}"):
comparison = json.loads(resp3)
else:
repaired = repair_json(resp3)
comparison = json.loads(repaired) if isinstance(repaired, str) else repaired
if not isinstance(comparison, dict):
logger.warning(f"jargon {content} 比较结果格式错误")
return
except Exception as e:
logger.error(f"jargon {content} 比较解析失败: {e}")
return
# 判断是否为黑话
is_similar = comparison.get("is_similar", False)
is_jargon = not is_similar # 如果相似,说明不是黑话;如果有差异,说明是黑话
# 更新数据库记录
jargon_obj.is_jargon = is_jargon
if is_jargon:
# 是黑话使用推断1的结果基于上下文更准确
jargon_obj.meaning = inference1.get("meaning", "")
jargon_obj.translation = inference1.get("translation", "")
else:
# 不是黑话也记录含义使用推断2的结果因为含义明确
jargon_obj.meaning = inference2.get("meaning", "")
jargon_obj.translation = inference2.get("translation", "")
# 更新最后一次判定的count值避免重启后重复判定
jargon_obj.last_inference_count = jargon_obj.count or 0
# 如果count>=100标记为完成不再进行推断
if (jargon_obj.count or 0) >= 100:
jargon_obj.is_complete = True
jargon_obj.save()
logger.info(f"jargon {content} 推断完成: is_jargon={is_jargon}, meaning={jargon_obj.meaning}, last_inference_count={jargon_obj.last_inference_count}, is_complete={jargon_obj.is_complete}")
except Exception as e:
logger.error(f"jargon推断失败: {e}")
import traceback
traceback.print_exc()
def should_trigger(self) -> bool:
# 冷却时间检查
if time.time() - self.last_learning_time < self.min_learning_interval:
@@ -85,11 +344,15 @@ class JargonMiner:
if not chat_stream:
return
# 记录本次提取的时间窗口,避免重复提取
extraction_start_time = self.last_learning_time
extraction_end_time = time.time()
# 拉取学习窗口内的消息
messages = get_raw_msg_by_timestamp_with_chat_inclusive(
chat_id=self.chat_id,
timestamp_start=self.last_learning_time,
timestamp_end=time.time(),
timestamp_start=extraction_start_time,
timestamp_end=extraction_end_time,
limit=20,
)
if not messages:
@@ -135,17 +398,27 @@ class JargonMiner:
if not isinstance(item, dict):
continue
content = str(item.get("content", "")).strip()
raw_content = str(item.get("raw_content", "")).strip()
raw_content_value = item.get("raw_content", "")
# 处理raw_content可能是字符串或列表
raw_content_list = []
if isinstance(raw_content_value, list):
raw_content_list = [str(rc).strip() for rc in raw_content_value if str(rc).strip()]
elif isinstance(raw_content_value, str):
raw_content_str = raw_content_value.strip()
if raw_content_str:
raw_content_list = [raw_content_str]
type_str = str(item.get("type", "")).strip().lower()
# 验证type是否为有效值
if type_str not in ["p", "c", "e"]:
type_str = "p" # 默认值
if content:
if content and raw_content_list:
entries.append({
"content": content,
"raw_content": raw_content,
"raw_content": raw_content_list,
"type": type_str
})
except Exception as e:
@@ -167,14 +440,20 @@ class JargonMiner:
saved = 0
updated = 0
merged = 0
for entry in uniq_entries:
content = entry["content"]
raw_content = entry["raw_content"]
raw_content_list = entry["raw_content"] # 已经是列表
type_str = entry["type"]
try:
# 步骤1: 检查同chat_id的记录默认纳入global项目
# 查询条件chat_id匹配 OR (is_global为True且content匹配)
query = (
Jargon.select()
.where((Jargon.chat_id == self.chat_id) & (Jargon.content == content))
.where(
((Jargon.chat_id == self.chat_id) | Jargon.is_global) &
(Jargon.content == content)
)
)
if query.exists():
obj = query.get()
@@ -182,30 +461,134 @@ class JargonMiner:
obj.count = (obj.count or 0) + 1
except Exception:
obj.count = 1
# 更新raw_content和type如果为空或需要更新
if raw_content and not obj.raw_content:
obj.raw_content = raw_content
# 合并raw_content列表:读取现有列表,追加新值,去重
existing_raw_content = []
if obj.raw_content:
try:
existing_raw_content = json.loads(obj.raw_content) if isinstance(obj.raw_content, str) else obj.raw_content
if not isinstance(existing_raw_content, list):
existing_raw_content = [existing_raw_content] if existing_raw_content else []
except (json.JSONDecodeError, TypeError):
existing_raw_content = [obj.raw_content] if obj.raw_content else []
# 合并并去重
merged_list = list(dict.fromkeys(existing_raw_content + raw_content_list))
obj.raw_content = json.dumps(merged_list, ensure_ascii=False)
# 更新type如果为空
if type_str and not obj.type:
obj.type = type_str
obj.save()
# 检查是否需要推断(达到阈值且超过上次判定值)
if _should_infer_meaning(obj):
# 异步触发推断,不阻塞主流程
# 重新加载对象以确保数据最新
jargon_id = obj.id
asyncio.create_task(self._infer_meaning_by_id(jargon_id))
updated += 1
else:
Jargon.create(
content=content,
raw_content=raw_content,
type=type_str,
chat_id=self.chat_id,
is_global=False,
count=1
# 步骤2: 同chat_id没有找到检查所有chat_id中是否有相同content的记录
# 查询所有非global的记录global的已经在步骤1检查过了
all_content_query = (
Jargon.select()
.where(
(Jargon.content == content) &
(~Jargon.is_global)
)
)
saved += 1
all_matching = list(all_content_query)
# 如果找到3个或更多相同content的记录合并它们
if len(all_matching) >= 3:
# 找到3个或更多已有记录合并它们新条目也会被包含在合并中
total_count = sum((obj.count or 0) for obj in all_matching) + 1 # +1 是因为当前新条目
# 合并所有raw_content列表
all_raw_content = []
for obj in all_matching:
if obj.raw_content:
try:
obj_raw = json.loads(obj.raw_content) if isinstance(obj.raw_content, str) else obj.raw_content
if not isinstance(obj_raw, list):
obj_raw = [obj_raw] if obj_raw else []
all_raw_content.extend(obj_raw)
except (json.JSONDecodeError, TypeError):
if obj.raw_content:
all_raw_content.append(obj.raw_content)
# 添加当前新条目的raw_content
all_raw_content.extend(raw_content_list)
# 去重
merged_raw_content = list(dict.fromkeys(all_raw_content))
# 合并type优先使用非空的值
merged_type = type_str
for obj in all_matching:
if obj.type and not merged_type:
merged_type = obj.type
break
# 合并其他字段:优先使用已有值
merged_meaning = None
merged_translation = None
merged_is_jargon = None
merged_last_inference_count = None
merged_is_complete = False
for obj in all_matching:
if obj.meaning and not merged_meaning:
merged_meaning = obj.meaning
if obj.translation and not merged_translation:
merged_translation = obj.translation
if obj.is_jargon is not None and merged_is_jargon is None:
merged_is_jargon = obj.is_jargon
if obj.last_inference_count is not None and merged_last_inference_count is None:
merged_last_inference_count = obj.last_inference_count
if obj.is_complete:
merged_is_complete = True
# 删除旧的记录
for obj in all_matching:
obj.delete_instance()
# 创建新的global记录
Jargon.create(
content=content,
raw_content=json.dumps(merged_raw_content, ensure_ascii=False),
type=merged_type,
chat_id="global",
is_global=True,
count=total_count,
meaning=merged_meaning,
translation=merged_translation,
is_jargon=merged_is_jargon,
last_inference_count=merged_last_inference_count,
is_complete=merged_is_complete
)
merged += 1
logger.info(f"合并jargon为global: content={content}, 合并了{len(all_matching)}条已有记录+1条新记录{len(all_matching)+1}总count={total_count}")
else:
# 找到少于3个已有记录正常创建新记录
Jargon.create(
content=content,
raw_content=json.dumps(raw_content_list, ensure_ascii=False),
type=type_str,
chat_id=self.chat_id,
is_global=False,
count=1
)
saved += 1
except Exception as e:
logger.error(f"保存jargon失败: chat_id={self.chat_id}, content={content}, err={e}")
continue
if saved or updated:
logger.info(f"jargon写入: 新增 {saved} 条,更新 {updated}chat_id={self.chat_id}")
self.last_learning_time = time.time()
if saved or updated or merged:
logger.info(f"jargon写入: 新增 {saved} 条,更新 {updated}合并为global {merged}chat_id={self.chat_id}")
# 更新为本次提取的结束时间,确保不会重复提取相同的消息窗口
self.last_learning_time = extraction_end_time
except Exception as e:
logger.error(f"JargonMiner 运行失败: {e}")