feat:可以查看转发消息,新增复杂消息,修复表情包缓存,修改bot自己的消息回调msg_id

This commit is contained in:
SengokuCola
2026-04-03 20:55:49 +08:00
parent 5cdca2bbd4
commit ce580d1f8b
8 changed files with 460 additions and 52 deletions

View File

@@ -290,12 +290,68 @@ class EmojiManager:
if not emoji_bytes:
return None
if not wait_for_build:
await self.ensure_emoji_saved(emoji_bytes, emoji_hash=emoji_hash)
self._schedule_description_build(emoji_hash, emoji_bytes)
return None
# 找不到尝试构建
return await self._build_and_cache_emoji_description(emoji_hash, emoji_bytes)
async def ensure_emoji_saved(
self,
emoji_bytes: bytes,
*,
emoji_hash: Optional[str] = None,
) -> MaiEmoji:
"""先缓存表情包文件与数据库记录,确保后续可按 hash 回填。"""
hash_str = emoji_hash or hashlib.sha256(emoji_bytes).hexdigest()
try:
with get_db_session() as session:
statement = select(Images).filter_by(image_hash=hash_str, image_type=ImageType.EMOJI).limit(1)
if record := session.exec(statement).first():
record_path = Path(record.full_path)
if not record.no_file_flag and record_path.exists():
record.last_used_time = datetime.now()
record.query_count += 1
session.add(record)
return MaiEmoji.from_db_instance(record)
except Exception as e:
logger.error(f"缓存表情包前查询数据库时出错: {e}")
raise e
logger.info(f"表情包不存在于数据库中,准备缓存新表情包,哈希值: {hash_str}")
tmp_file_path = EMOJI_DIR / f"{hash_str}.tmp"
with tmp_file_path.open("wb") as file:
file.write(emoji_bytes)
emoji = MaiEmoji(full_path=tmp_file_path, image_bytes=emoji_bytes)
await emoji.calculate_hash_format()
try:
with get_db_session() as session:
statement = select(Images).filter_by(image_hash=emoji.file_hash, image_type=ImageType.EMOJI).limit(1)
if existing_record := session.exec(statement).first():
existing_record.full_path = str(emoji.full_path)
existing_record.no_file_flag = False
existing_record.is_banned = False
existing_record.last_used_time = datetime.now()
existing_record.query_count += 1
session.add(existing_record)
else:
image_record = emoji.to_db_instance()
image_record.is_registered = False
image_record.is_banned = False
image_record.no_file_flag = False
image_record.last_used_time = datetime.now()
image_record.query_count = 1
session.add(image_record)
except Exception as e:
logger.error(f"缓存表情包记录到数据库时出错: {e}")
raise e
return emoji
def _schedule_description_build(self, emoji_hash: str, emoji_bytes: bytes) -> None:
"""调度表情包描述后台构建任务。
@@ -342,48 +398,36 @@ class EmojiManager:
emoji_hash: str,
emoji_bytes: bytes,
) -> Optional[Tuple[str, List[str]]]:
"""构建并缓存表情包描述与情感标签。
Args:
emoji_hash: 表情包哈希值。
emoji_bytes: 表情包字节数据。
Returns:
Optional[Tuple[str, List[str]]]: 构建成功时返回描述和情感标签,否则返回 ``None``。
"""
logger.info(f"未找到哈希值为 {emoji_hash} 的表情包与其描述,尝试构建描述")
full_path = EMOJI_DIR / f"{emoji_hash}.png"
try:
full_path.write_bytes(emoji_bytes)
new_emoji = MaiEmoji(full_path=full_path, image_bytes=emoji_bytes)
await new_emoji.calculate_hash_format()
except Exception as exc:
logger.error(f"缓存表情包文件时出错: {exc}")
raise exc
"""Build and cache emoji description and emotion labels."""
logger.info(f"Start building cached emoji description, hash={emoji_hash}")
new_emoji = await self.ensure_emoji_saved(emoji_bytes, emoji_hash=emoji_hash)
success_desc, new_emoji = await self.build_emoji_description(new_emoji)
if not success_desc:
logger.error("构建表情包描述失败")
logger.error("Build emoji description failed")
return None
success_emotion, new_emoji = await self.build_emoji_emotion(new_emoji)
if not success_emotion:
logger.error("构建表情包情感标签失败")
logger.error("Build emoji emotion labels failed")
return None
with get_db_session() as session:
try:
image_record = new_emoji.to_db_instance()
image_record.is_registered = False
image_record.is_banned = False
image_record.register_time = datetime.now()
image_record.no_file_flag = True
session.add(image_record)
statement = select(Images).filter_by(image_hash=new_emoji.file_hash, image_type=ImageType.EMOJI).limit(1)
if image_record := session.exec(statement).first():
image_record.full_path = str(new_emoji.full_path)
image_record.description = new_emoji.description
image_record.emotion = ",".join(new_emoji.emotion) if new_emoji.emotion else None
image_record.no_file_flag = False
image_record.is_banned = False
session.add(image_record)
except Exception as exc:
logger.error(f"缓存表情包描述时出错: {exc}")
logger.error(f"Update cached emoji description failed: {exc}")
return new_emoji.description, new_emoji.emotion or []
def load_emojis_from_db(self) -> None:
"""
从数据库加载已注册的表情包
@@ -398,6 +442,8 @@ class EmojiManager:
for record in results:
if record.image_type != ImageType.EMOJI:
continue
if not record.is_registered:
continue
if record.no_file_flag:
continue
if record.is_banned:

