from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from typing import Any from urllib.parse import parse_qs, urlparse import argparse import json import mimetypes import time import webbrowser DEFAULT_LOG_DIR = Path("logs") / "maisaka_reply_effect" DEFAULT_MANUAL_DIR = Path("logs") / "maisaka_reply_effect_manual" DEFAULT_HOST = "127.0.0.1" DEFAULT_PORT = 8765 def normalize_name(value: str) -> str: normalized = "".join(char if char.isalnum() or char in "._-" else "_" for char in str(value or "").strip()) normalized = normalized.strip("._") return normalized or "unknown" def load_json_file(file_path: Path) -> dict[str, Any]: try: payload = json.loads(file_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError): return {} return payload if isinstance(payload, dict) else {} def write_json_file(file_path: Path, payload: dict[str, Any]) -> None: file_path.parent.mkdir(parents=True, exist_ok=True) temp_path = file_path.with_name(f".{file_path.name}.tmp") temp_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2, default=str), encoding="utf-8") temp_path.replace(file_path) class ReplyEffectRepository: def __init__(self, log_dir: Path, manual_dir: Path) -> None: self.log_dir = log_dir self.manual_dir = manual_dir def list_chats(self) -> list[dict[str, Any]]: chats: list[dict[str, Any]] = [] if not self.log_dir.exists(): return chats for chat_dir in sorted(path for path in self.log_dir.iterdir() if path.is_dir()): records = list(chat_dir.glob("*.json")) annotated_count = sum(1 for record_file in records if self._annotation_path(chat_dir.name, record_file).exists()) finalized_count = 0 pending_count = 0 for record_file in records: payload = load_json_file(record_file) if payload.get("status") == "finalized": finalized_count += 1 else: pending_count += 1 chats.append( { "chat_id": chat_dir.name, "record_count": len(records), "finalized_count": finalized_count, "pending_count": pending_count, "annotated_count": annotated_count, } ) return chats def list_records( self, *, chat_id: str | None = None, status: str = "", annotated: str = "", ) -> list[dict[str, Any]]: records: list[dict[str, Any]] = [] for record_file in self._iter_record_files(chat_id): payload = load_json_file(record_file) if not payload: continue summary = self._build_record_summary(record_file, payload) if status and summary["status"] != status: continue if annotated == "yes" and summary["manual"] is None: continue if annotated == "no" and summary["manual"] is not None: continue records.append(summary) return sorted(records, key=lambda item: str(item.get("created_at") or ""), reverse=True) def get_record(self, chat_id: str, effect_id: str) -> dict[str, Any]: record_file = self._find_record_file(chat_id, effect_id) if record_file is None: return {} payload = load_json_file(record_file) if not payload: return {} payload["_manual"] = self.get_annotation(chat_id, effect_id) payload["_record_path"] = str(record_file) return payload def get_annotation(self, chat_id: str, effect_id: str) -> dict[str, Any] | None: annotation_path = self._annotation_path(chat_id, effect_id) if not annotation_path.exists(): return None payload = load_json_file(annotation_path) return payload or None def save_annotation(self, payload: dict[str, Any]) -> dict[str, Any]: chat_id = normalize_name(str(payload.get("chat_id") or "")) effect_id = normalize_name(str(payload.get("effect_id") or "")) if not chat_id or chat_id == "unknown" or not effect_id or effect_id == "unknown": raise ValueError("缺少 chat_id 或 effect_id") if self._find_record_file(chat_id, effect_id) is None: raise ValueError("找不到对应的回复效果记录") manual_score = payload.get("manual_score") manual_score_5 = payload.get("manual_score_5") normalized_score: float | None = None normalized_score_5: int | None = None if manual_score_5 not in {None, ""}: try: normalized_score_5 = int(manual_score_5) except (TypeError, ValueError): raise ValueError("manual_score_5 必须是 1-5 的整数") from None if normalized_score_5 < 1 or normalized_score_5 > 5: raise ValueError("manual_score_5 必须是 1-5 的整数") normalized_score = round((normalized_score_5 - 1) / 4 * 100, 2) elif manual_score not in {None, ""}: try: normalized_score = max(0.0, min(100.0, float(manual_score))) except (TypeError, ValueError): raise ValueError("manual_score 必须是 0-100 的数字") from None else: raise ValueError("缺少人工评分") annotation = { "schema_version": 1, "chat_id": chat_id, "effect_id": effect_id, "manual_score": round(normalized_score, 2), "manual_score_5": normalized_score_5, "manual_label": str(payload.get("manual_label") or "").strip(), "evaluator": str(payload.get("evaluator") or "manual").strip() or "manual", "notes": str(payload.get("notes") or "").strip(), "updated_at": time.strftime("%Y-%m-%dT%H:%M:%S%z"), } write_json_file(self._annotation_path(chat_id, effect_id), annotation) return annotation def _iter_record_files(self, chat_id: str | None = None) -> list[Path]: if not self.log_dir.exists(): return [] if chat_id: chat_dir = self.log_dir / normalize_name(chat_id) if not chat_dir.exists() or not chat_dir.is_dir(): return [] return sorted(chat_dir.glob("*.json")) record_files: list[Path] = [] for chat_dir in self.log_dir.iterdir(): if chat_dir.is_dir(): record_files.extend(chat_dir.glob("*.json")) return record_files def _find_record_file(self, chat_id: str, effect_id: str) -> Path | None: normalized_effect_id = normalize_name(effect_id) for record_file in self._iter_record_files(chat_id): payload = load_json_file(record_file) if normalize_name(str(payload.get("effect_id") or "")) == normalized_effect_id: return record_file return None def _annotation_path(self, chat_id: str, record_file_or_effect_id: Path | str) -> Path: if isinstance(record_file_or_effect_id, Path): payload = load_json_file(record_file_or_effect_id) effect_id = str(payload.get("effect_id") or record_file_or_effect_id.stem).strip() else: effect_id = str(record_file_or_effect_id or "").strip() return self.manual_dir / normalize_name(chat_id) / f"{normalize_name(effect_id)}.json" def _build_record_summary(self, record_file: Path, payload: dict[str, Any]) -> dict[str, Any]: chat_id = record_file.parent.name effect_id = str(payload.get("effect_id") or record_file.stem) scores = payload.get("scores") if isinstance(payload.get("scores"), dict) else {} reply = payload.get("reply") if isinstance(payload.get("reply"), dict) else {} target_user = payload.get("target_user") if isinstance(payload.get("target_user"), dict) else {} manual = self.get_annotation(chat_id, effect_id) return { "chat_id": chat_id, "effect_id": effect_id, "status": str(payload.get("status") or ""), "created_at": str(payload.get("created_at") or ""), "finalize_reason": str(payload.get("finalize_reason") or ""), "asi": scores.get("asi"), "behavior_score": scores.get("behavior_score"), "relational_score": scores.get("relational_score"), "friction_score": scores.get("friction_score"), "manual": manual, "reply_preview": self._truncate(str(reply.get("reply_text") or ""), 160), "target_message_id": str(reply.get("target_message_id") or ""), "target_user": target_user, "followup_count": len(payload.get("followup_messages") or []), "file_name": record_file.name, } @staticmethod def _truncate(text: str, limit: int) -> str: normalized_text = " ".join(str(text or "").split()) if len(normalized_text) <= limit: return normalized_text return f"{normalized_text[: limit - 1]}…" class ReplyEffectPreviewHandler(BaseHTTPRequestHandler): repository: ReplyEffectRepository def do_GET(self) -> None: parsed = urlparse(self.path) if parsed.path == "/": self._send_html(INDEX_HTML_V3) return if parsed.path == "/api/chats": self._send_json({"chats": self.repository.list_chats()}) return if parsed.path == "/api/records": query = parse_qs(parsed.query) records = self.repository.list_records( chat_id=self._first(query, "chat_id"), status=self._first(query, "status"), annotated=self._first(query, "annotated"), ) self._send_json({"records": records}) return if parsed.path == "/api/record": query = parse_qs(parsed.query) record = self.repository.get_record( normalize_name(self._first(query, "chat_id")), normalize_name(self._first(query, "effect_id")), ) if not record: self._send_json({"error": "record not found"}, status=404) return self._send_json({"record": record}) return if parsed.path == "/api/image": query = parse_qs(parsed.query) self._send_image(self._first(query, "path")) return if parsed.path == "/api/image_hash": query = parse_qs(parsed.query) self._send_image_by_hash(self._first(query, "hash"), self._first(query, "kind")) return self._send_json({"error": "not found"}, status=404) def do_POST(self) -> None: parsed = urlparse(self.path) if parsed.path != "/api/annotations": self._send_json({"error": "not found"}, status=404) return try: payload = self._read_json_body() annotation = self.repository.save_annotation(payload) except ValueError as exc: self._send_json({"error": str(exc)}, status=400) return self._send_json({"annotation": annotation}) def log_message(self, format: str, *args: Any) -> None: return def _send_html(self, content: str) -> None: body = content.encode("utf-8") self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _send_image_by_hash(self, image_hash: str, kind: str = "") -> None: image_hash = str(image_hash or "").strip() if not image_hash: self._send_json({"error": "missing image hash"}, status=400) return image_path = self._resolve_image_path_by_hash(image_hash, kind) if image_path is None: self._send_json({"error": "image hash not found"}, status=404) return self._send_image(str(image_path)) @staticmethod def _resolve_image_path_by_hash(image_hash: str, kind: str = "") -> Path | None: try: from sqlmodel import select from src.common.database.database import get_db_session from src.common.database.database_model import Images, ImageType preferred_types = [] if kind == "emoji": preferred_types.append(ImageType.EMOJI) elif kind == "image": preferred_types.append(ImageType.IMAGE) preferred_types.extend(image_type for image_type in (ImageType.IMAGE, ImageType.EMOJI) if image_type not in preferred_types) with get_db_session() as db: for image_type in preferred_types: statement = select(Images).filter_by(image_hash=image_hash, image_type=image_type).limit(1) image_record = db.exec(statement).first() if image_record is None or image_record.no_file_flag: continue image_path = Path(str(image_record.full_path or "")).expanduser().resolve() if image_path.is_file(): return image_path except Exception: return None return None def _send_json(self, payload: dict[str, Any], status: int = 200) -> None: body = json.dumps(payload, ensure_ascii=False, default=str).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _send_image(self, raw_path: str) -> None: try: image_path = Path(raw_path).expanduser().resolve() if not image_path.is_file(): raise FileNotFoundError(raw_path) mime_type = mimetypes.guess_type(str(image_path))[0] or "application/octet-stream" if not mime_type.startswith("image/"): self._send_json({"error": "not an image"}, status=400) return body = image_path.read_bytes() except OSError: self._send_json({"error": "image not found"}, status=404) return self.send_response(200) self.send_header("Content-Type", mime_type) self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def _read_json_body(self) -> dict[str, Any]: length = int(self.headers.get("Content-Length") or 0) raw_body = self.rfile.read(length).decode("utf-8") payload = json.loads(raw_body or "{}") if not isinstance(payload, dict): raise ValueError("请求体必须是 JSON 对象") return payload @staticmethod def _first(query: dict[str, list[str]], key: str) -> str: values = query.get(key) or [""] return values[0] INDEX_HTML = r""" Maisaka 回复效果评分预览

Maisaka 回复效果评分预览

选择一条记录查看详情
""" INDEX_HTML_V2 = r""" Maisaka 回复效果评分预览

Maisaka 回复效果评分预览

选择一条记录查看详情
""" INDEX_HTML_V3 = r""" Maisaka 回复效果评分预览

Maisaka 回复效果评分预览

选择一条记录查看详情
""" def build_handler(repository: ReplyEffectRepository) -> type[ReplyEffectPreviewHandler]: class ConfiguredHandler(ReplyEffectPreviewHandler): pass ConfiguredHandler.repository = repository return ConfiguredHandler def main() -> None: parser = argparse.ArgumentParser(description="预览 Maisaka 回复效果评分,并记录人工评分。") parser.add_argument("--host", default=DEFAULT_HOST, help=f"监听地址,默认 {DEFAULT_HOST}") parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"监听端口,默认 {DEFAULT_PORT}") parser.add_argument("--log-dir", type=Path, default=DEFAULT_LOG_DIR, help="回复效果 JSON 日志目录") parser.add_argument("--manual-dir", type=Path, default=DEFAULT_MANUAL_DIR, help="人工评分 JSON 保存目录") parser.add_argument("--no-browser", action="store_true", help="不自动打开浏览器") args = parser.parse_args() mimetypes.add_type("text/html", ".html") repository = ReplyEffectRepository(args.log_dir, args.manual_dir) server = ThreadingHTTPServer((args.host, args.port), build_handler(repository)) url = f"http://{args.host}:{args.port}/" print(f"Maisaka 回复效果评分预览已启动: {url}") print(f"自动评分目录: {args.log_dir}") print(f"人工评分目录: {args.manual_dir}") if not args.no_browser: webbrowser.open(url) try: server.serve_forever() except KeyboardInterrupt: print("\n正在关闭预览服务...") finally: server.server_close() if __name__ == "__main__": main()