炸 service 层 x 2,把能归类为现有重构好的模块的都归类过去

This commit is contained in:
DrSmoothl
2026-03-14 00:33:08 +08:00
parent 43c5b34623
commit 4bc9c5bf7e
12 changed files with 207 additions and 1000 deletions

View File

@@ -1,12 +1,38 @@
import random
from pathlib import Path
from typing import Any, Dict, List, Optional
import time
from src.chat.message_receive.chat_manager import BotChatSession, chat_manager
from src.common.data_models.image_data_model import MaiEmoji
from src.common.logger import get_logger
from src.common.utils.utils_image import ImageUtils
logger = get_logger("plugin_runtime.integration")
class RuntimeDataCapabilityMixin:
@staticmethod
def _serialize_emoji_payload(emoji: MaiEmoji) -> Optional[Dict[str, str]]:
emoji_base64 = ImageUtils.image_path_to_base64(str(emoji.full_path))
if not emoji_base64:
return None
matched_emotion = emoji.emotion[0] if emoji.emotion else ""
return {
"base64": emoji_base64,
"description": emoji.description,
"emotion": matched_emotion,
}
@staticmethod
def _build_emoji_temp_path() -> Path:
from src.chat.emoji_system.emoji_manager import EMOJI_DIR
EMOJI_DIR.mkdir(parents=True, exist_ok=True)
return EMOJI_DIR / f"emoji_cap_{int(time.time() * 1000000)}.png"
async def _cap_database_query(self, plugin_id: str, capability: str, args: Dict[str, Any]) -> Any:
from src.services import database_service as database_api
@@ -338,7 +364,7 @@ class RuntimeDataCapabilityMixin:
limit=args.get("limit", 0),
)
readable = message_api.build_readable_messages_to_str(
readable = message_api.build_readable_messages(
messages=messages,
replace_bot_name=args.get("replace_bot_name", True),
timestamp_mode=args.get("timestamp_mode", "relative"),
@@ -397,101 +423,173 @@ class RuntimeDataCapabilityMixin:
return {"success": False, "error": str(e)}
async def _cap_emoji_get_by_description(self, plugin_id: str, capability: str, args: Dict[str, Any]) -> Any:
from src.services import emoji_service as emoji_api
from src.chat.emoji_system.emoji_manager import emoji_manager
description: str = args.get("description", "")
if not description:
return {"success": False, "error": "缺少必要参数 description"}
try:
result = await emoji_api.get_by_description(description=description)
if result is None:
emoji = await emoji_manager.get_emoji_for_emotion(description)
if emoji is None:
return {"success": True, "emoji": None}
serialized = self._serialize_emoji_payload(emoji)
if serialized is None:
return {"success": True, "emoji": None}
emoji_base64, emoji_desc, matched_emotion = result
return {
"success": True,
"emoji": {
"base64": emoji_base64,
"description": emoji_desc,
"emotion": matched_emotion,
},
"emoji": serialized,
}
except Exception as e:
logger.error(f"[cap.emoji.get_by_description] 执行失败: {e}", exc_info=True)
return {"success": False, "error": str(e)}
async def _cap_emoji_get_random(self, plugin_id: str, capability: str, args: Dict[str, Any]) -> Any:
from src.services import emoji_service as emoji_api
from src.chat.emoji_system.emoji_manager import emoji_manager
count: int = args.get("count", 1)
try:
results = await emoji_api.get_random(count=count)
emojis = [{"base64": b64, "description": desc, "emotion": emo} for b64, desc, emo in results]
if count < 0:
return {"success": False, "error": "count 不能为负数"}
emojis_source = list(emoji_manager.emojis)
if count == 0 or not emojis_source:
return {"success": True, "emojis": []}
selected = random.sample(emojis_source, min(count, len(emojis_source)))
emojis: List[Dict[str, str]] = []
for emoji in selected:
emoji_manager.update_emoji_usage(emoji)
serialized = self._serialize_emoji_payload(emoji)
if serialized is not None:
if not serialized["emotion"]:
serialized["emotion"] = "随机表情"
emojis.append(serialized)
return {"success": True, "emojis": emojis}
except Exception as e:
logger.error(f"[cap.emoji.get_random] 执行失败: {e}", exc_info=True)
return {"success": False, "error": str(e)}
async def _cap_emoji_get_count(self, plugin_id: str, capability: str, args: Dict[str, Any]) -> Any:
from src.services import emoji_service as emoji_api
try:
return {"success": True, "count": emoji_api.get_count()}
from src.chat.emoji_system.emoji_manager import emoji_manager
return {"success": True, "count": len(emoji_manager.emojis)}
except Exception as e:
logger.error(f"[cap.emoji.get_count] 执行失败: {e}", exc_info=True)
return {"success": False, "error": str(e)}
async def _cap_emoji_get_emotions(self, plugin_id: str, capability: str, args: Dict[str, Any]) -> Any:
from src.services import emoji_service as emoji_api
try:
return {"success": True, "emotions": emoji_api.get_emotions()}
from src.chat.emoji_system.emoji_manager import emoji_manager
emotions = sorted({emotion for emoji in emoji_manager.emojis for emotion in emoji.emotion})
return {"success": True, "emotions": emotions}
except Exception as e:
logger.error(f"[cap.emoji.get_emotions] 执行失败: {e}", exc_info=True)
return {"success": False, "error": str(e)}
async def _cap_emoji_get_all(self, plugin_id: str, capability: str, args: Dict[str, Any]) -> Any:
from src.services import emoji_service as emoji_api
try:
results = await emoji_api.get_all()
emojis = [{"base64": b64, "description": desc, "emotion": emo} for b64, desc, emo in results] if results else []
from src.chat.emoji_system.emoji_manager import emoji_manager
emojis = []
for emoji in emoji_manager.emojis:
serialized = self._serialize_emoji_payload(emoji)
if serialized is not None:
if not serialized["emotion"]:
serialized["emotion"] = "随机表情"
emojis.append(serialized)
return {"success": True, "emojis": emojis}
except Exception as e:
logger.error(f"[cap.emoji.get_all] 执行失败: {e}", exc_info=True)
return {"success": False, "error": str(e)}
async def _cap_emoji_get_info(self, plugin_id: str, capability: str, args: Dict[str, Any]) -> Any:
from src.services import emoji_service as emoji_api
try:
return {"success": True, "info": emoji_api.get_info()}
from src.chat.emoji_system.emoji_manager import emoji_manager
from src.config.config import global_config
current_count = len(emoji_manager.emojis)
return {
"success": True,
"info": {
"current_count": current_count,
"max_count": global_config.emoji.max_reg_num,
"available_emojis": current_count,
},
}
except Exception as e:
logger.error(f"[cap.emoji.get_info] 执行失败: {e}", exc_info=True)
return {"success": False, "error": str(e)}
async def _cap_emoji_register(self, plugin_id: str, capability: str, args: Dict[str, Any]) -> Any:
from src.services import emoji_service as emoji_api
from src.chat.emoji_system.emoji_manager import emoji_manager
emoji_base64: str = args.get("emoji_base64", "")
if not emoji_base64:
return {"success": False, "error": "缺少必要参数 emoji_base64"}
try:
return await emoji_api.register_emoji(emoji_base64)
count_before = len(emoji_manager.emojis)
temp_file_path = self._build_emoji_temp_path()
if not ImageUtils.base64_to_image(emoji_base64, str(temp_file_path)):
return {"success": False, "message": "无法保存图片文件", "description": None, "emotions": None, "replaced": None, "hash": None}
register_success = await emoji_manager.register_emoji_by_filename(temp_file_path)
if not register_success:
if temp_file_path.exists():
temp_file_path.unlink(missing_ok=True)
return {
"success": False,
"message": "表情包注册失败,可能因为重复、格式不支持或审核未通过",
"description": None,
"emotions": None,
"replaced": None,
"hash": None,
}
count_after = len(emoji_manager.emojis)
replaced = count_after <= count_before
new_emoji = next(
(
item
for item in reversed(emoji_manager.emojis)
if temp_file_path.name == item.file_name or temp_file_path.name in str(item.full_path)
),
None,
)
return {
"success": True,
"message": f"表情包注册成功 {'(替换旧表情包)' if replaced else '(新增表情包)'}",
"description": None if new_emoji is None else new_emoji.description,
"emotions": None if new_emoji is None else new_emoji.emotion,
"replaced": replaced,
"hash": None if new_emoji is None else new_emoji.file_hash,
}
except Exception as e:
logger.error(f"[cap.emoji.register] 执行失败: {e}", exc_info=True)
return {"success": False, "error": str(e)}
async def _cap_emoji_delete(self, plugin_id: str, capability: str, args: Dict[str, Any]) -> Any:
from src.services import emoji_service as emoji_api
from src.chat.emoji_system.emoji_manager import emoji_manager
emoji_hash: str = args.get("emoji_hash", "")
if not emoji_hash:
return {"success": False, "error": "缺少必要参数 emoji_hash"}
try:
return await emoji_api.delete_emoji(emoji_hash)
emoji = emoji_manager.get_emoji_by_hash(emoji_hash)
if emoji is None:
return {"success": False, "message": f"未找到表情包: {emoji_hash}", "hash": emoji_hash}
success = emoji_manager.delete_emoji(emoji, not bool(emoji.description and emoji.description.strip()))
if not success:
return {"success": False, "message": f"删除表情包失败: {emoji_hash}", "hash": emoji_hash}
emoji_manager.emojis = [item for item in emoji_manager.emojis if item.file_hash != emoji_hash]
emoji_manager._emoji_num = len(emoji_manager.emojis)
return {"success": True, "message": f"成功删除表情包: {emoji_hash}", "hash": emoji_hash}
except Exception as e:
logger.error(f"[cap.emoji.delete] 执行失败: {e}", exc_info=True)
return {"success": False, "error": str(e)}