View File

@@ -466,8 +466,23 @@ class ChatBot:
return
mmc_message_id = message_data.get("echo")
actual_message_id = message_data.get("actual_id")
# TODO: Implement message ID update in new architecture
logger.debug(f"收到回送消息ID: {mmc_message_id} -> {actual_message_id}")
normalized_mmc_message_id = str(mmc_message_id or "").strip()
normalized_actual_message_id = str(actual_message_id or "").strip()
if not normalized_mmc_message_id or not normalized_actual_message_id:
return
updated = MessageUtils.update_message_id(
old_message_id=normalized_mmc_message_id,
new_message_id=normalized_actual_message_id,
)
if updated:
logger.debug(f"收到回送消息ID: {normalized_mmc_message_id} -> {normalized_actual_message_id}")
return
logger.debug(
"收到回送消息 ID但未找到可回填的本地消息: "
f"{normalized_mmc_message_id} -> {normalized_actual_message_id}"
)
async def message_process(self, message_data: Dict[str, Any]) -> None:
"""处理统一格式的入站消息字典。

View File

@@ -149,6 +149,57 @@ class MessageUtils:
db_message = message.to_db_instance()
session.add(db_message)
@staticmethod
def update_message_id(old_message_id: str, new_message_id: str) -> bool:
"""将已入库消息的临时 ID 回填为平台真实 ID。
Args:
old_message_id: 发送阶段生成的内部临时消息 ID。
new_message_id: 适配器回传的真实平台消息 ID。
Returns:
bool: 存在并成功更新目标消息时返回 ``True``,否则返回 ``False``。
"""
normalized_old_message_id = str(old_message_id).strip()
normalized_new_message_id = str(new_message_id).strip()
if not normalized_old_message_id or not normalized_new_message_id:
return False
if normalized_old_message_id == normalized_new_message_id:
return False
from src.common.database.database import get_db_session
from src.common.database.database_model import Messages
with get_db_session() as session:
existing_target = session.exec(
select(Messages).filter_by(message_id=normalized_new_message_id).limit(1)
).first()
if existing_target is not None:
logger.warning(
"消息 ID 回填时发现真实 ID 已存在,已跳过更新: "
f"{normalized_old_message_id} -> {normalized_new_message_id}"
)
return False
source_messages = session.exec(
select(Messages).filter_by(message_id=normalized_old_message_id)
).all()
if not source_messages:
return False
for source_message in source_messages:
source_message.message_id = normalized_new_message_id
session.add(source_message)
reply_target_messages = session.exec(
select(Messages).filter_by(reply_to=normalized_old_message_id)
).all()
for reply_target_message in reply_target_messages:
reply_target_message.reply_to = normalized_new_message_id
session.add(reply_target_message)
return True
@staticmethod
async def build_readable_message(
messages: List["SessionMessage"],

View File

@@ -17,6 +17,8 @@ from .reply import get_tool_spec as get_reply_tool_spec
from .reply import handle_tool as handle_reply_tool
from .send_emoji import get_tool_spec as get_send_emoji_tool_spec
from .send_emoji import handle_tool as handle_send_emoji_tool
from .view_complex_message import get_tool_spec as get_view_complex_message_tool_spec
from .view_complex_message import handle_tool as handle_view_complex_message_tool
from .wait import get_tool_spec as get_wait_tool_spec
from .wait import handle_tool as handle_wait_tool
@@ -29,6 +31,7 @@ def get_builtin_tool_specs() -> List[ToolSpec]:
return [
get_wait_tool_spec(),
get_reply_tool_spec(),
get_view_complex_message_tool_spec(),
get_query_jargon_tool_spec(),
get_no_reply_tool_spec(),
get_send_emoji_tool_spec(),
@@ -41,6 +44,7 @@ def get_all_builtin_tool_specs() -> List[ToolSpec]:
return [
get_wait_tool_spec(),
get_reply_tool_spec(),
get_view_complex_message_tool_spec(),
get_query_jargon_tool_spec(),
get_query_person_info_tool_spec(),
get_no_reply_tool_spec(),
@@ -68,4 +72,9 @@ def build_builtin_tool_handlers(tool_ctx: BuiltinToolRuntimeContext) -> Dict[str
),
"wait": lambda invocation, context=None: handle_wait_tool(tool_ctx, invocation, context),
"send_emoji": lambda invocation, context=None: handle_send_emoji_tool(tool_ctx, invocation, context),
"view_complex_message": lambda invocation, context=None: handle_view_complex_message_tool(
tool_ctx,
invocation,
context,
),
}

View File

@@ -5,6 +5,7 @@ from typing import Any, Dict, Optional
from src.chat.emoji_system.maisaka_tool import send_emoji_for_maisaka
from src.common.logger import get_logger
from src.core.tooling import ToolExecutionContext, ToolExecutionResult, ToolInvocation, ToolSpec
from src.maisaka.context_messages import LLMContextMessage
from .context import BuiltinToolRuntimeContext
@@ -42,9 +43,9 @@ async def handle_tool(
del context
emotion = str(invocation.arguments.get("emotion") or "").strip()
context_texts = [
message.get_history_text()
message.processed_plain_text.strip()
for message in tool_ctx.runtime._chat_history[-5:]
if message.get_history_text().strip()
if isinstance(message, LLMContextMessage) and message.processed_plain_text.strip()
]
structured_result: Dict[str, Any] = {
"success": False,

View File

@@ -0,0 +1,99 @@
"""view_complex_message 内置工具。"""
from typing import Optional
from src.common.logger import get_logger
from src.core.tooling import ToolExecutionContext, ToolExecutionResult, ToolInvocation, ToolSpec
from ..context_messages import build_full_complex_message_content, contains_complex_message
from .context import BuiltinToolRuntimeContext
logger = get_logger("maisaka_builtin_view_complex_message")
def get_tool_spec() -> ToolSpec:
"""获取 view_complex_message 工具声明。"""
return ToolSpec(
name="view_complex_message",
brief_description="根据 msg_id 查看复杂消息的完整内容,适用于 Prompt 中标记为 [消息类型]复杂消息 的消息。",
detailed_description=(
"参数说明:\n"
"- msg_idstring必填。要查看完整内容的目标消息编号。\n\n"
"当你在上下文中看到 [消息类型]复杂消息 时,可调用本工具查看对应转发消息的完整展开内容。"
),
parameters_schema={
"type": "object",
"properties": {
"msg_id": {
"type": "string",
"description": "要查看完整内容的目标消息编号。",
},
},
"required": ["msg_id"],
},
provider_name="maisaka_builtin",
provider_type="builtin",
)
async def handle_tool(
tool_ctx: BuiltinToolRuntimeContext,
invocation: ToolInvocation,
context: Optional[ToolExecutionContext] = None,
) -> ToolExecutionResult:
"""执行 view_complex_message 内置工具。"""
del context
target_message_id = str(invocation.arguments.get("msg_id") or "").strip()
if not target_message_id:
return tool_ctx.build_failure_result(
invocation.tool_name,
"查看复杂消息工具需要提供有效的 `msg_id` 参数。",
)
target_message = tool_ctx.runtime._source_messages_by_id.get(target_message_id)
if target_message is None:
return tool_ctx.build_failure_result(
invocation.tool_name,
f"未找到目标复杂消息msg_id={target_message_id}",
)
if not contains_complex_message(target_message.raw_message):
return tool_ctx.build_failure_result(
invocation.tool_name,
f"目标消息不是可展开查看的转发消息msg_id={target_message_id}",
)
logger.info(
f"{tool_ctx.runtime.log_prefix} 触发复杂消息查看工具,目标消息编号={target_message_id}"
)
try:
full_content = await build_full_complex_message_content(target_message)
except Exception as exc:
logger.exception(
f"{tool_ctx.runtime.log_prefix} 查看复杂消息时发生异常: 目标消息编号={target_message_id} 异常={exc}"
)
return tool_ctx.build_failure_result(
invocation.tool_name,
"查看复杂消息完整内容时发生异常。",
)
if not full_content:
return tool_ctx.build_failure_result(
invocation.tool_name,
f"复杂消息内容为空msg_id={target_message_id}",
)
return tool_ctx.build_success_result(
invocation.tool_name,
full_content,
structured_content={
"msg_id": target_message_id,
"message_type": "forward",
"full_content": full_content,
},
metadata={
"record_display_prompt": f"你查看了复杂消息 {target_message_id} 的完整内容。",
},
)

View File

@@ -5,22 +5,29 @@ from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from io import BytesIO
from typing import Optional
from typing import Optional, Sequence
import base64
from PIL import Image as PILImage
from src.chat.message_receive.message import SessionMessage
from src.common.data_models.message_component_data_model import (
AtComponent,
DictComponent,
EmojiComponent,
ForwardNodeComponent,
ImageComponent,
MessageSequence,
ReplyComponent,
StandardMessageComponents,
TextComponent,
VoiceComponent,
)
from src.llm_models.payload_content.message import Message, MessageBuilder, RoleType
from src.llm_models.payload_content.tool_option import ToolCall
FORWARD_PREVIEW_LIMIT = 4
def _guess_image_format(image_bytes: bytes) -> Optional[str]:
if not image_bytes:
@@ -71,6 +78,118 @@ def _append_reply_component(builder: MessageBuilder, component: ReplyComponent)
return True
def contains_complex_message(message_sequence: MessageSequence) -> bool:
"""判断消息序列中是否包含复杂消息组件。"""
return any(isinstance(component, ForwardNodeComponent) for component in message_sequence.components)
async def build_full_complex_message_content(message: SessionMessage) -> str:
"""构造复杂消息的完整文本内容。"""
if not message.processed_plain_text:
await message.process()
return (message.processed_plain_text or "").strip()
def _build_complex_message_prompt_text(message_sequence: MessageSequence) -> str:
"""将复杂消息转换为适合注入 Prompt 的摘要文本。"""
prompt_parts: list[str] = []
for component in message_sequence.components:
rendered_text = _render_component_for_prompt(component)
if rendered_text:
prompt_parts.append(rendered_text)
return "\n".join(part for part in prompt_parts if part).strip()
def _render_component_for_prompt(component: StandardMessageComponents) -> str:
"""将单个组件渲染为 Prompt 文本。"""
if isinstance(component, TextComponent):
return (component.text or "").strip()
if isinstance(component, ImageComponent):
return component.content.strip() if component.content else "[图片]"
if isinstance(component, EmojiComponent):
return component.content.strip() if component.content else "[表情包]"
if isinstance(component, VoiceComponent):
return component.content.strip() if component.content else "[语音消息]"
if isinstance(component, AtComponent):
target_name = component.target_user_cardname or component.target_user_nickname or component.target_user_id
return f"@{target_name}".strip()
if isinstance(component, ReplyComponent):
sender_name = (
component.target_message_sender_cardname
or component.target_message_sender_nickname
or component.target_message_sender_id
)
target_content = (component.target_message_content or "").strip()
if sender_name and target_content:
return f"[回复了{sender_name}的消息: {target_content}]"
if target_content:
return f"[回复消息: {target_content}]"
target_message_id = component.target_message_id.strip()
return f"[引用回复]({target_message_id})" if target_message_id else "[回复消息]"
if isinstance(component, ForwardNodeComponent):
return _build_forward_preview_block(component)
if isinstance(component, DictComponent):
raw_type = component.data.get("type") if isinstance(component.data, dict) else None
if isinstance(raw_type, str) and raw_type.strip():
return f"[{raw_type.strip()}消息]"
return "[复杂消息]"
return ""
def _build_forward_preview_block(component: ForwardNodeComponent) -> str:
"""构造转发消息的预览块。"""
preview_lines = ["[消息类型]复杂消息", "转发消息", f"预览前{FORWARD_PREVIEW_LIMIT}条:"]
preview_nodes = component.forward_components[:FORWARD_PREVIEW_LIMIT]
for node in preview_nodes:
sender_name = node.user_cardname or node.user_nickname or node.user_id or "未知用户"
content = _render_components_inline(node.content) or "[空消息]"
preview_lines.append(f"{sender_name}{content}")
total_count = len(component.forward_components)
if total_count > FORWARD_PREVIEW_LIMIT:
preview_lines.append("......")
preview_lines.append(f"{total_count}条,可以选择使用 view_complex_message 查看完整内容。")
return "\n".join(preview_lines).strip()
def _render_components_inline(components: Sequence[StandardMessageComponents]) -> str:
"""将组件序列压缩为单行预览文本。"""
rendered_parts: list[str] = []
for component in components:
if isinstance(component, ForwardNodeComponent):
rendered_parts.append("[转发消息]")
continue
rendered_text = _render_component_for_prompt(component)
normalized_text = _normalize_inline_text(rendered_text)
if normalized_text:
rendered_parts.append(normalized_text)
return " ".join(rendered_parts).strip()
def _normalize_inline_text(text: str) -> str:
"""将多行文本压缩为适合预览的一行。"""
return " ".join((text or "").split()).strip()
def _build_message_from_sequence(
role: RoleType,
message_sequence: MessageSequence,
@@ -209,6 +328,51 @@ class SessionBackedMessage(LLMContextMessage):
)
@dataclass(slots=True)
class ComplexSessionMessage(SessionBackedMessage):
"""复杂消息上下文消息。"""
prompt_text: str = ""
complex_message_type: str = "forward"
@property
def source(self) -> str:
return f"{self.source_kind}:{self.complex_message_type}"
def to_llm_message(self) -> Optional[Message]:
message_sequence = MessageSequence([TextComponent(self.prompt_text)])
return _build_message_from_sequence(
RoleType.User,
message_sequence,
self.prompt_text,
)
@classmethod
def from_session_message(
cls,
session_message: SessionMessage,
*,
planner_prefix: str,
visible_text: str,
source_kind: str = "user",
) -> Optional["ComplexSessionMessage"]:
"""从真实 SessionMessage 构造复杂消息上下文消息。"""
prompt_text = _build_complex_message_prompt_text(session_message.raw_message)
if not prompt_text:
return None
return cls(
raw_message=session_message.raw_message,
visible_text=visible_text,
timestamp=session_message.timestamp,
message_id=session_message.message_id,
original_message=session_message,
source_kind=source_kind,
prompt_text=f"{planner_prefix}{prompt_text}",
)
@dataclass(slots=True)
class ReferenceMessage(LLMContextMessage):
"""参考消息。"""

View File

@@ -24,9 +24,11 @@ from .builtin_tool import build_builtin_tool_handlers as build_split_builtin_too
from .builtin_tool.context import BuiltinToolRuntimeContext
from .context_messages import (
AssistantMessage,
ComplexSessionMessage,
LLMContextMessage,
SessionBackedMessage,
ToolResultMessage,
contains_complex_message,
)
from .message_adapter import (
build_visible_text_from_sequence,
@@ -220,26 +222,49 @@ class MaisakaReasoningEngine:
async def _ingest_messages(self, messages: list[SessionMessage]) -> None:
"""处理传入消息列表,将其转换为历史消息并加入聊天历史缓存。"""
for message in messages:
# 构建用户消息序列
user_sequence, visible_text = await self._build_message_sequence(message)
if not user_sequence.components:
history_message = await self._build_history_message(message)
if history_message is None:
continue
history_message = SessionBackedMessage.from_session_message(
message,
raw_message=user_sequence,
visible_text=visible_text,
source_kind="user",
)
self._insert_chat_history_message(history_message)
self._trim_chat_history()
async def _build_message_sequence(self, message: SessionMessage) -> tuple[MessageSequence, str]:
message_sequence = MessageSequence([])
async def _build_history_message(self, message: SessionMessage) -> Optional[LLMContextMessage]:
"""根据真实消息构造对应的上下文消息。"""
source_sequence = message.raw_message
visible_text = self._build_legacy_visible_text(message, source_sequence)
planner_prefix = build_planner_user_prefix_from_session_message(message)
if contains_complex_message(source_sequence):
return ComplexSessionMessage.from_session_message(
message,
planner_prefix=planner_prefix,
visible_text=visible_text,
source_kind="user",
)
user_sequence = await self._build_message_sequence(message, planner_prefix=planner_prefix)
if not user_sequence.components:
return None
return SessionBackedMessage.from_session_message(
message,
raw_message=user_sequence,
visible_text=visible_text,
source_kind="user",
)
async def _build_message_sequence(
self,
message: SessionMessage,
*,
planner_prefix: str,
) -> MessageSequence:
message_sequence = MessageSequence([])
appended_component = False
source_sequence = message.raw_message
planner_components = clone_message_sequence(source_sequence).components
if global_config.maisaka.direct_image_input:
await self._hydrate_visual_components(planner_components)
@@ -252,16 +277,14 @@ class MaisakaReasoningEngine:
message_sequence.components.append(component)
appended_component = True
legacy_visible_text = self._build_legacy_visible_text(message, source_sequence)
if not appended_component:
if not message.processed_plain_text:
await message.process()
content = (message.processed_plain_text or "").strip()
if content:
message_sequence.text(planner_prefix + content)
legacy_visible_text = self._build_legacy_visible_text_from_text(message, content)
return message_sequence, legacy_visible_text
return message_sequence
async def _hydrate_visual_components(self, planner_components: list[object]) -> None:
"""在 Maisaka 真正需要图片或表情时,按需回填二进制数据。"""
@@ -291,12 +314,6 @@ class MaisakaReasoningEngine:
legacy_sequence.components.append(component)
return build_visible_text_from_sequence(legacy_sequence).strip()
def _build_legacy_visible_text_from_text(self, message: SessionMessage, content: str) -> str:
user_info = message.message_info.user_info
speaker_name = user_info.user_cardname or user_info.user_nickname or user_info.user_id
visible_message_id = None if message.is_notify else message.message_id
return format_speaker_content(speaker_name, content, message.timestamp, visible_message_id).strip()
def _insert_chat_history_message(self, message: LLMContextMessage) -> int:
"""将消息按处理顺序追加到聊天历史末尾。"""
self._runtime._chat_history.append(message)
@@ -628,6 +645,12 @@ class MaisakaReasoningEngine:
return f"你查询了人物信息:{person_name}"
return "你查询了一次人物信息。"
if invocation.tool_name == "view_complex_message":
target_message_id = str(invocation.arguments.get("msg_id") or "").strip()
if target_message_id:
return f"你查看了复杂消息 {target_message_id} 的完整内容。"
return "你查看了一条复杂消息的完整内容。"
brief_description = ""
if tool_spec is not None:
brief_description = tool_spec.brief_description.strip()