diff --git a/scripts/analyze_reply_effect_score_correlation.py b/scripts/analyze_reply_effect_score_correlation.py new file mode 100644 index 00000000..3f3da3cb --- /dev/null +++ b/scripts/analyze_reply_effect_score_correlation.py @@ -0,0 +1,336 @@ +from pathlib import Path +from typing import Any + +from scipy import stats + +import argparse +import csv +import json +import math + + +DEFAULT_LOG_DIR = Path("logs") / "maisaka_reply_effect" +DEFAULT_MANUAL_DIR = Path("logs") / "maisaka_reply_effect_manual" + + +METRIC_SPECS = [ + ("总分", "asi", "ASI 自动总分"), + ("大项", "behavior_score", "行为满意度 B"), + ("大项", "relational_score", "感知质量 R"), + ("大项", "friction_score", "摩擦风险 F"), + ("大项", "friction_quality_score", "低摩擦质量分"), + ("行为子项", "behavior_signals.continue_2turns", "继续两轮"), + ("行为子项", "behavior_signals.next_user_sentiment", "后续情绪"), + ("行为子项", "behavior_signals.user_expansion", "用户展开"), + ("行为子项", "behavior_signals.no_correction", "没有纠正"), + ("行为子项", "behavior_signals.no_abort", "没有放弃"), + ("rubric 子项", "rubric_scores.social_presence.normalized_score", "社交临场感"), + ("rubric 子项", "rubric_scores.warmth.normalized_score", "温暖感"), + ("rubric 子项", "rubric_scores.competence.normalized_score", "能力/有用性"), + ("rubric 子项", "rubric_scores.appropriateness.normalized_score", "合适程度"), + ("rubric 子项", "rubric_scores.uncanny_risk.normalized_score", "违和风险 judge"), + ("摩擦子项", "friction_signals.explicit_negative", "明确负反馈"), + ("摩擦子项", "friction_signals.repair_loop", "修复循环"), + ("摩擦子项", "friction_signals.uncanny_risk", "违和风险"), +] + + +def normalize_name(value: str) -> str: + normalized = "".join(char if char.isalnum() or char in "._-" else "_" for char in str(value or "").strip()) + normalized = normalized.strip("._") + return normalized or "unknown" + + +def load_json_file(file_path: Path) -> dict[str, Any]: + try: + payload = json.loads(file_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return {} + return payload if isinstance(payload, dict) else {} + + +def to_float(value: Any) -> float | None: + if value in {None, ""}: + return None + try: + number = float(value) + except (TypeError, ValueError): + return None + if math.isnan(number) or math.isinf(number): + return None + return number + + +def get_nested(payload: dict[str, Any], dotted_path: str) -> Any: + current: Any = payload + for key in dotted_path.split("."): + if not isinstance(current, dict): + return None + current = current.get(key) + return current + + +def annotation_path(manual_dir: Path, chat_id: str, effect_id: str) -> Path: + return manual_dir / normalize_name(chat_id) / f"{normalize_name(effect_id)}.json" + + +def iter_records( + log_dir: Path, + manual_dir: Path, + *, + chat_id: str, + include_pending: bool, +) -> list[dict[str, Any]]: + records: list[dict[str, Any]] = [] + if not log_dir.exists(): + return records + + chat_dirs = [log_dir / normalize_name(chat_id)] if chat_id else [path for path in log_dir.iterdir() if path.is_dir()] + for chat_dir in sorted(chat_dirs): + if not chat_dir.exists() or not chat_dir.is_dir(): + continue + for record_file in sorted(chat_dir.glob("*.json")): + effect_record = load_json_file(record_file) + if not effect_record: + continue + if not include_pending and effect_record.get("status") != "finalized": + continue + + effect_id = str(effect_record.get("effect_id") or record_file.stem) + manual_record = load_json_file(annotation_path(manual_dir, chat_dir.name, effect_id)) + manual_score = to_float(manual_record.get("manual_score")) + if manual_score is None: + manual_score_5 = to_float(manual_record.get("manual_score_5")) + if manual_score_5 is not None: + manual_score = (manual_score_5 - 1) / 4 * 100 + if manual_score is None: + continue + + raw_scores = effect_record.get("scores") if isinstance(effect_record.get("scores"), dict) else {} + scores = dict(raw_scores) + friction_score = to_float(scores.get("friction_score")) + if friction_score is not None: + scores["friction_quality_score"] = 1 - friction_score + records.append( + { + "chat_id": chat_dir.name, + "effect_id": effect_id, + "manual_score": manual_score, + "manual_score_5": manual_record.get("manual_score_5"), + "scores": scores, + "status": effect_record.get("status"), + "created_at": effect_record.get("created_at"), + "record_file": str(record_file), + } + ) + return records + + +def calculate_metric_stats(records: list[dict[str, Any]], metric_path: str, min_n: int) -> dict[str, Any]: + pairs: list[tuple[float, float]] = [] + for record in records: + x_value = to_float(get_nested(record["scores"], metric_path)) + y_value = to_float(record["manual_score"]) + if x_value is None or y_value is None: + continue + pairs.append((x_value, y_value)) + + x_values = [pair[0] for pair in pairs] + y_values = [pair[1] for pair in pairs] + result: dict[str, Any] = { + "n": len(pairs), + "pearson_r": None, + "pearson_p": None, + "spearman_r": None, + "spearman_p": None, + "kendall_tau": None, + "kendall_p": None, + "note": "", + } + if len(pairs) < min_n: + result["note"] = f"样本数少于 {min_n}" + return result + if len(set(x_values)) < 2: + result["note"] = "自动评分没有变化,无法计算相关" + return result + if len(set(y_values)) < 2: + result["note"] = "人工评分没有变化,无法计算相关" + return result + + pearson = stats.pearsonr(x_values, y_values) + spearman = stats.spearmanr(x_values, y_values) + kendall = stats.kendalltau(x_values, y_values) + result.update( + { + "pearson_r": round_float(pearson.statistic), + "pearson_p": round_float(pearson.pvalue), + "spearman_r": round_float(spearman.statistic), + "spearman_p": round_float(spearman.pvalue), + "kendall_tau": round_float(kendall.statistic), + "kendall_p": round_float(kendall.pvalue), + } + ) + return result + + +def round_float(value: Any) -> float | None: + number = to_float(value) + if number is None: + return None + return round(number, 6) + + +def significance_label(p_value: float | None) -> str: + if p_value is None: + return "" + if p_value < 0.001: + return "***" + if p_value < 0.01: + return "**" + if p_value < 0.05: + return "*" + if p_value < 0.1: + return "." + return "ns" + + +def build_report(records: list[dict[str, Any]], min_n: int) -> list[dict[str, Any]]: + report: list[dict[str, Any]] = [] + for group, metric_path, label in METRIC_SPECS: + metric_stats = calculate_metric_stats(records, metric_path, min_n) + report.append( + { + "group": group, + "metric": metric_path, + "label": label, + **metric_stats, + "pearson_sig": significance_label(metric_stats["pearson_p"]), + "spearman_sig": significance_label(metric_stats["spearman_p"]), + "kendall_sig": significance_label(metric_stats["kendall_p"]), + } + ) + return report + + +def print_report(records: list[dict[str, Any]], report: list[dict[str, Any]]) -> None: + chats = sorted({record["chat_id"] for record in records}) + print("\nMaisaka 回复效果评分相关性分析") + print("=" * 96) + print(f"已匹配人工评分记录数: {len(records)}") + print(f"聊天流数量: {len(chats)}") + if chats: + print(f"聊天流: {', '.join(chats[:8])}{' ...' if len(chats) > 8 else ''}") + print("人工分使用 manual_score,若只有 manual_score_5,则换算到 0-100 后参与计算。") + print("显著性: *** p<0.001, ** p<0.01, * p<0.05, . p<0.1, ns 不显著") + print("-" * 96) + + header = ( + f"{'分组':<14} {'指标':<34} {'n':>4} " + f"{'Pearson r':>10} {'p':>10} {'sig':>4} " + f"{'Spearman r':>11} {'p':>10} {'sig':>4} " + f"{'Kendall':>9} {'p':>10} {'说明'}" + ) + print(header) + print("-" * 96) + for item in report: + print( + f"{item['group']:<14} " + f"{item['label']:<34} " + f"{item['n']:>4} " + f"{format_number(item['pearson_r']):>10} " + f"{format_number(item['pearson_p']):>10} " + f"{item['pearson_sig']:>4} " + f"{format_number(item['spearman_r']):>11} " + f"{format_number(item['spearman_p']):>10} " + f"{item['spearman_sig']:>4} " + f"{format_number(item['kendall_tau']):>9} " + f"{format_number(item['kendall_p']):>10} " + f"{item['note']}" + ) + + total = next((item for item in report if item["metric"] == "asi"), None) + if total: + print("-" * 96) + print( + "总分 ASI 与人工分的 Pearson 相关: " + f"r={format_number(total['pearson_r'])}, " + f"p={format_number(total['pearson_p'])}, " + f"显著性={total['pearson_sig'] or 'N/A'}" + ) + + +def format_number(value: Any) -> str: + if value is None: + return "N/A" + number = to_float(value) + if number is None: + return "N/A" + if abs(number) < 0.000001: + return "0" + return f"{number:.4g}" + + +def write_csv(file_path: Path, report: list[dict[str, Any]]) -> None: + file_path.parent.mkdir(parents=True, exist_ok=True) + fieldnames = [ + "group", + "metric", + "label", + "n", + "pearson_r", + "pearson_p", + "pearson_sig", + "spearman_r", + "spearman_p", + "spearman_sig", + "kendall_tau", + "kendall_p", + "kendall_sig", + "note", + ] + with file_path.open("w", encoding="utf-8-sig", newline="") as csv_file: + writer = csv.DictWriter(csv_file, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(report) + + +def write_json(file_path: Path, records: list[dict[str, Any]], report: list[dict[str, Any]]) -> None: + file_path.parent.mkdir(parents=True, exist_ok=True) + payload = { + "matched_record_count": len(records), + "chat_count": len({record["chat_id"] for record in records}), + "report": report, + } + file_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") + + +def main() -> None: + parser = argparse.ArgumentParser(description="分析 Maisaka 回复效果自动评分与人工评分的相关性和显著性。") + parser.add_argument("--log-dir", type=Path, default=DEFAULT_LOG_DIR, help="自动评分 JSON 目录") + parser.add_argument("--manual-dir", type=Path, default=DEFAULT_MANUAL_DIR, help="人工评分 JSON 目录") + parser.add_argument("--chat-id", default="", help="只分析某个 platform_type_id,例如 qq_group_1028699246") + parser.add_argument("--include-pending", action="store_true", help="包含尚未 finalized 的记录") + parser.add_argument("--min-n", type=int, default=3, help="计算相关性需要的最小样本数,默认 3") + parser.add_argument("--csv", type=Path, default=None, help="把统计结果另存为 CSV") + parser.add_argument("--json", type=Path, default=None, help="把统计结果另存为 JSON") + args = parser.parse_args() + + records = iter_records( + args.log_dir, + args.manual_dir, + chat_id=args.chat_id, + include_pending=args.include_pending, + ) + report = build_report(records, max(2, args.min_n)) + print_report(records, report) + + if args.csv: + write_csv(args.csv, report) + print(f"\nCSV 已保存: {args.csv}") + if args.json: + write_json(args.json, records, report) + print(f"JSON 已保存: {args.json}") + + +if __name__ == "__main__": + main() diff --git a/scripts/preview_reply_effect_scores.py b/scripts/preview_reply_effect_scores.py new file mode 100644 index 00000000..86648393 --- /dev/null +++ b/scripts/preview_reply_effect_scores.py @@ -0,0 +1,2219 @@ +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path +from typing import Any +from urllib.parse import parse_qs, urlparse + +import argparse +import json +import mimetypes +import time +import webbrowser + + +DEFAULT_LOG_DIR = Path("logs") / "maisaka_reply_effect" +DEFAULT_MANUAL_DIR = Path("logs") / "maisaka_reply_effect_manual" +DEFAULT_HOST = "127.0.0.1" +DEFAULT_PORT = 8765 + + +def normalize_name(value: str) -> str: + normalized = "".join(char if char.isalnum() or char in "._-" else "_" for char in str(value or "").strip()) + normalized = normalized.strip("._") + return normalized or "unknown" + + +def load_json_file(file_path: Path) -> dict[str, Any]: + try: + payload = json.loads(file_path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return {} + return payload if isinstance(payload, dict) else {} + + +def write_json_file(file_path: Path, payload: dict[str, Any]) -> None: + file_path.parent.mkdir(parents=True, exist_ok=True) + temp_path = file_path.with_name(f".{file_path.name}.tmp") + temp_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2, default=str), encoding="utf-8") + temp_path.replace(file_path) + + +class ReplyEffectRepository: + def __init__(self, log_dir: Path, manual_dir: Path) -> None: + self.log_dir = log_dir + self.manual_dir = manual_dir + + def list_chats(self) -> list[dict[str, Any]]: + chats: list[dict[str, Any]] = [] + if not self.log_dir.exists(): + return chats + + for chat_dir in sorted(path for path in self.log_dir.iterdir() if path.is_dir()): + records = list(chat_dir.glob("*.json")) + annotated_count = sum(1 for record_file in records if self._annotation_path(chat_dir.name, record_file).exists()) + finalized_count = 0 + pending_count = 0 + for record_file in records: + payload = load_json_file(record_file) + if payload.get("status") == "finalized": + finalized_count += 1 + else: + pending_count += 1 + chats.append( + { + "chat_id": chat_dir.name, + "record_count": len(records), + "finalized_count": finalized_count, + "pending_count": pending_count, + "annotated_count": annotated_count, + } + ) + return chats + + def list_records( + self, + *, + chat_id: str | None = None, + status: str = "", + annotated: str = "", + ) -> list[dict[str, Any]]: + records: list[dict[str, Any]] = [] + for record_file in self._iter_record_files(chat_id): + payload = load_json_file(record_file) + if not payload: + continue + summary = self._build_record_summary(record_file, payload) + if status and summary["status"] != status: + continue + if annotated == "yes" and summary["manual"] is None: + continue + if annotated == "no" and summary["manual"] is not None: + continue + records.append(summary) + return sorted(records, key=lambda item: str(item.get("created_at") or ""), reverse=True) + + def get_record(self, chat_id: str, effect_id: str) -> dict[str, Any]: + record_file = self._find_record_file(chat_id, effect_id) + if record_file is None: + return {} + payload = load_json_file(record_file) + if not payload: + return {} + payload["_manual"] = self.get_annotation(chat_id, effect_id) + payload["_record_path"] = str(record_file) + return payload + + def get_annotation(self, chat_id: str, effect_id: str) -> dict[str, Any] | None: + annotation_path = self._annotation_path(chat_id, effect_id) + if not annotation_path.exists(): + return None + payload = load_json_file(annotation_path) + return payload or None + + def save_annotation(self, payload: dict[str, Any]) -> dict[str, Any]: + chat_id = normalize_name(str(payload.get("chat_id") or "")) + effect_id = normalize_name(str(payload.get("effect_id") or "")) + if not chat_id or chat_id == "unknown" or not effect_id or effect_id == "unknown": + raise ValueError("缺少 chat_id 或 effect_id") + if self._find_record_file(chat_id, effect_id) is None: + raise ValueError("找不到对应的回复效果记录") + + manual_score = payload.get("manual_score") + manual_score_5 = payload.get("manual_score_5") + normalized_score: float | None = None + normalized_score_5: int | None = None + if manual_score_5 not in {None, ""}: + try: + normalized_score_5 = int(manual_score_5) + except (TypeError, ValueError): + raise ValueError("manual_score_5 必须是 1-5 的整数") from None + if normalized_score_5 < 1 or normalized_score_5 > 5: + raise ValueError("manual_score_5 必须是 1-5 的整数") + normalized_score = round((normalized_score_5 - 1) / 4 * 100, 2) + elif manual_score not in {None, ""}: + try: + normalized_score = max(0.0, min(100.0, float(manual_score))) + except (TypeError, ValueError): + raise ValueError("manual_score 必须是 0-100 的数字") from None + else: + raise ValueError("缺少人工评分") + + annotation = { + "schema_version": 1, + "chat_id": chat_id, + "effect_id": effect_id, + "manual_score": round(normalized_score, 2), + "manual_score_5": normalized_score_5, + "manual_label": str(payload.get("manual_label") or "").strip(), + "evaluator": str(payload.get("evaluator") or "manual").strip() or "manual", + "notes": str(payload.get("notes") or "").strip(), + "updated_at": time.strftime("%Y-%m-%dT%H:%M:%S%z"), + } + write_json_file(self._annotation_path(chat_id, effect_id), annotation) + return annotation + + def _iter_record_files(self, chat_id: str | None = None) -> list[Path]: + if not self.log_dir.exists(): + return [] + if chat_id: + chat_dir = self.log_dir / normalize_name(chat_id) + if not chat_dir.exists() or not chat_dir.is_dir(): + return [] + return sorted(chat_dir.glob("*.json")) + + record_files: list[Path] = [] + for chat_dir in self.log_dir.iterdir(): + if chat_dir.is_dir(): + record_files.extend(chat_dir.glob("*.json")) + return record_files + + def _find_record_file(self, chat_id: str, effect_id: str) -> Path | None: + normalized_effect_id = normalize_name(effect_id) + for record_file in self._iter_record_files(chat_id): + payload = load_json_file(record_file) + if normalize_name(str(payload.get("effect_id") or "")) == normalized_effect_id: + return record_file + return None + + def _annotation_path(self, chat_id: str, record_file_or_effect_id: Path | str) -> Path: + if isinstance(record_file_or_effect_id, Path): + payload = load_json_file(record_file_or_effect_id) + effect_id = str(payload.get("effect_id") or record_file_or_effect_id.stem).strip() + else: + effect_id = str(record_file_or_effect_id or "").strip() + return self.manual_dir / normalize_name(chat_id) / f"{normalize_name(effect_id)}.json" + + def _build_record_summary(self, record_file: Path, payload: dict[str, Any]) -> dict[str, Any]: + chat_id = record_file.parent.name + effect_id = str(payload.get("effect_id") or record_file.stem) + scores = payload.get("scores") if isinstance(payload.get("scores"), dict) else {} + reply = payload.get("reply") if isinstance(payload.get("reply"), dict) else {} + target_user = payload.get("target_user") if isinstance(payload.get("target_user"), dict) else {} + manual = self.get_annotation(chat_id, effect_id) + return { + "chat_id": chat_id, + "effect_id": effect_id, + "status": str(payload.get("status") or ""), + "created_at": str(payload.get("created_at") or ""), + "finalize_reason": str(payload.get("finalize_reason") or ""), + "asi": scores.get("asi"), + "behavior_score": scores.get("behavior_score"), + "relational_score": scores.get("relational_score"), + "friction_score": scores.get("friction_score"), + "manual": manual, + "reply_preview": self._truncate(str(reply.get("reply_text") or ""), 160), + "target_message_id": str(reply.get("target_message_id") or ""), + "target_user": target_user, + "followup_count": len(payload.get("followup_messages") or []), + "file_name": record_file.name, + } + + @staticmethod + def _truncate(text: str, limit: int) -> str: + normalized_text = " ".join(str(text or "").split()) + if len(normalized_text) <= limit: + return normalized_text + return f"{normalized_text[: limit - 1]}…" + + +class ReplyEffectPreviewHandler(BaseHTTPRequestHandler): + repository: ReplyEffectRepository + + def do_GET(self) -> None: + parsed = urlparse(self.path) + if parsed.path == "/": + self._send_html(INDEX_HTML_V3) + return + if parsed.path == "/api/chats": + self._send_json({"chats": self.repository.list_chats()}) + return + if parsed.path == "/api/records": + query = parse_qs(parsed.query) + records = self.repository.list_records( + chat_id=self._first(query, "chat_id"), + status=self._first(query, "status"), + annotated=self._first(query, "annotated"), + ) + self._send_json({"records": records}) + return + if parsed.path == "/api/record": + query = parse_qs(parsed.query) + record = self.repository.get_record( + normalize_name(self._first(query, "chat_id")), + normalize_name(self._first(query, "effect_id")), + ) + if not record: + self._send_json({"error": "record not found"}, status=404) + return + self._send_json({"record": record}) + return + if parsed.path == "/api/image": + query = parse_qs(parsed.query) + self._send_image(self._first(query, "path")) + return + if parsed.path == "/api/image_hash": + query = parse_qs(parsed.query) + self._send_image_by_hash(self._first(query, "hash"), self._first(query, "kind")) + return + self._send_json({"error": "not found"}, status=404) + + def do_POST(self) -> None: + parsed = urlparse(self.path) + if parsed.path != "/api/annotations": + self._send_json({"error": "not found"}, status=404) + return + try: + payload = self._read_json_body() + annotation = self.repository.save_annotation(payload) + except ValueError as exc: + self._send_json({"error": str(exc)}, status=400) + return + self._send_json({"annotation": annotation}) + + def log_message(self, format: str, *args: Any) -> None: + return + + def _send_html(self, content: str) -> None: + body = content.encode("utf-8") + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def _send_image_by_hash(self, image_hash: str, kind: str = "") -> None: + image_hash = str(image_hash or "").strip() + if not image_hash: + self._send_json({"error": "missing image hash"}, status=400) + return + image_path = self._resolve_image_path_by_hash(image_hash, kind) + if image_path is None: + self._send_json({"error": "image hash not found"}, status=404) + return + self._send_image(str(image_path)) + + @staticmethod + def _resolve_image_path_by_hash(image_hash: str, kind: str = "") -> Path | None: + try: + from sqlmodel import select + + from src.common.database.database import get_db_session + from src.common.database.database_model import Images, ImageType + + preferred_types = [] + if kind == "emoji": + preferred_types.append(ImageType.EMOJI) + elif kind == "image": + preferred_types.append(ImageType.IMAGE) + preferred_types.extend(image_type for image_type in (ImageType.IMAGE, ImageType.EMOJI) if image_type not in preferred_types) + + with get_db_session() as db: + for image_type in preferred_types: + statement = select(Images).filter_by(image_hash=image_hash, image_type=image_type).limit(1) + image_record = db.exec(statement).first() + if image_record is None or image_record.no_file_flag: + continue + image_path = Path(str(image_record.full_path or "")).expanduser().resolve() + if image_path.is_file(): + return image_path + except Exception: + return None + return None + + def _send_json(self, payload: dict[str, Any], status: int = 200) -> None: + body = json.dumps(payload, ensure_ascii=False, default=str).encode("utf-8") + self.send_response(status) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def _send_image(self, raw_path: str) -> None: + try: + image_path = Path(raw_path).expanduser().resolve() + if not image_path.is_file(): + raise FileNotFoundError(raw_path) + mime_type = mimetypes.guess_type(str(image_path))[0] or "application/octet-stream" + if not mime_type.startswith("image/"): + self._send_json({"error": "not an image"}, status=400) + return + body = image_path.read_bytes() + except OSError: + self._send_json({"error": "image not found"}, status=404) + return + + self.send_response(200) + self.send_header("Content-Type", mime_type) + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def _read_json_body(self) -> dict[str, Any]: + length = int(self.headers.get("Content-Length") or 0) + raw_body = self.rfile.read(length).decode("utf-8") + payload = json.loads(raw_body or "{}") + if not isinstance(payload, dict): + raise ValueError("请求体必须是 JSON 对象") + return payload + + @staticmethod + def _first(query: dict[str, list[str]], key: str) -> str: + values = query.get(key) or [""] + return values[0] + + +INDEX_HTML = r""" + + + + + Maisaka 回复效果评分预览 + + + +
+

Maisaka 回复效果评分预览

+ +
+
+ +
+
+ + +
+
+
+
+
选择一条记录查看详情
+
+
+ + + +""" + + +INDEX_HTML_V2 = r""" + + + + + Maisaka 回复效果评分预览 + + + +
+

Maisaka 回复效果评分预览

+
+ + + +
+
+
+ +
+
+
+ + +
+
+
+ +
+
+
选择一条记录查看详情
+
+
+ + + +""" + +INDEX_HTML_V3 = r""" + + + + + Maisaka 回复效果评分预览 + + + +
+

Maisaka 回复效果评分预览

+
+ + + +
+
+
+ +
+
+
+ + +
+
+
+ +
+
+
选择一条记录查看详情
+
+
+ + + +""" + + +def build_handler(repository: ReplyEffectRepository) -> type[ReplyEffectPreviewHandler]: + class ConfiguredHandler(ReplyEffectPreviewHandler): + pass + + ConfiguredHandler.repository = repository + return ConfiguredHandler + + +def main() -> None: + parser = argparse.ArgumentParser(description="预览 Maisaka 回复效果评分,并记录人工评分。") + parser.add_argument("--host", default=DEFAULT_HOST, help=f"监听地址,默认 {DEFAULT_HOST}") + parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"监听端口,默认 {DEFAULT_PORT}") + parser.add_argument("--log-dir", type=Path, default=DEFAULT_LOG_DIR, help="回复效果 JSON 日志目录") + parser.add_argument("--manual-dir", type=Path, default=DEFAULT_MANUAL_DIR, help="人工评分 JSON 保存目录") + parser.add_argument("--no-browser", action="store_true", help="不自动打开浏览器") + args = parser.parse_args() + + mimetypes.add_type("text/html", ".html") + repository = ReplyEffectRepository(args.log_dir, args.manual_dir) + server = ThreadingHTTPServer((args.host, args.port), build_handler(repository)) + url = f"http://{args.host}:{args.port}/" + print(f"Maisaka 回复效果评分预览已启动: {url}") + print(f"自动评分目录: {args.log_dir}") + print(f"人工评分目录: {args.manual_dir}") + if not args.no_browser: + webbrowser.open(url) + try: + server.serve_forever() + except KeyboardInterrupt: + print("\n正在关闭预览服务...") + finally: + server.server_close() + + +if __name__ == "__main__": + main() diff --git a/src/chat/replyer/maisaka_generator_base.py b/src/chat/replyer/maisaka_generator_base.py index 812b82d5..8fcf57ce 100644 --- a/src/chat/replyer/maisaka_generator_base.py +++ b/src/chat/replyer/maisaka_generator_base.py @@ -2,7 +2,6 @@ from dataclasses import dataclass, field from datetime import datetime from typing import Any, Awaitable, Callable, Dict, List, Literal, Optional, Tuple -import random import time from rich.console import Group, RenderableType @@ -83,14 +82,6 @@ class BaseMaisakaReplyGenerator: bot_aliases = f",也有人叫你{','.join(alias_names)}" if alias_names else "" prompt_personality = global_config.personality.personality - if ( - hasattr(global_config.personality, "states") - and global_config.personality.states - and hasattr(global_config.personality, "state_probability") - and global_config.personality.state_probability > 0 - and random.random() < global_config.personality.state_probability - ): - prompt_personality = random.choice(global_config.personality.states) return f"你的名字是{bot_name}{bot_aliases},你{prompt_personality};" except Exception as exc: diff --git a/src/config/config.py b/src/config/config.py index 8de2a29b..db7e9863 100644 --- a/src/config/config.py +++ b/src/config/config.py @@ -23,7 +23,6 @@ from .official_configs import ( EmojiConfig, ExpressionConfig, KeywordReactionConfig, - MaiSakaConfig, MaimMessageConfig, MCPConfig, MemoryConfig, @@ -55,7 +54,7 @@ BOT_CONFIG_PATH: Path = (CONFIG_DIR / "bot_config.toml").resolve().absolute() MODEL_CONFIG_PATH: Path = (CONFIG_DIR / "model_config.toml").resolve().absolute() LEGACY_ENV_PATH: Path = (PROJECT_ROOT / ".env").resolve().absolute() MMC_VERSION: str = "1.0.0" -CONFIG_VERSION: str = "8.8.0" +CONFIG_VERSION: str = "8.9.3" MODEL_CONFIG_VERSION: str = "1.14.0" logger = get_logger("config") @@ -121,9 +120,6 @@ class Config(ConfigBase): database: DatabaseConfig = Field(default_factory=DatabaseConfig) """数据库配置类""" - maisaka: MaiSakaConfig = Field(default_factory=MaiSakaConfig) - """MaiSaka对话系统配置类""" - mcp: MCPConfig = Field(default_factory=MCPConfig) """MCP 配置类""" diff --git a/src/config/official_configs.py b/src/config/official_configs.py index 8d3cff38..e8f364ae 100644 --- a/src/config/official_configs.py +++ b/src/config/official_configs.py @@ -113,32 +113,6 @@ class PersonalityConfig(ConfigBase): ) """每次构建回复时,从 multiple_reply_style 中随机替换 reply_style 的概率(0.0-1.0)""" - states: list[str] = Field( - default_factory=lambda: [ - "是一个女大学生,喜欢上网聊天,会刷小红书。", - "是一个大二心理学生,会刷贴吧和中国知网。", - "是一个赛博网友,最近很想吐槽人。", - ], - json_schema_extra={ - "x-widget": "custom", - "x-icon": "shuffle", - }, - ) - """_wrap_状态列表,用于随机替换personality""" - - state_probability: float = Field( - default=0.3, - ge=0, - le=1, - json_schema_extra={ - "x-widget": "slider", - "x-icon": "percent", - "step": 0.1, - }, - ) - """状态概率,每次构建人格时替换personality的概率""" - - class VisualConfig(ConfigBase): """视觉配置类""" @@ -1161,6 +1135,16 @@ class DebugConfig(ConfigBase): ) """是否显示记忆检索相关prompt""" + enable_reply_effect_tracking: bool = Field( + default=False, + json_schema_extra={ + "x-widget": "switch", + "x-icon": "activity", + }, + ) + """是否开启回复效果评分追踪,默认关闭,需要手动打开""" + + class ExtraPromptItem(ConfigBase): platform: str = Field( default="", diff --git a/src/maisaka/builtin_tool/reply.py b/src/maisaka/builtin_tool/reply.py index fa182401..8f66a8ca 100644 --- a/src/maisaka/builtin_tool/reply.py +++ b/src/maisaka/builtin_tool/reply.py @@ -121,13 +121,15 @@ async def handle_tool( "Maisaka 回复生成器当前不可用。", ) + replyer_chat_history = list(tool_ctx.runtime._chat_history) + try: success, reply_result = await replyer.generate_reply_with_context( reply_reason=latest_thought, reference_info=reference_info, stream_id=tool_ctx.runtime.session_id, reply_message=target_message, - chat_history=tool_ctx.runtime._chat_history, + chat_history=replyer_chat_history, sub_agent_runner=lambda system_prompt: _run_expression_selector( tool_ctx, system_prompt, @@ -207,6 +209,17 @@ async def handle_tool( if tool_ctx.runtime.chat_stream.platform == CLI_PLATFORM_NAME: tool_ctx.append_guided_reply_to_chat_history(combined_reply_text) tool_ctx.runtime._record_reply_sent() + await tool_ctx.runtime.track_reply_effect( + tool_call_id=invocation.call_id, + target_message=target_message, + set_quote=set_quote, + reply_text=combined_reply_text, + reply_segments=reply_segments, + planner_reasoning=latest_thought, + reference_info=reference_info, + reply_metadata=reply_metadata, + replyer_context_messages=replyer_chat_history, + ) return tool_ctx.build_success_result( invocation.tool_name, "回复已生成并发送。", diff --git a/src/maisaka/chat_loop_service.py b/src/maisaka/chat_loop_service.py index f4a8a7db..377eee9c 100644 --- a/src/maisaka/chat_loop_service.py +++ b/src/maisaka/chat_loop_service.py @@ -5,7 +5,6 @@ from datetime import datetime from typing import Any, List, Optional, Sequence import asyncio -import random from rich.console import RenderableType from src.common.data_models.llm_service_data_models import LLMGenerationOptions @@ -263,14 +262,6 @@ class MaisakaChatLoopService: bot_nickname = "" prompt_personality = global_config.personality.personality - if ( - hasattr(global_config.personality, "states") - and global_config.personality.states - and hasattr(global_config.personality, "state_probability") - and global_config.personality.state_probability > 0 - and random.random() < global_config.personality.state_probability - ): - prompt_personality = random.choice(global_config.personality.states) return f"Your name is {bot_name}{bot_nickname}; persona: {prompt_personality};" except Exception: diff --git a/src/maisaka/reply_effect/__init__.py b/src/maisaka/reply_effect/__init__.py new file mode 100644 index 00000000..efccf32e --- /dev/null +++ b/src/maisaka/reply_effect/__init__.py @@ -0,0 +1,5 @@ +"""Maisaka 回复效果观察器。""" + +from .tracker import ReplyEffectTracker + +__all__ = ["ReplyEffectTracker"] diff --git a/src/maisaka/reply_effect/image_utils.py b/src/maisaka/reply_effect/image_utils.py new file mode 100644 index 00000000..98b3e46a --- /dev/null +++ b/src/maisaka/reply_effect/image_utils.py @@ -0,0 +1,100 @@ +"""回复效果记录中的图片/表情附件提取工具。""" + +from base64 import b64encode +from pathlib import Path +from typing import Any + +from src.common.data_models.message_component_data_model import EmojiComponent, ImageComponent, MessageSequence + + +_MAX_INLINE_IMAGE_BYTES = 2 * 1024 * 1024 + + +def extract_visual_attachments_from_sequence(message_sequence: MessageSequence | None) -> list[dict[str, Any]]: + """从消息片段中提取可供评分页面展示的图片/表情信息。""" + + if message_sequence is None: + return [] + + attachments: list[dict[str, Any]] = [] + for index, component in enumerate(message_sequence.components): + if isinstance(component, ImageComponent): + attachments.append(_build_visual_attachment(component, index=index, kind="image")) + elif isinstance(component, EmojiComponent): + attachments.append(_build_visual_attachment(component, index=index, kind="emoji")) + return attachments + + +def _build_visual_attachment(component: ImageComponent | EmojiComponent, *, index: int, kind: str) -> dict[str, Any]: + binary_hash = str(component.binary_hash or "").strip() + attachment: dict[str, Any] = { + "kind": kind, + "index": index, + "hash": binary_hash, + "content": str(component.content or "").strip(), + "path": "", + "data_url": "", + } + + file_path = _resolve_image_path(binary_hash, kind=kind) + if file_path: + attachment["path"] = str(file_path) + attachment["file_name"] = file_path.name + attachment["mime_type"] = _guess_mime_type(file_path.suffix) + return attachment + + binary_data = bytes(component.binary_data or b"") + if binary_data and len(binary_data) <= _MAX_INLINE_IMAGE_BYTES: + mime_type = _guess_mime_type_from_bytes(binary_data) + attachment["mime_type"] = mime_type + attachment["data_url"] = f"data:{mime_type};base64,{b64encode(binary_data).decode('ascii')}" + return attachment + + +def _resolve_image_path(binary_hash: str, *, kind: str) -> Path | None: + if not binary_hash: + return None + + try: + from sqlmodel import select + + from src.common.database.database import get_db_session + from src.common.database.database_model import Images, ImageType + + image_type = ImageType.EMOJI if kind == "emoji" else ImageType.IMAGE + with get_db_session() as db: + statement = select(Images).filter_by(image_hash=binary_hash, image_type=image_type).limit(1) + image_record = db.exec(statement).first() + if image_record is None or getattr(image_record, "no_file_flag", False): + return None + file_path = Path(str(image_record.full_path or "")).expanduser().resolve() + if file_path.is_file(): + return file_path + except Exception: + return None + return None + + +def _guess_mime_type(suffix: str) -> str: + normalized_suffix = suffix.lower().lstrip(".") + if normalized_suffix in {"jpg", "jpeg"}: + return "image/jpeg" + if normalized_suffix == "gif": + return "image/gif" + if normalized_suffix == "webp": + return "image/webp" + if normalized_suffix == "bmp": + return "image/bmp" + return "image/png" + + +def _guess_mime_type_from_bytes(binary_data: bytes) -> str: + if binary_data.startswith(b"\xff\xd8\xff"): + return "image/jpeg" + if binary_data.startswith(b"GIF8"): + return "image/gif" + if binary_data.startswith(b"RIFF") and b"WEBP" in binary_data[:16]: + return "image/webp" + if binary_data.startswith(b"BM"): + return "image/bmp" + return "image/png" diff --git a/src/maisaka/reply_effect/judge.py b/src/maisaka/reply_effect/judge.py new file mode 100644 index 00000000..58b20c74 --- /dev/null +++ b/src/maisaka/reply_effect/judge.py @@ -0,0 +1,116 @@ +"""回复效果 LLM 窄维度评审。""" + +from __future__ import annotations + +from collections.abc import Awaitable, Callable +from typing import Any, Dict, List, Tuple + +import json + +from .models import FollowupMessageSnapshot, ReplyEffectRecord, RubricScoreItem, RubricScores +from .scoring import normalize_text_for_prompt + +JudgeRunner = Callable[[str], Awaitable[str]] + + +async def judge_reply_effect(record: ReplyEffectRecord, judge_runner: JudgeRunner | None) -> Tuple[RubricScores, str]: + """执行 LLM rubric judge,失败时返回中性分。""" + + if judge_runner is None: + return RubricScores(), "未提供 LLM judge runner" + + prompt = build_judge_prompt(record) + try: + response_text = await judge_runner(prompt) + payload = _loads_json_object(response_text) + return parse_rubric_scores(payload), "" + except Exception as exc: + return RubricScores(), str(exc) + + +def build_judge_prompt(record: ReplyEffectRecord) -> str: + """构建窄维度评分 prompt。""" + + followup_text = _format_followups(record.followup_messages) + return ( + "你是 Maisaka 回复效果的窄维度评审器,只评估这一次 bot 回复的交互感知质量。\n" + "不要评价总体满意度,不要给建议,只输出 JSON。\n\n" + "评分范围:1 到 5,1=很差,3=中性,5=很好。\n" + "uncanny_risk 的 1=完全不怪,5=非常过度拟人/越界/油腻。\n\n" + f"bot 回复:\n{normalize_text_for_prompt(record.reply.reply_text, 1200)}\n\n" + f"后续用户消息:\n{followup_text or '(暂无后续用户消息)'}\n\n" + "请输出严格 JSON 对象,格式如下:\n" + "{\n" + ' "social_presence": {"score": 3, "reason": "...", "evidence_spans": ["..."], "confidence": 0.7},\n' + ' "warmth": {"score": 3, "reason": "...", "evidence_spans": ["..."], "confidence": 0.7},\n' + ' "competence": {"score": 3, "reason": "...", "evidence_spans": ["..."], "confidence": 0.7},\n' + ' "appropriateness": {"score": 3, "reason": "...", "evidence_spans": ["..."], "confidence": 0.7},\n' + ' "uncanny_risk": {"score": 3, "reason": "...", "evidence_spans": ["..."], "confidence": 0.7}\n' + "}" + ) + + +def parse_rubric_scores(payload: Dict[str, Any]) -> RubricScores: + """解析 LLM rubric JSON。""" + + return RubricScores( + social_presence=_parse_item(payload.get("social_presence")), + warmth=_parse_item(payload.get("warmth")), + competence=_parse_item(payload.get("competence")), + appropriateness=_parse_item(payload.get("appropriateness")), + uncanny_risk=_parse_item(payload.get("uncanny_risk")), + available=True, + ) + + +def _parse_item(raw_item: Any) -> RubricScoreItem: + if not isinstance(raw_item, dict): + raw_item = {} + score = _coerce_float(raw_item.get("score"), 3.0) + score = max(1.0, min(5.0, score)) + evidence_spans = raw_item.get("evidence_spans") + if not isinstance(evidence_spans, list): + evidence_spans = [] + return RubricScoreItem( + score=score, + normalized_score=round((score - 1.0) / 4.0, 4), + reason=str(raw_item.get("reason") or "").strip(), + evidence_spans=[str(item).strip() for item in evidence_spans if str(item).strip()], + confidence=max(0.0, min(1.0, _coerce_float(raw_item.get("confidence"), 0.0))), + ) + + +def _loads_json_object(response_text: str) -> Dict[str, Any]: + normalized_response = str(response_text or "").strip() + if normalized_response.startswith("```"): + normalized_response = normalized_response.strip("`") + if normalized_response.lower().startswith("json"): + normalized_response = normalized_response[4:].strip() + try: + parsed = json.loads(normalized_response) + except json.JSONDecodeError: + start = normalized_response.find("{") + end = normalized_response.rfind("}") + if start < 0 or end <= start: + raise + parsed = json.loads(normalized_response[start : end + 1]) + if not isinstance(parsed, dict): + raise ValueError("LLM judge 未返回 JSON 对象") + return parsed + + +def _format_followups(followups: List[FollowupMessageSnapshot]) -> str: + lines: List[str] = [] + for index, followup in enumerate(followups[:5], start=1): + marker = "目标用户" if followup.is_target_user else "其他用户" + lines.append( + f"{index}. [{marker}] {normalize_text_for_prompt(followup.visible_text or followup.plain_text, 500)}" + ) + return "\n".join(lines) + + +def _coerce_float(value: Any, default: float) -> float: + try: + return float(value) + except (TypeError, ValueError): + return default diff --git a/src/maisaka/reply_effect/models.py b/src/maisaka/reply_effect/models.py new file mode 100644 index 00000000..26d06fb0 --- /dev/null +++ b/src/maisaka/reply_effect/models.py @@ -0,0 +1,164 @@ +"""回复效果观察器的数据模型。""" + +from dataclasses import asdict, dataclass, field +from datetime import datetime +from enum import Enum +from pathlib import Path +from typing import Any, Dict, List, Optional + + +SCHEMA_VERSION = 1 + + +class ReplyEffectStatus(str, Enum): + """回复效果记录状态。""" + + PENDING = "pending" + FINALIZED = "finalized" + + +@dataclass(slots=True) +class SessionSnapshot: + """会话快照。""" + + session_id: str + platform_type_id: str + platform: str + chat_type: str + group_id: str + user_id: str + session_name: str + + +@dataclass(slots=True) +class UserSnapshot: + """用户快照。""" + + user_id: str + nickname: str + cardname: str + + +@dataclass(slots=True) +class ReplySnapshot: + """被观察的回复内容。""" + + tool_call_id: str + target_message_id: str + set_quote: bool + reply_text: str + reply_segments: List[str] + planner_reasoning: str + reference_info: str + reply_metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass(slots=True) +class FollowupMessageSnapshot: + """后续用户消息快照。""" + + message_id: str + timestamp: str + user_id: str + nickname: str + cardname: str + visible_text: str + plain_text: str + latency_seconds: float + is_target_user: bool + attachments: List[Dict[str, Any]] = field(default_factory=list) + + +@dataclass(slots=True) +class BehaviorSignals: + """行为满意度信号。""" + + continue_2turns: float = 0.0 + next_user_sentiment: float = 0.5 + user_expansion: float = 0.0 + no_correction: float = 1.0 + no_abort: float = 1.0 + evidence_source: str = "no_followup" + + +@dataclass(slots=True) +class RubricScoreItem: + """单个 LLM rubric 项。""" + + score: float = 3.0 + normalized_score: float = 0.5 + reason: str = "" + evidence_spans: List[str] = field(default_factory=list) + confidence: float = 0.0 + + +@dataclass(slots=True) +class RubricScores: + """LLM 感知质量评分。""" + + social_presence: RubricScoreItem = field(default_factory=RubricScoreItem) + warmth: RubricScoreItem = field(default_factory=RubricScoreItem) + competence: RubricScoreItem = field(default_factory=RubricScoreItem) + appropriateness: RubricScoreItem = field(default_factory=RubricScoreItem) + uncanny_risk: RubricScoreItem = field(default_factory=RubricScoreItem) + available: bool = False + + +@dataclass(slots=True) +class FrictionSignals: + """摩擦和反感信号。""" + + explicit_negative: float = 0.0 + repair_loop: float = 0.0 + uncanny_risk: float = 0.5 + evidence_messages: List[str] = field(default_factory=list) + + +@dataclass(slots=True) +class ReplyEffectScores: + """最终效果评分。""" + + asi: float + behavior_score: float + relational_score: float + friction_score: float + behavior_signals: BehaviorSignals + rubric_scores: RubricScores + friction_signals: FrictionSignals + judge_error: str = "" + + +@dataclass(slots=True) +class ReplyEffectRecord: + """一条回复效果观察记录。""" + + effect_id: str + status: ReplyEffectStatus + created_at: str + updated_at: str + session: SessionSnapshot + reply: ReplySnapshot + target_user: UserSnapshot + context_snapshot: List[Dict[str, Any]] = field(default_factory=list) + followup_messages: List[FollowupMessageSnapshot] = field(default_factory=list) + scores: Optional[ReplyEffectScores] = None + finalized_at: str = "" + finalize_reason: str = "" + confidence_note: str = "" + followup_summary: Dict[str, Any] = field(default_factory=dict) + file_path: Optional[Path] = field(default=None, repr=False) + + def to_json_dict(self) -> Dict[str, Any]: + """转换为可直接写入 JSON 的字典。""" + + payload = asdict(self) + payload["schema_version"] = SCHEMA_VERSION + payload["status"] = self.status.value + payload.pop("file_path", None) + return payload + + +def now_iso() -> str: + """返回本地时区 ISO 时间字符串。""" + + return datetime.now().astimezone().isoformat(timespec="seconds") diff --git a/src/maisaka/reply_effect/path_utils.py b/src/maisaka/reply_effect/path_utils.py new file mode 100644 index 00000000..4114bcbf --- /dev/null +++ b/src/maisaka/reply_effect/path_utils.py @@ -0,0 +1,24 @@ +"""回复效果日志路径工具。""" + +from pathlib import Path + +from src.maisaka.display.preview_path_utils import build_preview_chat_dir_name, normalize_preview_name + +BASE_DIR = Path("logs") / "maisaka_reply_effect" + + +def build_reply_effect_chat_dir_name(session_id: str) -> str: + """构建回复效果日志的会话目录名。""" + + chat_dir_name = build_preview_chat_dir_name(session_id) + normalized_chat_dir_name = normalize_preview_name(chat_dir_name) + if normalized_chat_dir_name != "unknown": + return normalized_chat_dir_name + return "unknown_chat" + + +def build_reply_effect_chat_dir(session_id: str, base_dir: Path | None = None) -> Path: + """返回某个会话对应的回复效果日志目录。""" + + root_dir = base_dir or BASE_DIR + return root_dir / build_reply_effect_chat_dir_name(session_id) diff --git a/src/maisaka/reply_effect/scoring.py b/src/maisaka/reply_effect/scoring.py new file mode 100644 index 00000000..41a38b5b --- /dev/null +++ b/src/maisaka/reply_effect/scoring.py @@ -0,0 +1,262 @@ +"""回复效果评分规则。""" + +from __future__ import annotations + +from typing import Iterable, List + +import re + +from .models import BehaviorSignals, FollowupMessageSnapshot, FrictionSignals, ReplyEffectScores, RubricScores + +NEGATIVE_PATTERNS = ( + "你没懂", + "没懂", + "不是这个意思", + "不是", + "别这样", + "好烦", + "烦死", + "算了", + "离谱", + "无语", + "你在说什么", + "听不懂", + "看不懂", + "错了", + "不对", +) +REPAIR_PATTERNS = ( + "我是说", + "我说的是", + "重新说", + "再说一遍", + "不是问", + "你理解错", + "你搞错", + "我问的是", + "纠正", +) +POSITIVE_PATTERNS = ( + "谢谢", + "感谢", + "懂了", + "明白了", + "可以", + "有用", + "不错", + "好耶", + "太好了", +) + + +def clamp(value: float, lower: float = 0.0, upper: float = 1.0) -> float: + """限制数值范围。""" + + return max(lower, min(upper, value)) + + +def score_reply_effect( + followups: List[FollowupMessageSnapshot], + rubric_scores: RubricScores, + *, + target_user_id: str = "", + judge_error: str = "", +) -> ReplyEffectScores: + """计算一条回复的 ASI 分数。""" + + behavior_signals = build_behavior_signals(followups, target_user_id=target_user_id) + friction_signals = build_friction_signals(followups, rubric_scores, target_user_id=target_user_id) + behavior_score = calculate_behavior_score(behavior_signals) + relational_score = calculate_relational_score(rubric_scores) + friction_score = calculate_friction_score(friction_signals) + asi = calculate_asi_score(behavior_score, relational_score, friction_score) + return ReplyEffectScores( + asi=asi, + behavior_score=round(behavior_score, 4), + relational_score=round(relational_score, 4), + friction_score=round(friction_score, 4), + behavior_signals=behavior_signals, + rubric_scores=rubric_scores, + friction_signals=friction_signals, + judge_error=judge_error, + ) + + +def build_behavior_signals( + followups: List[FollowupMessageSnapshot], + *, + target_user_id: str = "", +) -> BehaviorSignals: + """从后续消息构造行为满意度信号。""" + + target_followups = [ + followup + for followup in followups + if target_user_id and followup.user_id == target_user_id + ] + evidence_followups = target_followups or followups + evidence_source = ( + "target_user_feedback" + if target_followups + else "indirect_session_feedback" + if followups + else "no_followup" + ) + if not evidence_followups: + return BehaviorSignals( + continue_2turns=0.0, + next_user_sentiment=0.5, + user_expansion=0.0, + no_correction=1.0, + no_abort=0.6, + evidence_source=evidence_source, + ) + + combined_text = "\n".join(followup.plain_text for followup in evidence_followups) + negative_count = count_matches(combined_text, NEGATIVE_PATTERNS) + repair_count = count_matches(combined_text, REPAIR_PATTERNS) + positive_count = count_matches(combined_text, POSITIVE_PATTERNS) + average_length = sum(len(followup.plain_text.strip()) for followup in evidence_followups) / len(evidence_followups) + + return BehaviorSignals( + continue_2turns=1.0 if len(evidence_followups) >= 2 else 0.5, + next_user_sentiment=estimate_sentiment(positive_count, negative_count, repair_count), + user_expansion=clamp((average_length - 8.0) / 42.0), + no_correction=0.0 if repair_count > 0 else 1.0, + no_abort=0.0 if negative_count >= 2 or "算了" in combined_text else 1.0, + evidence_source=evidence_source, + ) + + +def build_friction_signals( + followups: List[FollowupMessageSnapshot], + rubric_scores: RubricScores, + *, + target_user_id: str = "", +) -> FrictionSignals: + """从后续消息和 LLM judge 结果构造摩擦信号。""" + + evidence_messages: List[str] = [] + explicit_negative = 0.0 + repair_loop = 0.0 + for followup in followups: + text = followup.plain_text + source_weight = 1.0 if target_user_id and followup.user_id == target_user_id else 0.65 + if any(pattern in text for pattern in NEGATIVE_PATTERNS): + explicit_negative = max(explicit_negative, source_weight) + evidence_messages.append(followup.message_id) + if any(pattern in text for pattern in REPAIR_PATTERNS): + repair_loop = max(repair_loop, source_weight) + evidence_messages.append(followup.message_id) + + uncanny_risk = rubric_scores.uncanny_risk.normalized_score if rubric_scores.available else 0.5 + return FrictionSignals( + explicit_negative=round(clamp(explicit_negative), 4), + repair_loop=round(clamp(repair_loop), 4), + uncanny_risk=round(clamp(uncanny_risk), 4), + evidence_messages=sorted(set(evidence_messages)), + ) + + +def calculate_behavior_score(signals: BehaviorSignals) -> float: + """计算行为满意度分数。""" + + return clamp( + 0.30 * signals.continue_2turns + + 0.25 * signals.next_user_sentiment + + 0.20 * signals.user_expansion + + 0.15 * signals.no_correction + + 0.10 * signals.no_abort + ) + + +def calculate_relational_score(rubric_scores: RubricScores) -> float: + """计算感知质量分数。""" + + if not rubric_scores.available: + return 0.5 + return clamp( + 0.35 * rubric_scores.social_presence.normalized_score + + 0.25 * rubric_scores.warmth.normalized_score + + 0.25 * rubric_scores.competence.normalized_score + + 0.15 * rubric_scores.appropriateness.normalized_score + ) + + +def calculate_friction_score(signals: FrictionSignals) -> float: + """计算摩擦惩罚分数。""" + + return clamp( + 0.40 * signals.explicit_negative + + 0.30 * signals.repair_loop + + 0.30 * signals.uncanny_risk + ) + + +def calculate_asi_score(behavior_score: float, relational_score: float, friction_score: float) -> float: + """计算 0-100 的 ASI 总分,摩擦分越高扣分越多。""" + + return round( + clamp( + 0.45 * behavior_score + + 0.35 * relational_score + + 0.20 * (1.0 - friction_score) + ) + * 100, + 2, + ) + + +def has_explicit_negative_feedback( + followups: Iterable[FollowupMessageSnapshot], + *, + target_user_id: str = "", + allow_indirect: bool = False, +) -> bool: + """判断是否出现可提前结算的明确负反馈。""" + + for followup in followups: + if target_user_id and followup.user_id != target_user_id and not allow_indirect: + continue + if any(pattern in followup.plain_text for pattern in NEGATIVE_PATTERNS): + return True + return False + + +def has_repair_loop( + followups: Iterable[FollowupMessageSnapshot], + *, + target_user_id: str = "", + allow_indirect: bool = False, +) -> bool: + """判断是否出现修复循环。""" + + repair_count = 0 + for followup in followups: + if target_user_id and followup.user_id != target_user_id and not allow_indirect: + continue + if any(pattern in followup.plain_text for pattern in REPAIR_PATTERNS): + repair_count += 1 + return repair_count >= 1 + + +def count_matches(text: str, patterns: Iterable[str]) -> int: + """统计模式命中次数。""" + + return sum(1 for pattern in patterns if pattern and pattern in text) + + +def estimate_sentiment(positive_count: int, negative_count: int, repair_count: int) -> float: + """用轻量规则估计后续消息情绪。""" + + raw_score = 0.5 + 0.2 * positive_count - 0.25 * negative_count - 0.15 * repair_count + return round(clamp(raw_score), 4) + + +def normalize_text_for_prompt(text: str, limit: int = 800) -> str: + """清理用于评分 prompt 的文本。""" + + normalized_text = re.sub(r"\s+", " ", str(text or "")).strip() + if len(normalized_text) <= limit: + return normalized_text + return normalized_text[: limit - 1] + "…" diff --git a/src/maisaka/reply_effect/storage.py b/src/maisaka/reply_effect/storage.py new file mode 100644 index 00000000..7c296add --- /dev/null +++ b/src/maisaka/reply_effect/storage.py @@ -0,0 +1,75 @@ +"""回复效果独立 JSON 存储。""" + +from pathlib import Path +from typing import Dict + +import json +import time + +from .models import ReplyEffectRecord +from .path_utils import BASE_DIR, build_reply_effect_chat_dir, normalize_preview_name + + +class ReplyEffectStorage: + """负责回复效果记录的独立 JSON 文件存储。""" + + _MAX_RECORDS_PER_CHAT = 1024 + _TRIM_COUNT = 100 + + def __init__(self, base_dir: Path | None = None) -> None: + self._base_dir = base_dir or BASE_DIR + + def create_record_file(self, record: ReplyEffectRecord) -> Path: + """为新记录创建文件路径并写入初始 JSON。""" + + chat_dir_name = normalize_preview_name(record.session.platform_type_id) + if chat_dir_name == "unknown": + chat_dir = build_reply_effect_chat_dir(record.session.session_id, self._base_dir).resolve() + else: + chat_dir = (self._base_dir / chat_dir_name).resolve() + chat_dir.mkdir(parents=True, exist_ok=True) + timestamp_ms = int(time.time() * 1000) + safe_effect_id = record.effect_id.replace("-", "") + file_path = chat_dir / f"{timestamp_ms}_{safe_effect_id}.json" + record.file_path = file_path + self.save_record(record) + self._trim_overflow(chat_dir) + return file_path + + def save_record(self, record: ReplyEffectRecord) -> None: + """原子写入记录 JSON。""" + + if record.file_path is None: + self.create_record_file(record) + return + + file_path = record.file_path + file_path.parent.mkdir(parents=True, exist_ok=True) + temp_path = file_path.with_name(f".{file_path.name}.tmp") + temp_path.write_text( + json.dumps(record.to_json_dict(), ensure_ascii=False, indent=2, default=str), + encoding="utf-8", + ) + temp_path.replace(file_path) + + @staticmethod + def read_json(file_path: Path) -> Dict[str, object]: + """读取已保存的 JSON 文件。""" + + return json.loads(file_path.read_text(encoding="utf-8")) + + def _trim_overflow(self, chat_dir: Path) -> None: + """超过容量时删除最旧的回复效果记录。""" + + files = [file_path for file_path in chat_dir.glob("*.json") if file_path.is_file()] + if len(files) <= self._MAX_RECORDS_PER_CHAT: + return + + sorted_files = sorted(files, key=lambda file_path: file_path.stat().st_mtime) + overflow_count = len(files) - self._MAX_RECORDS_PER_CHAT + trim_count = min(len(sorted_files), max(self._TRIM_COUNT, overflow_count)) + for old_file in sorted_files[:trim_count]: + try: + old_file.unlink() + except FileNotFoundError: + continue diff --git a/src/maisaka/reply_effect/tracker.py b/src/maisaka/reply_effect/tracker.py new file mode 100644 index 00000000..39895403 --- /dev/null +++ b/src/maisaka/reply_effect/tracker.py @@ -0,0 +1,267 @@ +"""会话级回复效果观察器。""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any, Dict, List + +import asyncio +import time +import uuid + +from src.chat.message_receive.message import SessionMessage +from src.maisaka.history_utils import build_session_message_visible_text + +from .image_utils import extract_visual_attachments_from_sequence +from .judge import JudgeRunner, judge_reply_effect +from .models import ( + FollowupMessageSnapshot, + ReplyEffectRecord, + ReplyEffectStatus, + ReplySnapshot, + SessionSnapshot, + UserSnapshot, + now_iso, +) +from .path_utils import build_reply_effect_chat_dir_name +from .scoring import ( + has_explicit_negative_feedback, + has_repair_loop, + score_reply_effect, +) +from .storage import ReplyEffectStorage + +TARGET_USER_FOLLOWUP_LIMIT = 2 +SESSION_FOLLOWUP_LIMIT = 5 +OBSERVATION_WINDOW_SECONDS = 600.0 + + +class ReplyEffectTracker: + """追踪单个 Maisaka 会话内 reply 工具回复后的用户反馈。""" + + def __init__( + self, + *, + session_id: str, + session_name: str, + chat_stream: Any, + judge_runner: JudgeRunner | None = None, + storage: ReplyEffectStorage | None = None, + ) -> None: + self._session_id = session_id + self._session_name = session_name + self._chat_stream = chat_stream + self._judge_runner = judge_runner + self._storage = storage or ReplyEffectStorage() + self._pending_records: Dict[str, ReplyEffectRecord] = {} + self._timeout_tasks: Dict[str, asyncio.Task[None]] = {} + + async def record_reply( + self, + *, + tool_call_id: str, + target_message: SessionMessage, + set_quote: bool, + reply_text: str, + reply_segments: List[str], + planner_reasoning: str, + reference_info: str, + reply_metadata: Dict[str, Any] | None = None, + context_snapshot: List[Dict[str, Any]] | None = None, + ) -> ReplyEffectRecord: + """登记一条已经成功发出的 reply 回复。""" + + effect_id = str(uuid.uuid4()) + target_user_info = target_message.message_info.user_info + record = ReplyEffectRecord( + effect_id=effect_id, + status=ReplyEffectStatus.PENDING, + created_at=now_iso(), + updated_at=now_iso(), + session=self._build_session_snapshot(), + reply=ReplySnapshot( + tool_call_id=tool_call_id, + target_message_id=target_message.message_id, + set_quote=set_quote, + reply_text=reply_text, + reply_segments=list(reply_segments), + planner_reasoning=planner_reasoning, + reference_info=reference_info, + reply_metadata=dict(reply_metadata or {}), + ), + target_user=UserSnapshot( + user_id=str(target_user_info.user_id or "").strip(), + nickname=str(target_user_info.user_nickname or "").strip(), + cardname=str(target_user_info.user_cardname or "").strip(), + ), + context_snapshot=list(context_snapshot or []), + ) + self._storage.create_record_file(record) + self._pending_records[effect_id] = record + self._timeout_tasks[effect_id] = asyncio.create_task(self._finalize_after_timeout(effect_id)) + return record + + async def observe_user_message(self, message: SessionMessage) -> None: + """观察一条后续用户消息,并在满足规则时完成相关 pending 记录。""" + + if not self._pending_records or message.session_id != self._session_id: + return + + for effect_id, record in list(self._pending_records.items()): + if record.status != ReplyEffectStatus.PENDING: + continue + followup = self._build_followup_snapshot(message, record) + record.followup_messages.append(followup) + record.updated_at = now_iso() + self._storage.save_record(record) + + reason = self._resolve_finalize_reason(record) + if reason: + await self.finalize(effect_id, reason) + + async def finalize_all(self, reason: str = "runtime_stop") -> None: + """强制完成当前会话所有 pending 记录。""" + + for effect_id in list(self._pending_records.keys()): + await self.finalize(effect_id, reason) + + async def finalize(self, effect_id: str, reason: str) -> None: + """完成一条 pending 记录并写回 JSON。""" + + record = self._pending_records.pop(effect_id, None) + if record is None or record.status == ReplyEffectStatus.FINALIZED: + return + + timeout_task = self._timeout_tasks.pop(effect_id, None) + current_task = asyncio.current_task() + if timeout_task is not None and timeout_task is not current_task: + timeout_task.cancel() + + rubric_scores, judge_error = await judge_reply_effect(record, self._judge_runner) + record.scores = score_reply_effect( + record.followup_messages, + rubric_scores, + target_user_id=record.target_user.user_id, + judge_error=judge_error, + ) + record.status = ReplyEffectStatus.FINALIZED + record.finalized_at = now_iso() + record.updated_at = record.finalized_at + record.finalize_reason = reason + record.confidence_note = self._build_confidence_note(record) + record.followup_summary = self._build_followup_summary(record) + self._storage.save_record(record) + + def _build_session_snapshot(self) -> SessionSnapshot: + platform = str(getattr(self._chat_stream, "platform", "") or "").strip() + group_id = str(getattr(self._chat_stream, "group_id", "") or "").strip() + user_id = str(getattr(self._chat_stream, "user_id", "") or "").strip() + is_group_session = bool(getattr(self._chat_stream, "is_group_session", False)) + return SessionSnapshot( + session_id=self._session_id, + platform_type_id=build_reply_effect_chat_dir_name(self._session_id), + platform=platform, + chat_type="group" if is_group_session else "private", + group_id=group_id, + user_id=user_id, + session_name=self._session_name, + ) + + def _build_followup_snapshot( + self, + message: SessionMessage, + record: ReplyEffectRecord, + ) -> FollowupMessageSnapshot: + user_info = message.message_info.user_info + plain_text = str(message.processed_plain_text or "").strip() + try: + visible_text = build_session_message_visible_text(message) + except Exception: + visible_text = plain_text + latency_seconds = max(0.0, time.time() - _parse_iso_timestamp(record.created_at)) + user_id = str(user_info.user_id or "").strip() + return FollowupMessageSnapshot( + message_id=str(message.message_id or "").strip(), + timestamp=_message_timestamp_to_iso(message), + user_id=user_id, + nickname=str(user_info.user_nickname or "").strip(), + cardname=str(user_info.user_cardname or "").strip(), + visible_text=visible_text, + plain_text=plain_text, + latency_seconds=round(latency_seconds, 3), + is_target_user=bool(record.target_user.user_id and user_id == record.target_user.user_id), + attachments=extract_visual_attachments_from_sequence(message.raw_message), + ) + + def _resolve_finalize_reason(self, record: ReplyEffectRecord) -> str: + target_user_id = record.target_user.user_id + target_followups = [ + followup + for followup in record.followup_messages + if target_user_id and followup.user_id == target_user_id + ] + has_target_feedback = bool(target_followups) + if has_explicit_negative_feedback(target_followups, target_user_id=target_user_id, allow_indirect=False): + return "explicit_negative" + if has_repair_loop(target_followups, target_user_id=target_user_id, allow_indirect=False): + return "repair_loop" + if len(target_followups) >= TARGET_USER_FOLLOWUP_LIMIT: + return "target_user_followups" + + if not target_user_id or not has_target_feedback: + allow_indirect = not target_user_id + if has_explicit_negative_feedback( + record.followup_messages, + target_user_id=target_user_id, + allow_indirect=allow_indirect, + ): + return "explicit_negative" + if has_repair_loop( + record.followup_messages, + target_user_id=target_user_id, + allow_indirect=allow_indirect, + ): + return "repair_loop" + if len(record.followup_messages) >= SESSION_FOLLOWUP_LIMIT: + return "session_followups_limit" + + return "" + + async def _finalize_after_timeout(self, effect_id: str) -> None: + try: + await asyncio.sleep(OBSERVATION_WINDOW_SECONDS) + await self.finalize(effect_id, "window_timeout") + except asyncio.CancelledError: + return + + @staticmethod + def _build_confidence_note(record: ReplyEffectRecord) -> str: + if not record.followup_messages: + return "没有观察到后续用户消息,行为分使用保守中性信号。" + if any(followup.is_target_user for followup in record.followup_messages): + return "行为反馈包含回复对象本人的后续发言。" + return "行为反馈来自同会话其他用户,不是回复对象本人,置信度较低。" + + @staticmethod + def _build_followup_summary(record: ReplyEffectRecord) -> Dict[str, Any]: + target_count = sum(1 for followup in record.followup_messages if followup.is_target_user) + return { + "total_count": len(record.followup_messages), + "target_user_count": target_count, + "other_user_count": len(record.followup_messages) - target_count, + "target_user_id": record.target_user.user_id, + } + + +def _message_timestamp_to_iso(message: SessionMessage) -> str: + timestamp = getattr(message, "timestamp", None) + if isinstance(timestamp, datetime): + return timestamp.astimezone().isoformat(timespec="seconds") + return now_iso() + + +def _parse_iso_timestamp(value: str) -> float: + try: + return datetime.fromisoformat(value).timestamp() + except ValueError: + return time.time() diff --git a/src/maisaka/runtime.py b/src/maisaka/runtime.py index 37c79180..c91b972c 100644 --- a/src/maisaka/runtime.py +++ b/src/maisaka/runtime.py @@ -1,6 +1,7 @@ """Maisaka 非 CLI 运行时。""" from collections import deque +from datetime import datetime from math import ceil from typing import Any, Literal, Optional, Sequence @@ -33,11 +34,13 @@ from src.plugin_runtime.tool_provider import PluginToolProvider from src.plugin_runtime.hook_payloads import deserialize_prompt_messages from .chat_loop_service import ChatResponse, MaisakaChatLoopService -from .context_messages import LLMContextMessage +from .context_messages import LLMContextMessage, ReferenceMessage, ReferenceMessageType from .display.display_utils import build_tool_call_summary_lines, format_token_count from .display.prompt_cli_renderer import PromptCLIVisualizer from .display.stage_status_board import remove_stage_status, update_stage_status from .reasoning_engine import MaisakaReasoningEngine +from .reply_effect import ReplyEffectTracker +from .reply_effect.image_utils import extract_visual_attachments_from_sequence from .tool_provider import MaisakaBuiltinToolProvider logger = get_logger("maisaka_runtime") @@ -120,8 +123,20 @@ class MaisakaHeartFlowChatting: self._reasoning_engine = MaisakaReasoningEngine(self) self._tool_registry = ToolRegistry() + self._reply_effect_tracker = ReplyEffectTracker( + session_id=self.session_id, + session_name=self.session_name, + chat_stream=self.chat_stream, + judge_runner=self._run_reply_effect_judge, + ) self._register_tool_providers() + @staticmethod + def _is_reply_effect_tracking_enabled() -> bool: + """判断是否启用回复效果评分追踪。""" + + return bool(global_config.debug.enable_reply_effect_tracking) + def _update_stage_status(self, stage: str, detail: str = "", *, round_text: str = "") -> None: """更新当前会话的阶段状态。""" @@ -171,6 +186,8 @@ class MaisakaHeartFlowChatting: finally: self._internal_loop_task = None + if self._is_reply_effect_tracking_enabled(): + await self._reply_effect_tracker.finalize_all("runtime_stop") await self._tool_registry.close() self._mcp_manager = None self._mcp_host_bridge = None @@ -230,6 +247,8 @@ class MaisakaHeartFlowChatting: self.message_cache.append(message) self._message_received_at_by_id[message.message_id] = received_at self._source_messages_by_id[message.message_id] = message + if self._is_reply_effect_tracking_enabled(): + asyncio.create_task(self._reply_effect_tracker.observe_user_message(message)) if self._agent_state == self._STATE_RUNNING: self._message_debounce_required = True if self._agent_state == self._STATE_RUNNING and self._planner_interrupt_flag is not None: @@ -266,6 +285,79 @@ class MaisakaHeartFlowChatting: talk_value = max(0.01, float(ChatConfigUtils.get_talk_value(self.session_id))) return max(0.01, talk_value * self._talk_frequency_adjust) + async def track_reply_effect( + self, + *, + tool_call_id: str, + target_message: SessionMessage, + set_quote: bool, + reply_text: str, + reply_segments: list[str], + planner_reasoning: str, + reference_info: str, + reply_metadata: Optional[dict[str, Any]] = None, + replyer_context_messages: Optional[Sequence[LLMContextMessage]] = None, + ) -> None: + """登记一次已成功发送的 reply 工具回复,供后续用户反馈评分。""" + + if not self._is_reply_effect_tracking_enabled(): + return + + try: + context_snapshot = self._build_reply_effect_context_snapshot( + context_messages=replyer_context_messages, + exclude_reply_segments=reply_segments if replyer_context_messages is None else None, + ) + enriched_reply_metadata = dict(reply_metadata or {}) + enriched_reply_metadata["replyer_context_count"] = ( + len(replyer_context_messages) if replyer_context_messages is not None else len(self._chat_history) + ) + enriched_reply_metadata["recorded_context_count"] = len(context_snapshot) + await self._reply_effect_tracker.record_reply( + tool_call_id=tool_call_id, + target_message=target_message, + set_quote=set_quote, + reply_text=reply_text, + reply_segments=reply_segments, + planner_reasoning=planner_reasoning, + reference_info=reference_info, + reply_metadata=enriched_reply_metadata, + context_snapshot=context_snapshot, + ) + except Exception as exc: + logger.warning(f"{self.log_prefix} 创建回复效果观察记录失败: {exc}") + + def _build_reply_effect_context_snapshot( + self, + *, + context_messages: Optional[Sequence[LLMContextMessage]] = None, + exclude_reply_segments: Optional[Sequence[str]] = None, + ) -> list[dict[str, Any]]: + """构建回复效果观察使用的上下文快照。 + + 优先记录 replyer 当次生成时实际收到的完整上下文列表;只有旧调用未传入时才回退到当前运行时历史。 + """ + + source_messages = list(context_messages) if context_messages is not None else list(self._chat_history) + snapshot: list[dict[str, Any]] = [] + excluded_segments = [segment.strip() for segment in (exclude_reply_segments or []) if segment.strip()] + for message in source_messages: + text = str(message.processed_plain_text or "").strip() + if not text: + continue + if message.source == "guided_reply" and any(segment in text for segment in excluded_segments): + continue + snapshot.append( + { + "source": message.source, + "role": message.role, + "timestamp": message.timestamp.isoformat(timespec="seconds"), + "text": text, + "attachments": extract_visual_attachments_from_sequence(getattr(message, "raw_message", None)), + } + ) + return snapshot + def _get_message_trigger_threshold(self) -> int: """根据回复频率折算出触发一轮循环所需的消息数。""" effective_frequency = min(1.0, self._get_effective_reply_frequency()) @@ -496,6 +588,27 @@ class MaisakaHeartFlowChatting: tool_definitions=[] if tool_definitions is None else tool_definitions, ) + async def _run_reply_effect_judge(self, prompt: str) -> str: + """运行回复效果观察器使用的临时 LLM 评审。""" + + judge_message = ReferenceMessage( + content=prompt, + timestamp=datetime.now(), + reference_type=ReferenceMessageType.TOOL_HINT, + remaining_uses_value=1, + display_prefix="[回复效果评分任务]", + ) + response = await self.run_sub_agent( + context_message_limit=1, + system_prompt="你是回复效果评分器。请严格按用户给出的 JSON 格式输出,不要输出 JSON 之外的内容。", + request_kind="reply_effect_judge", + extra_messages=[judge_message], + max_tokens=900, + temperature=0.1, + tool_definitions=[], + ) + return (response.content or "").strip() + def set_current_action_tool_names(self, tool_names: Sequence[str]) -> None: """记录当前 Action Loop 已实际暴露给 planner 的工具名集合。"""