feat:优化黑话附加

This commit is contained in:
SengokuCola
2025-12-02 15:43:46 +08:00
parent 8237f1a4c1
commit 138bd8ec70
3 changed files with 262 additions and 362 deletions

View File

@@ -8,7 +8,8 @@ import json
import re
from datetime import datetime
from typing import Tuple
from difflib import SequenceMatcher
from typing import List
from json_repair import repair_json
from src.common.logger import get_logger
@@ -16,101 +17,56 @@ from src.common.logger import get_logger
logger = get_logger("memory_utils")
def parse_md_json(json_text: str) -> list[str]:
"""从Markdown格式的内容中提取JSON对象和推理内容"""
json_objects = []
reasoning_content = ""
# 使用正则表达式查找```json包裹的JSON内容
json_pattern = r"```json\s*(.*?)\s*```"
matches = re.findall(json_pattern, json_text, re.DOTALL)
# 提取JSON之前的内容作为推理文本
if matches:
# 找到第一个```json的位置
first_json_pos = json_text.find("```json")
if first_json_pos > 0:
reasoning_content = json_text[:first_json_pos].strip()
# 清理推理内容中的注释标记
reasoning_content = re.sub(r"^//\s*", "", reasoning_content, flags=re.MULTILINE)
reasoning_content = reasoning_content.strip()
for match in matches:
try:
# 清理可能的注释和格式问题
json_str = re.sub(r"//.*?\n", "\n", match) # 移除单行注释
json_str = re.sub(r"/\*.*?\*/", "", json_str, flags=re.DOTALL) # 移除多行注释
if json_str := json_str.strip():
json_obj = json.loads(json_str)
if isinstance(json_obj, dict):
json_objects.append(json_obj)
elif isinstance(json_obj, list):
for item in json_obj:
if isinstance(item, dict):
json_objects.append(item)
except Exception as e:
logger.warning(f"解析JSON块失败: {e}, 块内容: {match[:100]}...")
continue
return json_objects, reasoning_content
def calculate_similarity(text1: str, text2: str) -> float:
"""
计算两个文本的相似度
def parse_questions_json(response: str) -> Tuple[List[str], List[str]]:
"""解析问题JSON返回概念列表和问题列表
Args:
text1: 第一个文本
text2: 第二个文本
response: LLM返回的响应
Returns:
float: 相似度分数 (0-1)
Tuple[List[str], List[str]]: (概念列表, 问题列表)
"""
try:
# 预处理文本
text1 = preprocess_text(text1)
text2 = preprocess_text(text2)
# 尝试提取JSON可能包含在```json代码块中
json_pattern = r"```json\s*(.*?)\s*```"
matches = re.findall(json_pattern, response, re.DOTALL)
# 使用SequenceMatcher计算相似度
similarity = SequenceMatcher(None, text1, text2).ratio()
if matches:
json_str = matches[0]
else:
# 尝试直接解析整个响应
json_str = response.strip()
# 如果其中一个文本包含另一个,提高相似度
if text1 in text2 or text2 in text1:
similarity = max(similarity, 0.8)
# 修复可能的JSON错误
repaired_json = repair_json(json_str)
return similarity
# 解析JSON
parsed = json.loads(repaired_json)
# 只支持新格式包含concepts和questions的对象
if not isinstance(parsed, dict):
logger.warning(f"解析的JSON不是对象格式: {parsed}")
return [], []
concepts_raw = parsed.get("concepts", [])
questions_raw = parsed.get("questions", [])
# 确保是列表
if not isinstance(concepts_raw, list):
concepts_raw = []
if not isinstance(questions_raw, list):
questions_raw = []
# 确保所有元素都是字符串
concepts = [c for c in concepts_raw if isinstance(c, str) and c.strip()]
questions = [q for q in questions_raw if isinstance(q, str) and q.strip()]
return concepts, questions
except Exception as e:
logger.error(f"计算相似度时出错: {e}")
return 0.0
def preprocess_text(text: str) -> str:
"""
预处理文本,提高匹配准确性
Args:
text: 原始文本
Returns:
str: 预处理后的文本
"""
try:
# 转换为小写
text = text.lower()
# 移除标点符号和特殊字符
text = re.sub(r"[^\w\s]", "", text)
# 移除多余空格
text = re.sub(r"\s+", " ", text).strip()
return text
except Exception as e:
logger.error(f"预处理文本时出错: {e}")
return text
logger.error(f"解析问题JSON失败: {e}, 响应内容: {response[:200]}...")
return [], []
def parse_datetime_to_timestamp(value: str) -> float:
"""
@@ -140,29 +96,3 @@ def parse_datetime_to_timestamp(value: str) -> float:
except Exception as e:
last_err = e
raise ValueError(f"无法解析时间: {value} ({last_err})")
def parse_time_range(time_range: str) -> Tuple[float, float]:
"""
解析时间范围字符串,返回开始和结束时间戳
Args:
time_range: 时间范围字符串,格式:"YYYY-MM-DD HH:MM:SS - YYYY-MM-DD HH:MM:SS"
Returns:
Tuple[float, float]: (开始时间戳, 结束时间戳)
"""
if " - " not in time_range:
raise ValueError(f"时间范围格式错误,应为 '开始时间 - 结束时间': {time_range}")
parts = time_range.split(" - ", 1)
if len(parts) != 2:
raise ValueError(f"时间范围格式错误: {time_range}")
start_str = parts[0].strip()
end_str = parts[1].strip()
start_timestamp = parse_datetime_to_timestamp(start_str)
end_timestamp = parse_datetime_to_timestamp(end_str)
return start_timestamp, end_timestamp