better:完善表情包管理插件

修复一些问题

Update emoji_api.py
This commit is contained in:
SengokuCola
2025-09-24 18:14:23 +08:00
parent 0964676bfb
commit 1c37dd46e8
2 changed files with 464 additions and 13 deletions

View File

@@ -28,7 +28,7 @@ class AddEmojiCommand(BaseCommand):
async def execute(self) -> Tuple[bool, str, bool]:
# 查找消息中的表情包
logger.info(f"查找消息中的表情包: {self.message.message_segment}")
# logger.info(f"查找消息中的表情包: {self.message.message_segment}")
emoji_base64_list = self.find_and_return_emoji_in_message(self.message.message_segment)
@@ -42,8 +42,8 @@ class AddEmojiCommand(BaseCommand):
for i, emoji_base64 in enumerate(emoji_base64_list):
try:
# 使用emoji_api注册表情包
result = await emoji_api.register_emoji(emoji_base64, filename=f"emoji_{i+1}")
# 使用emoji_api注册表情包让API自动生成唯一文件名
result = await emoji_api.register_emoji(emoji_base64)
if result["success"]:
success_count += 1
@@ -72,13 +72,46 @@ class AddEmojiCommand(BaseCommand):
summary_msg = f"表情包注册完成: 成功 {success_count} 个,失败 {fail_count} 个,共处理 {total_count}"
# 如果有结果详情,添加到返回消息中
details_msg = ""
if results:
details_msg = "\n" + "\n".join(results)
final_msg = summary_msg + details_msg
else:
final_msg = summary_msg
return success_count > 0, final_msg, success_count > 0
# 使用表达器重写回复
try:
from src.plugin_system.apis import generator_api
# 构建重写数据
rewrite_data = {
"raw_reply": summary_msg,
"reason": f"注册了表情包:{details_msg}\n",
}
# 调用表达器重写
result_status, data = await generator_api.rewrite_reply(
chat_stream=self.message.chat_stream,
reply_data=rewrite_data,
)
if result_status:
# 发送重写后的回复
for reply_seg in data.reply_set.reply_data:
send_data = reply_seg.content
await self.send_text(send_data)
return success_count > 0, final_msg, success_count > 0
else:
# 如果重写失败,发送原始消息
await self.send_text(final_msg)
return success_count > 0, final_msg, success_count > 0
except Exception as e:
# 如果表达器调用失败,发送原始消息
logger.error(f"[add_emoji] 表达器重写失败: {e}")
await self.send_text(final_msg)
return success_count > 0, final_msg, success_count > 0
def find_and_return_emoji_in_message(self, message_segments) -> List[str]:
emoji_base64_list = []
@@ -114,22 +147,205 @@ class ListEmojiCommand(BaseCommand):
command_description = "列表表情包"
# === 命令设置(必须填写)===
command_pattern = r"^/emoji list$" # 精确匹配 "/emoji list" 命令
command_pattern = r"^/emoji list(\s+\d+)?$" # 匹配 "/emoji list" 或 "/emoji list 数量"
async def execute(self) -> Tuple[bool, str, bool]:
"""执行列表表情包"""
from src.plugin_system.apis import emoji_api
import datetime
# 解析命令参数
import re
match = re.match(r"^/emoji list(?:\s+(\d+))?$", self.message.raw_message)
max_count = 10 # 默认显示10个
if match and match.group(1):
max_count = min(int(match.group(1)), 50) # 最多显示50个
# 获取当前时间
time_format: str = self.get_config("time.format", "%Y-%m-%d %H:%M:%S") # type: ignore
now = datetime.datetime.now()
time_str = now.strftime(time_format)
# 发送时间信息
message = f"⏰ 当前时间:{time_str}"
await self.send_text(message)
# 获取表情包信息
emoji_count = emoji_api.get_count()
emoji_info = emoji_api.get_info()
return True, f"显示了当前时间: {time_str}", True
# 构建返回消息
message_lines = [
f"📊 表情包统计信息 ({time_str})",
f"• 总数: {emoji_count} / {emoji_info['max_count']}",
f"• 可用: {emoji_info['available_emojis']}",
]
if emoji_count == 0:
message_lines.append("\n❌ 暂无表情包")
final_message = "\n".join(message_lines)
await self.send_text(final_message)
return True, final_message, True
# 获取所有表情包
all_emojis = await emoji_api.get_all()
if not all_emojis:
message_lines.append("\n❌ 无法获取表情包列表")
final_message = "\n".join(message_lines)
await self.send_text(final_message)
return False, final_message, True
# 显示前N个表情包
display_emojis = all_emojis[:max_count]
message_lines.append(f"\n📋 显示前 {len(display_emojis)} 个表情包:")
for i, (emoji_base64, description, emotion) in enumerate(display_emojis, 1):
# 截断过长的描述
short_desc = description[:50] + "..." if len(description) > 50 else description
message_lines.append(f"{i}. {short_desc} [{emotion}]")
# 如果还有更多表情包,显示总数
if len(all_emojis) > max_count:
message_lines.append(f"\n💡 还有 {len(all_emojis) - max_count} 个表情包未显示")
final_message = "\n".join(message_lines)
# 直接发送文本消息
await self.send_text(final_message)
return True, final_message, True
class DeleteEmojiCommand(BaseCommand):
command_name = "delete_emoji"
command_description = "删除表情包"
command_pattern = r".*/emoji delete.*"
async def execute(self) -> Tuple[bool, str, bool]:
# 查找消息中的表情包图片
logger.info(f"查找消息中的表情包用于删除: {self.message.message_segment}")
emoji_base64_list = self.find_and_return_emoji_in_message(self.message.message_segment)
if not emoji_base64_list:
return False, "未在消息中找到表情包或图片", False
# 删除找到的表情包
success_count = 0
fail_count = 0
results = []
for i, emoji_base64 in enumerate(emoji_base64_list):
try:
# 计算图片的哈希值来查找对应的表情包
import base64
import hashlib
# 确保base64字符串只包含ASCII字符
if isinstance(emoji_base64, str):
emoji_base64_clean = emoji_base64.encode("ascii", errors="ignore").decode("ascii")
else:
emoji_base64_clean = str(emoji_base64)
# 计算哈希值
image_bytes = base64.b64decode(emoji_base64_clean)
emoji_hash = hashlib.md5(image_bytes).hexdigest()
# 使用emoji_api删除表情包
result = await emoji_api.delete_emoji(emoji_hash)
if result["success"]:
success_count += 1
description = result.get("description", "未知描述")
count_before = result.get("count_before", 0)
count_after = result.get("count_after", 0)
emotions = result.get("emotions", [])
result_msg = f"表情包 {i+1} 删除成功"
if description:
result_msg += f"\n描述: {description}"
if emotions:
result_msg += f"\n情感标签: {', '.join(emotions)}"
result_msg += f"\n表情包数量: {count_before}{count_after}"
results.append(result_msg)
else:
fail_count += 1
error_msg = result.get("message", "删除失败")
results.append(f"表情包 {i+1} 删除失败: {error_msg}")
except Exception as e:
fail_count += 1
results.append(f"表情包 {i+1} 删除时发生错误: {str(e)}")
# 构建返回消息
total_count = success_count + fail_count
summary_msg = f"表情包删除完成: 成功 {success_count} 个,失败 {fail_count} 个,共处理 {total_count}"
# 如果有结果详情,添加到返回消息中
details_msg = ""
if results:
details_msg = "\n" + "\n".join(results)
final_msg = summary_msg + details_msg
else:
final_msg = summary_msg
# 使用表达器重写回复
try:
from src.plugin_system.apis import generator_api
# 构建重写数据
rewrite_data = {
"raw_reply": summary_msg,
"reason": f"删除了表情包:{details_msg}\n",
}
# 调用表达器重写
result_status, data = await generator_api.rewrite_reply(
chat_stream=self.message.chat_stream,
reply_data=rewrite_data,
)
if result_status:
# 发送重写后的回复
for reply_seg in data.reply_set.reply_data:
send_data = reply_seg.content
await self.send_text(send_data)
return success_count > 0, final_msg, success_count > 0
else:
# 如果重写失败,发送原始消息
await self.send_text(final_msg)
return success_count > 0, final_msg, success_count > 0
except Exception as e:
# 如果表达器调用失败,发送原始消息
logger.error(f"[delete_emoji] 表达器重写失败: {e}")
await self.send_text(final_msg)
return success_count > 0, final_msg, success_count > 0
def find_and_return_emoji_in_message(self, message_segments) -> List[str]:
emoji_base64_list = []
# 处理单个Seg对象的情况
if isinstance(message_segments, Seg):
if message_segments.type == "emoji":
emoji_base64_list.append(message_segments.data)
elif message_segments.type == "image":
# 假设图片数据是base64编码的
emoji_base64_list.append(message_segments.data)
elif message_segments.type == "seglist":
# 递归处理嵌套的Seg列表
emoji_base64_list.extend(self.find_and_return_emoji_in_message(message_segments.data))
return emoji_base64_list
# 处理Seg列表的情况
for seg in message_segments:
if seg.type == "emoji":
emoji_base64_list.append(seg.data)
elif seg.type == "image":
# 假设图片数据是base64编码的
emoji_base64_list.append(seg.data)
elif seg.type == "seglist":
# 递归处理嵌套的Seg列表
emoji_base64_list.extend(self.find_and_return_emoji_in_message(seg.data))
return emoji_base64_list
class RandomEmojis(BaseCommand):
@@ -183,4 +399,6 @@ class EmojiManagePlugin(BasePlugin):
return [
(RandomEmojis.get_command_info(), RandomEmojis),
(AddEmojiCommand.get_command_info(), AddEmojiCommand),
(ListEmojiCommand.get_command_info(), ListEmojiCommand),
(DeleteEmojiCommand.get_command_info(), DeleteEmojiCommand),
]