feat:记忆系统重出江湖,移除了即时记忆和定期记忆

This commit is contained in:
SengokuCola
2025-08-27 22:18:22 +08:00
parent 01197cb2b7
commit 6d3e9fd3d4
14 changed files with 481 additions and 486 deletions

View File

@@ -18,6 +18,7 @@ from src.config.config import global_config, model_config
from src.common.data_models.database_data_model import DatabaseMessages
from src.common.database.database_model import GraphNodes, GraphEdges # Peewee Models导入
from src.common.logger import get_logger
from src.chat.utils.utils import cut_key_words
from src.chat.utils.chat_message_builder import (
build_readable_messages,
get_raw_msg_by_timestamp_with_chat_inclusive,
@@ -98,19 +99,23 @@ class MemoryGraph:
current_weight = self.G.nodes[concept].get("weight", 0.0)
self.G.nodes[concept]["weight"] = current_weight + 1.0
logger.debug(f"节点 {concept} 记忆整合成功,权重增加到 {current_weight + 1.0}")
logger.info(f"节点 {concept} 记忆内容已更新:{integrated_memory}")
except Exception as e:
logger.error(f"LLM整合记忆失败: {e}")
# 降级到简单连接
new_memory_str = f"{existing_memory} | {memory}"
self.G.nodes[concept]["memory_items"] = new_memory_str
logger.info(f"节点 {concept} 记忆内容已简单拼接并更新:{new_memory_str}")
else:
new_memory_str = str(memory)
self.G.nodes[concept]["memory_items"] = new_memory_str
logger.info(f"节点 {concept} 记忆内容已直接更新:{new_memory_str}")
else:
self.G.nodes[concept]["memory_items"] = str(memory)
# 如果节点存在但没有memory_items,说明是第一次添加memory,设置created_time
if "created_time" not in self.G.nodes[concept]:
self.G.nodes[concept]["created_time"] = current_time
logger.info(f"节点 {concept} 创建新记忆:{str(memory)}")
# 更新最后修改时间
self.G.nodes[concept]["last_modified"] = current_time
else:
@@ -122,6 +127,7 @@ class MemoryGraph:
created_time=current_time, # 添加创建时间
last_modified=current_time,
) # 添加最后修改时间
logger.info(f"新节点 {concept} 已添加,记忆内容已写入:{str(memory)}")
def get_dot(self, concept):
# 检查节点是否存在于图中
@@ -402,9 +408,7 @@ class Hippocampus:
text_length = len(text)
topic_num: int | list[int] = 0
words = jieba.cut(text)
keywords_lite = [word for word in words if len(word) > 1]
keywords_lite = list(set(keywords_lite))
keywords_lite = cut_key_words(text)
if keywords_lite:
logger.debug(f"提取关键词极简版: {keywords_lite}")
@@ -1159,6 +1163,131 @@ class ParahippocampalGyrus:
return compressed_memory, similar_topics_dict
def get_similar_topics_from_keywords(
self,
keywords: list[str] | str,
top_k: int = 3,
threshold: float = 0.7,
) -> dict[str, list[tuple[str, float]]]:
"""基于输入的关键词,返回每个关键词对应的相似主题列表。
Args:
keywords: 关键词列表或以逗号/空格/顿号分隔的字符串。
top_k: 每个关键词返回的相似主题数量上限。
threshold: 相似度阈值,低于该值的主题将被过滤。
Returns:
dict[str, list[tuple[str, float]]]: {keyword: [(topic, similarity), ...]}
"""
# 规范化输入为列表[str]
if isinstance(keywords, str):
# 支持中英文逗号、顿号、空格分隔
parts = (
keywords.replace("", ",").replace("", ",").replace(" ", ",").strip(", ")
)
keyword_list = [p.strip() for p in parts.split(",") if p.strip()]
else:
keyword_list = [k.strip() for k in keywords if isinstance(k, str) and k.strip()]
if not keyword_list:
return {}
existing_topics = list(self.memory_graph.G.nodes())
result: dict[str, list[tuple[str, float]]] = {}
for kw in keyword_list:
kw_words = set(jieba.cut(kw))
similar_topics: list[tuple[str, float]] = []
for topic in existing_topics:
topic_words = set(jieba.cut(topic))
all_words = kw_words | topic_words
if not all_words:
continue
v1 = [1 if w in kw_words else 0 for w in all_words]
v2 = [1 if w in topic_words else 0 for w in all_words]
sim = cosine_similarity(v1, v2)
if sim >= threshold:
similar_topics.append((topic, sim))
similar_topics.sort(key=lambda x: x[1], reverse=True)
result[kw] = similar_topics[:top_k]
return result
async def add_memory_with_similar(
self,
memory_item: str,
similar_topics_dict: dict[str, list[tuple[str, float]]],
) -> bool:
"""将单条记忆内容与相似主题写入记忆网络并同步数据库。
按 build_memory_for_chat 的方式:为 similar_topics_dict 的每个键作为主题添加节点内容,
并与其相似主题建立连接,连接强度为 int(similarity * 10)。
Args:
memory_item: 记忆内容字符串,将作为每个主题节点的 memory_items。
similar_topics_dict: {topic: [(similar_topic, similarity), ...]}
Returns:
bool: 是否成功执行添加与同步。
"""
try:
if not memory_item or not isinstance(memory_item, str):
return False
if not similar_topics_dict or not isinstance(similar_topics_dict, dict):
return False
current_time = time.time()
# 为每个主题写入节点
for topic, similar_list in similar_topics_dict.items():
if not topic or not isinstance(topic, str):
continue
await self.hippocampus.memory_graph.add_dot(topic, memory_item, self.hippocampus)
# 连接相似主题
if isinstance(similar_list, list):
for item in similar_list:
try:
similar_topic, similarity = item
except Exception:
continue
if not isinstance(similar_topic, str):
continue
if topic == similar_topic:
continue
# 强度按 build_memory_for_chat 的规则
strength = int(max(0.0, float(similarity)) * 10) if similarity is not None else 0
if strength <= 0:
continue
# 确保相似主题节点存在如果没有也可以只建立边networkx会创建节点但需初始化属性
if similar_topic not in self.memory_graph.G:
# 创建一个空的相似主题节点避免悬空边memory_items 为空字符串
self.memory_graph.G.add_node(
similar_topic,
memory_items="",
weight=1.0,
created_time=current_time,
last_modified=current_time,
)
self.memory_graph.G.add_edge(
topic,
similar_topic,
strength=strength,
created_time=current_time,
last_modified=current_time,
)
# 同步数据库
await self.hippocampus.entorhinal_cortex.sync_memory_to_db()
return True
except Exception as e:
logger.error(f"添加记忆节点失败: {e}")
return False
async def operation_forget_topic(self, percentage=0.005):
start_time = time.time()
logger.info("[遗忘] 开始检查数据库...")
@@ -1325,7 +1454,6 @@ class HippocampusManager:
logger.info(f"""
--------------------------------
记忆系统参数配置:
构建频率: {global_config.memory.memory_build_frequency}秒|压缩率: {global_config.memory.memory_compress_rate}
遗忘间隔: {global_config.memory.forget_memory_interval}秒|遗忘比例: {global_config.memory.memory_forget_percentage}|遗忘: {global_config.memory.memory_forget_time}小时之后
记忆图统计信息: 节点数量: {node_count}, 连接数量: {edge_count}
--------------------------------""") # noqa: E501
@@ -1343,61 +1471,6 @@ class HippocampusManager:
raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法")
return await self._hippocampus.parahippocampal_gyrus.operation_forget_topic(percentage)
async def build_memory_for_chat(self, chat_id: str):
"""为指定chat_id构建记忆在heartFC_chat.py中调用"""
if not self._initialized:
raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法")
try:
# 检查是否需要构建记忆
logger.info(f"{chat_id} 构建记忆")
if memory_segment_manager.check_and_build_memory_for_chat(chat_id):
logger.info(f"{chat_id} 构建记忆,需要构建记忆")
messages = memory_segment_manager.get_messages_for_memory_build(chat_id, 50)
build_probability = 0.3 * global_config.memory.memory_build_frequency
if messages and random.random() < build_probability:
logger.info(f"{chat_id} 构建记忆,消息数量: {len(messages)}")
# 调用记忆压缩和构建
(
compressed_memory,
similar_topics_dict,
) = await self._hippocampus.parahippocampal_gyrus.memory_compress(
messages, global_config.memory.memory_compress_rate
)
# 添加记忆节点
current_time = time.time()
for topic, memory in compressed_memory:
await self._hippocampus.memory_graph.add_dot(topic, memory, self._hippocampus)
# 连接相似主题
if topic in similar_topics_dict:
similar_topics = similar_topics_dict[topic]
for similar_topic, similarity in similar_topics:
if topic != similar_topic:
strength = int(similarity * 10)
self._hippocampus.memory_graph.G.add_edge(
topic,
similar_topic,
strength=strength,
created_time=current_time,
last_modified=current_time,
)
# 同步到数据库
await self._hippocampus.entorhinal_cortex.sync_memory_to_db()
logger.info(f"{chat_id} 构建记忆完成")
return True
except Exception as e:
logger.error(f"{chat_id} 构建记忆失败: {e}")
return False
return False
async def get_memory_from_topic(
self, valid_keywords: list[str], max_memory_num: int = 3, max_memory_length: int = 2, max_depth: int = 3
) -> list:
@@ -1441,89 +1514,3 @@ class HippocampusManager:
# 创建全局实例
hippocampus_manager = HippocampusManager()
# 在Hippocampus类中添加新的记忆构建管理器
class MemoryBuilder:
"""记忆构建器
为每个chat_id维护消息缓存和触发机制类似ExpressionLearner
"""
def __init__(self, chat_id: str):
self.chat_id = chat_id
self.last_update_time: float = time.time()
self.last_processed_time: float = 0.0
def should_trigger_memory_build(self) -> bool:
# sourcery skip: assign-if-exp, boolean-if-exp-identity, reintroduce-else
"""检查是否应该触发记忆构建"""
current_time = time.time()
# 检查时间间隔
time_diff = current_time - self.last_update_time
if time_diff < 600 / global_config.memory.memory_build_frequency:
return False
# 检查消息数量
recent_messages = get_raw_msg_by_timestamp_with_chat_inclusive(
chat_id=self.chat_id,
timestamp_start=self.last_update_time,
timestamp_end=current_time,
)
logger.info(f"最近消息数量: {len(recent_messages)},间隔时间: {time_diff}")
if not recent_messages or len(recent_messages) < 30 / global_config.memory.memory_build_frequency:
return False
return True
def get_messages_for_memory_build(self, threshold: int = 25) -> List[DatabaseMessages]:
"""获取用于记忆构建的消息"""
current_time = time.time()
messages = get_raw_msg_by_timestamp_with_chat_inclusive(
chat_id=self.chat_id,
timestamp_start=self.last_update_time,
timestamp_end=current_time,
limit=threshold,
)
if messages:
# 更新最后处理时间
self.last_processed_time = current_time
self.last_update_time = current_time
return messages or []
class MemorySegmentManager:
"""记忆段管理器
管理所有chat_id的MemoryBuilder实例自动检查和触发记忆构建
"""
def __init__(self):
self.builders: Dict[str, MemoryBuilder] = {}
def get_or_create_builder(self, chat_id: str) -> MemoryBuilder:
"""获取或创建指定chat_id的MemoryBuilder"""
if chat_id not in self.builders:
self.builders[chat_id] = MemoryBuilder(chat_id)
return self.builders[chat_id]
def check_and_build_memory_for_chat(self, chat_id: str) -> bool:
"""检查指定chat_id是否需要构建记忆如果需要则返回True"""
builder = self.get_or_create_builder(chat_id)
return builder.should_trigger_memory_build()
def get_messages_for_memory_build(self, chat_id: str, threshold: int = 25) -> List[DatabaseMessages]:
"""获取指定chat_id用于记忆构建的消息"""
if chat_id not in self.builders:
return []
return self.builders[chat_id].get_messages_for_memory_build(threshold)
# 创建全局实例
memory_segment_manager = MemorySegmentManager()