feat:增加了工作记忆

This commit is contained in:
SengokuCola
2025-05-16 16:13:12 +08:00
parent 7f3178c96c
commit 456def4f9c
24 changed files with 2650 additions and 102 deletions

View File

@@ -0,0 +1,172 @@
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.heart_flow.observation.observation import Observation
from src.chat.models.utils_model import LLMRequest
from src.config.config import global_config
import time
import traceback
from src.common.logger_manager import get_logger
from src.individuality.individuality import Individuality
import random
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.chat.utils.json_utils import safe_json_dumps
from src.chat.message_receive.chat_stream import chat_manager
import difflib
from src.chat.person_info.relationship_manager import relationship_manager
from .base_processor import BaseProcessor
from src.chat.focus_chat.info.mind_info import MindInfo
from typing import List, Optional
from src.chat.heart_flow.observation.hfcloop_observation import HFCloopObservation
from typing import Dict
from src.chat.focus_chat.info.info_base import InfoBase
from src.chat.focus_chat.info.self_info import SelfInfo
logger = get_logger("processor")
def init_prompt():
indentify_prompt = """
你的名字是{bot_name},你是一个AI但是你伪装成了一个人类你的人格是{prompt_personality}
你对外的形象是一只橙色的鱼,头上有绿色的树叶,你用的头像也是这个。
{relation_prompt}
{memory_str}
现在是{time_now}你正在上网和qq群里的网友们聊天以下是正在进行的聊天内容
{chat_observe_info}
现在请你根据现有的信息,思考自我认同
1. 你是一个什么样的人,你和群里的人关系如何
2. 思考有没有人提到你,或者图片与你有关
3. 你的自我认同是否有助于你的回答,如果你需要自我相关的信息来帮你参与聊天,请输出,否则请输出十个字以内的简短自我认同
4. 一般情况下不用输出自我认同,只需要输出十几个字的简短自我认同就好,除非有明显需要自我认同的场景
"""
Prompt(indentify_prompt, "indentify_prompt")
class SelfProcessor(BaseProcessor):
log_prefix = "自我认同"
def __init__(self, subheartflow_id: str):
super().__init__()
self.subheartflow_id = subheartflow_id
self.llm_model = LLMRequest(
model=global_config.llm_sub_heartflow,
temperature=global_config.llm_sub_heartflow["temp"],
max_tokens=800,
request_type="self_identify",
)
name = chat_manager.get_stream_name(self.subheartflow_id)
self.log_prefix = f"[{name}] "
async def process_info(
self, observations: Optional[List[Observation]] = None, running_memorys: Optional[List[Dict]] = None, *infos
) -> List[InfoBase]:
"""处理信息对象
Args:
*infos: 可变数量的InfoBase类型的信息对象
Returns:
List[InfoBase]: 处理后的结构化信息列表
"""
self_info_str = await self.self_indentify(observations, running_memorys)
if self_info_str:
self_info = SelfInfo()
self_info.set_self_info(self_info_str)
else:
self_info = None
return None
return [self_info]
async def self_indentify(
self, observations: Optional[List[Observation]] = None, running_memorys: Optional[List[Dict]] = None
):
"""
在回复前进行思考,生成内心想法并收集工具调用结果
参数:
observations: 观察信息
返回:
如果return_prompt为False:
tuple: (current_mind, past_mind) 当前想法和过去的想法列表
如果return_prompt为True:
tuple: (current_mind, past_mind, prompt) 当前想法、过去的想法列表和使用的prompt
"""
memory_str = ""
if running_memorys:
memory_str = "以下是当前在聊天中,你回忆起的记忆:\n"
for running_memory in running_memorys:
memory_str += f"{running_memory['topic']}: {running_memory['content']}\n"
if observations is None:
observations = []
for observation in observations:
if isinstance(observation, ChattingObservation):
# 获取聊天元信息
is_group_chat = observation.is_group_chat
chat_target_info = observation.chat_target_info
chat_target_name = "对方" # 私聊默认名称
if not is_group_chat and chat_target_info:
# 优先使用person_name其次user_nickname最后回退到默认值
chat_target_name = (
chat_target_info.get("person_name") or chat_target_info.get("user_nickname") or chat_target_name
)
# 获取聊天内容
chat_observe_info = observation.get_observe_info()
person_list = observation.person_list
if isinstance(observation, HFCloopObservation):
hfcloop_observe_info = observation.get_observe_info()
individuality = Individuality.get_instance()
personality_block = individuality.get_prompt(x_person=2, level=2)
relation_prompt = ""
for person in person_list:
relation_prompt += await relationship_manager.build_relationship_info(person, is_id=True)
prompt = (await global_prompt_manager.get_prompt_async("indentify_prompt")).format(
bot_name=individuality.name,
prompt_personality=personality_block,
memory_str=memory_str,
relation_prompt=relation_prompt,
time_now=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
chat_observe_info=chat_observe_info,
)
content = ""
try:
content, _ = await self.llm_model.generate_response_async(prompt=prompt)
if not content:
logger.warning(f"{self.log_prefix} LLM返回空结果自我识别失败。")
except Exception as e:
# 处理总体异常
logger.error(f"{self.log_prefix} 执行LLM请求或处理响应时出错: {e}")
logger.error(traceback.format_exc())
content = "自我识别过程中出现错误"
if content == 'None':
content = ""
# 记录初步思考结果
logger.debug(f"{self.log_prefix} 自我识别prompt: \n{prompt}\n")
logger.info(f"{self.log_prefix} 自我识别结果: {content}")
return content
init_prompt()

