better:优化做梦表现

This commit is contained in:
SengokuCola
2025-12-04 20:00:20 +08:00
parent 0399f878f0
commit add29efefc
12 changed files with 893 additions and 380 deletions

View File

@@ -1,58 +1,70 @@
import asyncio import asyncio
import random import random
import time import time
import json
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
from peewee import fn
from src.common.logger import get_logger from src.common.logger import get_logger
from src.config.config import global_config, model_config from src.config.config import global_config, model_config
from src.common.database.database_model import ChatHistory from src.common.database.database_model import ChatHistory, Jargon
from src.chat.utils.prompt_builder import Prompt, global_prompt_manager from src.chat.utils.prompt_builder import Prompt, global_prompt_manager
from src.llm_models.payload_content.message import MessageBuilder, RoleType, Message from src.llm_models.payload_content.message import MessageBuilder, RoleType, Message
from src.plugin_system.apis import llm_api from src.plugin_system.apis import llm_api
from src.llm_models.utils_model import LLMRequest from src.dream.dream_generator import generate_dream_summary
# dream 工具工厂函数
from src.dream.tools.search_chat_history_tool import make_search_chat_history
from src.dream.tools.get_chat_history_detail_tool import make_get_chat_history_detail
from src.dream.tools.delete_chat_history_tool import make_delete_chat_history
from src.dream.tools.create_chat_history_tool import make_create_chat_history
from src.dream.tools.update_chat_history_tool import make_update_chat_history
from src.dream.tools.finish_maintenance_tool import make_finish_maintenance
from src.dream.tools.search_jargon_tool import make_search_jargon
from src.dream.tools.delete_jargon_tool import make_delete_jargon
from src.dream.tools.update_jargon_tool import make_update_jargon
logger = get_logger("dream_agent") logger = get_logger("dream_agent")
# 初始化 utils 模型用于生成梦境总结
_dream_summary_model: Optional[LLMRequest] = None
def _get_dream_summary_model() -> LLMRequest:
"""获取用于生成梦境总结的 utils 模型实例"""
global _dream_summary_model
if _dream_summary_model is None:
_dream_summary_model = LLMRequest(
model_set=model_config.model_task_config.utils,
request_type="dream.summary",
)
return _dream_summary_model
def init_dream_prompts() -> None: def init_dream_prompts() -> None:
"""初始化 dream agent 的提示词""" """初始化 dream agent 的提示词"""
Prompt( Prompt(
""" """
你的名字是{bot_name},你现在处于"梦境维护模式dream agent" 你的名字是{bot_name},你现在处于"梦境维护模式dream agent"
你可以自由地在 ChatHistory 库中探索、整理、合并和删改记录,以帮助自己在未来更好地回忆和理解对话历史。 你可以自由地在 ChatHistory 库中探索、整理、创建和删改记录,以帮助自己在未来更好地回忆和理解对话历史。
本轮要维护的聊天ID{chat_id} 本轮要维护的聊天ID{chat_id}
本轮随机选中的起始记忆 ID{start_memory_id} 本轮随机选中的起始记忆 ID{start_memory_id}
请优先以这条起始记忆为切入点,先理解它的内容与上下文,再决定如何在其附近进行合并、重写或删除等整理操作;如果起始记忆为空,则由你自行选择合适的切入点。 请优先以这条起始记忆为切入点,先理解它的内容与上下文,再决定如何在其附近进行创建新概括、重写或删除等整理操作;如果起始记忆为空,则由你自行选择合适的切入点。
你可以使用的工具包括: 你可以使用的工具包括:
- list_chat_histories浏览该 chat_id 下的历史概括列表 **ChatHistory 维护工具:**
- search_chat_history根据关键词或参与人搜索该 chat_id 下的历史记忆概括列表
- get_chat_history_detail查看某条概括的详细内容 - get_chat_history_detail查看某条概括的详细内容
- merge_chat_histories把多条内容高度相似或应当归为一类的记录合并到一条 - create_chat_history根据整理后的理解创建一条新的 ChatHistory 概括记录(主题、概括、关键词、关键信息等)
- update_chat_history在不改变事实的前提下重写或精炼主题、概括、关键词、关键信息 - update_chat_history在不改变事实的前提下重写或精炼主题、概括、关键词、关键信息
- delete_chat_history删除明显噪声、错误或无意义的记录(要格外谨慎) - delete_chat_history删除明显冗余、噪声、错误或无意义的记录,或者非常有时效性的信息,或者无太多有用信息的日常互动。
- finish_maintenance当你认为当前 chat_id 下的维护工作已经完成,没有更多需要整理的内容时,调用此工具来结束本次运行 你也可以先用 create_chat_history 创建一条新的综合概括,再对旧的冗余记录执行多次 delete_chat_history 来完成“合并”效果。
**Jargon黑话维护工具只读禁止修改**
- search_jargon根据一个或多个关键词搜索Jargon 记录,通常是含义不明确的词条或者特殊的缩写
**通用工具:**
- finish_maintenance当你认为当前维护工作已经完成没有更多需要整理的内容时调用此工具来结束本次运行
**工作目标** **工作目标**
- 发现冗余、重复或高度相似的记录,并进行合并或删除; - 发现冗余、重复或高度相似的记录,并进行合并或删除;
- 发现主题/概括过于含糊、啰嗦或缺少关键信息的记录,进行重写和精简; - 发现主题/概括过于含糊、啰嗦或缺少关键信息的记录,进行重写和精简;
- summary要尽可能保持有用的信息
- 尽量保持信息的真实与可用性,不要凭空捏造事实。 - 尽量保持信息的真实与可用性,不要凭空捏造事实。
**合并准则**
- 你可以新建一个记录,然后删除旧记录来实现合并。
- 如果两个或多个记录的主题相似,内容是对主题不同方面的信息或讨论,且信息量较少,则可以合并为一条记录。
- 如果两个记录冲突,可以根据逻辑保留一个或者进行整合,也可以采取更新的记录,删除旧的记录
**轮次信息** **轮次信息**
- 本次维护最多执行 {max_iterations} - 本次维护最多执行 {max_iterations}
- 每轮开始时,系统会告知你当前是第几轮,还剩多少轮 - 每轮开始时,系统会告知你当前是第几轮,还剩多少轮
@@ -69,23 +81,6 @@ def init_dream_prompts() -> None:
name="dream_react_head_prompt", name="dream_react_head_prompt",
) )
Prompt(
"""
你刚刚完成了一次对聊天记录的记忆整理工作。以下是整理过程的摘要:
整理过程:
{conversation_text}
请将这次整理涉及的相关信息改写为一个富有诗意和想象力的"梦境",请你仅使用具体的记忆的内容,而不是整理过程编写。
要求:
1. 使用第一人称视角
2. 叙述直白,不要复杂修辞,口语化
3. 保持诗意和想象力,自由编写
4. 长度控制在200-800字
5. 用中文输出
请直接输出梦境内容,不要添加其他说明:
""",
name="dream_summary_prompt",
)
class DreamTool: class DreamTool:
@@ -113,11 +108,12 @@ class DreamToolRegistry:
self.tools: Dict[str, DreamTool] = {} self.tools: Dict[str, DreamTool] = {}
def register_tool(self, tool: DreamTool) -> None: def register_tool(self, tool: DreamTool) -> None:
if tool.name in self.tools: """
logger.debug(f"dream 工具 {tool.name} 已存在,跳过重复注册") 注册或更新 dream 工具。
return 注意dream agent 每个 chat_id 会重新初始化工具,这里允许覆盖已有同名工具。
"""
self.tools[tool.name] = tool self.tools[tool.name] = tool
logger.info(f"注册 dream 工具: {tool.name}") logger.info(f"注册/更新 dream 工具: {tool.name}")
def get_tool(self, name: str) -> Optional[DreamTool]: def get_tool(self, name: str) -> Optional[DreamTool]:
return self.tools.get(name) return self.tools.get(name)
@@ -133,211 +129,38 @@ def get_dream_tool_registry() -> DreamToolRegistry:
return _dream_tool_registry return _dream_tool_registry
def init_dream_tools() -> None: def init_dream_tools(chat_id: str) -> None:
"""注册 dream agent 可用的 ChatHistory 相关工具""" """注册 dream agent 可用的 ChatHistory / Jargon 相关工具(限定在当前 chat_id 作用域内)"""
from src.plugin_system.apis import database_api
async def list_chat_histories(chat_id: str, limit: int = 50) -> str:
"""列出某个 chat_id 下的 chat_history 概览"""
try:
logger.info(f"[dream][tool] 调用 list_chat_histories(chat_id={chat_id}, limit={limit})")
query = (
ChatHistory.select()
.where(ChatHistory.chat_id == chat_id)
.order_by(ChatHistory.start_time.asc())
.limit(max(10, min(limit, 200)))
)
records = list(query)
if not records:
msg = f"chat_id={chat_id} 在 ChatHistory 中暂时没有记录。"
logger.info(f"[dream][tool] list_chat_histories 无记录: {msg}")
return msg
lines: List[str] = []
for r in records:
lines.append(
f"ID={r.id} | 时间范围={r.start_time:.0f}-{r.end_time:.0f} | "
f"主题={r.theme or ''} | 被检索次数={r.count} | 被遗忘次数={r.forget_times}"
)
result = "\n".join(lines)
logger.info(
f"[dream][tool] list_chat_histories 返回 {len(records)} 条记录,预览: {result[:200].replace(chr(10), ' ')}"
)
return result
except Exception as e:
logger.error(f"list_chat_histories 失败: {e}")
return f"list_chat_histories 执行失败: {e}"
async def get_chat_history_detail(memory_id: int) -> str:
"""获取单条 chat_history 的完整内容"""
try:
logger.info(f"[dream][tool] 调用 get_chat_history_detail(memory_id={memory_id})")
record = ChatHistory.get_or_none(ChatHistory.id == memory_id)
if not record:
msg = f"未找到 ID={memory_id} 的 ChatHistory 记录。"
logger.info(f"[dream][tool] get_chat_history_detail 未找到记录: {msg}")
return msg
result = (
f"ID={record.id}\n"
f"chat_id={record.chat_id}\n"
f"时间范围={record.start_time}-{record.end_time}\n"
f"主题={record.theme}\n"
f"关键词={record.keywords}\n"
f"参与者={record.participants}\n"
f"概括={record.summary}\n"
f"关键信息={record.key_point}\n"
f"原文:\n{record.original_text}"
)
logger.info(
f"[dream][tool] get_chat_history_detail 成功,原文长度={len(record.original_text or '')},预览: {result[:200].replace(chr(10), ' ')}"
)
return result
except Exception as e:
logger.error(f"get_chat_history_detail 失败: {e}")
return f"get_chat_history_detail 执行失败: {e}"
async def delete_chat_history(memory_id: int) -> str:
"""删除一条 chat_history 记录"""
try:
logger.info(f"[dream][tool] 调用 delete_chat_history(memory_id={memory_id})")
record = ChatHistory.get_or_none(ChatHistory.id == memory_id)
if not record:
msg = f"未找到 ID={memory_id} 的 ChatHistory 记录,无法删除。"
logger.info(f"[dream][tool] delete_chat_history 未找到记录: {msg}")
return msg
rows = ChatHistory.delete().where(ChatHistory.id == memory_id).execute()
msg = f"已删除 ID={memory_id} 的 ChatHistory 记录,受影响行数={rows}"
logger.info(f"[dream][tool] delete_chat_history 完成: {msg}")
return msg
except Exception as e:
logger.error(f"delete_chat_history 失败: {e}")
return f"delete_chat_history 执行失败: {e}"
async def update_chat_history(
memory_id: int,
theme: Optional[str] = None,
summary: Optional[str] = None,
keywords: Optional[str] = None,
key_point: Optional[str] = None,
) -> str:
"""按字段更新 chat_history字符串字段要求 JSON 的字段须传入已序列化的字符串)"""
try:
logger.info(
f"[dream][tool] 调用 update_chat_history(memory_id={memory_id}, "
f"theme={bool(theme)}, summary={bool(summary)}, keywords={bool(keywords)}, key_point={bool(key_point)})"
)
record = ChatHistory.get_or_none(ChatHistory.id == memory_id)
if not record:
msg = f"未找到 ID={memory_id} 的 ChatHistory 记录,无法更新。"
logger.info(f"[dream][tool] update_chat_history 未找到记录: {msg}")
return msg
data: Dict[str, Any] = {}
if theme is not None:
data["theme"] = theme
if summary is not None:
data["summary"] = summary
if keywords is not None:
data["keywords"] = keywords
if key_point is not None:
data["key_point"] = key_point
if not data:
return "未提供任何需要更新的字段。"
await database_api.db_save(ChatHistory, data=data, key_field="id", key_value=memory_id)
msg = f"已更新 ChatHistory 记录 ID={memory_id},更新字段={list(data.keys())}"
logger.info(f"[dream][tool] update_chat_history 完成: {msg}")
return msg
except Exception as e:
logger.error(f"update_chat_history 失败: {e}")
return f"update_chat_history 执行失败: {e}"
async def merge_chat_histories(target_id: int, from_ids: List[int] | str) -> str:
"""将多条 chat_history 合并到 target_id合并文本与统计字段删除 from_ids
from_ids 可以是整数列表,也可以是逗号/空格分隔的字符串,由本函数负责解析。
"""
try:
logger.info(f"[dream][tool] 调用 merge_chat_histories(target_id={target_id}, from_ids={from_ids})")
# 兼容字符串形式的 from_ids方便 LLM 传参
if isinstance(from_ids, str):
raw_parts = [p.strip() for p in from_ids.replace("", ",").replace(" ", ",").split(",") if p.strip()]
parsed_ids: List[int] = []
for part in raw_parts:
try:
parsed_ids.append(int(part))
except ValueError:
continue
from_ids = parsed_ids
if target_id in from_ids:
from_ids = [i for i in from_ids if i != target_id]
if not from_ids:
msg = "from_ids 为空或只包含 target_id本次不执行合并。"
logger.info(f"[dream][tool] merge_chat_histories 参数不足: {msg}")
return msg
target = ChatHistory.get_or_none(ChatHistory.id == target_id)
if not target:
msg = f"未找到合并目标 ID={target_id} 的记录。"
logger.info(f"[dream][tool] merge_chat_histories 失败: {msg}")
return msg
others = list(ChatHistory.select().where(ChatHistory.id.in_(from_ids)))
if not others:
msg = f"未找到需要合并的来源记录: {from_ids}"
logger.info(f"[dream][tool] merge_chat_histories 无来源记录: {msg}")
return msg
# 合并原文与统计字段(简单拼接)
original_parts = [target.original_text or ""]
for r in others:
original_parts.append(r.original_text or "")
target.original_text = "\n\n--- 合并分隔线 ---\n\n".join(p for p in original_parts if p)
target.count = (target.count or 0) + sum(r.count or 0 for r in others)
target.forget_times = (target.forget_times or 0) + sum(r.forget_times or 0 for r in others)
target.save()
deleted = ChatHistory.delete().where(ChatHistory.id.in_(from_ids)).execute()
msg = (
f"已将 ChatHistory {from_ids} 合并到 {target_id}"
f"合并后原文长度={len(target.original_text or '')},删除记录数={deleted}"
)
logger.info(f"[dream][tool] merge_chat_histories 完成: {msg}")
return msg
except Exception as e:
logger.error(f"merge_chat_histories 失败: {e}")
return f"merge_chat_histories 执行失败: {e}"
async def finish_maintenance(reason: Optional[str] = None) -> str:
"""结束本次 dream 维护任务。当你认为当前 chat_id 下的维护工作已经完成,没有更多需要整理的内容时,调用此工具来结束本次运行。"""
reason_text = f",原因:{reason}" if reason else ""
msg = f"DREAM_MAINTENANCE_COMPLETE{reason_text}"
logger.info(f"[dream][tool] 调用 finish_maintenance结束本次维护{reason_text}")
return msg
# 参数元组格式与 BaseTool 一致: (name, ToolParamType, desc, required, enum_values)
from src.llm_models.payload_content.tool_option import ToolParamType from src.llm_models.payload_content.tool_option import ToolParamType
# 通过工厂函数生成绑定当前 chat_id 的工具实现
search_chat_history = make_search_chat_history(chat_id)
get_chat_history_detail = make_get_chat_history_detail(chat_id)
delete_chat_history = make_delete_chat_history(chat_id)
create_chat_history = make_create_chat_history(chat_id)
update_chat_history = make_update_chat_history(chat_id)
finish_maintenance = make_finish_maintenance(chat_id)
search_jargon = make_search_jargon(chat_id)
delete_jargon = make_delete_jargon(chat_id)
update_jargon = make_update_jargon(chat_id)
_dream_tool_registry.register_tool( _dream_tool_registry.register_tool(
DreamTool( DreamTool(
"list_chat_histories", "search_chat_history",
"列出某个 chat_id 下的 ChatHistory 概览,便于全局理解该聊天的历史记忆。", "根据关键词或参与人查询当前 chat_id 下的 ChatHistory 概览,便于快速定位相关记忆。",
[ [
("chat_id", ToolParamType.STRING, "需要浏览的 chat_id", True, None), ("keyword", ToolParamType.STRING, "关键词(可选,支持多个关键词,可用空格、逗号等分隔)", False, None),
("limit", ToolParamType.INTEGER, "最多返回多少条记录,默认 50最大 200", False, None), ("participant", ToolParamType.STRING, "参与人昵称(可选)", False, None),
], ],
list_chat_histories, search_chat_history,
) )
) )
_dream_tool_registry.register_tool( _dream_tool_registry.register_tool(
DreamTool( DreamTool(
"get_chat_history_detail", "get_chat_history_detail",
"根据 memory_id 获取单条 ChatHistory 的详细内容,包含原文与所有字段", "根据 memory_id 获取单条 ChatHistory 的详细内容,包含主题、概括、关键词、关键信息等字段(不包含原文)",
[ [
("memory_id", ToolParamType.INTEGER, "ChatHistory 主键 ID。", True, None), ("memory_id", ToolParamType.INTEGER, "ChatHistory 主键 ID。", True, None),
], ],
@@ -373,19 +196,17 @@ def init_dream_tools() -> None:
_dream_tool_registry.register_tool( _dream_tool_registry.register_tool(
DreamTool( DreamTool(
"merge_chat_histories", "create_chat_history",
"将多条 ChatHistory 记录合并到一条中,可用于去重或把同一主题的多段记录整合在一起", "根据整理后的理解创建一条新的 ChatHistory 概括记录(主题、概括、关键词、关键信息等)",
[ [
("target_id", ToolParamType.INTEGER, "合并的目标记录 ID", True, None), ("theme", ToolParamType.STRING, "新的主题标题(必填)", True, None),
( ("summary", ToolParamType.STRING, "新的概括内容(必填)。", True, None),
"from_ids", ("keywords", ToolParamType.STRING, "新的关键词 JSON 字符串,如 ['关键词1','关键词2'](必填)。", True, None),
ToolParamType.STRING, ("key_point", ToolParamType.STRING, "新的关键信息 JSON 字符串,如 ['要点1','要点2'](必填)。", True, None),
"需要被合并并删除的来源记录 ID 列表,可以是用逗号/空格分隔的字符串,如 '12, 13, 15'", ("start_time", ToolParamType.STRING, "起始时间戳Unix 时间,必填)。", True, None),
True, ("end_time", ToolParamType.STRING, "结束时间戳Unix 时间,必填)。", True, None),
None,
),
], ],
merge_chat_histories, create_chat_history,
) )
) )
@@ -400,6 +221,19 @@ def init_dream_tools() -> None:
) )
) )
# ==================== Jargon 维护工具 ====================
# 注册 Jargon 工具
_dream_tool_registry.register_tool(
DreamTool(
"search_jargon",
"根据一个或多个关键词搜索当前 chat_id 相关的 Jargon 记录概览(只包含 is_jargon=True含全局 Jargon便于快速理解黑话库。",
[
("keyword", ToolParamType.STRING, "按一个或多个关键词搜索内容/含义/推断结果(必填)。", True, None),
],
search_jargon,
)
)
async def run_dream_agent_once( async def run_dream_agent_once(
chat_id: str, chat_id: str,
@@ -416,8 +250,8 @@ async def run_dream_agent_once(
start_ts = time.time() start_ts = time.time()
logger.info(f"[dream] 开始对 chat_id={chat_id} 进行 dream 维护,最多迭代 {max_iterations}") logger.info(f"[dream] 开始对 chat_id={chat_id} 进行 dream 维护,最多迭代 {max_iterations}")
# 初始化工具 # 初始化工具(作用域限定在当前 chat_id
init_dream_tools() init_dream_tools(chat_id)
tool_registry = get_dream_tool_registry() tool_registry = get_dream_tool_registry()
tool_defs = tool_registry.get_tool_definitions() tool_defs = tool_registry.get_tool_definitions()
@@ -436,6 +270,53 @@ async def run_dream_agent_once(
conversation_messages: List[Message] = [] conversation_messages: List[Message] = []
# 如果提供了起始记忆 ID则在对话正式开始前先把这条记忆的详细信息放入上下文
# 避免 LLM 还需要额外调用一次 get_chat_history_detail 才能看到起始记忆内容。
if start_memory_id is not None:
try:
record = ChatHistory.get_or_none(ChatHistory.id == start_memory_id)
if record:
start_time_str = (
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(record.start_time))
if record.start_time
else "未知"
)
end_time_str = (
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(record.end_time))
if record.end_time
else "未知"
)
detail_text = (
f"ID={record.id}\n"
f"chat_id={record.chat_id}\n"
f"时间范围={start_time_str}{end_time_str}\n"
f"主题={record.theme or ''}\n"
f"关键词={record.keywords or ''}\n"
f"参与者={record.participants or ''}\n"
f"概括={record.summary or ''}\n"
f"关键信息={record.key_point or ''}"
)
logger.debug(
f"[dream] 预加载起始记忆详情 memory_id={start_memory_id}"
f"预览: {detail_text[:200].replace(chr(10), ' ')}"
)
start_detail_builder = MessageBuilder()
start_detail_builder.set_role(RoleType.User)
start_detail_builder.add_text_content(
"【起始记忆详情】以下是本轮随机/指定的起始记忆的详细信息,供你在整理时优先参考:\n\n"
+ detail_text
)
conversation_messages.append(start_detail_builder.build())
else:
logger.warning(
f"[dream] 提供的 start_memory_id={start_memory_id} 未找到对应 ChatHistory 记录,"
"将不预加载起始记忆详情。"
)
except Exception as e:
logger.error(f"[dream] 预加载起始记忆详情失败 start_memory_id={start_memory_id}: {e}")
# 注意message_factory 必须是同步函数,返回消息列表(不能是 async/coroutine # 注意message_factory 必须是同步函数,返回消息列表(不能是 async/coroutine
def message_factory( def message_factory(
_client, _client,
@@ -475,10 +356,10 @@ async def run_dream_agent_once(
logger.error(f"[dream] 第 {iteration} 轮 LLM 调用失败: {response}") logger.error(f"[dream] 第 {iteration} 轮 LLM 调用失败: {response}")
break break
# 先输出「思考」内容,再输出工具调用信息 # 先输出「思考」内容,再输出工具调用信息(思考文本较长,仅在 debug 下输出)
thought_log = reasoning_content or (response[:300] if response else "") thought_log = reasoning_content or (response[:300] if response else "")
if thought_log: if thought_log:
logger.info(f"[dream] 第 {iteration} 轮思考内容: {thought_log}") logger.debug(f"[dream] 第 {iteration} 轮思考内容: {thought_log}")
logger.info( logger.info(
f"[dream] 第 {iteration} 轮响应,模型={model_name},工具调用数={len(tool_calls) if tool_calls else 0}" f"[dream] 第 {iteration} 轮响应,模型={model_name},工具调用数={len(tool_calls) if tool_calls else 0}"
@@ -524,7 +405,7 @@ async def run_dream_agent_once(
async def _run_single(t: DreamTool, p: Dict[str, Any], call_id: str, it: int): async def _run_single(t: DreamTool, p: Dict[str, Any], call_id: str, it: int):
try: try:
result = await t.execute(**p) result = await t.execute(**p)
logger.info(f"[dream] 第 {it} 轮 工具 {t.name} 执行完成") logger.debug(f"[dream] 第 {it} 轮 工具 {t.name} 执行完成")
return call_id, result return call_id, result
except Exception as e: except Exception as e:
logger.error(f"[dream] 工具 {t.name} 执行失败: {e}") logger.error(f"[dream] 工具 {t.name} 执行失败: {e}")
@@ -554,137 +435,32 @@ async def run_dream_agent_once(
logger.info(f"[dream] 对 chat_id={chat_id} 的 dream 维护结束,共迭代 {iteration} 轮,耗时 {cost:.1f}") logger.info(f"[dream] 对 chat_id={chat_id} 的 dream 维护结束,共迭代 {iteration} 轮,耗时 {cost:.1f}")
# 生成梦境总结 # 生成梦境总结
await _generate_dream_summary(chat_id, conversation_messages, iteration, cost) await generate_dream_summary(chat_id, conversation_messages, iteration, cost)
async def _generate_dream_summary(
chat_id: str,
conversation_messages: List[Message],
total_iterations: int,
time_cost: float,
) -> None:
"""生成梦境总结并输出到日志"""
try:
import json
# 第一步:建立工具调用结果映射 (call_id -> result)
tool_results_map: Dict[str, str] = {}
for msg in conversation_messages:
if msg.role == RoleType.Tool and msg.tool_call_id:
content = ""
if msg.content:
if isinstance(msg.content, list) and msg.content:
content = msg.content[0].text if hasattr(msg.content[0], "text") else str(msg.content[0])
else:
content = str(msg.content)
tool_results_map[msg.tool_call_id] = content
# 第二步:详细记录所有工具调用操作和结果到日志
tool_call_count = 0
logger.info(f"[dream][工具调用详情] 开始记录 chat_id={chat_id} 的所有工具调用操作:")
for msg in conversation_messages:
if msg.role == RoleType.Assistant and msg.tool_calls:
tool_call_count += 1
# 提取思考内容
thought_content = ""
if msg.content:
if isinstance(msg.content, list) and msg.content:
thought_content = msg.content[0].text if hasattr(msg.content[0], "text") else str(msg.content[0])
else:
thought_content = str(msg.content)
logger.info(f"[dream][工具调用详情] === 第 {tool_call_count} 组工具调用 ===")
if thought_content:
logger.info(f"[dream][工具调用详情] 思考内容:{thought_content[:500]}{'...' if len(thought_content) > 500 else ''}")
# 记录每个工具调用的详细信息
for idx, tool_call in enumerate(msg.tool_calls, 1):
tool_name = tool_call.func_name
tool_args = tool_call.args or {}
tool_call_id = tool_call.call_id
tool_result = tool_results_map.get(tool_call_id, "未找到执行结果")
# 格式化参数
try:
args_str = json.dumps(tool_args, ensure_ascii=False, indent=2) if tool_args else "无参数"
except Exception:
args_str = str(tool_args)
logger.info(f"[dream][工具调用详情] --- 工具 {idx}: {tool_name} ---")
logger.info(f"[dream][工具调用详情] 调用参数:\n{args_str}")
logger.info(f"[dream][工具调用详情] 执行结果:\n{tool_result}")
logger.info(f"[dream][工具调用详情] {'-' * 60}")
logger.info(f"[dream][工具调用详情] 共记录了 {tool_call_count} 组工具调用操作")
# 第三步:构建对话历史摘要(用于生成梦境)
conversation_summary = []
for msg in conversation_messages:
role = msg.role.value if hasattr(msg.role, "value") else str(msg.role)
content = ""
if msg.content:
content = msg.content[0].text if isinstance(msg.content, list) and msg.content else str(msg.content)
if role == "user" and "轮次信息" in content:
# 跳过轮次信息消息
continue
if role == "assistant":
# 只保留思考内容,简化工具调用信息
if content:
# 截取前500字符避免过长
content_preview = content[:500] + ("..." if len(content) > 500 else "")
conversation_summary.append(f"[{role}] {content_preview}")
elif role == "tool":
# 工具结果,只保留关键信息
if content:
# 截取前300字符
content_preview = content[:300] + ("..." if len(content) > 300 else "")
conversation_summary.append(f"[工具执行] {content_preview}")
conversation_text = "\n".join(conversation_summary[-20:]) # 只保留最后20条消息
# 使用 Prompt 管理器格式化梦境生成 prompt
dream_prompt = await global_prompt_manager.format_prompt(
"dream_summary_prompt",
chat_id=chat_id,
total_iterations=total_iterations,
time_cost=time_cost,
conversation_text=conversation_text,
)
# 调用 utils 模型生成梦境
summary_model = _get_dream_summary_model()
dream_content, (reasoning, model_name, _) = await summary_model.generate_response_async(
dream_prompt,
max_tokens=512,
temperature=0.8,
)
if dream_content:
logger.info(f"[dream][梦境总结] 对 chat_id={chat_id} 的整理过程梦境:\n{dream_content}")
else:
logger.warning("[dream][梦境总结] 未能生成梦境总结")
except Exception as e:
logger.error(f"[dream][梦境总结] 生成梦境总结失败: {e}", exc_info=True)
def _pick_random_chat_id() -> Optional[str]: def _pick_random_chat_id() -> Optional[str]:
"""从 ChatHistory 中随机选择一个 chat_id用于 dream agent 本次维护""" """从 ChatHistory 中随机选择一个 chat_id用于 dream agent 本次维护
规则:
- 只在 chat_id 所属的 ChatHistory 记录数 >= 10 时才会参与随机选择;
- 记录数不足 10 的 chat_id 将被跳过,不会触发做梦 react。
"""
try: try:
# 统计每个 chat_id 的记录数,只保留记录数 >= 10 的 chat_id
rows = ( rows = (
ChatHistory.select(ChatHistory.chat_id) ChatHistory.select(ChatHistory.chat_id, fn.COUNT(ChatHistory.id).alias("cnt"))
.distinct() .group_by(ChatHistory.chat_id)
.having(fn.COUNT(ChatHistory.id) >= 10)
.order_by(ChatHistory.chat_id) .order_by(ChatHistory.chat_id)
.limit(200) .limit(200)
) )
ids = [r.chat_id for r in rows] eligible_ids = [r.chat_id for r in rows]
if not ids: if not eligible_ids:
logger.warning("[dream] ChatHistory 中暂无可用 chat_id本轮 dream 任务跳过。") logger.warning("[dream] ChatHistory 中暂无满足条件(记录数 >= 10 chat_id本轮 dream 任务跳过。")
return None return None
return random.choice(ids) chosen = random.choice(eligible_ids)
logger.info(f"[dream] 从 {len(eligible_ids)} 个满足条件的 chat_id 中随机选择:{chosen}")
return chosen
except Exception as e: except Exception as e:
logger.error(f"[dream] 随机选择 chat_id 失败: {e}") logger.error(f"[dream] 随机选择 chat_id 失败: {e}")
return None return None
@@ -731,16 +507,19 @@ async def run_dream_cycle_once() -> None:
async def start_dream_scheduler( async def start_dream_scheduler(
first_delay_seconds: int = 60, first_delay_seconds: Optional[int] = None,
interval_seconds: Optional[int] = None, interval_seconds: Optional[int] = None,
stop_event: Optional[asyncio.Event] = None, stop_event: Optional[asyncio.Event] = None,
) -> None: ) -> None:
""" """
dream 调度器: dream 调度器:
- 程序启动后先等待 first_delay_seconds默认 60s - 程序启动后先等待 first_delay_seconds如果为 None则使用配置文件中的值默认 60s
- 然后每隔 interval_seconds如果为 None则使用配置文件中的值默认 30 分钟)运行一次 dream agent 周期 - 然后每隔 interval_seconds如果为 None则使用配置文件中的值默认 30 分钟)运行一次 dream agent 周期
- 如果提供 stop_event则在 stop_event 被 set() 后优雅退出循环 - 如果提供 stop_event则在 stop_event 被 set() 后优雅退出循环
""" """
if first_delay_seconds is None:
first_delay_seconds = global_config.dream.first_delay_seconds
if interval_seconds is None: if interval_seconds is None:
interval_seconds = global_config.dream.interval_minutes * 60 interval_seconds = global_config.dream.interval_minutes * 60

View File

@@ -0,0 +1,198 @@
import random
from typing import List, Optional
from src.common.logger import get_logger
from src.config.config import model_config
from src.chat.utils.prompt_builder import Prompt
from src.llm_models.payload_content.message import RoleType, Message
from src.llm_models.utils_model import LLMRequest
logger = get_logger("dream_generator")
# 初始化 utils 模型用于生成梦境总结
_dream_summary_model: Optional[LLMRequest] = None
# 梦境风格列表21种
DREAM_STYLES = [
"保持诗意和想象力,自由编写",
"诗意朦胧,如薄雾笼罩的清晨",
"奇幻冒险,充满未知与探索",
"温暖怀旧,带着时光的痕迹",
"神秘悬疑,暗藏深意",
"浪漫唯美,如诗如画",
"科幻未来,科技与想象交织",
"自然清新,如山林间的微风",
"深沉哲思,引人深思",
"轻松幽默,充满趣味",
"悲伤忧郁,带着淡淡哀愁",
"激昂热烈,充满活力",
"宁静平和,如湖面般平静",
"荒诞离奇,打破常规",
"细腻温柔,如春风拂面",
"壮阔宏大,气势磅礴",
"简约纯粹,返璞归真",
"复杂多变,层次丰富",
"梦幻迷离,虚实难辨",
"现实写意,贴近生活",
"抽象概念,超越具象",
]
def get_random_dream_styles(count: int = 2) -> List[str]:
"""从梦境风格列表中随机选择指定数量的风格"""
return random.sample(DREAM_STYLES, min(count, len(DREAM_STYLES)))
def get_dream_summary_model() -> LLMRequest:
"""获取用于生成梦境总结的 utils 模型实例"""
global _dream_summary_model
if _dream_summary_model is None:
_dream_summary_model = LLMRequest(
model_set=model_config.model_task_config.utils,
request_type="dream.summary",
)
return _dream_summary_model
def init_dream_summary_prompt() -> None:
"""初始化梦境总结的提示词"""
Prompt(
"""
你刚刚完成了一次对聊天记录的记忆整理工作。以下是整理过程的摘要:
整理过程:
{conversation_text}
请将这次整理涉及的相关信息改写为一个富有诗意和想象力的"梦境",请你仅使用具体的记忆的内容,而不是整理过程编写。
要求:
1. 使用第一人称视角
2. 叙述直白,不要复杂修辞,口语化
3. 长度控制在200-800字
4. 用中文输出
梦境风格:
{dream_styles}
请直接输出梦境内容,不要添加其他说明:
""",
name="dream_summary_prompt",
)
async def generate_dream_summary(
chat_id: str,
conversation_messages: List[Message],
total_iterations: int,
time_cost: float,
) -> None:
"""生成梦境总结并输出到日志"""
try:
import json
from src.chat.utils.prompt_builder import global_prompt_manager
# 第一步:建立工具调用结果映射 (call_id -> result)
tool_results_map: dict[str, str] = {}
for msg in conversation_messages:
if msg.role == RoleType.Tool and msg.tool_call_id:
content = ""
if msg.content:
if isinstance(msg.content, list) and msg.content:
content = msg.content[0].text if hasattr(msg.content[0], "text") else str(msg.content[0])
else:
content = str(msg.content)
tool_results_map[msg.tool_call_id] = content
# 第二步:详细记录所有工具调用操作和结果到日志
tool_call_count = 0
logger.info(f"[dream][工具调用详情] 开始记录 chat_id={chat_id} 的所有工具调用操作:")
for msg in conversation_messages:
if msg.role == RoleType.Assistant and msg.tool_calls:
tool_call_count += 1
# 提取思考内容
thought_content = ""
if msg.content:
if isinstance(msg.content, list) and msg.content:
thought_content = msg.content[0].text if hasattr(msg.content[0], "text") else str(msg.content[0])
else:
thought_content = str(msg.content)
logger.info(f"[dream][工具调用详情] === 第 {tool_call_count} 组工具调用 ===")
if thought_content:
logger.info(f"[dream][工具调用详情] 思考内容:{thought_content[:500]}{'...' if len(thought_content) > 500 else ''}")
# 记录每个工具调用的详细信息
for idx, tool_call in enumerate(msg.tool_calls, 1):
tool_name = tool_call.func_name
tool_args = tool_call.args or {}
tool_call_id = tool_call.call_id
tool_result = tool_results_map.get(tool_call_id, "未找到执行结果")
# 格式化参数
try:
args_str = json.dumps(tool_args, ensure_ascii=False, indent=2) if tool_args else "无参数"
except Exception:
args_str = str(tool_args)
logger.info(f"[dream][工具调用详情] --- 工具 {idx}: {tool_name} ---")
logger.info(f"[dream][工具调用详情] 调用参数:\n{args_str}")
logger.info(f"[dream][工具调用详情] 执行结果:\n{tool_result}")
logger.info(f"[dream][工具调用详情] {'-' * 60}")
logger.info(f"[dream][工具调用详情] 共记录了 {tool_call_count} 组工具调用操作")
# 第三步:构建对话历史摘要(用于生成梦境)
conversation_summary = []
for msg in conversation_messages:
role = msg.role.value if hasattr(msg.role, "value") else str(msg.role)
content = ""
if msg.content:
content = msg.content[0].text if isinstance(msg.content, list) and msg.content else str(msg.content)
if role == "user" and "轮次信息" in content:
# 跳过轮次信息消息
continue
if role == "assistant":
# 只保留思考内容,简化工具调用信息
if content:
# 截取前500字符避免过长
content_preview = content[:500] + ("..." if len(content) > 500 else "")
conversation_summary.append(f"[{role}] {content_preview}")
elif role == "tool":
# 工具结果,只保留关键信息
if content:
# 截取前300字符
content_preview = content[:300] + ("..." if len(content) > 300 else "")
conversation_summary.append(f"[工具执行] {content_preview}")
conversation_text = "\n".join(conversation_summary[-20:]) # 只保留最后20条消息
# 随机选择2个梦境风格
selected_styles = get_random_dream_styles(2)
dream_styles_text = "\n".join([f"{i+1}. {style}" for i, style in enumerate(selected_styles)])
# 使用 Prompt 管理器格式化梦境生成 prompt
dream_prompt = await global_prompt_manager.format_prompt(
"dream_summary_prompt",
chat_id=chat_id,
total_iterations=total_iterations,
time_cost=time_cost,
conversation_text=conversation_text,
dream_styles=dream_styles_text,
)
# 调用 utils 模型生成梦境
summary_model = get_dream_summary_model()
dream_content, (reasoning, model_name, _) = await summary_model.generate_response_async(
dream_prompt,
max_tokens=512,
temperature=0.8,
)
if dream_content:
logger.info(f"[dream][梦境总结] 对 chat_id={chat_id} 的整理过程梦境:\n{dream_content}")
else:
logger.warning("[dream][梦境总结] 未能生成梦境总结")
except Exception as e:
logger.error(f"[dream][梦境总结] 生成梦境总结失败: {e}", exc_info=True)
init_dream_summary_prompt()

View File

@@ -0,0 +1,9 @@
"""
dream agent 工具实现模块。
每个工具的具体实现放在独立文件中,通过 make_xxx(chat_id) 工厂函数
生成绑定到特定 chat_id 的协程函数,由 dream_agent.init_dream_tools 统一注册。
"""

View File

@@ -0,0 +1,65 @@
import time
from src.common.logger import get_logger
from src.common.database.database_model import ChatHistory
logger = get_logger("dream_agent")
def make_create_chat_history(chat_id: str):
async def create_chat_history(
theme: str,
summary: str,
keywords: str,
key_point: str,
start_time: float,
end_time: float,
) -> str:
"""创建一条新的 ChatHistory 概括记录(用于整理/合并后的新记忆)"""
try:
logger.info(
f"[dream][tool] 调用 create_chat_history("
f"theme={bool(theme)}, summary={bool(summary)}, "
f"keywords={bool(keywords)}, key_point={bool(key_point)}, "
f"start_time={start_time}, end_time={end_time}) (chat_id={chat_id})"
)
now_ts = time.time()
# 将传入的 start_time/end_time如果有解析为时间戳否则回退为当前时间
def _parse_ts(value, default):
if value is None:
return default
try:
return float(value)
except (TypeError, ValueError):
return default
start_ts = _parse_ts(start_time, now_ts)
end_ts = _parse_ts(end_time, now_ts)
record = ChatHistory.create(
chat_id=chat_id,
theme=theme,
summary=summary,
keywords=keywords,
key_point=key_point,
# 对于由 dream 整理产生的新概括,时间范围优先使用工具提供的时间,否则使用当前时间占位
start_time=start_ts,
end_time=end_ts,
)
msg = (
f"已创建新的 ChatHistory 记录ID={record.id}"
f"theme={record.theme or ''}summary={'' if record.summary else ''}"
)
logger.info(f"[dream][tool] create_chat_history 完成: {msg}")
return msg
except Exception as e:
logger.error(f"create_chat_history 失败: {e}")
return f"create_chat_history 执行失败: {e}"
return create_chat_history

View File

@@ -0,0 +1,28 @@
from src.common.logger import get_logger
from src.common.database.database_model import ChatHistory
logger = get_logger("dream_agent")
def make_delete_chat_history(chat_id: str): # chat_id 目前未直接使用,预留以备扩展
async def delete_chat_history(memory_id: int) -> str:
"""删除一条 chat_history 记录"""
try:
logger.info(f"[dream][tool] 调用 delete_chat_history(memory_id={memory_id})")
record = ChatHistory.get_or_none(ChatHistory.id == memory_id)
if not record:
msg = f"未找到 ID={memory_id} 的 ChatHistory 记录,无法删除。"
logger.info(f"[dream][tool] delete_chat_history 未找到记录: {msg}")
return msg
rows = ChatHistory.delete().where(ChatHistory.id == memory_id).execute()
msg = f"已删除 ID={memory_id} 的 ChatHistory 记录,受影响行数={rows}"
logger.info(f"[dream][tool] delete_chat_history 完成: {msg}")
return msg
except Exception as e:
logger.error(f"delete_chat_history 失败: {e}")
return f"delete_chat_history 执行失败: {e}"
return delete_chat_history

View File

@@ -0,0 +1,28 @@
from src.common.logger import get_logger
from src.common.database.database_model import Jargon
logger = get_logger("dream_agent")
def make_delete_jargon(chat_id: str): # chat_id 目前未直接使用,预留以备扩展
async def delete_jargon(jargon_id: int) -> str:
"""删除一条 Jargon 记录"""
try:
logger.info(f"[dream][tool] 调用 delete_jargon(jargon_id={jargon_id})")
record = Jargon.get_or_none(Jargon.id == jargon_id)
if not record:
msg = f"未找到 ID={jargon_id} 的 Jargon 记录,无法删除。"
logger.info(f"[dream][tool] delete_jargon 未找到记录: {msg}")
return msg
rows = Jargon.delete().where(Jargon.id == jargon_id).execute()
msg = f"已删除 ID={jargon_id} 的 Jargon 记录(内容:{record.content}),受影响行数={rows}"
logger.info(f"[dream][tool] delete_jargon 完成: {msg}")
return msg
except Exception as e:
logger.error(f"delete_jargon 失败: {e}")
return f"delete_jargon 执行失败: {e}"
return delete_jargon

View File

@@ -0,0 +1,19 @@
from typing import Optional
from src.common.logger import get_logger
logger = get_logger("dream_agent")
def make_finish_maintenance(chat_id: str): # chat_id 目前未直接使用,预留以备扩展
async def finish_maintenance(reason: Optional[str] = None) -> str:
"""结束本次 dream 维护任务。当你认为当前 chat_id 下的维护工作已经完成,没有更多需要整理的内容时,调用此工具来结束本次运行。"""
reason_text = f",原因:{reason}" if reason else ""
msg = f"DREAM_MAINTENANCE_COMPLETE{reason_text}"
logger.info(f"[dream][tool] 调用 finish_maintenance结束本次维护{reason_text}")
return msg
return finish_maintenance

View File

@@ -0,0 +1,54 @@
import time
from typing import Optional
from src.common.logger import get_logger
from src.common.database.database_model import ChatHistory
logger = get_logger("dream_agent")
def make_get_chat_history_detail(chat_id: str): # chat_id 目前未直接使用,预留以备扩展
async def get_chat_history_detail(memory_id: int) -> str:
"""获取单条 chat_history 的完整内容"""
try:
logger.info(f"[dream][tool] 调用 get_chat_history_detail(memory_id={memory_id})")
record = ChatHistory.get_or_none(ChatHistory.id == memory_id)
if not record:
msg = f"未找到 ID={memory_id} 的 ChatHistory 记录。"
logger.info(f"[dream][tool] get_chat_history_detail 未找到记录: {msg}")
return msg
# 将时间戳转换为可读时间格式
start_time_str = (
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(record.start_time))
if record.start_time
else "未知"
)
end_time_str = (
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(record.end_time))
if record.end_time
else "未知"
)
result = (
f"ID={record.id}\n"
# f"chat_id={record.chat_id}\n"
f"时间范围={start_time_str}{end_time_str}\n"
f"主题={record.theme or ''}\n"
f"关键词={record.keywords or ''}\n"
f"参与者={record.participants or ''}\n"
f"概括={record.summary or ''}\n"
f"关键信息={record.key_point or ''}"
)
logger.debug(
f"[dream][tool] get_chat_history_detail 成功,预览: {result[:200].replace(chr(10), ' ')}"
)
return result
except Exception as e:
logger.error(f"get_chat_history_detail 失败: {e}")
return f"get_chat_history_detail 执行失败: {e}"
return get_chat_history_detail

View File

@@ -0,0 +1,225 @@
import json
from typing import List, Optional
from src.common.logger import get_logger
from src.common.database.database_model import ChatHistory
from src.chat.utils.utils import parse_keywords_string
logger = get_logger("dream_agent")
def make_search_chat_history(chat_id: str):
async def search_chat_history(
keyword: Optional[str] = None,
participant: Optional[str] = None,
) -> str:
"""根据关键词或参与人查询记忆返回匹配的记忆id、记忆标题theme和关键词keywordsdream 维护专用版本)"""
try:
# 检查参数
if not keyword and not participant:
return "未指定查询参数需要提供keyword或participant之一"
logger.info(
f"[dream][tool] 调用 search_chat_history(keyword={keyword}, participant={participant}) "
f"(作用域 chat_id={chat_id})"
)
# 构建查询条件
query = ChatHistory.select().where(ChatHistory.chat_id == chat_id)
# 执行查询(按时间倒序,最近的在前)
records = list(query.order_by(ChatHistory.start_time.desc()).limit(50))
filtered_records: List[ChatHistory] = []
for record in records:
participant_matched = True # 如果没有participant条件默认为True
keyword_matched = True # 如果没有keyword条件默认为True
# 检查参与人匹配
if participant:
participant_matched = False
participants_list: List[str] = []
if record.participants:
try:
participants_data = (
json.loads(record.participants)
if isinstance(record.participants, str)
else record.participants
)
if isinstance(participants_data, list):
participants_list = [str(p).lower() for p in participants_data]
except (json.JSONDecodeError, TypeError, ValueError):
pass
participant_lower = participant.lower().strip()
if participant_lower and any(participant_lower in p for p in participants_list):
participant_matched = True
# 检查关键词匹配
if keyword:
keyword_matched = False
# 解析多个关键词(支持空格、逗号等分隔符)
keywords_list = parse_keywords_string(keyword)
if not keywords_list:
keywords_list = [keyword.strip()] if keyword.strip() else []
# 转换为小写以便匹配
keywords_lower = [kw.lower() for kw in keywords_list if kw.strip()]
if keywords_lower:
# 在theme、keywords、summary、original_text中搜索
theme = (record.theme or "").lower()
summary = (record.summary or "").lower()
original_text = (record.original_text or "").lower()
# 解析record中的keywords JSON
record_keywords_list: List[str] = []
if record.keywords:
try:
keywords_data = (
json.loads(record.keywords)
if isinstance(record.keywords, str)
else record.keywords
)
if isinstance(keywords_data, list):
record_keywords_list = [str(k).lower() for k in keywords_data]
except (json.JSONDecodeError, TypeError, ValueError):
pass
# 有容错的全匹配:如果关键词数量>2允许n-1个关键词匹配否则必须全部匹配
matched_count = 0
for kw in keywords_lower:
kw_matched = (
kw in theme
or kw in summary
or kw in original_text
or any(kw in k for k in record_keywords_list)
)
if kw_matched:
matched_count += 1
# 计算需要匹配的关键词数量
total_keywords = len(keywords_lower)
if total_keywords > 2:
# 关键词数量>2允许n-1个关键词匹配
required_matches = total_keywords - 1
else:
# 关键词数量<=2必须全部匹配
required_matches = total_keywords
keyword_matched = matched_count >= required_matches
# 两者都匹配如果同时有participant和keyword需要两者都匹配如果只有一个条件只需要该条件匹配
matched = participant_matched and keyword_matched
if matched:
filtered_records.append(record)
if not filtered_records:
if keyword and participant:
keywords_str = "".join(parse_keywords_string(keyword) if keyword else [])
return f"未找到包含关键词'{keywords_str}'且参与人包含'{participant}'的聊天记录"
elif keyword:
keywords_list = parse_keywords_string(keyword)
keywords_str = "".join(keywords_list)
if len(keywords_list) > 2:
required_count = len(keywords_list) - 1
return (
f"未找到包含至少{required_count}个关键词(共{len(keywords_list)}个)'{keywords_str}'的聊天记录"
)
else:
return f"未找到包含所有关键词'{keywords_str}'的聊天记录"
elif participant:
return f"未找到参与人包含'{participant}'的聊天记录"
else:
return "未找到相关聊天记录"
# 如果匹配结果超过20条不返回具体记录只返回提示和所有相关关键词
if len(filtered_records) > 20:
all_keywords_set = set()
for record in filtered_records:
if record.keywords:
try:
keywords_data = (
json.loads(record.keywords)
if isinstance(record.keywords, str)
else record.keywords
)
if isinstance(keywords_data, list):
for k in keywords_data:
k_str = str(k).strip()
if k_str:
all_keywords_set.add(k_str)
except (json.JSONDecodeError, TypeError, ValueError):
continue
search_label = keyword or participant or "当前条件"
if all_keywords_set:
keywords_str = "".join(sorted(all_keywords_set))
response_text = (
f"包含“{search_label}”的结果过多,请尝试更多关键词精确查找\n\n"
f"有关\"{search_label}\"的关键词:\n"
f"{keywords_str}"
)
else:
response_text = (
f"包含“{search_label}”的结果过多,请尝试更多关键词精确查找\n\n"
f"有关\"{search_label}\"的关键词信息为空"
)
logger.info(
f"[dream][tool] search_chat_history 匹配结果超过20条返回关键词汇总提示总数={len(filtered_records)}"
)
return response_text
# 构建结果文本返回id、theme和keywords最多20条
results: List[str] = []
for record in filtered_records[:20]:
result_parts: List[str] = []
# 记忆ID
result_parts.append(f"记忆ID{record.id}")
# 主题
if record.theme:
result_parts.append(f"主题:{record.theme}")
else:
result_parts.append("主题:(无)")
# 关键词
if record.keywords:
try:
keywords_data = (
json.loads(record.keywords)
if isinstance(record.keywords, str)
else record.keywords
)
if isinstance(keywords_data, list) and keywords_data:
keywords_str = "".join([str(k) for k in keywords_data])
result_parts.append(f"关键词:{keywords_str}")
else:
result_parts.append("关键词:(无)")
except (json.JSONDecodeError, TypeError, ValueError):
result_parts.append("关键词:(无)")
else:
result_parts.append("关键词:(无)")
results.append("\n".join(result_parts))
if not results:
return "未找到相关聊天记录"
response_text = "\n\n---\n\n".join(results)
logger.info(f"[dream][tool] search_chat_history 返回 {len(filtered_records)} 条匹配记录")
return response_text
except Exception as e:
logger.error(f"search_chat_history 失败: {e}")
return f"search_chat_history 执行失败: {e}"
return search_chat_history

View File

@@ -0,0 +1,54 @@
from typing import Any, Dict, Optional
from src.common.logger import get_logger
from src.common.database.database_model import ChatHistory
from src.plugin_system.apis import database_api
logger = get_logger("dream_agent")
def make_update_chat_history(chat_id: str): # chat_id 目前未直接使用,预留以备扩展
async def update_chat_history(
memory_id: int,
theme: Optional[str] = None,
summary: Optional[str] = None,
keywords: Optional[str] = None,
key_point: Optional[str] = None,
) -> str:
"""按字段更新 chat_history字符串字段要求 JSON 的字段须传入已序列化的字符串)"""
try:
logger.info(
f"[dream][tool] 调用 update_chat_history(memory_id={memory_id}, "
f"theme={bool(theme)}, summary={bool(summary)}, keywords={bool(keywords)}, key_point={bool(key_point)})"
)
record = ChatHistory.get_or_none(ChatHistory.id == memory_id)
if not record:
msg = f"未找到 ID={memory_id} 的 ChatHistory 记录,无法更新。"
logger.info(f"[dream][tool] update_chat_history 未找到记录: {msg}")
return msg
data: Dict[str, Any] = {}
if theme is not None:
data["theme"] = theme
if summary is not None:
data["summary"] = summary
if keywords is not None:
data["keywords"] = keywords
if key_point is not None:
data["key_point"] = key_point
if not data:
return "未提供任何需要更新的字段。"
await database_api.db_save(ChatHistory, data=data, key_field="id", key_value=memory_id)
msg = f"已更新 ChatHistory 记录 ID={memory_id},更新字段={list(data.keys())}"
logger.info(f"[dream][tool] update_chat_history 完成: {msg}")
return msg
except Exception as e:
logger.error(f"update_chat_history 失败: {e}")
return f"update_chat_history 执行失败: {e}"
return update_chat_history

View File

@@ -0,0 +1,54 @@
from typing import Any, Dict, Optional
from src.common.logger import get_logger
from src.common.database.database_model import Jargon
from src.plugin_system.apis import database_api
logger = get_logger("dream_agent")
def make_update_jargon(chat_id: str): # chat_id 目前未直接使用,预留以备扩展
async def update_jargon(
jargon_id: int,
meaning: Optional[str] = None,
is_global: Optional[bool] = None,
is_jargon: Optional[bool] = None,
content: Optional[str] = None,
) -> str:
"""按字段更新 Jargon 记录,可用于修正含义、调整全局性、标记是否为黑话等"""
try:
logger.info(
f"[dream][tool] 调用 update_jargon(jargon_id={jargon_id}, "
f"meaning={bool(meaning)}, is_global={is_global}, is_jargon={is_jargon}, content={bool(content)})"
)
record = Jargon.get_or_none(Jargon.id == jargon_id)
if not record:
msg = f"未找到 ID={jargon_id} 的 Jargon 记录,无法更新。"
logger.info(f"[dream][tool] update_jargon 未找到记录: {msg}")
return msg
data: Dict[str, Any] = {}
if meaning is not None:
data["meaning"] = meaning
if is_global is not None:
data["is_global"] = is_global
if is_jargon is not None:
data["is_jargon"] = is_jargon
if content is not None:
data["content"] = content
if not data:
return "未提供任何需要更新的字段。"
await database_api.db_save(Jargon, data=data, key_field="id", key_value=jargon_id)
msg = f"已更新 Jargon 记录 ID={jargon_id},更新字段={list(data.keys())}"
logger.info(f"[dream][tool] update_jargon 完成: {msg}")
return msg
except Exception as e:
logger.error(f"update_jargon 失败: {e}")
return f"update_jargon 执行失败: {e}"
return update_jargon

View File

@@ -28,10 +28,10 @@ class MemoryForgetTask(AsyncTask):
# logger.info("[记忆遗忘] 开始遗忘检查...") # logger.info("[记忆遗忘] 开始遗忘检查...")
# 执行4个阶段的遗忘检查 # 执行4个阶段的遗忘检查
await self._forget_stage_1(current_time) # await self._forget_stage_1(current_time)
await self._forget_stage_2(current_time) # await self._forget_stage_2(current_time)
await self._forget_stage_3(current_time) # await self._forget_stage_3(current_time)
await self._forget_stage_4(current_time) # await self._forget_stage_4(current_time)
# logger.info("[记忆遗忘] 遗忘检查完成") # logger.info("[记忆遗忘] 遗忘检查完成")
except Exception as e: except Exception as e: