fix:图片存储问题
This commit is contained in:
@@ -42,4 +42,7 @@
|
|||||||
如果你需要改动配置文件,不需要修改实际的bot_config.toml或者model_config.toml,只需要修改配置文件模版,并新增一个版本号即可,也不必要为配置改动创建测试文件。
|
如果你需要改动配置文件,不需要修改实际的bot_config.toml或者model_config.toml,只需要修改配置文件模版,并新增一个版本号即可,也不必要为配置改动创建测试文件。
|
||||||
|
|
||||||
# 关于webui修改
|
# 关于webui修改
|
||||||
不要修改dashboard下的内容,因为这部分内容由另一个仓库build
|
不要修改dashboard下的内容,因为这部分内容由另一个仓库build
|
||||||
|
|
||||||
|
# maibot插件开发文档
|
||||||
|
https://github.com/Mai-with-u/maibot-plugin-sdk/blob/main/docs/guide.md
|
||||||
@@ -1,44 +0,0 @@
|
|||||||
{
|
|
||||||
"manifest_version": 2,
|
|
||||||
"version": "2.0.0",
|
|
||||||
"name": "BetterEmoji",
|
|
||||||
"description": "更好的表情包管理插件",
|
|
||||||
"author": {
|
|
||||||
"name": "SengokuCola",
|
|
||||||
"url": "https://github.com/SengokuCola"
|
|
||||||
},
|
|
||||||
"license": "GPL-v3.0-or-later",
|
|
||||||
"urls": {
|
|
||||||
"repository": "https://github.com/SengokuCola/BetterEmoji",
|
|
||||||
"homepage": "https://github.com/SengokuCola/BetterEmoji",
|
|
||||||
"documentation": "https://github.com/SengokuCola/BetterEmoji",
|
|
||||||
"issues": "https://github.com/SengokuCola/BetterEmoji/issues"
|
|
||||||
},
|
|
||||||
"host_application": {
|
|
||||||
"min_version": "1.0.0",
|
|
||||||
"max_version": "1.0.0"
|
|
||||||
},
|
|
||||||
"sdk": {
|
|
||||||
"min_version": "2.0.0",
|
|
||||||
"max_version": "2.99.99"
|
|
||||||
},
|
|
||||||
"dependencies": [],
|
|
||||||
"capabilities": [
|
|
||||||
"emoji.get_random",
|
|
||||||
"emoji.get_count",
|
|
||||||
"emoji.get_info",
|
|
||||||
"emoji.get_all",
|
|
||||||
"emoji.register_emoji",
|
|
||||||
"emoji.delete_emoji",
|
|
||||||
"send.text",
|
|
||||||
"send.forward"
|
|
||||||
],
|
|
||||||
"i18n": {
|
|
||||||
"default_locale": "zh-CN",
|
|
||||||
"locales_path": "_locales",
|
|
||||||
"supported_locales": [
|
|
||||||
"zh-CN"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"id": "sengokucola.betteremoji"
|
|
||||||
}
|
|
||||||
@@ -1,238 +0,0 @@
|
|||||||
"""表情包管理插件 — 新 SDK 版本
|
|
||||||
|
|
||||||
通过 /emoji 命令管理表情包的添加、列表和删除。
|
|
||||||
"""
|
|
||||||
|
|
||||||
from maibot_sdk import Command, MaiBotPlugin
|
|
||||||
|
|
||||||
import base64
|
|
||||||
import datetime
|
|
||||||
import hashlib
|
|
||||||
import re
|
|
||||||
|
|
||||||
|
|
||||||
class EmojiManagePlugin(MaiBotPlugin):
|
|
||||||
"""表情包管理插件"""
|
|
||||||
|
|
||||||
async def on_load(self) -> None:
|
|
||||||
"""处理插件加载。"""
|
|
||||||
|
|
||||||
async def on_unload(self) -> None:
|
|
||||||
"""处理插件卸载。"""
|
|
||||||
|
|
||||||
# ===== 工具方法 =====
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _extract_emoji_base64(segments) -> list[str]:
|
|
||||||
"""从消息 segments 中提取 emoji/image 的 base64 数据。
|
|
||||||
|
|
||||||
segments 可以是 dict 列表或 Seg 对象列表(兼容两种格式)。
|
|
||||||
"""
|
|
||||||
results: list[str] = []
|
|
||||||
if not segments:
|
|
||||||
return results
|
|
||||||
|
|
||||||
if isinstance(segments, dict):
|
|
||||||
seg_type = segments.get("type", "")
|
|
||||||
if seg_type in ("emoji", "image"):
|
|
||||||
data = segments.get("data", "")
|
|
||||||
if data:
|
|
||||||
results.append(data)
|
|
||||||
elif seg_type == "seglist":
|
|
||||||
for child in segments.get("data", []):
|
|
||||||
results.extend(EmojiManagePlugin._extract_emoji_base64(child))
|
|
||||||
return results
|
|
||||||
|
|
||||||
# 如果有 .type 属性(Seg 对象)
|
|
||||||
if hasattr(segments, "type"):
|
|
||||||
seg_type = getattr(segments, "type", "")
|
|
||||||
if seg_type in ("emoji", "image"):
|
|
||||||
results.append(getattr(segments, "data", ""))
|
|
||||||
elif seg_type == "seglist":
|
|
||||||
for child in getattr(segments, "data", []):
|
|
||||||
results.extend(EmojiManagePlugin._extract_emoji_base64(child))
|
|
||||||
return results
|
|
||||||
|
|
||||||
# 列表
|
|
||||||
for seg in segments:
|
|
||||||
results.extend(EmojiManagePlugin._extract_emoji_base64(seg))
|
|
||||||
return results
|
|
||||||
|
|
||||||
# ===== Command 组件 =====
|
|
||||||
|
|
||||||
@Command("add_emoji", description="添加表情包", pattern=r".*/emoji add.*")
|
|
||||||
async def handle_add_emoji(self, stream_id: str = "", message_segments=None, **kwargs):
|
|
||||||
"""添加表情包"""
|
|
||||||
emoji_base64_list = self._extract_emoji_base64(message_segments)
|
|
||||||
if not emoji_base64_list:
|
|
||||||
await self.ctx.send.text("未在消息中找到表情包或图片", stream_id)
|
|
||||||
return False, "未在消息中找到表情包或图片", False
|
|
||||||
|
|
||||||
success_count = 0
|
|
||||||
fail_count = 0
|
|
||||||
results = []
|
|
||||||
|
|
||||||
for i, emoji_b64 in enumerate(emoji_base64_list):
|
|
||||||
result = await self.ctx.emoji.register_emoji(emoji_b64)
|
|
||||||
if isinstance(result, dict) and result.get("success"):
|
|
||||||
success_count += 1
|
|
||||||
desc = result.get("description", "未知描述")
|
|
||||||
emotions = result.get("emotions", [])
|
|
||||||
replaced = result.get("replaced", False)
|
|
||||||
msg = f"表情包 {i + 1} 注册成功{'(替换旧表情包)' if replaced else '(新增表情包)'}"
|
|
||||||
if desc:
|
|
||||||
msg += f"\n描述: {desc}"
|
|
||||||
if emotions:
|
|
||||||
msg += f"\n情感标签: {', '.join(emotions)}"
|
|
||||||
results.append(msg)
|
|
||||||
else:
|
|
||||||
fail_count += 1
|
|
||||||
err = result.get("message", "注册失败") if isinstance(result, dict) else "注册失败"
|
|
||||||
results.append(f"表情包 {i + 1} 注册失败: {err}")
|
|
||||||
|
|
||||||
total = success_count + fail_count
|
|
||||||
summary = f"表情包注册完成: 成功 {success_count} 个,失败 {fail_count} 个,共处理 {total} 个"
|
|
||||||
if results:
|
|
||||||
summary += "\n" + "\n".join(results)
|
|
||||||
|
|
||||||
await self.ctx.send.text(summary, stream_id)
|
|
||||||
return success_count > 0, summary, success_count > 0
|
|
||||||
|
|
||||||
@Command("emoji_list", description="列表表情包", pattern=r"^/emoji list(\s+\d+)?$")
|
|
||||||
async def handle_list_emoji(self, stream_id: str = "", raw_message: str = "", **kwargs):
|
|
||||||
"""列出表情包"""
|
|
||||||
max_count = 10
|
|
||||||
match = re.match(r"^/emoji list(?:\s+(\d+))?$", raw_message)
|
|
||||||
if match and match.group(1):
|
|
||||||
max_count = min(int(match.group(1)), 50)
|
|
||||||
|
|
||||||
now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
||||||
|
|
||||||
count_result = await self.ctx.emoji.get_count()
|
|
||||||
emoji_count = count_result if isinstance(count_result, int) else 0
|
|
||||||
|
|
||||||
info_result = await self.ctx.emoji.get_info()
|
|
||||||
max_emoji = info_result.get("max_count", 0) if isinstance(info_result, dict) else 0
|
|
||||||
available = info_result.get("available_emojis", 0) if isinstance(info_result, dict) else 0
|
|
||||||
|
|
||||||
lines = [
|
|
||||||
f"📊 表情包统计信息 ({now})",
|
|
||||||
f"• 总数: {emoji_count} / {max_emoji}",
|
|
||||||
f"• 可用: {available}",
|
|
||||||
]
|
|
||||||
|
|
||||||
if emoji_count == 0:
|
|
||||||
lines.append("\n❌ 暂无表情包")
|
|
||||||
await self.ctx.send.text("\n".join(lines), stream_id)
|
|
||||||
return True, "\n".join(lines), True
|
|
||||||
|
|
||||||
all_result = await self.ctx.emoji.get_all()
|
|
||||||
all_emojis = all_result if isinstance(all_result, list) else []
|
|
||||||
if not all_emojis:
|
|
||||||
lines.append("\n❌ 无法获取表情包列表")
|
|
||||||
await self.ctx.send.text("\n".join(lines), stream_id)
|
|
||||||
return False, "\n".join(lines), True
|
|
||||||
|
|
||||||
display = all_emojis[:max_count]
|
|
||||||
lines.append(f"\n📋 显示前 {len(display)} 个表情包:")
|
|
||||||
for i, emoji in enumerate(display, 1):
|
|
||||||
if isinstance(emoji, (list, tuple)) and len(emoji) >= 3:
|
|
||||||
_, desc, emotion = emoji[0], emoji[1], emoji[2]
|
|
||||||
elif isinstance(emoji, dict):
|
|
||||||
desc = emoji.get("description", "")
|
|
||||||
emotion = emoji.get("emotion", "")
|
|
||||||
else:
|
|
||||||
desc, emotion = str(emoji), ""
|
|
||||||
short_desc = desc[:50] + "..." if len(desc) > 50 else desc
|
|
||||||
lines.append(f"{i}. {short_desc} [{emotion}]")
|
|
||||||
|
|
||||||
if len(all_emojis) > max_count:
|
|
||||||
lines.append(f"\n💡 还有 {len(all_emojis) - max_count} 个表情包未显示")
|
|
||||||
|
|
||||||
final = "\n".join(lines)
|
|
||||||
await self.ctx.send.text(final, stream_id)
|
|
||||||
return True, final, True
|
|
||||||
|
|
||||||
@Command("delete_emoji", description="删除表情包", pattern=r".*/emoji delete.*")
|
|
||||||
async def handle_delete_emoji(self, stream_id: str = "", message_segments=None, **kwargs):
|
|
||||||
"""删除表情包"""
|
|
||||||
emoji_base64_list = self._extract_emoji_base64(message_segments)
|
|
||||||
if not emoji_base64_list:
|
|
||||||
await self.ctx.send.text("未在消息中找到表情包或图片", stream_id)
|
|
||||||
return False, "未找到表情包", False
|
|
||||||
|
|
||||||
success_count = 0
|
|
||||||
fail_count = 0
|
|
||||||
results = []
|
|
||||||
|
|
||||||
for i, emoji_b64 in enumerate(emoji_base64_list):
|
|
||||||
# 计算哈希
|
|
||||||
if isinstance(emoji_b64, str):
|
|
||||||
clean = emoji_b64.encode("ascii", errors="ignore").decode("ascii")
|
|
||||||
else:
|
|
||||||
clean = str(emoji_b64)
|
|
||||||
image_bytes = base64.b64decode(clean)
|
|
||||||
emoji_hash = hashlib.md5(image_bytes).hexdigest() # noqa: S324
|
|
||||||
|
|
||||||
result = await self.ctx.emoji.delete_emoji(emoji_hash)
|
|
||||||
if isinstance(result, dict) and result.get("success"):
|
|
||||||
success_count += 1
|
|
||||||
desc = result.get("description", "未知描述")
|
|
||||||
emotions = result.get("emotions", [])
|
|
||||||
before = result.get("count_before", 0)
|
|
||||||
after = result.get("count_after", 0)
|
|
||||||
msg = f"表情包 {i + 1} 删除成功"
|
|
||||||
if desc:
|
|
||||||
msg += f"\n描述: {desc}"
|
|
||||||
if emotions:
|
|
||||||
msg += f"\n情感标签: {', '.join(emotions)}"
|
|
||||||
msg += f"\n表情包数量: {before} → {after}"
|
|
||||||
results.append(msg)
|
|
||||||
else:
|
|
||||||
fail_count += 1
|
|
||||||
err = result.get("message", "删除失败") if isinstance(result, dict) else "删除失败"
|
|
||||||
results.append(f"表情包 {i + 1} 删除失败: {err}")
|
|
||||||
|
|
||||||
total = success_count + fail_count
|
|
||||||
summary = f"表情包删除完成: 成功 {success_count} 个,失败 {fail_count} 个,共处理 {total} 个"
|
|
||||||
if results:
|
|
||||||
summary += "\n" + "\n".join(results)
|
|
||||||
|
|
||||||
await self.ctx.send.text(summary, stream_id)
|
|
||||||
return success_count > 0, summary, success_count > 0
|
|
||||||
|
|
||||||
@Command("random_emojis", description="发送多张随机表情包", pattern=r"^/random_emojis$")
|
|
||||||
async def handle_random_emojis(self, stream_id: str = "", **kwargs):
|
|
||||||
"""发送多张随机表情包"""
|
|
||||||
emojis = await self.ctx.emoji.get_random(5)
|
|
||||||
if not emojis:
|
|
||||||
return False, "未找到表情包", False
|
|
||||||
messages = [
|
|
||||||
{"user_id": "0", "nickname": "神秘用户", "segments": [{"type": "image", "content": e.get("base64", "")}]}
|
|
||||||
for e in emojis
|
|
||||||
]
|
|
||||||
await self.ctx.send.forward(messages, stream_id)
|
|
||||||
return True, "已发送随机表情包", True
|
|
||||||
|
|
||||||
async def on_config_update(self, scope: str, config_data: dict[str, object], version: str) -> None:
|
|
||||||
"""处理配置热重载事件。
|
|
||||||
|
|
||||||
Args:
|
|
||||||
scope: 配置变更范围。
|
|
||||||
config_data: 最新配置数据。
|
|
||||||
version: 配置版本号。
|
|
||||||
"""
|
|
||||||
|
|
||||||
del scope
|
|
||||||
del config_data
|
|
||||||
del version
|
|
||||||
|
|
||||||
|
|
||||||
def create_plugin() -> EmojiManagePlugin:
|
|
||||||
"""创建表情包管理插件实例。
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
EmojiManagePlugin: 新的表情包管理插件实例。
|
|
||||||
"""
|
|
||||||
|
|
||||||
return EmojiManagePlugin()
|
|
||||||
@@ -31,6 +31,23 @@ async def test_calculate_hash_format_updates_runtime_path_metadata(tmp_path: Pat
|
|||||||
assert emoji.dir_path == tmp_path.resolve()
|
assert emoji.dir_path == tmp_path.resolve()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_calculate_hash_format_reuses_existing_target_file(tmp_path: Path) -> None:
|
||||||
|
image_bytes = _build_test_image_bytes("JPEG")
|
||||||
|
tmp_file_path = tmp_path / "emoji.tmp"
|
||||||
|
target_file_path = tmp_path / "emoji.jpeg"
|
||||||
|
tmp_file_path.write_bytes(image_bytes)
|
||||||
|
target_file_path.write_bytes(image_bytes)
|
||||||
|
|
||||||
|
emoji = MaiEmoji(full_path=tmp_file_path, image_bytes=image_bytes)
|
||||||
|
|
||||||
|
assert await emoji.calculate_hash_format() is True
|
||||||
|
assert emoji.full_path == target_file_path.resolve()
|
||||||
|
assert emoji.file_name == target_file_path.name
|
||||||
|
assert not tmp_file_path.exists()
|
||||||
|
assert target_file_path.exists()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
("model_cls", "extra_fields"),
|
("model_cls", "extra_fields"),
|
||||||
[
|
[
|
||||||
|
|||||||
@@ -24,125 +24,148 @@ logger = get_logger("emoji")
|
|||||||
class BaseImageDataModel(BaseDatabaseDataModel[Images]):
|
class BaseImageDataModel(BaseDatabaseDataModel[Images]):
|
||||||
def __init__(self, full_path: str | Path, image_bytes: Optional[bytes] = None):
|
def __init__(self, full_path: str | Path, image_bytes: Optional[bytes] = None):
|
||||||
if not full_path:
|
if not full_path:
|
||||||
# 创建时候即检测文件路径合法性
|
raise ValueError("图片路径不能为空")
|
||||||
raise ValueError("表情包路径不能为空")
|
|
||||||
if Path(full_path).is_dir() or not Path(full_path).exists():
|
if Path(full_path).is_dir() or not Path(full_path).exists():
|
||||||
raise FileNotFoundError(f"表情包路径无效: {full_path}")
|
raise FileNotFoundError(f"图片路径无效: {full_path}")
|
||||||
|
|
||||||
resolved_path = Path(full_path).absolute().resolve()
|
resolved_path = Path(full_path).absolute().resolve()
|
||||||
self.full_path: Path
|
self.full_path: Path
|
||||||
self.dir_path: Path
|
self.dir_path: Path
|
||||||
self.file_name: str
|
self.file_name: str
|
||||||
self._set_full_path(resolved_path)
|
self._set_full_path(resolved_path)
|
||||||
|
|
||||||
self.file_hash: str = None # type: ignore
|
self.file_hash: str = None # type: ignore
|
||||||
|
|
||||||
self.image_bytes: Optional[bytes] = image_bytes
|
self.image_bytes: Optional[bytes] = image_bytes
|
||||||
|
self.image_format: str = ""
|
||||||
self.image_format: str = "" # 图片格式
|
|
||||||
|
|
||||||
def _set_full_path(self, full_path: Path) -> None:
|
def _set_full_path(self, full_path: Path) -> None:
|
||||||
"""同步更新文件路径相关的运行时元数据。"""
|
"""同步刷新路径、目录和文件名等运行时元数据。"""
|
||||||
resolved_path = full_path.absolute().resolve()
|
resolved_path = full_path.absolute().resolve()
|
||||||
self.full_path = resolved_path
|
self.full_path = resolved_path
|
||||||
self.dir_path = resolved_path.parent.resolve()
|
self.dir_path = resolved_path.parent.resolve()
|
||||||
self.file_name = resolved_path.name
|
self.file_name = resolved_path.name
|
||||||
|
|
||||||
def _restore_image_format_from_path(self) -> None:
|
def _restore_image_format_from_path(self) -> None:
|
||||||
"""根据文件扩展名恢复基础图片格式信息。"""
|
"""根据文件扩展名恢复图片格式信息。"""
|
||||||
self.image_format = self.full_path.suffix.removeprefix(".").lower()
|
self.image_format = self.full_path.suffix.removeprefix(".").lower()
|
||||||
|
|
||||||
|
def _build_non_conflicting_path(self, target_path: Path) -> Path:
|
||||||
|
"""在目标路径被占用时,生成一个可用的新路径。"""
|
||||||
|
candidate_path = target_path
|
||||||
|
index = 1
|
||||||
|
while candidate_path.exists():
|
||||||
|
candidate_path = target_path.with_name(
|
||||||
|
f"{target_path.stem}_{self.file_hash[:8]}_{index}{target_path.suffix}"
|
||||||
|
)
|
||||||
|
index += 1
|
||||||
|
return candidate_path
|
||||||
|
|
||||||
|
def _rename_file_to_match_format(self) -> None:
|
||||||
|
"""修正文件扩展名,并处理目标文件已存在的冲突。"""
|
||||||
|
new_file_name = ".".join(self.file_name.split(".")[:-1] + [self.image_format])
|
||||||
|
new_full_path = self.dir_path / new_file_name
|
||||||
|
if new_full_path == self.full_path:
|
||||||
|
return
|
||||||
|
|
||||||
|
if new_full_path.exists():
|
||||||
|
existing_file_hash = hashlib.sha256(self.read_image_bytes(new_full_path)).hexdigest()
|
||||||
|
if existing_file_hash == self.file_hash:
|
||||||
|
logger.info(f"[初始化] {new_full_path.name} 已存在且内容一致,复用已有文件")
|
||||||
|
self.full_path.unlink()
|
||||||
|
self._set_full_path(new_full_path)
|
||||||
|
return
|
||||||
|
|
||||||
|
conflict_free_path = self._build_non_conflicting_path(new_full_path)
|
||||||
|
logger.warning(
|
||||||
|
f"[初始化] {new_full_path.name} 已存在且内容不同,改为保存到 {conflict_free_path.name}"
|
||||||
|
)
|
||||||
|
self.full_path.rename(conflict_free_path)
|
||||||
|
self._set_full_path(conflict_free_path)
|
||||||
|
return
|
||||||
|
|
||||||
|
self.full_path.rename(new_full_path)
|
||||||
|
self._set_full_path(new_full_path)
|
||||||
|
|
||||||
def read_image_bytes(self, path: Path) -> bytes:
|
def read_image_bytes(self, path: Path) -> bytes:
|
||||||
"""
|
"""
|
||||||
同步读取图片文件的字节内容
|
同步读取图片文件的字节内容。
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
path (Path): 图片文件的完整路径
|
path: 图片文件的完整路径。
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
return (bytes): 图片文件的字节内容
|
图片文件的字节内容。
|
||||||
Raises:
|
|
||||||
FileNotFoundError: 如果文件不存在则抛出该异常
|
|
||||||
Exception: 其他读取文件时发生的异常
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
with open(path, "rb") as f:
|
with open(path, "rb") as file:
|
||||||
return f.read()
|
return file.read()
|
||||||
except FileNotFoundError as e:
|
except FileNotFoundError as exc:
|
||||||
logger.error(f"[读取图片文件] 文件未找到: {path}")
|
logger.error(f"[读取图片文件] 文件未找到: {path}")
|
||||||
raise e
|
raise exc
|
||||||
except Exception as e:
|
except Exception as exc:
|
||||||
logger.error(f"[读取图片文件] 读取文件时发生错误: {e}")
|
logger.error(f"[读取图片文件] 读取文件时发生错误: {exc}")
|
||||||
raise e
|
raise exc
|
||||||
|
|
||||||
def get_image_format(self, image_bytes: bytes) -> str:
|
def get_image_format(self, image_bytes: bytes) -> str:
|
||||||
"""
|
"""
|
||||||
获取图片的格式
|
获取图片的实际格式。
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
image_bytes (bytes): 图片的字节内容
|
image_bytes: 图片的字节内容。
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
return (str): 图片的格式(小写)
|
小写格式名,例如 `png`、`jpeg`。
|
||||||
|
|
||||||
Raises:
|
|
||||||
ValueError: 如果无法识别图片格式
|
|
||||||
Exception: 其他读取图片格式时发生的异常
|
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
with PILImage.open(io.BytesIO(image_bytes)) as img:
|
with PILImage.open(io.BytesIO(image_bytes)) as img:
|
||||||
if not img.format:
|
if not img.format:
|
||||||
raise ValueError("无法识别图片格式")
|
raise ValueError("无法识别图片格式")
|
||||||
return img.format.lower()
|
return img.format.lower()
|
||||||
except Exception as e:
|
except Exception as exc:
|
||||||
logger.error(f"[获取图片格式] 读取图片格式时发生错误: {e}")
|
logger.error(f"[获取图片格式] 读取图片格式时发生错误: {exc}")
|
||||||
raise e
|
raise exc
|
||||||
|
|
||||||
async def calculate_hash_format(self) -> bool:
|
async def calculate_hash_format(self) -> bool:
|
||||||
"""
|
"""
|
||||||
异步计算表情包的哈希值和格式,初始化后应该执行此方法来确保对象的哈希值和格式正确
|
计算图片哈希和实际格式,并在需要时修正扩展名。
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
return (bool): 如果成功计算哈希值和格式则返回True,否则返回False
|
成功返回 `True`,失败返回 `False`。
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
# 计算哈希值
|
|
||||||
logger.debug(f"[初始化] 计算 {self.file_name} 的哈希值...")
|
logger.debug(f"[初始化] 计算 {self.file_name} 的哈希值...")
|
||||||
if not self.image_bytes:
|
if self.image_bytes is None:
|
||||||
logger.debug(f"[初始化] 正在读取文件: {self.full_path}")
|
logger.debug(f"[初始化] 正在读取文件: {self.full_path}")
|
||||||
image_bytes = await asyncio.to_thread(self.read_image_bytes, self.full_path)
|
image_bytes = await asyncio.to_thread(self.read_image_bytes, self.full_path)
|
||||||
else:
|
else:
|
||||||
image_bytes = self.image_bytes
|
image_bytes = self.image_bytes
|
||||||
|
|
||||||
self.image_bytes = image_bytes
|
self.image_bytes = image_bytes
|
||||||
self.file_hash = hashlib.sha256(image_bytes).hexdigest()
|
self.file_hash = hashlib.sha256(image_bytes).hexdigest()
|
||||||
logger.debug(f"[初始化] {self.file_name} 计算哈希值成功: {self.file_hash}")
|
logger.debug(f"[初始化] {self.file_name} 计算哈希值成功: {self.file_hash}")
|
||||||
|
|
||||||
# 用PIL读取图片格式
|
|
||||||
logger.debug(f"[初始化] 读取 {self.file_name} 的图片格式...")
|
logger.debug(f"[初始化] 读取 {self.file_name} 的图片格式...")
|
||||||
self.image_format = await asyncio.to_thread(self.get_image_format, image_bytes)
|
self.image_format = await asyncio.to_thread(self.get_image_format, image_bytes)
|
||||||
logger.debug(f"[初始化] {self.file_name} 读取图片格式成功: {self.image_format}")
|
logger.debug(f"[初始化] {self.file_name} 读取图片格式成功: {self.image_format}")
|
||||||
|
|
||||||
# 比对文件扩展名和实际格式
|
|
||||||
file_ext = self.file_name.split(".")[-1].lower()
|
file_ext = self.file_name.split(".")[-1].lower()
|
||||||
if file_ext != self.image_format:
|
if file_ext != self.image_format:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
f"[初始化] {self.file_name} 文件扩展名与实际格式不符: ext`{file_ext}`!=`{self.image_format}`"
|
f"[初始化] {self.file_name} 文件扩展名与实际格式不符: ext`{file_ext}`!=`{self.image_format}`"
|
||||||
)
|
)
|
||||||
# 重命名文件以匹配实际格式
|
self._rename_file_to_match_format()
|
||||||
new_file_name = ".".join(self.file_name.split(".")[:-1] + [self.image_format])
|
|
||||||
new_full_path = self.dir_path / new_file_name
|
|
||||||
self.full_path.rename(new_full_path)
|
|
||||||
self._set_full_path(new_full_path)
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as exc:
|
||||||
logger.error(f"[初始化] 初始化图片时发生错误: {e}")
|
logger.error(f"[初始化] 初始化图片时发生错误: {exc}")
|
||||||
logger.error(traceback.format_exc())
|
logger.error(traceback.format_exc())
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
class MaiEmoji(BaseImageDataModel):
|
class MaiEmoji(BaseImageDataModel):
|
||||||
"""麦麦的表情包对象,仅当**图片文件存在**时才应该创建此对象,数据库记录如果标记为文件不存在`(no_file_flag = True)`则不应该调用 `from_db_instance` 方法来创建此对象"""
|
"""表情包数据模型。"""
|
||||||
|
|
||||||
def __init__(self, full_path: str | Path, image_bytes: Optional[bytes] = None):
|
def __init__(self, full_path: str | Path, image_bytes: Optional[bytes] = None):
|
||||||
# self.embedding = []
|
|
||||||
self.description: str = ""
|
self.description: str = ""
|
||||||
self.emotion: List[str] = []
|
self.emotion: List[str] = []
|
||||||
self.query_count = 0
|
self.query_count = 0
|
||||||
@@ -152,33 +175,26 @@ class MaiEmoji(BaseImageDataModel):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_db_instance(cls, db_record: Images):
|
def from_db_instance(cls, db_record: Images):
|
||||||
"""从数据库记录创建 MaiEmoji 对象,如果记录标记为文件不存在则**抛出异常**
|
"""从数据库记录构建 `MaiEmoji` 对象。"""
|
||||||
|
|
||||||
调用者应该对数据库记录进行检查,如果 `no_file_flag` 为 True 则不应该调用此方法
|
|
||||||
|
|
||||||
Args:
|
|
||||||
db_record (Images): 数据库中的图片记录
|
|
||||||
Returns:
|
|
||||||
return (MaiEmoji): 包含图片信息的 MaiEmoji 对象
|
|
||||||
Raises:
|
|
||||||
ValueError: 如果数据库记录标记为文件不存在则抛出该异常
|
|
||||||
"""
|
|
||||||
if db_record.no_file_flag:
|
if db_record.no_file_flag:
|
||||||
raise ValueError(f"数据库记录 {db_record.image_hash} 标记为文件不存在,无法创建 MaiEmoji 对象")
|
raise ValueError(f"数据库记录 {db_record.image_hash} 标记为文件不存在,无法创建 MaiEmoji 对象")
|
||||||
|
|
||||||
obj = cls(db_record.full_path)
|
obj = cls(db_record.full_path)
|
||||||
obj.file_hash = db_record.image_hash
|
obj.file_hash = db_record.image_hash
|
||||||
obj._restore_image_format_from_path()
|
obj._restore_image_format_from_path()
|
||||||
|
|
||||||
description = db_record.description or ""
|
description = db_record.description or ""
|
||||||
obj.description = description
|
obj.description = description
|
||||||
normalized_tags = [
|
normalized_tags = [
|
||||||
str(item).strip()
|
str(item).strip()
|
||||||
for item in str(description).replace(",", ",").replace("、", ",").replace(";", ",").split(",")
|
for item in str(description).replace(",", ",").replace("。", ",").replace("、", ",").split(",")
|
||||||
if str(item).strip()
|
if str(item).strip()
|
||||||
]
|
]
|
||||||
deduped_tags: List[str] = []
|
deduped_tags: List[str] = []
|
||||||
for item in normalized_tags:
|
for item in normalized_tags:
|
||||||
if item not in deduped_tags:
|
if item not in deduped_tags:
|
||||||
deduped_tags.append(item)
|
deduped_tags.append(item)
|
||||||
|
|
||||||
obj.emotion = deduped_tags
|
obj.emotion = deduped_tags
|
||||||
obj.query_count = db_record.query_count
|
obj.query_count = db_record.query_count
|
||||||
obj.last_used_time = db_record.last_used_time
|
obj.last_used_time = db_record.last_used_time
|
||||||
@@ -198,7 +214,7 @@ class MaiEmoji(BaseImageDataModel):
|
|||||||
|
|
||||||
|
|
||||||
class MaiImage(BaseImageDataModel):
|
class MaiImage(BaseImageDataModel):
|
||||||
"""麦麦图片数据模型,仅当**图片文件存在**时才应该创建此对象,数据库记录如果标记为文件不存在`(no_file_flag = True)`则不应该调用 `from_db_instance` 方法来创建此对象"""
|
"""普通图片数据模型。"""
|
||||||
|
|
||||||
def __init__(self, full_path: str | Path, image_bytes: Optional[bytes] = None):
|
def __init__(self, full_path: str | Path, image_bytes: Optional[bytes] = None):
|
||||||
self.description: str = ""
|
self.description: str = ""
|
||||||
@@ -207,19 +223,10 @@ class MaiImage(BaseImageDataModel):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_db_instance(cls, db_record: Images):
|
def from_db_instance(cls, db_record: Images):
|
||||||
"""从数据库记录创建 MaiImage 对象,如果记录标记为文件不存在则**抛出异常**
|
"""从数据库记录构建 `MaiImage` 对象。"""
|
||||||
|
|
||||||
调用者应该对数据库记录进行检查,如果 `no_file_flag` 为 True 则不应该调用此方法
|
|
||||||
|
|
||||||
Args:
|
|
||||||
db_record (Images): 数据库中的图片记录
|
|
||||||
Returns:
|
|
||||||
return (MaiImage): 包含图片信息的 MaiImage 对象
|
|
||||||
Raises:
|
|
||||||
ValueError: 如果数据库记录标记为文件不存在则抛出该异常
|
|
||||||
"""
|
|
||||||
if db_record.no_file_flag:
|
if db_record.no_file_flag:
|
||||||
raise ValueError(f"数据库记录 {db_record.image_hash} 标记为文件不存在,无法创建 MaiImage 对象")
|
raise ValueError(f"数据库记录 {db_record.image_hash} 标记为文件不存在,无法创建 MaiImage 对象")
|
||||||
|
|
||||||
obj = cls(db_record.full_path)
|
obj = cls(db_record.full_path)
|
||||||
obj.file_hash = db_record.image_hash
|
obj.file_hash = db_record.image_hash
|
||||||
obj._set_full_path(Path(db_record.full_path))
|
obj._set_full_path(Path(db_record.full_path))
|
||||||
|
|||||||
Reference in New Issue
Block a user