View File

@@ -4,15 +4,15 @@ from src.config.config import global_config
import time
from src.common.logger_manager import get_logger
from src.individuality.individuality import Individuality
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.tools.tool_use import ToolUser
from src.chat.utils.json_utils import process_llm_tool_calls
from src.chat.person_info.relationship_manager import relationship_manager
from .base_processor import BaseProcessor
from typing import List, Optional, Dict
from src.chat.heart_flow.observation.observation import Observation
from src.chat.heart_flow.observation.working_observation import WorkingObservation
from src.chat.focus_chat.info.structured_info import StructuredInfo
from src.chat.heart_flow.observation.structure_observation import StructureObservation
logger = get_logger("processor")
@@ -24,9 +24,6 @@ def init_prompt():
tool_executor_prompt = """
你是一个专门执行工具的助手。你的名字是{bot_name}。现在是{time_now}
你要在群聊中扮演以下角色:
{prompt_personality}
你当前的额外信息:
{memory_str}
@@ -70,6 +67,8 @@ class ToolProcessor(BaseProcessor):
list: 处理后的结构化信息列表
"""
working_infos = []
if observations:
for observation in observations:
if isinstance(observation, ChattingObservation):
@@ -77,7 +76,7 @@ class ToolProcessor(BaseProcessor):
# 更新WorkingObservation中的结构化信息
for observation in observations:
if isinstance(observation, WorkingObservation):
if isinstance(observation, StructureObservation):
for structured_info in result:
logger.debug(f"{self.log_prefix} 更新WorkingObservation中的结构化信息: {structured_info}")
observation.add_structured_info(structured_info)
@@ -86,8 +85,9 @@ class ToolProcessor(BaseProcessor):
logger.debug(f"{self.log_prefix} 获取更新后WorkingObservation中的结构化信息: {working_infos}")
structured_info = StructuredInfo()
for working_info in working_infos:
structured_info.set_info(working_info.get("type"), working_info.get("content"))
if working_infos:
for working_info in working_infos:
structured_info.set_info(working_info.get("type"), working_info.get("content"))
return [structured_info]
@@ -148,7 +148,7 @@ class ToolProcessor(BaseProcessor):
# chat_target_name=chat_target_name,
is_group_chat=is_group_chat,
# relation_prompt=relation_prompt,
prompt_personality=prompt_personality,
# prompt_personality=prompt_personality,
# mood_info=mood_info,
bot_name=individuality.name,
time_now=time_now,

View File

