feat;优化表情包发送和replyer

This commit is contained in:
SengokuCola
2026-04-04 01:28:37 +08:00
parent 84977aa07b
commit 5254d9ac8e
4 changed files with 231 additions and 42 deletions

View File

@@ -162,15 +162,37 @@ class MaisakaReplyGenerator:
return "\n".join(parts)
def _build_target_message_block(self, reply_message: Optional[SessionMessage]) -> str:
"""构建当前需要回复的目标消息摘要。"""
if reply_message is None:
return ""
user_info = reply_message.message_info.user_info
sender_name = user_info.user_cardname or user_info.user_nickname or user_info.user_id
target_message_id = reply_message.message_id.strip() if reply_message.message_id else "未知"
target_content = self._normalize_content((reply_message.processed_plain_text or "").strip(), limit=300)
if not target_content:
target_content = "[无可见文本内容]"
return (
"【本次回复目标】\n"
f"- 目标消息ID{target_message_id}\n"
f"- 发送者:{sender_name}\n"
f"- 消息内容:{target_content}\n"
"- 你这次要回复的就是这条目标消息,请结合整段上下文理解,但不要误把其他历史消息当成当前回复对象。"
)
def _build_prompt(
self,
chat_history: List[LLMContextMessage],
reply_message: Optional[SessionMessage],
reply_reason: str,
expression_habits: str = "",
) -> str:
"""构建 Maisaka replyer 提示词。"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
formatted_history = self._format_chat_history(chat_history)
target_message_block = self._build_target_message_block(reply_message)
try:
system_prompt = load_prompt(
@@ -191,6 +213,8 @@ class MaisakaReplyGenerator:
f"当前时间:{current_time}",
f"【聊天记录】\n{formatted_history}",
]
if target_message_block:
user_sections.append(target_message_block)
if extra_sections:
user_sections.append("\n\n".join(extra_sections))
user_sections.append(f"【回复信息参考】\n{reply_reason}")
@@ -362,6 +386,7 @@ class MaisakaReplyGenerator:
try:
prompt = self._build_prompt(
chat_history=filtered_history,
reply_message=reply_message,
reply_reason=reply_reason or "",
expression_habits=merged_expression_habits,
)

View File

@@ -126,13 +126,35 @@ class MaisakaReplyGenerator:
return segments
def _build_target_message_block(self, reply_message: Optional[SessionMessage]) -> str:
"""构建当前需要回复的目标消息摘要。"""
if reply_message is None:
return ""
user_info = reply_message.message_info.user_info
sender_name = user_info.user_cardname or user_info.user_nickname or user_info.user_id
target_message_id = reply_message.message_id.strip() if reply_message.message_id else "未知"
target_content = self._normalize_content((reply_message.processed_plain_text or "").strip(), limit=300)
if not target_content:
target_content = "[无可见文本内容]"
return (
"【本次回复目标】\n"
f"- 目标消息ID{target_message_id}\n"
f"- 发送者:{sender_name}\n"
f"- 消息内容:{target_content}\n"
"- 你这次要回复的就是这条目标消息,请结合整段上下文理解,但不要误把其他历史消息当成当前回复对象。"
)
def _build_system_prompt(
self,
reply_message: Optional[SessionMessage],
reply_reason: str,
expression_habits: str = "",
) -> str:
"""构建 Maisaka replyer 使用的系统提示词。"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
target_message_block = self._build_target_message_block(reply_message)
try:
system_prompt = load_prompt(
@@ -148,16 +170,26 @@ class MaisakaReplyGenerator:
extra_sections: List[str] = []
if expression_habits.strip():
extra_sections.append(expression_habits.strip())
if reply_reason.strip():
extra_sections.append(f"【回复信息参考】\n{reply_reason}")
if target_message_block:
extra_sections.append(target_message_block)
if not extra_sections:
return system_prompt
return f"{system_prompt}\n\n" + "\n\n".join(extra_sections)
def _build_reply_instruction(self) -> str:
def _build_reply_instruction(
self,
reply_message: Optional[SessionMessage],
reply_reason: str,
) -> str:
"""构建追加在上下文末尾的回复指令。"""
return "请基于以上逐条对话消息,自然地继续回复。直接输出你要说的话,不要额外解释。"
sections: List[str] = []
target_message_block = self._build_target_message_block(reply_message)
if target_message_block:
sections.append(target_message_block)
if reply_reason.strip():
sections.append(f"【回复信息参考】\n{reply_reason}")
sections.append("请基于以上逐条对话消息,自然地继续回复。直接输出你要说的话,不要额外解释。")
return "\n\n".join(sections)
def _build_multimodal_user_message(
self,
@@ -238,16 +270,21 @@ class MaisakaReplyGenerator:
def _build_request_messages(
self,
chat_history: List[LLMContextMessage],
reply_message: Optional[SessionMessage],
reply_reason: str,
expression_habits: str = "",
) -> List[Message]:
"""构建发给大模型的消息列表。"""
messages: List[Message] = []
system_prompt = self._build_system_prompt(
reply_message=reply_message,
reply_reason=reply_reason,
expression_habits=expression_habits,
)
instruction = self._build_reply_instruction()
instruction = self._build_reply_instruction(
reply_message=reply_message,
reply_reason=reply_reason,
)
messages.append(MessageBuilder().set_role(RoleType.System).add_text_content(system_prompt).build())
messages.extend(self._build_history_messages(chat_history))
@@ -433,6 +470,7 @@ class MaisakaReplyGenerator:
try:
request_messages = self._build_request_messages(
chat_history=filtered_history,
reply_message=reply_message,
reply_reason=reply_reason or "",
expression_habits=merged_expression_habits,
)

View File

@@ -1,22 +1,30 @@
"""send_emoji 内置工具。"""
from datetime import datetime
from io import BytesIO
from random import sample
from secrets import token_hex
from typing import Any, Dict, Optional
import asyncio
from PIL import Image as PILImage
from PIL import ImageDraw, ImageFont
from pydantic import BaseModel, Field as PydanticField
from src.chat.emoji_system.emoji_manager import emoji_manager
from src.chat.emoji_system.maisaka_tool import send_emoji_for_maisaka
from src.common.data_models.message_component_data_model import ImageComponent, MessageSequence, TextComponent
from src.common.data_models.image_data_model import MaiEmoji
from src.common.data_models.message_component_data_model import ImageComponent, MessageSequence, TextComponent
from src.common.logger import get_logger
from src.core.tooling import ToolExecutionContext, ToolExecutionResult, ToolInvocation, ToolSpec
from src.llm_models.payload_content.resp_format import RespFormat, RespFormatType
from src.maisaka.context_messages import LLMContextMessage, ReferenceMessage, ReferenceMessageType, SessionBackedMessage
from src.maisaka.context_messages import (
LLMContextMessage,
ReferenceMessage,
ReferenceMessageType,
SessionBackedMessage,
)
from .context import BuiltinToolRuntimeContext
@@ -24,16 +32,17 @@ logger = get_logger("maisaka_builtin_send_emoji")
_EMOJI_SUB_AGENT_CONTEXT_LIMIT = 12
_EMOJI_SUB_AGENT_MAX_TOKENS = 240
_EMOJI_SUB_AGENT_SAMPLE_SIZE = 20
_EMOJI_SUCCESS_MESSAGE = "???????"
_EMOJI_CANDIDATE_GROUP_COUNT = 3
_EMOJI_CANDIDATES_PER_GROUP = 5
_EMOJI_CANDIDATE_TILE_SIZE = 256
_EMOJI_SUCCESS_MESSAGE = "表情包发送成功"
class EmojiSelectionResult(BaseModel):
"""表情包子代理的结构化选择结果。"""
emoji_id: str = PydanticField(default="", description="选中的候选表情包 ID。")
matched_emotion: str = PydanticField(default="", description="本次命中的情绪标签,可为空")
reason: str = PydanticField(default="", description="简短选择理由。")
emoji_id: str = PydanticField(default="", description="选中的候选消息 ID。")
emoji_index: int = PydanticField(default=1, description="该候选消息中第几张图片,从 1 开始计数")
def get_tool_spec() -> ToolSpec:
@@ -57,14 +66,109 @@ def get_tool_spec() -> ToolSpec:
)
async def _build_emoji_candidate_message(emoji: MaiEmoji, candidate_id: str) -> SessionBackedMessage:
"""构建供子代理挑选的图片候选消息"""
async def _load_emoji_bytes(emoji: MaiEmoji) -> bytes:
"""读取单个表情包图片字节"""
image_bytes = await asyncio.to_thread(emoji.full_path.read_bytes)
return await asyncio.to_thread(emoji.full_path.read_bytes)
def _build_placeholder_tile(label: str, tile_size: int) -> PILImage.Image:
"""构建图片读取失败时使用的占位图。"""
tile = PILImage.new("RGB", (tile_size, tile_size), color=(245, 245, 245))
draw = ImageDraw.Draw(tile)
font = ImageFont.load_default()
text_bbox = draw.textbbox((0, 0), label, font=font)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
draw.text(
((tile_size - text_width) / 2, (tile_size - text_height) / 2),
label,
fill=(80, 80, 80),
font=font,
)
return tile
def _build_labeled_tile(image_bytes: bytes, index: int, tile_size: int) -> PILImage.Image:
"""构建带序号角标的候选图片块。"""
try:
with PILImage.open(BytesIO(image_bytes)) as raw_image:
image = raw_image.convert("RGBA")
except Exception:
return _build_placeholder_tile(str(index), tile_size)
image.thumbnail((tile_size, tile_size))
tile = PILImage.new("RGBA", (tile_size, tile_size), color=(255, 255, 255, 255))
offset_x = (tile_size - image.width) // 2
offset_y = (tile_size - image.height) // 2
tile.paste(image, (offset_x, offset_y), image)
draw = ImageDraw.Draw(tile)
font = ImageFont.load_default()
badge_size = 56
badge_margin = 14
draw.rounded_rectangle(
(
badge_margin,
badge_margin,
badge_margin + badge_size,
badge_margin + badge_size,
),
radius=8,
fill=(0, 0, 0, 180),
)
label = str(index)
text_bbox = draw.textbbox((0, 0), label, font=font)
text_width = text_bbox[2] - text_bbox[0]
text_height = text_bbox[3] - text_bbox[1]
draw.text(
(
badge_margin + (badge_size - text_width) / 2,
badge_margin + (badge_size - text_height) / 2 - 1,
),
label,
fill=(255, 255, 255, 255),
font=font,
)
return tile
def _merge_emoji_tiles(image_bytes_list: list[bytes]) -> bytes:
"""将三张候选表情图拼接成一张横向图片。"""
tile_size = _EMOJI_CANDIDATE_TILE_SIZE
gap = 12
tiles = [
_build_labeled_tile(image_bytes=image_bytes, index=index, tile_size=tile_size)
for index, image_bytes in enumerate(image_bytes_list, start=1)
]
canvas_width = tile_size * len(tiles) + gap * max(len(tiles) - 1, 0)
canvas = PILImage.new("RGBA", (canvas_width, tile_size), color=(255, 255, 255, 255))
current_x = 0
for tile in tiles:
canvas.paste(tile, (current_x, 0), tile)
current_x += tile_size + gap
output = BytesIO()
canvas.convert("RGB").save(output, format="PNG")
return output.getvalue()
async def _build_emoji_candidate_message(
emojis: list[MaiEmoji],
candidate_id: str,
) -> SessionBackedMessage:
"""构建供子代理挑选的拼图候选消息。"""
image_bytes_list = await asyncio.gather(*[_load_emoji_bytes(emoji) for emoji in emojis])
merged_image_bytes = await asyncio.to_thread(_merge_emoji_tiles, list(image_bytes_list))
raw_message = MessageSequence(
[
TextComponent(f"ID: {candidate_id}"),
ImageComponent(binary_hash=str(emoji.file_hash or ""), binary_data=image_bytes),
ImageComponent(binary_hash="", binary_data=merged_image_bytes),
]
)
return SessionBackedMessage(
@@ -84,39 +188,45 @@ async def _select_emoji_with_sub_agent(
) -> tuple[MaiEmoji | None, str]:
"""通过临时子代理从候选表情包中选出一个结果。"""
del reasoning, context_texts, sample_size
available_emojis = list(emoji_manager.emojis)
if not available_emojis:
return None, ""
effective_sample_size = min(max(sample_size, 1), _EMOJI_SUB_AGENT_SAMPLE_SIZE, len(available_emojis))
sampled_emojis = sample(available_emojis, effective_sample_size)
total_candidate_count = min(
len(available_emojis),
_EMOJI_CANDIDATE_GROUP_COUNT * _EMOJI_CANDIDATES_PER_GROUP,
)
sampled_emojis = sample(available_emojis, total_candidate_count)
candidate_map: dict[str, MaiEmoji] = {}
candidate_map: dict[str, list[MaiEmoji]] = {}
candidate_messages: list[LLMContextMessage] = []
for emoji in sampled_emojis:
for group_index in range(0, len(sampled_emojis), _EMOJI_CANDIDATES_PER_GROUP):
emoji_group = sampled_emojis[group_index : group_index + _EMOJI_CANDIDATES_PER_GROUP]
if not emoji_group:
continue
candidate_id = token_hex(4)
while candidate_id in candidate_map:
candidate_id = token_hex(4)
candidate_map[candidate_id] = emoji
candidate_messages.append(await _build_emoji_candidate_message(emoji, candidate_id))
candidate_map[candidate_id] = emoji_group
candidate_messages.append(await _build_emoji_candidate_message(emoji_group, candidate_id))
context_text = "\n".join(context_texts[-5:]) if context_texts else "(暂无额外上下文)"
system_prompt = (
"你是 Maisaka 的临时表情包选择子代理。\n"
"你会收到一段群聊上下文,以及若干条候选表情包消息。每条候选消息里都有一个临时 ID\n"
"你的任务是根据上下文、当前语气和发送意图,从候选里选出最合适的一个表情包\n"
"必须只从候选消息中选择,不能编造新的 ID\n"
"你会收到群聊上下文,以及 3 条候选消息。每条候选消息都包含 5 张横向拼接的表情图\n"
"每条候选消息都有一个临时 ID图片左上角标有 1、2、3、4、5对应这条消息中的第 1 到第 5 张图\n"
"你的任务是根据上下文和当前语气,从候选中选出最合适的一张表情包\n"
"如果提供了 requested_emotion请优先考虑与其接近的候选如果没有完全匹配则选择最符合上下文语气的候选。\n"
"你必须返回一个 JSON 对象json object不要输出任何 JSON 之外的内容。\n"
'返回格式固定为:{"emoji_id":"候选ID","matched_emotion":"情绪标签","reason":"简短理由"}'
'返回格式固定为:{"emoji_id":"候选消息ID","emoji_index":1}'
)
prompt_message = ReferenceMessage(
content=(
f"[选择任务]\n"
f"requested_emotion: {requested_emotion or '未指定'}\n"
f"reasoning: {reasoning or '辅助表达当前语气和情绪'}\n"
f"recent_context:\n{context_text}\n"
'请只输出 JSON。'
"请只输出 JSON。"
),
timestamp=datetime.now(),
reference_type=ReferenceMessageType.TOOL_HINT,
@@ -140,20 +250,24 @@ async def _select_emoji_with_sub_agent(
except Exception as exc:
logger.warning(f"{tool_ctx.runtime.log_prefix} 表情包子代理结果解析失败,将回退到候选首项: {exc}")
fallback_emoji = sampled_emojis[0] if sampled_emojis else None
return fallback_emoji, requested_emotion
return fallback_emoji, ""
selected_emoji = candidate_map.get(selection.emoji_id.strip())
if selected_emoji is None:
selected_group = candidate_map.get(selection.emoji_id.strip())
if selected_group is None:
logger.warning(
f"{tool_ctx.runtime.log_prefix} 表情包子代理返回了无效 ID: {selection.emoji_id!r},将回退到候选首项"
)
fallback_emoji = sampled_emojis[0] if sampled_emojis else None
return fallback_emoji, requested_emotion
return fallback_emoji, ""
matched_emotion = selection.matched_emotion.strip()
if not matched_emotion:
matched_emotion = requested_emotion.strip()
return selected_emoji, matched_emotion
emoji_index = int(selection.emoji_index)
if emoji_index < 1 or emoji_index > len(selected_group):
logger.warning(
f"{tool_ctx.runtime.log_prefix} 表情包子代理返回了无效序号: {emoji_index!r},将回退到该组第 1 张"
)
emoji_index = 1
return selected_group[emoji_index - 1], ""
async def handle_tool(
@@ -207,9 +321,9 @@ async def handle_tool(
if send_result.success:
structured_result["message"] = _EMOJI_SUCCESS_MESSAGE
logger.info(
f"{tool_ctx.runtime.log_prefix} ??????? "
f"??={send_result.description!r} ????={send_result.emotions} "
f"????={emotion!r} ????={send_result.matched_emotion!r}"
f"{tool_ctx.runtime.log_prefix} 表情包发送成功 "
f"描述={send_result.description!r} 情绪标签={send_result.emotions} "
f"请求情绪={emotion!r} 命中情绪={send_result.matched_emotion!r}"
)
tool_ctx.append_sent_emoji_to_chat_history(
emoji_base64=send_result.emoji_base64,

View File

@@ -51,6 +51,18 @@ class MaisakaReasoningEngine:
self._runtime = runtime
self._last_reasoning_content: str = ""
@staticmethod
def _get_runtime_manager() -> Any:
"""获取插件运行时管理器。
Returns:
Any: 插件运行时管理器单例。
"""
from src.plugin_runtime.integration import get_plugin_runtime_manager
return get_plugin_runtime_manager()
@property
def last_reasoning_content(self) -> str:
"""返回最近一轮思考文本。"""