214 lines
8.8 KiB
Python
214 lines
8.8 KiB
Python
"""表情包管理插件 — 新 SDK 版本
|
||
|
||
通过 /emoji 命令管理表情包的添加、列表和删除。
|
||
"""
|
||
|
||
import base64
|
||
import datetime
|
||
import hashlib
|
||
import re
|
||
|
||
from maibot_sdk import MaiBotPlugin, Command
|
||
|
||
|
||
class EmojiManagePlugin(MaiBotPlugin):
|
||
"""表情包管理插件"""
|
||
|
||
# ===== 工具方法 =====
|
||
|
||
@staticmethod
|
||
def _extract_emoji_base64(segments) -> list[str]:
|
||
"""从消息 segments 中提取 emoji/image 的 base64 数据。
|
||
|
||
segments 可以是 dict 列表或 Seg 对象列表(兼容两种格式)。
|
||
"""
|
||
results: list[str] = []
|
||
if not segments:
|
||
return results
|
||
|
||
if isinstance(segments, dict):
|
||
seg_type = segments.get("type", "")
|
||
if seg_type in ("emoji", "image"):
|
||
data = segments.get("data", "")
|
||
if data:
|
||
results.append(data)
|
||
elif seg_type == "seglist":
|
||
for child in segments.get("data", []):
|
||
results.extend(EmojiManagePlugin._extract_emoji_base64(child))
|
||
return results
|
||
|
||
# 如果有 .type 属性(Seg 对象)
|
||
if hasattr(segments, "type"):
|
||
seg_type = getattr(segments, "type", "")
|
||
if seg_type in ("emoji", "image"):
|
||
results.append(getattr(segments, "data", ""))
|
||
elif seg_type == "seglist":
|
||
for child in getattr(segments, "data", []):
|
||
results.extend(EmojiManagePlugin._extract_emoji_base64(child))
|
||
return results
|
||
|
||
# 列表
|
||
for seg in segments:
|
||
results.extend(EmojiManagePlugin._extract_emoji_base64(seg))
|
||
return results
|
||
|
||
# ===== Command 组件 =====
|
||
|
||
@Command("add_emoji", description="添加表情包", pattern=r".*/emoji add.*")
|
||
async def handle_add_emoji(self, stream_id: str = "", message_segments=None, **kwargs):
|
||
"""添加表情包"""
|
||
emoji_base64_list = self._extract_emoji_base64(message_segments)
|
||
if not emoji_base64_list:
|
||
await self.ctx.send.text("未在消息中找到表情包或图片", stream_id)
|
||
return False, "未在消息中找到表情包或图片", False
|
||
|
||
success_count = 0
|
||
fail_count = 0
|
||
results = []
|
||
|
||
for i, emoji_b64 in enumerate(emoji_base64_list):
|
||
result = await self.ctx.emoji.register_emoji(emoji_b64)
|
||
if isinstance(result, dict) and result.get("success"):
|
||
success_count += 1
|
||
desc = result.get("description", "未知描述")
|
||
emotions = result.get("emotions", [])
|
||
replaced = result.get("replaced", False)
|
||
msg = f"表情包 {i + 1} 注册成功{'(替换旧表情包)' if replaced else '(新增表情包)'}"
|
||
if desc:
|
||
msg += f"\n描述: {desc}"
|
||
if emotions:
|
||
msg += f"\n情感标签: {', '.join(emotions)}"
|
||
results.append(msg)
|
||
else:
|
||
fail_count += 1
|
||
err = result.get("message", "注册失败") if isinstance(result, dict) else "注册失败"
|
||
results.append(f"表情包 {i + 1} 注册失败: {err}")
|
||
|
||
total = success_count + fail_count
|
||
summary = f"表情包注册完成: 成功 {success_count} 个,失败 {fail_count} 个,共处理 {total} 个"
|
||
if results:
|
||
summary += "\n" + "\n".join(results)
|
||
|
||
await self.ctx.send.text(summary, stream_id)
|
||
return success_count > 0, summary, success_count > 0
|
||
|
||
@Command("emoji_list", description="列表表情包", pattern=r"^/emoji list(\s+\d+)?$")
|
||
async def handle_list_emoji(self, stream_id: str = "", raw_message: str = "", **kwargs):
|
||
"""列出表情包"""
|
||
max_count = 10
|
||
match = re.match(r"^/emoji list(?:\s+(\d+))?$", raw_message)
|
||
if match and match.group(1):
|
||
max_count = min(int(match.group(1)), 50)
|
||
|
||
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
count_result = await self.ctx.emoji.get_count()
|
||
emoji_count = count_result if isinstance(count_result, int) else 0
|
||
|
||
info_result = await self.ctx.emoji.get_info()
|
||
max_emoji = info_result.get("max_count", 0) if isinstance(info_result, dict) else 0
|
||
available = info_result.get("available_emojis", 0) if isinstance(info_result, dict) else 0
|
||
|
||
lines = [
|
||
f"📊 表情包统计信息 ({now})",
|
||
f"• 总数: {emoji_count} / {max_emoji}",
|
||
f"• 可用: {available}",
|
||
]
|
||
|
||
if emoji_count == 0:
|
||
lines.append("\n❌ 暂无表情包")
|
||
await self.ctx.send.text("\n".join(lines), stream_id)
|
||
return True, "\n".join(lines), True
|
||
|
||
all_result = await self.ctx.emoji.get_all()
|
||
all_emojis = all_result if isinstance(all_result, list) else []
|
||
if not all_emojis:
|
||
lines.append("\n❌ 无法获取表情包列表")
|
||
await self.ctx.send.text("\n".join(lines), stream_id)
|
||
return False, "\n".join(lines), True
|
||
|
||
display = all_emojis[:max_count]
|
||
lines.append(f"\n📋 显示前 {len(display)} 个表情包:")
|
||
for i, emoji in enumerate(display, 1):
|
||
if isinstance(emoji, (list, tuple)) and len(emoji) >= 3:
|
||
_, desc, emotion = emoji[0], emoji[1], emoji[2]
|
||
elif isinstance(emoji, dict):
|
||
desc = emoji.get("description", "")
|
||
emotion = emoji.get("emotion", "")
|
||
else:
|
||
desc, emotion = str(emoji), ""
|
||
short_desc = desc[:50] + "..." if len(desc) > 50 else desc
|
||
lines.append(f"{i}. {short_desc} [{emotion}]")
|
||
|
||
if len(all_emojis) > max_count:
|
||
lines.append(f"\n💡 还有 {len(all_emojis) - max_count} 个表情包未显示")
|
||
|
||
final = "\n".join(lines)
|
||
await self.ctx.send.text(final, stream_id)
|
||
return True, final, True
|
||
|
||
@Command("delete_emoji", description="删除表情包", pattern=r".*/emoji delete.*")
|
||
async def handle_delete_emoji(self, stream_id: str = "", message_segments=None, **kwargs):
|
||
"""删除表情包"""
|
||
emoji_base64_list = self._extract_emoji_base64(message_segments)
|
||
if not emoji_base64_list:
|
||
await self.ctx.send.text("未在消息中找到表情包或图片", stream_id)
|
||
return False, "未找到表情包", False
|
||
|
||
success_count = 0
|
||
fail_count = 0
|
||
results = []
|
||
|
||
for i, emoji_b64 in enumerate(emoji_base64_list):
|
||
# 计算哈希
|
||
if isinstance(emoji_b64, str):
|
||
clean = emoji_b64.encode("ascii", errors="ignore").decode("ascii")
|
||
else:
|
||
clean = str(emoji_b64)
|
||
image_bytes = base64.b64decode(clean)
|
||
emoji_hash = hashlib.md5(image_bytes).hexdigest() # noqa: S324
|
||
|
||
result = await self.ctx.emoji.delete_emoji(emoji_hash)
|
||
if isinstance(result, dict) and result.get("success"):
|
||
success_count += 1
|
||
desc = result.get("description", "未知描述")
|
||
emotions = result.get("emotions", [])
|
||
before = result.get("count_before", 0)
|
||
after = result.get("count_after", 0)
|
||
msg = f"表情包 {i + 1} 删除成功"
|
||
if desc:
|
||
msg += f"\n描述: {desc}"
|
||
if emotions:
|
||
msg += f"\n情感标签: {', '.join(emotions)}"
|
||
msg += f"\n表情包数量: {before} → {after}"
|
||
results.append(msg)
|
||
else:
|
||
fail_count += 1
|
||
err = result.get("message", "删除失败") if isinstance(result, dict) else "删除失败"
|
||
results.append(f"表情包 {i + 1} 删除失败: {err}")
|
||
|
||
total = success_count + fail_count
|
||
summary = f"表情包删除完成: 成功 {success_count} 个,失败 {fail_count} 个,共处理 {total} 个"
|
||
if results:
|
||
summary += "\n" + "\n".join(results)
|
||
|
||
await self.ctx.send.text(summary, stream_id)
|
||
return success_count > 0, summary, success_count > 0
|
||
|
||
@Command("random_emojis", description="发送多张随机表情包", pattern=r"^/random_emojis$")
|
||
async def handle_random_emojis(self, stream_id: str = "", **kwargs):
|
||
"""发送多张随机表情包"""
|
||
emojis = await self.ctx.emoji.get_random(5)
|
||
if not emojis:
|
||
return False, "未找到表情包", False
|
||
messages = [
|
||
{"user_id": "0", "nickname": "神秘用户", "segments": [{"type": "image", "content": e.get("base64", "")}]}
|
||
for e in emojis
|
||
]
|
||
await self.ctx.send.forward(messages, stream_id)
|
||
return True, "已发送随机表情包", True
|
||
|
||
|
||
def create_plugin():
|
||
return EmojiManagePlugin()
|