Files
mai-bot/plugins/emoji_manage_plugin/plugin.py

217 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""表情包管理插件 — 新 SDK 版本
通过 /emoji 命令管理表情包的添加、列表和删除。
"""
import base64
import datetime
import hashlib
import re
from maibot_sdk import MaiBotPlugin, Command
class EmojiManagePlugin(MaiBotPlugin):
"""表情包管理插件"""
# ===== 工具方法 =====
@staticmethod
def _extract_emoji_base64(segments) -> list[str]:
"""从消息 segments 中提取 emoji/image 的 base64 数据。
segments 可以是 dict 列表或 Seg 对象列表(兼容两种格式)。
"""
results: list[str] = []
if not segments:
return results
if isinstance(segments, dict):
seg_type = segments.get("type", "")
if seg_type in ("emoji", "image"):
data = segments.get("data", "")
if data:
results.append(data)
elif seg_type == "seglist":
for child in segments.get("data", []):
results.extend(EmojiManagePlugin._extract_emoji_base64(child))
return results
# 如果有 .type 属性Seg 对象)
if hasattr(segments, "type"):
seg_type = getattr(segments, "type", "")
if seg_type in ("emoji", "image"):
results.append(getattr(segments, "data", ""))
elif seg_type == "seglist":
for child in getattr(segments, "data", []):
results.extend(EmojiManagePlugin._extract_emoji_base64(child))
return results
# 列表
for seg in segments:
results.extend(EmojiManagePlugin._extract_emoji_base64(seg))
return results
# ===== Command 组件 =====
@Command("add_emoji", description="添加表情包", pattern=r".*/emoji add.*")
async def handle_add_emoji(self, stream_id: str = "", message_segments=None, **kwargs):
"""添加表情包"""
emoji_base64_list = self._extract_emoji_base64(message_segments)
if not emoji_base64_list:
await self.ctx.send.text("未在消息中找到表情包或图片", stream_id)
return False, "未在消息中找到表情包或图片", False
success_count = 0
fail_count = 0
results = []
for i, emoji_b64 in enumerate(emoji_base64_list):
result = await self.ctx.emoji.register_emoji(emoji_b64)
if isinstance(result, dict) and result.get("success"):
success_count += 1
desc = result.get("description", "未知描述")
emotions = result.get("emotions", [])
replaced = result.get("replaced", False)
msg = f"表情包 {i + 1} 注册成功{'(替换旧表情包)' if replaced else '(新增表情包)'}"
if desc:
msg += f"\n描述: {desc}"
if emotions:
msg += f"\n情感标签: {', '.join(emotions)}"
results.append(msg)
else:
fail_count += 1
err = result.get("message", "注册失败") if isinstance(result, dict) else "注册失败"
results.append(f"表情包 {i + 1} 注册失败: {err}")
total = success_count + fail_count
summary = f"表情包注册完成: 成功 {success_count} 个,失败 {fail_count} 个,共处理 {total}"
if results:
summary += "\n" + "\n".join(results)
await self.ctx.send.text(summary, stream_id)
return success_count > 0, summary, success_count > 0
@Command("emoji_list", description="列表表情包", pattern=r"^/emoji list(\s+\d+)?$")
async def handle_list_emoji(self, stream_id: str = "", raw_message: str = "", **kwargs):
"""列出表情包"""
max_count = 10
match = re.match(r"^/emoji list(?:\s+(\d+))?$", raw_message)
if match and match.group(1):
max_count = min(int(match.group(1)), 50)
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
count_result = await self.ctx.emoji.get_count()
emoji_count = count_result if isinstance(count_result, int) else 0
info_result = await self.ctx.emoji.get_info()
max_emoji = info_result.get("max_count", 0) if isinstance(info_result, dict) else 0
available = info_result.get("available_emojis", 0) if isinstance(info_result, dict) else 0
lines = [
f"📊 表情包统计信息 ({now})",
f"• 总数: {emoji_count} / {max_emoji}",
f"• 可用: {available}",
]
if emoji_count == 0:
lines.append("\n❌ 暂无表情包")
await self.ctx.send.text("\n".join(lines), stream_id)
return True, "\n".join(lines), True
all_result = await self.ctx.emoji.get_all()
all_emojis = all_result if isinstance(all_result, list) else []
if not all_emojis:
lines.append("\n❌ 无法获取表情包列表")
await self.ctx.send.text("\n".join(lines), stream_id)
return False, "\n".join(lines), True
display = all_emojis[:max_count]
lines.append(f"\n📋 显示前 {len(display)} 个表情包:")
for i, emoji in enumerate(display, 1):
if isinstance(emoji, (list, tuple)) and len(emoji) >= 3:
_, desc, emotion = emoji[0], emoji[1], emoji[2]
elif isinstance(emoji, dict):
desc = emoji.get("description", "")
emotion = emoji.get("emotion", "")
else:
desc, emotion = str(emoji), ""
short_desc = desc[:50] + "..." if len(desc) > 50 else desc
lines.append(f"{i}. {short_desc} [{emotion}]")
if len(all_emojis) > max_count:
lines.append(f"\n💡 还有 {len(all_emojis) - max_count} 个表情包未显示")
final = "\n".join(lines)
await self.ctx.send.text(final, stream_id)
return True, final, True
@Command("delete_emoji", description="删除表情包", pattern=r".*/emoji delete.*")
async def handle_delete_emoji(self, stream_id: str = "", message_segments=None, **kwargs):
"""删除表情包"""
emoji_base64_list = self._extract_emoji_base64(message_segments)
if not emoji_base64_list:
await self.ctx.send.text("未在消息中找到表情包或图片", stream_id)
return False, "未找到表情包", False
success_count = 0
fail_count = 0
results = []
for i, emoji_b64 in enumerate(emoji_base64_list):
# 计算哈希
if isinstance(emoji_b64, str):
clean = emoji_b64.encode("ascii", errors="ignore").decode("ascii")
else:
clean = str(emoji_b64)
image_bytes = base64.b64decode(clean)
emoji_hash = hashlib.md5(image_bytes).hexdigest() # noqa: S324
result = await self.ctx.emoji.delete_emoji(emoji_hash)
if isinstance(result, dict) and result.get("success"):
success_count += 1
desc = result.get("description", "未知描述")
emotions = result.get("emotions", [])
before = result.get("count_before", 0)
after = result.get("count_after", 0)
msg = f"表情包 {i + 1} 删除成功"
if desc:
msg += f"\n描述: {desc}"
if emotions:
msg += f"\n情感标签: {', '.join(emotions)}"
msg += f"\n表情包数量: {before}{after}"
results.append(msg)
else:
fail_count += 1
err = result.get("message", "删除失败") if isinstance(result, dict) else "删除失败"
results.append(f"表情包 {i + 1} 删除失败: {err}")
total = success_count + fail_count
summary = f"表情包删除完成: 成功 {success_count} 个,失败 {fail_count} 个,共处理 {total}"
if results:
summary += "\n" + "\n".join(results)
await self.ctx.send.text(summary, stream_id)
return success_count > 0, summary, success_count > 0
@Command("random_emojis", description="发送多张随机表情包", pattern=r"^/random_emojis$")
async def handle_random_emojis(self, stream_id: str = "", **kwargs):
"""发送多张随机表情包"""
result = await self.ctx.emoji.get_random(5)
if not result or not result.get("success"):
return False, "未找到表情包", False
emojis = result.get("emojis", [])
if not emojis:
return False, "未找到表情包", False
messages = [
{"user_id": "0", "nickname": "神秘用户", "segments": [{"type": "image", "content": e.get("base64", "")}]}
for e in emojis
]
await self.ctx.send.forward(messages, stream_id)
return True, "已发送随机表情包", True
def create_plugin():
return EmojiManagePlugin()