diff --git a/AGENTS.md b/AGENTS.md index fc486081..fead0c13 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -42,4 +42,7 @@ 如果你需要改动配置文件,不需要修改实际的bot_config.toml或者model_config.toml,只需要修改配置文件模版,并新增一个版本号即可,也不必要为配置改动创建测试文件。 # 关于webui修改 -不要修改dashboard下的内容,因为这部分内容由另一个仓库build \ No newline at end of file +不要修改dashboard下的内容,因为这部分内容由另一个仓库build + +# maibot插件开发文档 +https://github.com/Mai-with-u/maibot-plugin-sdk/blob/main/docs/guide.md \ No newline at end of file diff --git a/plugins/emoji_manage_plugin/_manifest.json b/plugins/emoji_manage_plugin/_manifest.json deleted file mode 100644 index 998cb7da..00000000 --- a/plugins/emoji_manage_plugin/_manifest.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "manifest_version": 2, - "version": "2.0.0", - "name": "BetterEmoji", - "description": "更好的表情包管理插件", - "author": { - "name": "SengokuCola", - "url": "https://github.com/SengokuCola" - }, - "license": "GPL-v3.0-or-later", - "urls": { - "repository": "https://github.com/SengokuCola/BetterEmoji", - "homepage": "https://github.com/SengokuCola/BetterEmoji", - "documentation": "https://github.com/SengokuCola/BetterEmoji", - "issues": "https://github.com/SengokuCola/BetterEmoji/issues" - }, - "host_application": { - "min_version": "1.0.0", - "max_version": "1.0.0" - }, - "sdk": { - "min_version": "2.0.0", - "max_version": "2.99.99" - }, - "dependencies": [], - "capabilities": [ - "emoji.get_random", - "emoji.get_count", - "emoji.get_info", - "emoji.get_all", - "emoji.register_emoji", - "emoji.delete_emoji", - "send.text", - "send.forward" - ], - "i18n": { - "default_locale": "zh-CN", - "locales_path": "_locales", - "supported_locales": [ - "zh-CN" - ] - }, - "id": "sengokucola.betteremoji" -} diff --git a/plugins/emoji_manage_plugin/plugin.py b/plugins/emoji_manage_plugin/plugin.py deleted file mode 100644 index 9362c828..00000000 --- a/plugins/emoji_manage_plugin/plugin.py +++ /dev/null @@ -1,238 +0,0 @@ -"""表情包管理插件 — 新 SDK 版本 - -通过 /emoji 命令管理表情包的添加、列表和删除。 -""" - -from maibot_sdk import Command, MaiBotPlugin - -import base64 -import datetime -import hashlib -import re - - -class EmojiManagePlugin(MaiBotPlugin): - """表情包管理插件""" - - async def on_load(self) -> None: - """处理插件加载。""" - - async def on_unload(self) -> None: - """处理插件卸载。""" - - # ===== 工具方法 ===== - - @staticmethod - def _extract_emoji_base64(segments) -> list[str]: - """从消息 segments 中提取 emoji/image 的 base64 数据。 - - segments 可以是 dict 列表或 Seg 对象列表(兼容两种格式)。 - """ - results: list[str] = [] - if not segments: - return results - - if isinstance(segments, dict): - seg_type = segments.get("type", "") - if seg_type in ("emoji", "image"): - data = segments.get("data", "") - if data: - results.append(data) - elif seg_type == "seglist": - for child in segments.get("data", []): - results.extend(EmojiManagePlugin._extract_emoji_base64(child)) - return results - - # 如果有 .type 属性(Seg 对象) - if hasattr(segments, "type"): - seg_type = getattr(segments, "type", "") - if seg_type in ("emoji", "image"): - results.append(getattr(segments, "data", "")) - elif seg_type == "seglist": - for child in getattr(segments, "data", []): - results.extend(EmojiManagePlugin._extract_emoji_base64(child)) - return results - - # 列表 - for seg in segments: - results.extend(EmojiManagePlugin._extract_emoji_base64(seg)) - return results - - # ===== Command 组件 ===== - - @Command("add_emoji", description="添加表情包", pattern=r".*/emoji add.*") - async def handle_add_emoji(self, stream_id: str = "", message_segments=None, **kwargs): - """添加表情包""" - emoji_base64_list = self._extract_emoji_base64(message_segments) - if not emoji_base64_list: - await self.ctx.send.text("未在消息中找到表情包或图片", stream_id) - return False, "未在消息中找到表情包或图片", False - - success_count = 0 - fail_count = 0 - results = [] - - for i, emoji_b64 in enumerate(emoji_base64_list): - result = await self.ctx.emoji.register_emoji(emoji_b64) - if isinstance(result, dict) and result.get("success"): - success_count += 1 - desc = result.get("description", "未知描述") - emotions = result.get("emotions", []) - replaced = result.get("replaced", False) - msg = f"表情包 {i + 1} 注册成功{'(替换旧表情包)' if replaced else '(新增表情包)'}" - if desc: - msg += f"\n描述: {desc}" - if emotions: - msg += f"\n情感标签: {', '.join(emotions)}" - results.append(msg) - else: - fail_count += 1 - err = result.get("message", "注册失败") if isinstance(result, dict) else "注册失败" - results.append(f"表情包 {i + 1} 注册失败: {err}") - - total = success_count + fail_count - summary = f"表情包注册完成: 成功 {success_count} 个,失败 {fail_count} 个,共处理 {total} 个" - if results: - summary += "\n" + "\n".join(results) - - await self.ctx.send.text(summary, stream_id) - return success_count > 0, summary, success_count > 0 - - @Command("emoji_list", description="列表表情包", pattern=r"^/emoji list(\s+\d+)?$") - async def handle_list_emoji(self, stream_id: str = "", raw_message: str = "", **kwargs): - """列出表情包""" - max_count = 10 - match = re.match(r"^/emoji list(?:\s+(\d+))?$", raw_message) - if match and match.group(1): - max_count = min(int(match.group(1)), 50) - - now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - count_result = await self.ctx.emoji.get_count() - emoji_count = count_result if isinstance(count_result, int) else 0 - - info_result = await self.ctx.emoji.get_info() - max_emoji = info_result.get("max_count", 0) if isinstance(info_result, dict) else 0 - available = info_result.get("available_emojis", 0) if isinstance(info_result, dict) else 0 - - lines = [ - f"📊 表情包统计信息 ({now})", - f"• 总数: {emoji_count} / {max_emoji}", - f"• 可用: {available}", - ] - - if emoji_count == 0: - lines.append("\n❌ 暂无表情包") - await self.ctx.send.text("\n".join(lines), stream_id) - return True, "\n".join(lines), True - - all_result = await self.ctx.emoji.get_all() - all_emojis = all_result if isinstance(all_result, list) else [] - if not all_emojis: - lines.append("\n❌ 无法获取表情包列表") - await self.ctx.send.text("\n".join(lines), stream_id) - return False, "\n".join(lines), True - - display = all_emojis[:max_count] - lines.append(f"\n📋 显示前 {len(display)} 个表情包:") - for i, emoji in enumerate(display, 1): - if isinstance(emoji, (list, tuple)) and len(emoji) >= 3: - _, desc, emotion = emoji[0], emoji[1], emoji[2] - elif isinstance(emoji, dict): - desc = emoji.get("description", "") - emotion = emoji.get("emotion", "") - else: - desc, emotion = str(emoji), "" - short_desc = desc[:50] + "..." if len(desc) > 50 else desc - lines.append(f"{i}. {short_desc} [{emotion}]") - - if len(all_emojis) > max_count: - lines.append(f"\n💡 还有 {len(all_emojis) - max_count} 个表情包未显示") - - final = "\n".join(lines) - await self.ctx.send.text(final, stream_id) - return True, final, True - - @Command("delete_emoji", description="删除表情包", pattern=r".*/emoji delete.*") - async def handle_delete_emoji(self, stream_id: str = "", message_segments=None, **kwargs): - """删除表情包""" - emoji_base64_list = self._extract_emoji_base64(message_segments) - if not emoji_base64_list: - await self.ctx.send.text("未在消息中找到表情包或图片", stream_id) - return False, "未找到表情包", False - - success_count = 0 - fail_count = 0 - results = [] - - for i, emoji_b64 in enumerate(emoji_base64_list): - # 计算哈希 - if isinstance(emoji_b64, str): - clean = emoji_b64.encode("ascii", errors="ignore").decode("ascii") - else: - clean = str(emoji_b64) - image_bytes = base64.b64decode(clean) - emoji_hash = hashlib.md5(image_bytes).hexdigest() # noqa: S324 - - result = await self.ctx.emoji.delete_emoji(emoji_hash) - if isinstance(result, dict) and result.get("success"): - success_count += 1 - desc = result.get("description", "未知描述") - emotions = result.get("emotions", []) - before = result.get("count_before", 0) - after = result.get("count_after", 0) - msg = f"表情包 {i + 1} 删除成功" - if desc: - msg += f"\n描述: {desc}" - if emotions: - msg += f"\n情感标签: {', '.join(emotions)}" - msg += f"\n表情包数量: {before} → {after}" - results.append(msg) - else: - fail_count += 1 - err = result.get("message", "删除失败") if isinstance(result, dict) else "删除失败" - results.append(f"表情包 {i + 1} 删除失败: {err}") - - total = success_count + fail_count - summary = f"表情包删除完成: 成功 {success_count} 个,失败 {fail_count} 个,共处理 {total} 个" - if results: - summary += "\n" + "\n".join(results) - - await self.ctx.send.text(summary, stream_id) - return success_count > 0, summary, success_count > 0 - - @Command("random_emojis", description="发送多张随机表情包", pattern=r"^/random_emojis$") - async def handle_random_emojis(self, stream_id: str = "", **kwargs): - """发送多张随机表情包""" - emojis = await self.ctx.emoji.get_random(5) - if not emojis: - return False, "未找到表情包", False - messages = [ - {"user_id": "0", "nickname": "神秘用户", "segments": [{"type": "image", "content": e.get("base64", "")}]} - for e in emojis - ] - await self.ctx.send.forward(messages, stream_id) - return True, "已发送随机表情包", True - - async def on_config_update(self, scope: str, config_data: dict[str, object], version: str) -> None: - """处理配置热重载事件。 - - Args: - scope: 配置变更范围。 - config_data: 最新配置数据。 - version: 配置版本号。 - """ - - del scope - del config_data - del version - - -def create_plugin() -> EmojiManagePlugin: - """创建表情包管理插件实例。 - - Returns: - EmojiManagePlugin: 新的表情包管理插件实例。 - """ - - return EmojiManagePlugin() diff --git a/pytests/image_sys_test/test_image_data_model.py b/pytests/image_sys_test/test_image_data_model.py index 6bd98f58..964e209c 100644 --- a/pytests/image_sys_test/test_image_data_model.py +++ b/pytests/image_sys_test/test_image_data_model.py @@ -31,6 +31,23 @@ async def test_calculate_hash_format_updates_runtime_path_metadata(tmp_path: Pat assert emoji.dir_path == tmp_path.resolve() +@pytest.mark.asyncio +async def test_calculate_hash_format_reuses_existing_target_file(tmp_path: Path) -> None: + image_bytes = _build_test_image_bytes("JPEG") + tmp_file_path = tmp_path / "emoji.tmp" + target_file_path = tmp_path / "emoji.jpeg" + tmp_file_path.write_bytes(image_bytes) + target_file_path.write_bytes(image_bytes) + + emoji = MaiEmoji(full_path=tmp_file_path, image_bytes=image_bytes) + + assert await emoji.calculate_hash_format() is True + assert emoji.full_path == target_file_path.resolve() + assert emoji.file_name == target_file_path.name + assert not tmp_file_path.exists() + assert target_file_path.exists() + + @pytest.mark.parametrize( ("model_cls", "extra_fields"), [ diff --git a/src/common/data_models/image_data_model.py b/src/common/data_models/image_data_model.py index 9916444f..19cbc1b2 100644 --- a/src/common/data_models/image_data_model.py +++ b/src/common/data_models/image_data_model.py @@ -24,125 +24,148 @@ logger = get_logger("emoji") class BaseImageDataModel(BaseDatabaseDataModel[Images]): def __init__(self, full_path: str | Path, image_bytes: Optional[bytes] = None): if not full_path: - # 创建时候即检测文件路径合法性 - raise ValueError("表情包路径不能为空") + raise ValueError("图片路径不能为空") if Path(full_path).is_dir() or not Path(full_path).exists(): - raise FileNotFoundError(f"表情包路径无效: {full_path}") + raise FileNotFoundError(f"图片路径无效: {full_path}") + resolved_path = Path(full_path).absolute().resolve() self.full_path: Path self.dir_path: Path self.file_name: str self._set_full_path(resolved_path) + self.file_hash: str = None # type: ignore - self.image_bytes: Optional[bytes] = image_bytes - - self.image_format: str = "" # 图片格式 + self.image_format: str = "" def _set_full_path(self, full_path: Path) -> None: - """同步更新文件路径相关的运行时元数据。""" + """同步刷新路径、目录和文件名等运行时元数据。""" resolved_path = full_path.absolute().resolve() self.full_path = resolved_path self.dir_path = resolved_path.parent.resolve() self.file_name = resolved_path.name def _restore_image_format_from_path(self) -> None: - """根据文件扩展名恢复基础图片格式信息。""" + """根据文件扩展名恢复图片格式信息。""" self.image_format = self.full_path.suffix.removeprefix(".").lower() + def _build_non_conflicting_path(self, target_path: Path) -> Path: + """在目标路径被占用时,生成一个可用的新路径。""" + candidate_path = target_path + index = 1 + while candidate_path.exists(): + candidate_path = target_path.with_name( + f"{target_path.stem}_{self.file_hash[:8]}_{index}{target_path.suffix}" + ) + index += 1 + return candidate_path + + def _rename_file_to_match_format(self) -> None: + """修正文件扩展名,并处理目标文件已存在的冲突。""" + new_file_name = ".".join(self.file_name.split(".")[:-1] + [self.image_format]) + new_full_path = self.dir_path / new_file_name + if new_full_path == self.full_path: + return + + if new_full_path.exists(): + existing_file_hash = hashlib.sha256(self.read_image_bytes(new_full_path)).hexdigest() + if existing_file_hash == self.file_hash: + logger.info(f"[初始化] {new_full_path.name} 已存在且内容一致,复用已有文件") + self.full_path.unlink() + self._set_full_path(new_full_path) + return + + conflict_free_path = self._build_non_conflicting_path(new_full_path) + logger.warning( + f"[初始化] {new_full_path.name} 已存在且内容不同,改为保存到 {conflict_free_path.name}" + ) + self.full_path.rename(conflict_free_path) + self._set_full_path(conflict_free_path) + return + + self.full_path.rename(new_full_path) + self._set_full_path(new_full_path) + def read_image_bytes(self, path: Path) -> bytes: """ - 同步读取图片文件的字节内容 + 同步读取图片文件的字节内容。 Args: - path (Path): 图片文件的完整路径 + path: 图片文件的完整路径。 + Returns: - return (bytes): 图片文件的字节内容 - Raises: - FileNotFoundError: 如果文件不存在则抛出该异常 - Exception: 其他读取文件时发生的异常 + 图片文件的字节内容。 """ try: - with open(path, "rb") as f: - return f.read() - except FileNotFoundError as e: + with open(path, "rb") as file: + return file.read() + except FileNotFoundError as exc: logger.error(f"[读取图片文件] 文件未找到: {path}") - raise e - except Exception as e: - logger.error(f"[读取图片文件] 读取文件时发生错误: {e}") - raise e + raise exc + except Exception as exc: + logger.error(f"[读取图片文件] 读取文件时发生错误: {exc}") + raise exc def get_image_format(self, image_bytes: bytes) -> str: """ - 获取图片的格式 + 获取图片的实际格式。 Args: - image_bytes (bytes): 图片的字节内容 + image_bytes: 图片的字节内容。 Returns: - return (str): 图片的格式(小写) - - Raises: - ValueError: 如果无法识别图片格式 - Exception: 其他读取图片格式时发生的异常 + 小写格式名,例如 `png`、`jpeg`。 """ try: with PILImage.open(io.BytesIO(image_bytes)) as img: if not img.format: raise ValueError("无法识别图片格式") return img.format.lower() - except Exception as e: - logger.error(f"[获取图片格式] 读取图片格式时发生错误: {e}") - raise e + except Exception as exc: + logger.error(f"[获取图片格式] 读取图片格式时发生错误: {exc}") + raise exc async def calculate_hash_format(self) -> bool: """ - 异步计算表情包的哈希值和格式,初始化后应该执行此方法来确保对象的哈希值和格式正确 + 计算图片哈希和实际格式,并在需要时修正扩展名。 Returns: - return (bool): 如果成功计算哈希值和格式则返回True,否则返回False + 成功返回 `True`,失败返回 `False`。 """ try: - # 计算哈希值 logger.debug(f"[初始化] 计算 {self.file_name} 的哈希值...") - if not self.image_bytes: + if self.image_bytes is None: logger.debug(f"[初始化] 正在读取文件: {self.full_path}") image_bytes = await asyncio.to_thread(self.read_image_bytes, self.full_path) else: image_bytes = self.image_bytes + self.image_bytes = image_bytes self.file_hash = hashlib.sha256(image_bytes).hexdigest() logger.debug(f"[初始化] {self.file_name} 计算哈希值成功: {self.file_hash}") - # 用PIL读取图片格式 logger.debug(f"[初始化] 读取 {self.file_name} 的图片格式...") self.image_format = await asyncio.to_thread(self.get_image_format, image_bytes) logger.debug(f"[初始化] {self.file_name} 读取图片格式成功: {self.image_format}") - # 比对文件扩展名和实际格式 file_ext = self.file_name.split(".")[-1].lower() if file_ext != self.image_format: logger.warning( f"[初始化] {self.file_name} 文件扩展名与实际格式不符: ext`{file_ext}`!=`{self.image_format}`" ) - # 重命名文件以匹配实际格式 - new_file_name = ".".join(self.file_name.split(".")[:-1] + [self.image_format]) - new_full_path = self.dir_path / new_file_name - self.full_path.rename(new_full_path) - self._set_full_path(new_full_path) + self._rename_file_to_match_format() return True - except Exception as e: - logger.error(f"[初始化] 初始化图片时发生错误: {e}") + except Exception as exc: + logger.error(f"[初始化] 初始化图片时发生错误: {exc}") logger.error(traceback.format_exc()) return False class MaiEmoji(BaseImageDataModel): - """麦麦的表情包对象,仅当**图片文件存在**时才应该创建此对象,数据库记录如果标记为文件不存在`(no_file_flag = True)`则不应该调用 `from_db_instance` 方法来创建此对象""" + """表情包数据模型。""" def __init__(self, full_path: str | Path, image_bytes: Optional[bytes] = None): - # self.embedding = [] self.description: str = "" self.emotion: List[str] = [] self.query_count = 0 @@ -152,33 +175,26 @@ class MaiEmoji(BaseImageDataModel): @classmethod def from_db_instance(cls, db_record: Images): - """从数据库记录创建 MaiEmoji 对象,如果记录标记为文件不存在则**抛出异常** - - 调用者应该对数据库记录进行检查,如果 `no_file_flag` 为 True 则不应该调用此方法 - - Args: - db_record (Images): 数据库中的图片记录 - Returns: - return (MaiEmoji): 包含图片信息的 MaiEmoji 对象 - Raises: - ValueError: 如果数据库记录标记为文件不存在则抛出该异常 - """ + """从数据库记录构建 `MaiEmoji` 对象。""" if db_record.no_file_flag: raise ValueError(f"数据库记录 {db_record.image_hash} 标记为文件不存在,无法创建 MaiEmoji 对象") + obj = cls(db_record.full_path) obj.file_hash = db_record.image_hash obj._restore_image_format_from_path() + description = db_record.description or "" obj.description = description normalized_tags = [ str(item).strip() - for item in str(description).replace(",", ",").replace("、", ",").replace(";", ",").split(",") + for item in str(description).replace(",", ",").replace("。", ",").replace("、", ",").split(",") if str(item).strip() ] deduped_tags: List[str] = [] for item in normalized_tags: if item not in deduped_tags: deduped_tags.append(item) + obj.emotion = deduped_tags obj.query_count = db_record.query_count obj.last_used_time = db_record.last_used_time @@ -198,7 +214,7 @@ class MaiEmoji(BaseImageDataModel): class MaiImage(BaseImageDataModel): - """麦麦图片数据模型,仅当**图片文件存在**时才应该创建此对象,数据库记录如果标记为文件不存在`(no_file_flag = True)`则不应该调用 `from_db_instance` 方法来创建此对象""" + """普通图片数据模型。""" def __init__(self, full_path: str | Path, image_bytes: Optional[bytes] = None): self.description: str = "" @@ -207,19 +223,10 @@ class MaiImage(BaseImageDataModel): @classmethod def from_db_instance(cls, db_record: Images): - """从数据库记录创建 MaiImage 对象,如果记录标记为文件不存在则**抛出异常** - - 调用者应该对数据库记录进行检查,如果 `no_file_flag` 为 True 则不应该调用此方法 - - Args: - db_record (Images): 数据库中的图片记录 - Returns: - return (MaiImage): 包含图片信息的 MaiImage 对象 - Raises: - ValueError: 如果数据库记录标记为文件不存在则抛出该异常 - """ + """从数据库记录构建 `MaiImage` 对象。""" if db_record.no_file_flag: raise ValueError(f"数据库记录 {db_record.image_hash} 标记为文件不存在,无法创建 MaiImage 对象") + obj = cls(db_record.full_path) obj.file_hash = db_record.image_hash obj._set_full_path(Path(db_record.full_path))