@@ -0,0 +1,247 @@
from src.chat.heart_flow.observation.chatting_observation import ChattingObservation
from src.chat.heart_flow.observation.observation import Observation
from src.chat.models.utils_model import LLMRequest
from src.config.config import global_config
import time
import traceback
from src.common.logger_manager import get_logger
from src.individuality.individuality import Individuality
import random
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.chat.utils.json_utils import safe_json_dumps
from src.chat.message_receive.chat_stream import chat_manager
import difflib
from src.chat.person_info.relationship_manager import relationship_manager
from .base_processor import BaseProcessor
from src.chat.focus_chat.info.mind_info import MindInfo
from typing import List, Optional
from src.chat.heart_flow.observation.hfcloop_observation import HFCloopObservation
from src.chat.heart_flow.observation.working_observation import WorkingMemoryObservation
from src.chat.focus_chat.working_memory.working_memory import WorkingMemory
from typing import Dict
from src.chat.focus_chat.info.info_base import InfoBase
from json_repair import repair_json
from src.chat.focus_chat.info.workingmemory_info import WorkingMemoryInfo
import asyncio
import json
logger = get_logger("processor")
def init_prompt():
memory_proces_prompt = """
你的名字是{bot_name}
现在是{time_now}你正在上网和qq群里的网友们聊天以下是正在进行的聊天内容
{chat_observe_info}
以下是你已经总结的记忆你可以调取这些记忆来帮助你聊天不要一次调取太多记忆最多调取3个左右记忆
{memory_str}
观察聊天内容和已经总结的记忆,思考是否有新内容需要总结成记忆,如果有,就输出 true否则输出 false
如果当前聊天记录的内容已经被总结千万不要总结新记忆输出false
如果已经总结的记忆包含了当前聊天记录的内容千万不要总结新记忆输出false
如果已经总结的记忆摘要,包含了当前聊天记录的内容千万不要总结新记忆输出false
如果有相近的记忆请合并记忆输出merge_memory格式为[["id1", "id2"], ["id3", "id4"],...]你可以进行多组合并但是每组合并只能有两个记忆id不要输出其他内容
请根据聊天内容选择你需要调取的记忆并考虑是否添加新记忆以JSON格式输出格式如下
```json
{{
"selected_memory_ids": ["id1", "id2", ...],
"new_memory": "true" or "false",
"merge_memory": [["id1", "id2"], ["id3", "id4"],...]
}}
```
"""
Prompt(memory_proces_prompt, "prompt_memory_proces")
class WorkingMemoryProcessor(BaseProcessor):
log_prefix = "工作记忆"
def __init__(self, subheartflow_id: str):
super().__init__()
self.subheartflow_id = subheartflow_id
self.llm_model = LLMRequest(
model=global_config.llm_sub_heartflow,
temperature=global_config.llm_sub_heartflow["temp"],
max_tokens=800,
request_type="working_memory",
)
name = chat_manager.get_stream_name(self.subheartflow_id)
self.log_prefix = f"[{name}] "
async def process_info(
self, observations: Optional[List[Observation]] = None, running_memorys: Optional[List[Dict]] = None, *infos
) -> List[InfoBase]:
"""处理信息对象
Args:
*infos: 可变数量的InfoBase类型的信息对象
Returns:
List[InfoBase]: 处理后的结构化信息列表
"""
working_memory = None
chat_info = ""
try:
for observation in observations:
if isinstance(observation, WorkingMemoryObservation):
working_memory = observation.get_observe_info()
working_memory_obs = observation
if isinstance(observation, ChattingObservation):
chat_info = observation.get_observe_info()
# chat_info_truncate = observation.talking_message_str_truncate
if not working_memory:
logger.warning(f"{self.log_prefix} 没有找到工作记忆对象")
mind_info = MindInfo()
return [mind_info]
except Exception as e:
logger.error(f"{self.log_prefix} 处理观察时出错: {e}")
logger.error(traceback.format_exc())
return []
all_memory = working_memory.get_all_memories()
memory_prompts = []
for memory in all_memory:
memory_content = memory.data
memory_summary = memory.summary
memory_id = memory.id
memory_brief = memory_summary.get("brief")
memory_detailed = memory_summary.get("detailed")
memory_keypoints = memory_summary.get("keypoints")
memory_events = memory_summary.get("events")
memory_single_prompt = f"记忆id:{memory_id},记忆摘要:{memory_brief}\n"
memory_prompts.append(memory_single_prompt)
memory_choose_str = "".join(memory_prompts)
# 使用提示模板进行处理
prompt = (await global_prompt_manager.get_prompt_async("prompt_memory_proces")).format(
bot_name=global_config.BOT_NICKNAME,
time_now=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
chat_observe_info=chat_info,
memory_str=memory_choose_str
)
# 调用LLM处理记忆
content = ""
try:
logger.debug(f"{self.log_prefix} 处理工作记忆的prompt: {prompt}")
content, _ = await self.llm_model.generate_response_async(prompt=prompt)
if not content:
logger.warning(f"{self.log_prefix} LLM返回空结果处理工作记忆失败。")
except Exception as e:
logger.error(f"{self.log_prefix} 执行LLM请求或处理响应时出错: {e}")
logger.error(traceback.format_exc())
# 解析LLM返回的JSON
try:
result = repair_json(content)
if isinstance(result, str):
result = json.loads(result)
if not isinstance(result, dict):
logger.error(f"{self.log_prefix} 解析LLM返回的JSON失败结果不是字典类型: {type(result)}")
return []
selected_memory_ids = result.get("selected_memory_ids", [])
new_memory = result.get("new_memory", "")
merge_memory = result.get("merge_memory", [])
except Exception as e:
logger.error(f"{self.log_prefix} 解析LLM返回的JSON失败: {e}")
logger.error(traceback.format_exc())
return []
logger.debug(f"{self.log_prefix} 解析LLM返回的JSON成功: {result}")
# 根据selected_memory_ids调取记忆
memory_str = ""
if selected_memory_ids:
for memory_id in selected_memory_ids:
memory = await working_memory.retrieve_memory(memory_id)
if memory:
memory_content = memory.data
memory_summary = memory.summary
memory_id = memory.id
memory_brief = memory_summary.get("brief")
memory_detailed = memory_summary.get("detailed")
memory_keypoints = memory_summary.get("keypoints")
memory_events = memory_summary.get("events")
for keypoint in memory_keypoints:
memory_str += f"记忆要点:{keypoint}\n"
for event in memory_events:
memory_str += f"记忆事件:{event}\n"
# memory_str += f"记忆摘要:{memory_detailed}\n"
# memory_str += f"记忆主题:{memory_brief}\n"
working_memory_info = WorkingMemoryInfo()
if memory_str:
working_memory_info.add_working_memory(memory_str)
logger.debug(f"{self.log_prefix} 取得工作记忆: {memory_str}")
else:
logger.warning(f"{self.log_prefix} 没有找到工作记忆")
# 根据聊天内容添加新记忆
if new_memory:
# 使用异步方式添加新记忆,不阻塞主流程
logger.debug(f"{self.log_prefix} {new_memory}新记忆: ")
asyncio.create_task(self.add_memory_async(working_memory, chat_info))
if merge_memory:
for merge_pairs in merge_memory:
memory1 = await working_memory.retrieve_memory(merge_pairs[0])
memory2 = await working_memory.retrieve_memory(merge_pairs[1])
if memory1 and memory2:
memory_str = f"记忆id:{memory1.id},记忆摘要:{memory1.summary.get('brief')}\n"
memory_str += f"记忆id:{memory2.id},记忆摘要:{memory2.summary.get('brief')}\n"
asyncio.create_task(self.merge_memory_async(working_memory, merge_pairs[0], merge_pairs[1]))
return [working_memory_info]
async def add_memory_async(self, working_memory: WorkingMemory, content: str):
"""异步添加记忆,不阻塞主流程
Args:
working_memory: 工作记忆对象
content: 记忆内容
"""
try:
await working_memory.add_memory(content=content, from_source="chat_text")
logger.debug(f"{self.log_prefix} 异步添加新记忆成功: {content[:30]}...")
except Exception as e:
logger.error(f"{self.log_prefix} 异步添加新记忆失败: {e}")
logger.error(traceback.format_exc())
async def merge_memory_async(self, working_memory: WorkingMemory, memory_id1: str, memory_id2: str):
"""异步合并记忆,不阻塞主流程
Args:
working_memory: 工作记忆对象
memory_str: 记忆内容
"""
try:
merged_memory = await working_memory.merge_memory(memory_id1, memory_id2)
logger.debug(f"{self.log_prefix} 异步合并记忆成功: {memory_id1}{memory_id2}...")
logger.debug(f"{self.log_prefix} 合并后的记忆梗概: {merged_memory.summary.get('brief')}")
logger.debug(f"{self.log_prefix} 合并后的记忆详情: {merged_memory.summary.get('detailed')}")
logger.debug(f"{self.log_prefix} 合并后的记忆要点: {merged_memory.summary.get('keypoints')}")
logger.debug(f"{self.log_prefix} 合并后的记忆事件: {merged_memory.summary.get('events')}")
except Exception as e:
logger.error(f"{self.log_prefix} 异步合并记忆失败: {e}")
logger.error(traceback.format_exc())
init_prompt()