feat:情绪可开关,默认关
This commit is contained in:
@@ -207,343 +207,6 @@ class Hippocampus:
|
||||
|
||||
|
||||
|
||||
|
||||
def get_memory_from_keyword(self, keyword: str, max_depth: int = 2) -> list:
|
||||
"""从关键词获取相关记忆。
|
||||
|
||||
Args:
|
||||
keyword (str): 关键词
|
||||
max_depth (int, optional): 记忆检索深度,默认为2。1表示只获取直接相关的记忆,2表示获取间接相关的记忆。
|
||||
|
||||
Returns:
|
||||
list: 记忆列表,每个元素是一个元组 (topic, memory_content, similarity)
|
||||
- topic: str, 记忆主题
|
||||
- memory_content: str, 该主题下的完整记忆内容
|
||||
- similarity: float, 与关键词的相似度
|
||||
"""
|
||||
if not keyword:
|
||||
return []
|
||||
|
||||
# 获取所有节点
|
||||
all_nodes = list(self.memory_graph.G.nodes())
|
||||
memories = []
|
||||
|
||||
# 计算关键词的词集合
|
||||
keyword_words = set(jieba.cut(keyword))
|
||||
|
||||
# 遍历所有节点,计算相似度
|
||||
for node in all_nodes:
|
||||
node_words = set(jieba.cut(node))
|
||||
all_words = keyword_words | node_words
|
||||
v1 = [1 if word in keyword_words else 0 for word in all_words]
|
||||
v2 = [1 if word in node_words else 0 for word in all_words]
|
||||
similarity = cosine_similarity(v1, v2)
|
||||
|
||||
# 如果相似度超过阈值,获取该节点的记忆
|
||||
if similarity >= 0.3: # 可以调整这个阈值
|
||||
node_data = self.memory_graph.G.nodes[node]
|
||||
# 直接使用完整的记忆内容
|
||||
if memory_items := node_data.get("memory_items", ""):
|
||||
memories.append((node, memory_items, similarity))
|
||||
|
||||
# 按相似度降序排序
|
||||
memories.sort(key=lambda x: x[2], reverse=True)
|
||||
return memories
|
||||
|
||||
async def get_keywords_from_text(self, text: str) -> Tuple[List[str], List]:
|
||||
"""从文本中提取关键词。
|
||||
|
||||
Args:
|
||||
text (str): 输入文本
|
||||
fast_retrieval (bool, optional): 是否使用快速检索。默认为False。
|
||||
如果为True,使用jieba分词提取关键词,速度更快但可能不够准确。
|
||||
如果为False,使用LLM提取关键词,速度较慢但更准确。
|
||||
"""
|
||||
if not text:
|
||||
return [], []
|
||||
|
||||
# 使用LLM提取关键词 - 根据详细文本长度分布优化topic_num计算
|
||||
text_length = len(text)
|
||||
topic_num: int | list[int] = 0
|
||||
|
||||
keywords_lite = cut_key_words(text)
|
||||
if keywords_lite:
|
||||
logger.debug(f"提取关键词极简版: {keywords_lite}")
|
||||
|
||||
if text_length <= 12:
|
||||
topic_num = [1, 3] # 6-10字符: 1个关键词 (27.18%的文本)
|
||||
elif text_length <= 20:
|
||||
topic_num = [2, 4] # 11-20字符: 2个关键词 (22.76%的文本)
|
||||
elif text_length <= 30:
|
||||
topic_num = [3, 5] # 21-30字符: 3个关键词 (10.33%的文本)
|
||||
elif text_length <= 50:
|
||||
topic_num = [4, 5] # 31-50字符: 4个关键词 (9.79%的文本)
|
||||
else:
|
||||
topic_num = 5 # 51+字符: 5个关键词 (其余长文本)
|
||||
|
||||
topics_response, _ = await self.model_small.generate_response_async(self.find_topic_llm(text, topic_num))
|
||||
|
||||
# 提取关键词
|
||||
keywords = re.findall(r"<([^>]+)>", topics_response)
|
||||
if not keywords:
|
||||
keywords = []
|
||||
else:
|
||||
keywords = [
|
||||
keyword.strip()
|
||||
for keyword in ",".join(keywords).replace(",", ",").replace("、", ",").replace(" ", ",").split(",")
|
||||
if keyword.strip()
|
||||
]
|
||||
|
||||
if keywords:
|
||||
logger.debug(f"提取关键词: {keywords}")
|
||||
|
||||
return keywords, keywords_lite
|
||||
|
||||
async def get_memory_from_topic(
|
||||
self,
|
||||
keywords: list[str],
|
||||
max_memory_num: int = 3,
|
||||
max_memory_length: int = 2,
|
||||
max_depth: int = 3,
|
||||
) -> list:
|
||||
"""从文本中提取关键词并获取相关记忆。
|
||||
|
||||
Args:
|
||||
keywords (list): 输入文本
|
||||
max_memory_num (int, optional): 返回的记忆条目数量上限。默认为3,表示最多返回3条与输入文本相关度最高的记忆。
|
||||
max_memory_length (int, optional): 每个主题最多返回的记忆条目数量。默认为2,表示每个主题最多返回2条相似度最高的记忆。
|
||||
max_depth (int, optional): 记忆检索深度。默认为3。值越大,检索范围越广,可以获取更多间接相关的记忆,但速度会变慢。
|
||||
|
||||
Returns:
|
||||
list: 记忆列表,每个元素是一个元组 (topic, memory_content)
|
||||
- topic: str, 记忆主题
|
||||
- memory_content: str, 该主题下的完整记忆内容
|
||||
"""
|
||||
if not keywords:
|
||||
return []
|
||||
|
||||
logger.info(f"提取的关键词: {', '.join(keywords)}")
|
||||
|
||||
# 过滤掉不存在于记忆图中的关键词
|
||||
valid_keywords = [keyword for keyword in keywords if keyword in self.memory_graph.G]
|
||||
if not valid_keywords:
|
||||
logger.debug("没有找到有效的关键词节点")
|
||||
return []
|
||||
|
||||
logger.debug(f"有效的关键词: {', '.join(valid_keywords)}")
|
||||
|
||||
# 从每个关键词获取记忆
|
||||
activate_map = {} # 存储每个词的累计激活值
|
||||
|
||||
# 对每个关键词进行扩散式检索
|
||||
for keyword in valid_keywords:
|
||||
logger.debug(f"开始以关键词 '{keyword}' 为中心进行扩散检索 (最大深度: {max_depth}):")
|
||||
# 初始化激活值
|
||||
activation_values = {keyword: 1.0}
|
||||
# 记录已访问的节点
|
||||
visited_nodes = {keyword}
|
||||
# 待处理的节点队列,每个元素是(节点, 激活值, 当前深度)
|
||||
nodes_to_process = [(keyword, 1.0, 0)]
|
||||
|
||||
while nodes_to_process:
|
||||
current_node, current_activation, current_depth = nodes_to_process.pop(0)
|
||||
|
||||
# 如果激活值小于0或超过最大深度,停止扩散
|
||||
if current_activation <= 0 or current_depth >= max_depth:
|
||||
continue
|
||||
|
||||
# 获取当前节点的所有邻居
|
||||
neighbors = list(self.memory_graph.G.neighbors(current_node))
|
||||
|
||||
for neighbor in neighbors:
|
||||
if neighbor in visited_nodes:
|
||||
continue
|
||||
|
||||
# 获取连接强度
|
||||
edge_data = self.memory_graph.G[current_node][neighbor]
|
||||
strength = edge_data.get("strength", 1)
|
||||
|
||||
# 计算新的激活值
|
||||
new_activation = current_activation - (1 / strength)
|
||||
|
||||
if new_activation > 0:
|
||||
activation_values[neighbor] = new_activation
|
||||
visited_nodes.add(neighbor)
|
||||
nodes_to_process.append((neighbor, new_activation, current_depth + 1))
|
||||
# logger.debug(
|
||||
# f"节点 '{neighbor}' 被激活,激活值: {new_activation:.2f} (通过 '{current_node}' 连接,强度: {strength}, 深度: {current_depth + 1})"
|
||||
# ) # noqa: E501
|
||||
|
||||
# 更新激活映射
|
||||
for node, activation_value in activation_values.items():
|
||||
if activation_value > 0:
|
||||
if node in activate_map:
|
||||
activate_map[node] += activation_value
|
||||
else:
|
||||
activate_map[node] = activation_value
|
||||
|
||||
# 基于激活值平方的独立概率选择
|
||||
remember_map = {}
|
||||
# logger.info("基于激活值平方的归一化选择:")
|
||||
|
||||
# 计算所有激活值的平方和
|
||||
total_squared_activation = sum(activation**2 for activation in activate_map.values())
|
||||
if total_squared_activation > 0:
|
||||
# 计算归一化的激活值
|
||||
normalized_activations = {
|
||||
node: (activation**2) / total_squared_activation for node, activation in activate_map.items()
|
||||
}
|
||||
|
||||
# 按归一化激活值排序并选择前max_memory_num个
|
||||
sorted_nodes = sorted(normalized_activations.items(), key=lambda x: x[1], reverse=True)[:max_memory_num]
|
||||
|
||||
# 将选中的节点添加到remember_map
|
||||
for node, normalized_activation in sorted_nodes:
|
||||
remember_map[node] = activate_map[node] # 使用原始激活值
|
||||
logger.debug(
|
||||
f"节点 '{node}' (归一化激活值: {normalized_activation:.2f}, 激活值: {activate_map[node]:.2f})"
|
||||
)
|
||||
else:
|
||||
logger.info("没有有效的激活值")
|
||||
|
||||
# 从选中的节点中提取记忆
|
||||
all_memories = []
|
||||
# logger.info("开始从选中的节点中提取记忆:")
|
||||
for node, activation in remember_map.items():
|
||||
logger.debug(f"处理节点 '{node}' (激活值: {activation:.2f}):")
|
||||
node_data = self.memory_graph.G.nodes[node]
|
||||
if memory_items := node_data.get("memory_items", ""):
|
||||
logger.debug("节点包含完整记忆")
|
||||
# 计算记忆与关键词的相似度
|
||||
memory_words = set(jieba.cut(memory_items))
|
||||
text_words = set(keywords)
|
||||
if all_words := memory_words | text_words:
|
||||
# 计算相似度(虽然这里没有使用,但保持逻辑一致性)
|
||||
v1 = [1 if word in memory_words else 0 for word in all_words]
|
||||
v2 = [1 if word in text_words else 0 for word in all_words]
|
||||
_ = cosine_similarity(v1, v2) # 计算但不使用,用_表示
|
||||
|
||||
# 添加完整记忆到结果中
|
||||
all_memories.append((node, memory_items, activation))
|
||||
else:
|
||||
logger.info("节点没有记忆")
|
||||
|
||||
# 去重(基于记忆内容)
|
||||
logger.debug("开始记忆去重:")
|
||||
seen_memories = set()
|
||||
unique_memories = []
|
||||
for topic, memory_items, activation_value in all_memories:
|
||||
# memory_items现在是完整的字符串格式
|
||||
memory = memory_items or ""
|
||||
if memory not in seen_memories:
|
||||
seen_memories.add(memory)
|
||||
unique_memories.append((topic, memory_items, activation_value))
|
||||
logger.debug(f"保留记忆: {memory} (来自节点: {topic}, 激活值: {activation_value:.2f})")
|
||||
else:
|
||||
logger.debug(f"跳过重复记忆: {memory} (来自节点: {topic})")
|
||||
|
||||
# 转换为(关键词, 记忆)格式
|
||||
result = []
|
||||
for topic, memory_items, _ in unique_memories:
|
||||
# memory_items现在是完整的字符串格式
|
||||
memory = memory_items or ""
|
||||
result.append((topic, memory))
|
||||
logger.debug(f"选中记忆: {memory} (来自节点: {topic})")
|
||||
|
||||
return result
|
||||
|
||||
async def get_activate_from_text(
|
||||
self, text: str, max_depth: int = 3, fast_retrieval: bool = False
|
||||
) -> tuple[float, list[str], list[str]]:
|
||||
"""从文本中提取关键词并获取相关记忆。
|
||||
|
||||
Args:
|
||||
text (str): 输入文本
|
||||
max_depth (int, optional): 记忆检索深度。默认为2。
|
||||
fast_retrieval (bool, optional): 是否使用快速检索。默认为False。
|
||||
如果为True,使用jieba分词和TF-IDF提取关键词,速度更快但可能不够准确。
|
||||
如果为False,使用LLM提取关键词,速度较慢但更准确。
|
||||
|
||||
Returns:
|
||||
float: 激活节点数与总节点数的比值
|
||||
list[str]: 有效的关键词
|
||||
"""
|
||||
keywords, keywords_lite = await self.get_keywords_from_text(text)
|
||||
|
||||
# 过滤掉不存在于记忆图中的关键词
|
||||
valid_keywords = [keyword for keyword in keywords if keyword in self.memory_graph.G]
|
||||
if not valid_keywords:
|
||||
# logger.info("没有找到有效的关键词节点")
|
||||
return 0, keywords, keywords_lite
|
||||
|
||||
logger.debug(f"有效的关键词: {', '.join(valid_keywords)}")
|
||||
|
||||
# 从每个关键词获取记忆
|
||||
activate_map = {} # 存储每个词的累计激活值
|
||||
|
||||
# 对每个关键词进行扩散式检索
|
||||
for keyword in valid_keywords:
|
||||
logger.debug(f"开始以关键词 '{keyword}' 为中心进行扩散检索 (最大深度: {max_depth}):")
|
||||
# 初始化激活值
|
||||
activation_values = {keyword: 1.5}
|
||||
# 记录已访问的节点
|
||||
visited_nodes = {keyword}
|
||||
# 待处理的节点队列,每个元素是(节点, 激活值, 当前深度)
|
||||
nodes_to_process = [(keyword, 1.0, 0)]
|
||||
|
||||
while nodes_to_process:
|
||||
current_node, current_activation, current_depth = nodes_to_process.pop(0)
|
||||
|
||||
# 如果激活值小于0或超过最大深度,停止扩散
|
||||
if current_activation <= 0 or current_depth >= max_depth:
|
||||
continue
|
||||
|
||||
# 获取当前节点的所有邻居
|
||||
neighbors = list(self.memory_graph.G.neighbors(current_node))
|
||||
|
||||
for neighbor in neighbors:
|
||||
if neighbor in visited_nodes:
|
||||
continue
|
||||
|
||||
# 获取连接强度
|
||||
edge_data = self.memory_graph.G[current_node][neighbor]
|
||||
strength = edge_data.get("strength", 1)
|
||||
|
||||
# 计算新的激活值
|
||||
new_activation = current_activation - (1 / strength)
|
||||
|
||||
if new_activation > 0:
|
||||
activation_values[neighbor] = new_activation
|
||||
visited_nodes.add(neighbor)
|
||||
nodes_to_process.append((neighbor, new_activation, current_depth + 1))
|
||||
# logger.debug(
|
||||
# f"节点 '{neighbor}' 被激活,激活值: {new_activation:.2f} (通过 '{current_node}' 连接,强度: {strength}, 深度: {current_depth + 1})") # noqa: E501
|
||||
|
||||
# 更新激活映射
|
||||
for node, activation_value in activation_values.items():
|
||||
if activation_value > 0:
|
||||
if node in activate_map:
|
||||
activate_map[node] += activation_value
|
||||
else:
|
||||
activate_map[node] = activation_value
|
||||
|
||||
# 输出激活映射
|
||||
# logger.info("激活映射统计:")
|
||||
# for node, total_activation in sorted(activate_map.items(), key=lambda x: x[1], reverse=True):
|
||||
# logger.info(f"节点 '{node}': 累计激活值 = {total_activation:.2f}")
|
||||
|
||||
# 计算激活节点数与总节点数的比值
|
||||
total_activation = sum(activate_map.values())
|
||||
# logger.debug(f"总激活值: {total_activation:.2f}")
|
||||
total_nodes = len(self.memory_graph.G.nodes())
|
||||
# activated_nodes = len(activate_map)
|
||||
activation_ratio = total_activation / total_nodes if total_nodes > 0 else 0
|
||||
activation_ratio = activation_ratio * 50
|
||||
logger.debug(f"总激活值: {total_activation:.2f}, 总节点数: {total_nodes}, 激活: {activation_ratio}")
|
||||
|
||||
return activation_ratio, keywords, keywords_lite
|
||||
|
||||
|
||||
# 负责海马体与其他部分的交互
|
||||
class EntorhinalCortex:
|
||||
def __init__(self, hippocampus: Hippocampus):
|
||||
@@ -905,11 +568,6 @@ class ParahippocampalGyrus:
|
||||
self.memory_graph = hippocampus.memory_graph
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class HippocampusManager:
|
||||
def __init__(self):
|
||||
self._hippocampus: Hippocampus = None # type: ignore
|
||||
@@ -942,41 +600,6 @@ class HippocampusManager:
|
||||
raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法")
|
||||
return self._hippocampus
|
||||
|
||||
|
||||
async def get_memory_from_topic(
|
||||
self, valid_keywords: list[str], max_memory_num: int = 3, max_memory_length: int = 2, max_depth: int = 3
|
||||
) -> list:
|
||||
"""从文本中获取相关记忆的公共接口"""
|
||||
if not self._initialized:
|
||||
raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法")
|
||||
try:
|
||||
response = await self._hippocampus.get_memory_from_topic(
|
||||
valid_keywords, max_memory_num, max_memory_length, max_depth
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"文本激活记忆失败: {e}")
|
||||
response = []
|
||||
return response
|
||||
|
||||
async def get_activate_from_text(
|
||||
self, text: str, max_depth: int = 3, fast_retrieval: bool = False
|
||||
) -> tuple[float, list[str], list[str]]:
|
||||
"""从文本中获取激活值的公共接口"""
|
||||
if not self._initialized:
|
||||
raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法")
|
||||
try:
|
||||
return await self._hippocampus.get_activate_from_text(text, max_depth, fast_retrieval)
|
||||
except Exception as e:
|
||||
logger.error(f"文本产生激活值失败: {e}")
|
||||
logger.error(traceback.format_exc())
|
||||
return 0.0, [], []
|
||||
|
||||
def get_memory_from_keyword(self, keyword: str, max_depth: int = 2) -> list:
|
||||
"""从关键词获取相关记忆的公共接口"""
|
||||
if not self._initialized:
|
||||
raise RuntimeError("HippocampusManager 尚未初始化,请先调用 initialize 方法")
|
||||
return self._hippocampus.get_memory_from_keyword(keyword, max_depth)
|
||||
|
||||
def get_all_node_names(self) -> list:
|
||||
"""获取所有节点名称的公共接口"""
|
||||
if not self._initialized:
|
||||
|
||||
Reference in New Issue
Block a user