From abada5588477e657237d97605c49c46b4dfe515f Mon Sep 17 00:00:00 2001
From: SengokuCola <1026294844@qq.com>
Date: Fri, 17 Apr 2026 23:05:46 +0800
Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9A=E6=B7=BB=E5=8A=A0=E5=9B=9E?=
=?UTF-8?q?=E5=A4=8D=E5=90=8E=E6=89=93=E5=88=86=E8=BF=BD=E8=B8=AA=E5=99=A8?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../analyze_reply_effect_score_correlation.py | 336 +++
scripts/preview_reply_effect_scores.py | 2219 +++++++++++++++++
src/chat/replyer/maisaka_generator_base.py | 9 -
src/config/config.py | 6 +-
src/config/official_configs.py | 36 +-
src/maisaka/builtin_tool/reply.py | 15 +-
src/maisaka/chat_loop_service.py | 9 -
src/maisaka/reply_effect/__init__.py | 5 +
src/maisaka/reply_effect/image_utils.py | 100 +
src/maisaka/reply_effect/judge.py | 116 +
src/maisaka/reply_effect/models.py | 164 ++
src/maisaka/reply_effect/path_utils.py | 24 +
src/maisaka/reply_effect/scoring.py | 262 ++
src/maisaka/reply_effect/storage.py | 75 +
src/maisaka/reply_effect/tracker.py | 267 ++
src/maisaka/runtime.py | 115 +-
16 files changed, 3707 insertions(+), 51 deletions(-)
create mode 100644 scripts/analyze_reply_effect_score_correlation.py
create mode 100644 scripts/preview_reply_effect_scores.py
create mode 100644 src/maisaka/reply_effect/__init__.py
create mode 100644 src/maisaka/reply_effect/image_utils.py
create mode 100644 src/maisaka/reply_effect/judge.py
create mode 100644 src/maisaka/reply_effect/models.py
create mode 100644 src/maisaka/reply_effect/path_utils.py
create mode 100644 src/maisaka/reply_effect/scoring.py
create mode 100644 src/maisaka/reply_effect/storage.py
create mode 100644 src/maisaka/reply_effect/tracker.py
diff --git a/scripts/analyze_reply_effect_score_correlation.py b/scripts/analyze_reply_effect_score_correlation.py
new file mode 100644
index 00000000..3f3da3cb
--- /dev/null
+++ b/scripts/analyze_reply_effect_score_correlation.py
@@ -0,0 +1,336 @@
+from pathlib import Path
+from typing import Any
+
+from scipy import stats
+
+import argparse
+import csv
+import json
+import math
+
+
+DEFAULT_LOG_DIR = Path("logs") / "maisaka_reply_effect"
+DEFAULT_MANUAL_DIR = Path("logs") / "maisaka_reply_effect_manual"
+
+
+METRIC_SPECS = [
+ ("总分", "asi", "ASI 自动总分"),
+ ("大项", "behavior_score", "行为满意度 B"),
+ ("大项", "relational_score", "感知质量 R"),
+ ("大项", "friction_score", "摩擦风险 F"),
+ ("大项", "friction_quality_score", "低摩擦质量分"),
+ ("行为子项", "behavior_signals.continue_2turns", "继续两轮"),
+ ("行为子项", "behavior_signals.next_user_sentiment", "后续情绪"),
+ ("行为子项", "behavior_signals.user_expansion", "用户展开"),
+ ("行为子项", "behavior_signals.no_correction", "没有纠正"),
+ ("行为子项", "behavior_signals.no_abort", "没有放弃"),
+ ("rubric 子项", "rubric_scores.social_presence.normalized_score", "社交临场感"),
+ ("rubric 子项", "rubric_scores.warmth.normalized_score", "温暖感"),
+ ("rubric 子项", "rubric_scores.competence.normalized_score", "能力/有用性"),
+ ("rubric 子项", "rubric_scores.appropriateness.normalized_score", "合适程度"),
+ ("rubric 子项", "rubric_scores.uncanny_risk.normalized_score", "违和风险 judge"),
+ ("摩擦子项", "friction_signals.explicit_negative", "明确负反馈"),
+ ("摩擦子项", "friction_signals.repair_loop", "修复循环"),
+ ("摩擦子项", "friction_signals.uncanny_risk", "违和风险"),
+]
+
+
+def normalize_name(value: str) -> str:
+ normalized = "".join(char if char.isalnum() or char in "._-" else "_" for char in str(value or "").strip())
+ normalized = normalized.strip("._")
+ return normalized or "unknown"
+
+
+def load_json_file(file_path: Path) -> dict[str, Any]:
+ try:
+ payload = json.loads(file_path.read_text(encoding="utf-8"))
+ except (OSError, json.JSONDecodeError):
+ return {}
+ return payload if isinstance(payload, dict) else {}
+
+
+def to_float(value: Any) -> float | None:
+ if value in {None, ""}:
+ return None
+ try:
+ number = float(value)
+ except (TypeError, ValueError):
+ return None
+ if math.isnan(number) or math.isinf(number):
+ return None
+ return number
+
+
+def get_nested(payload: dict[str, Any], dotted_path: str) -> Any:
+ current: Any = payload
+ for key in dotted_path.split("."):
+ if not isinstance(current, dict):
+ return None
+ current = current.get(key)
+ return current
+
+
+def annotation_path(manual_dir: Path, chat_id: str, effect_id: str) -> Path:
+ return manual_dir / normalize_name(chat_id) / f"{normalize_name(effect_id)}.json"
+
+
+def iter_records(
+ log_dir: Path,
+ manual_dir: Path,
+ *,
+ chat_id: str,
+ include_pending: bool,
+) -> list[dict[str, Any]]:
+ records: list[dict[str, Any]] = []
+ if not log_dir.exists():
+ return records
+
+ chat_dirs = [log_dir / normalize_name(chat_id)] if chat_id else [path for path in log_dir.iterdir() if path.is_dir()]
+ for chat_dir in sorted(chat_dirs):
+ if not chat_dir.exists() or not chat_dir.is_dir():
+ continue
+ for record_file in sorted(chat_dir.glob("*.json")):
+ effect_record = load_json_file(record_file)
+ if not effect_record:
+ continue
+ if not include_pending and effect_record.get("status") != "finalized":
+ continue
+
+ effect_id = str(effect_record.get("effect_id") or record_file.stem)
+ manual_record = load_json_file(annotation_path(manual_dir, chat_dir.name, effect_id))
+ manual_score = to_float(manual_record.get("manual_score"))
+ if manual_score is None:
+ manual_score_5 = to_float(manual_record.get("manual_score_5"))
+ if manual_score_5 is not None:
+ manual_score = (manual_score_5 - 1) / 4 * 100
+ if manual_score is None:
+ continue
+
+ raw_scores = effect_record.get("scores") if isinstance(effect_record.get("scores"), dict) else {}
+ scores = dict(raw_scores)
+ friction_score = to_float(scores.get("friction_score"))
+ if friction_score is not None:
+ scores["friction_quality_score"] = 1 - friction_score
+ records.append(
+ {
+ "chat_id": chat_dir.name,
+ "effect_id": effect_id,
+ "manual_score": manual_score,
+ "manual_score_5": manual_record.get("manual_score_5"),
+ "scores": scores,
+ "status": effect_record.get("status"),
+ "created_at": effect_record.get("created_at"),
+ "record_file": str(record_file),
+ }
+ )
+ return records
+
+
+def calculate_metric_stats(records: list[dict[str, Any]], metric_path: str, min_n: int) -> dict[str, Any]:
+ pairs: list[tuple[float, float]] = []
+ for record in records:
+ x_value = to_float(get_nested(record["scores"], metric_path))
+ y_value = to_float(record["manual_score"])
+ if x_value is None or y_value is None:
+ continue
+ pairs.append((x_value, y_value))
+
+ x_values = [pair[0] for pair in pairs]
+ y_values = [pair[1] for pair in pairs]
+ result: dict[str, Any] = {
+ "n": len(pairs),
+ "pearson_r": None,
+ "pearson_p": None,
+ "spearman_r": None,
+ "spearman_p": None,
+ "kendall_tau": None,
+ "kendall_p": None,
+ "note": "",
+ }
+ if len(pairs) < min_n:
+ result["note"] = f"样本数少于 {min_n}"
+ return result
+ if len(set(x_values)) < 2:
+ result["note"] = "自动评分没有变化,无法计算相关"
+ return result
+ if len(set(y_values)) < 2:
+ result["note"] = "人工评分没有变化,无法计算相关"
+ return result
+
+ pearson = stats.pearsonr(x_values, y_values)
+ spearman = stats.spearmanr(x_values, y_values)
+ kendall = stats.kendalltau(x_values, y_values)
+ result.update(
+ {
+ "pearson_r": round_float(pearson.statistic),
+ "pearson_p": round_float(pearson.pvalue),
+ "spearman_r": round_float(spearman.statistic),
+ "spearman_p": round_float(spearman.pvalue),
+ "kendall_tau": round_float(kendall.statistic),
+ "kendall_p": round_float(kendall.pvalue),
+ }
+ )
+ return result
+
+
+def round_float(value: Any) -> float | None:
+ number = to_float(value)
+ if number is None:
+ return None
+ return round(number, 6)
+
+
+def significance_label(p_value: float | None) -> str:
+ if p_value is None:
+ return ""
+ if p_value < 0.001:
+ return "***"
+ if p_value < 0.01:
+ return "**"
+ if p_value < 0.05:
+ return "*"
+ if p_value < 0.1:
+ return "."
+ return "ns"
+
+
+def build_report(records: list[dict[str, Any]], min_n: int) -> list[dict[str, Any]]:
+ report: list[dict[str, Any]] = []
+ for group, metric_path, label in METRIC_SPECS:
+ metric_stats = calculate_metric_stats(records, metric_path, min_n)
+ report.append(
+ {
+ "group": group,
+ "metric": metric_path,
+ "label": label,
+ **metric_stats,
+ "pearson_sig": significance_label(metric_stats["pearson_p"]),
+ "spearman_sig": significance_label(metric_stats["spearman_p"]),
+ "kendall_sig": significance_label(metric_stats["kendall_p"]),
+ }
+ )
+ return report
+
+
+def print_report(records: list[dict[str, Any]], report: list[dict[str, Any]]) -> None:
+ chats = sorted({record["chat_id"] for record in records})
+ print("\nMaisaka 回复效果评分相关性分析")
+ print("=" * 96)
+ print(f"已匹配人工评分记录数: {len(records)}")
+ print(f"聊天流数量: {len(chats)}")
+ if chats:
+ print(f"聊天流: {', '.join(chats[:8])}{' ...' if len(chats) > 8 else ''}")
+ print("人工分使用 manual_score,若只有 manual_score_5,则换算到 0-100 后参与计算。")
+ print("显著性: *** p<0.001, ** p<0.01, * p<0.05, . p<0.1, ns 不显著")
+ print("-" * 96)
+
+ header = (
+ f"{'分组':<14} {'指标':<34} {'n':>4} "
+ f"{'Pearson r':>10} {'p':>10} {'sig':>4} "
+ f"{'Spearman r':>11} {'p':>10} {'sig':>4} "
+ f"{'Kendall':>9} {'p':>10} {'说明'}"
+ )
+ print(header)
+ print("-" * 96)
+ for item in report:
+ print(
+ f"{item['group']:<14} "
+ f"{item['label']:<34} "
+ f"{item['n']:>4} "
+ f"{format_number(item['pearson_r']):>10} "
+ f"{format_number(item['pearson_p']):>10} "
+ f"{item['pearson_sig']:>4} "
+ f"{format_number(item['spearman_r']):>11} "
+ f"{format_number(item['spearman_p']):>10} "
+ f"{item['spearman_sig']:>4} "
+ f"{format_number(item['kendall_tau']):>9} "
+ f"{format_number(item['kendall_p']):>10} "
+ f"{item['note']}"
+ )
+
+ total = next((item for item in report if item["metric"] == "asi"), None)
+ if total:
+ print("-" * 96)
+ print(
+ "总分 ASI 与人工分的 Pearson 相关: "
+ f"r={format_number(total['pearson_r'])}, "
+ f"p={format_number(total['pearson_p'])}, "
+ f"显著性={total['pearson_sig'] or 'N/A'}"
+ )
+
+
+def format_number(value: Any) -> str:
+ if value is None:
+ return "N/A"
+ number = to_float(value)
+ if number is None:
+ return "N/A"
+ if abs(number) < 0.000001:
+ return "0"
+ return f"{number:.4g}"
+
+
+def write_csv(file_path: Path, report: list[dict[str, Any]]) -> None:
+ file_path.parent.mkdir(parents=True, exist_ok=True)
+ fieldnames = [
+ "group",
+ "metric",
+ "label",
+ "n",
+ "pearson_r",
+ "pearson_p",
+ "pearson_sig",
+ "spearman_r",
+ "spearman_p",
+ "spearman_sig",
+ "kendall_tau",
+ "kendall_p",
+ "kendall_sig",
+ "note",
+ ]
+ with file_path.open("w", encoding="utf-8-sig", newline="") as csv_file:
+ writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
+ writer.writeheader()
+ writer.writerows(report)
+
+
+def write_json(file_path: Path, records: list[dict[str, Any]], report: list[dict[str, Any]]) -> None:
+ file_path.parent.mkdir(parents=True, exist_ok=True)
+ payload = {
+ "matched_record_count": len(records),
+ "chat_count": len({record["chat_id"] for record in records}),
+ "report": report,
+ }
+ file_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser(description="分析 Maisaka 回复效果自动评分与人工评分的相关性和显著性。")
+ parser.add_argument("--log-dir", type=Path, default=DEFAULT_LOG_DIR, help="自动评分 JSON 目录")
+ parser.add_argument("--manual-dir", type=Path, default=DEFAULT_MANUAL_DIR, help="人工评分 JSON 目录")
+ parser.add_argument("--chat-id", default="", help="只分析某个 platform_type_id,例如 qq_group_1028699246")
+ parser.add_argument("--include-pending", action="store_true", help="包含尚未 finalized 的记录")
+ parser.add_argument("--min-n", type=int, default=3, help="计算相关性需要的最小样本数,默认 3")
+ parser.add_argument("--csv", type=Path, default=None, help="把统计结果另存为 CSV")
+ parser.add_argument("--json", type=Path, default=None, help="把统计结果另存为 JSON")
+ args = parser.parse_args()
+
+ records = iter_records(
+ args.log_dir,
+ args.manual_dir,
+ chat_id=args.chat_id,
+ include_pending=args.include_pending,
+ )
+ report = build_report(records, max(2, args.min_n))
+ print_report(records, report)
+
+ if args.csv:
+ write_csv(args.csv, report)
+ print(f"\nCSV 已保存: {args.csv}")
+ if args.json:
+ write_json(args.json, records, report)
+ print(f"JSON 已保存: {args.json}")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/preview_reply_effect_scores.py b/scripts/preview_reply_effect_scores.py
new file mode 100644
index 00000000..86648393
--- /dev/null
+++ b/scripts/preview_reply_effect_scores.py
@@ -0,0 +1,2219 @@
+from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
+from pathlib import Path
+from typing import Any
+from urllib.parse import parse_qs, urlparse
+
+import argparse
+import json
+import mimetypes
+import time
+import webbrowser
+
+
+DEFAULT_LOG_DIR = Path("logs") / "maisaka_reply_effect"
+DEFAULT_MANUAL_DIR = Path("logs") / "maisaka_reply_effect_manual"
+DEFAULT_HOST = "127.0.0.1"
+DEFAULT_PORT = 8765
+
+
+def normalize_name(value: str) -> str:
+ normalized = "".join(char if char.isalnum() or char in "._-" else "_" for char in str(value or "").strip())
+ normalized = normalized.strip("._")
+ return normalized or "unknown"
+
+
+def load_json_file(file_path: Path) -> dict[str, Any]:
+ try:
+ payload = json.loads(file_path.read_text(encoding="utf-8"))
+ except (OSError, json.JSONDecodeError):
+ return {}
+ return payload if isinstance(payload, dict) else {}
+
+
+def write_json_file(file_path: Path, payload: dict[str, Any]) -> None:
+ file_path.parent.mkdir(parents=True, exist_ok=True)
+ temp_path = file_path.with_name(f".{file_path.name}.tmp")
+ temp_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2, default=str), encoding="utf-8")
+ temp_path.replace(file_path)
+
+
+class ReplyEffectRepository:
+ def __init__(self, log_dir: Path, manual_dir: Path) -> None:
+ self.log_dir = log_dir
+ self.manual_dir = manual_dir
+
+ def list_chats(self) -> list[dict[str, Any]]:
+ chats: list[dict[str, Any]] = []
+ if not self.log_dir.exists():
+ return chats
+
+ for chat_dir in sorted(path for path in self.log_dir.iterdir() if path.is_dir()):
+ records = list(chat_dir.glob("*.json"))
+ annotated_count = sum(1 for record_file in records if self._annotation_path(chat_dir.name, record_file).exists())
+ finalized_count = 0
+ pending_count = 0
+ for record_file in records:
+ payload = load_json_file(record_file)
+ if payload.get("status") == "finalized":
+ finalized_count += 1
+ else:
+ pending_count += 1
+ chats.append(
+ {
+ "chat_id": chat_dir.name,
+ "record_count": len(records),
+ "finalized_count": finalized_count,
+ "pending_count": pending_count,
+ "annotated_count": annotated_count,
+ }
+ )
+ return chats
+
+ def list_records(
+ self,
+ *,
+ chat_id: str | None = None,
+ status: str = "",
+ annotated: str = "",
+ ) -> list[dict[str, Any]]:
+ records: list[dict[str, Any]] = []
+ for record_file in self._iter_record_files(chat_id):
+ payload = load_json_file(record_file)
+ if not payload:
+ continue
+ summary = self._build_record_summary(record_file, payload)
+ if status and summary["status"] != status:
+ continue
+ if annotated == "yes" and summary["manual"] is None:
+ continue
+ if annotated == "no" and summary["manual"] is not None:
+ continue
+ records.append(summary)
+ return sorted(records, key=lambda item: str(item.get("created_at") or ""), reverse=True)
+
+ def get_record(self, chat_id: str, effect_id: str) -> dict[str, Any]:
+ record_file = self._find_record_file(chat_id, effect_id)
+ if record_file is None:
+ return {}
+ payload = load_json_file(record_file)
+ if not payload:
+ return {}
+ payload["_manual"] = self.get_annotation(chat_id, effect_id)
+ payload["_record_path"] = str(record_file)
+ return payload
+
+ def get_annotation(self, chat_id: str, effect_id: str) -> dict[str, Any] | None:
+ annotation_path = self._annotation_path(chat_id, effect_id)
+ if not annotation_path.exists():
+ return None
+ payload = load_json_file(annotation_path)
+ return payload or None
+
+ def save_annotation(self, payload: dict[str, Any]) -> dict[str, Any]:
+ chat_id = normalize_name(str(payload.get("chat_id") or ""))
+ effect_id = normalize_name(str(payload.get("effect_id") or ""))
+ if not chat_id or chat_id == "unknown" or not effect_id or effect_id == "unknown":
+ raise ValueError("缺少 chat_id 或 effect_id")
+ if self._find_record_file(chat_id, effect_id) is None:
+ raise ValueError("找不到对应的回复效果记录")
+
+ manual_score = payload.get("manual_score")
+ manual_score_5 = payload.get("manual_score_5")
+ normalized_score: float | None = None
+ normalized_score_5: int | None = None
+ if manual_score_5 not in {None, ""}:
+ try:
+ normalized_score_5 = int(manual_score_5)
+ except (TypeError, ValueError):
+ raise ValueError("manual_score_5 必须是 1-5 的整数") from None
+ if normalized_score_5 < 1 or normalized_score_5 > 5:
+ raise ValueError("manual_score_5 必须是 1-5 的整数")
+ normalized_score = round((normalized_score_5 - 1) / 4 * 100, 2)
+ elif manual_score not in {None, ""}:
+ try:
+ normalized_score = max(0.0, min(100.0, float(manual_score)))
+ except (TypeError, ValueError):
+ raise ValueError("manual_score 必须是 0-100 的数字") from None
+ else:
+ raise ValueError("缺少人工评分")
+
+ annotation = {
+ "schema_version": 1,
+ "chat_id": chat_id,
+ "effect_id": effect_id,
+ "manual_score": round(normalized_score, 2),
+ "manual_score_5": normalized_score_5,
+ "manual_label": str(payload.get("manual_label") or "").strip(),
+ "evaluator": str(payload.get("evaluator") or "manual").strip() or "manual",
+ "notes": str(payload.get("notes") or "").strip(),
+ "updated_at": time.strftime("%Y-%m-%dT%H:%M:%S%z"),
+ }
+ write_json_file(self._annotation_path(chat_id, effect_id), annotation)
+ return annotation
+
+ def _iter_record_files(self, chat_id: str | None = None) -> list[Path]:
+ if not self.log_dir.exists():
+ return []
+ if chat_id:
+ chat_dir = self.log_dir / normalize_name(chat_id)
+ if not chat_dir.exists() or not chat_dir.is_dir():
+ return []
+ return sorted(chat_dir.glob("*.json"))
+
+ record_files: list[Path] = []
+ for chat_dir in self.log_dir.iterdir():
+ if chat_dir.is_dir():
+ record_files.extend(chat_dir.glob("*.json"))
+ return record_files
+
+ def _find_record_file(self, chat_id: str, effect_id: str) -> Path | None:
+ normalized_effect_id = normalize_name(effect_id)
+ for record_file in self._iter_record_files(chat_id):
+ payload = load_json_file(record_file)
+ if normalize_name(str(payload.get("effect_id") or "")) == normalized_effect_id:
+ return record_file
+ return None
+
+ def _annotation_path(self, chat_id: str, record_file_or_effect_id: Path | str) -> Path:
+ if isinstance(record_file_or_effect_id, Path):
+ payload = load_json_file(record_file_or_effect_id)
+ effect_id = str(payload.get("effect_id") or record_file_or_effect_id.stem).strip()
+ else:
+ effect_id = str(record_file_or_effect_id or "").strip()
+ return self.manual_dir / normalize_name(chat_id) / f"{normalize_name(effect_id)}.json"
+
+ def _build_record_summary(self, record_file: Path, payload: dict[str, Any]) -> dict[str, Any]:
+ chat_id = record_file.parent.name
+ effect_id = str(payload.get("effect_id") or record_file.stem)
+ scores = payload.get("scores") if isinstance(payload.get("scores"), dict) else {}
+ reply = payload.get("reply") if isinstance(payload.get("reply"), dict) else {}
+ target_user = payload.get("target_user") if isinstance(payload.get("target_user"), dict) else {}
+ manual = self.get_annotation(chat_id, effect_id)
+ return {
+ "chat_id": chat_id,
+ "effect_id": effect_id,
+ "status": str(payload.get("status") or ""),
+ "created_at": str(payload.get("created_at") or ""),
+ "finalize_reason": str(payload.get("finalize_reason") or ""),
+ "asi": scores.get("asi"),
+ "behavior_score": scores.get("behavior_score"),
+ "relational_score": scores.get("relational_score"),
+ "friction_score": scores.get("friction_score"),
+ "manual": manual,
+ "reply_preview": self._truncate(str(reply.get("reply_text") or ""), 160),
+ "target_message_id": str(reply.get("target_message_id") or ""),
+ "target_user": target_user,
+ "followup_count": len(payload.get("followup_messages") or []),
+ "file_name": record_file.name,
+ }
+
+ @staticmethod
+ def _truncate(text: str, limit: int) -> str:
+ normalized_text = " ".join(str(text or "").split())
+ if len(normalized_text) <= limit:
+ return normalized_text
+ return f"{normalized_text[: limit - 1]}…"
+
+
+class ReplyEffectPreviewHandler(BaseHTTPRequestHandler):
+ repository: ReplyEffectRepository
+
+ def do_GET(self) -> None:
+ parsed = urlparse(self.path)
+ if parsed.path == "/":
+ self._send_html(INDEX_HTML_V3)
+ return
+ if parsed.path == "/api/chats":
+ self._send_json({"chats": self.repository.list_chats()})
+ return
+ if parsed.path == "/api/records":
+ query = parse_qs(parsed.query)
+ records = self.repository.list_records(
+ chat_id=self._first(query, "chat_id"),
+ status=self._first(query, "status"),
+ annotated=self._first(query, "annotated"),
+ )
+ self._send_json({"records": records})
+ return
+ if parsed.path == "/api/record":
+ query = parse_qs(parsed.query)
+ record = self.repository.get_record(
+ normalize_name(self._first(query, "chat_id")),
+ normalize_name(self._first(query, "effect_id")),
+ )
+ if not record:
+ self._send_json({"error": "record not found"}, status=404)
+ return
+ self._send_json({"record": record})
+ return
+ if parsed.path == "/api/image":
+ query = parse_qs(parsed.query)
+ self._send_image(self._first(query, "path"))
+ return
+ if parsed.path == "/api/image_hash":
+ query = parse_qs(parsed.query)
+ self._send_image_by_hash(self._first(query, "hash"), self._first(query, "kind"))
+ return
+ self._send_json({"error": "not found"}, status=404)
+
+ def do_POST(self) -> None:
+ parsed = urlparse(self.path)
+ if parsed.path != "/api/annotations":
+ self._send_json({"error": "not found"}, status=404)
+ return
+ try:
+ payload = self._read_json_body()
+ annotation = self.repository.save_annotation(payload)
+ except ValueError as exc:
+ self._send_json({"error": str(exc)}, status=400)
+ return
+ self._send_json({"annotation": annotation})
+
+ def log_message(self, format: str, *args: Any) -> None:
+ return
+
+ def _send_html(self, content: str) -> None:
+ body = content.encode("utf-8")
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html; charset=utf-8")
+ self.send_header("Content-Length", str(len(body)))
+ self.end_headers()
+ self.wfile.write(body)
+
+ def _send_image_by_hash(self, image_hash: str, kind: str = "") -> None:
+ image_hash = str(image_hash or "").strip()
+ if not image_hash:
+ self._send_json({"error": "missing image hash"}, status=400)
+ return
+ image_path = self._resolve_image_path_by_hash(image_hash, kind)
+ if image_path is None:
+ self._send_json({"error": "image hash not found"}, status=404)
+ return
+ self._send_image(str(image_path))
+
+ @staticmethod
+ def _resolve_image_path_by_hash(image_hash: str, kind: str = "") -> Path | None:
+ try:
+ from sqlmodel import select
+
+ from src.common.database.database import get_db_session
+ from src.common.database.database_model import Images, ImageType
+
+ preferred_types = []
+ if kind == "emoji":
+ preferred_types.append(ImageType.EMOJI)
+ elif kind == "image":
+ preferred_types.append(ImageType.IMAGE)
+ preferred_types.extend(image_type for image_type in (ImageType.IMAGE, ImageType.EMOJI) if image_type not in preferred_types)
+
+ with get_db_session() as db:
+ for image_type in preferred_types:
+ statement = select(Images).filter_by(image_hash=image_hash, image_type=image_type).limit(1)
+ image_record = db.exec(statement).first()
+ if image_record is None or image_record.no_file_flag:
+ continue
+ image_path = Path(str(image_record.full_path or "")).expanduser().resolve()
+ if image_path.is_file():
+ return image_path
+ except Exception:
+ return None
+ return None
+
+ def _send_json(self, payload: dict[str, Any], status: int = 200) -> None:
+ body = json.dumps(payload, ensure_ascii=False, default=str).encode("utf-8")
+ self.send_response(status)
+ self.send_header("Content-Type", "application/json; charset=utf-8")
+ self.send_header("Content-Length", str(len(body)))
+ self.end_headers()
+ self.wfile.write(body)
+
+ def _send_image(self, raw_path: str) -> None:
+ try:
+ image_path = Path(raw_path).expanduser().resolve()
+ if not image_path.is_file():
+ raise FileNotFoundError(raw_path)
+ mime_type = mimetypes.guess_type(str(image_path))[0] or "application/octet-stream"
+ if not mime_type.startswith("image/"):
+ self._send_json({"error": "not an image"}, status=400)
+ return
+ body = image_path.read_bytes()
+ except OSError:
+ self._send_json({"error": "image not found"}, status=404)
+ return
+
+ self.send_response(200)
+ self.send_header("Content-Type", mime_type)
+ self.send_header("Content-Length", str(len(body)))
+ self.end_headers()
+ self.wfile.write(body)
+
+ def _read_json_body(self) -> dict[str, Any]:
+ length = int(self.headers.get("Content-Length") or 0)
+ raw_body = self.rfile.read(length).decode("utf-8")
+ payload = json.loads(raw_body or "{}")
+ if not isinstance(payload, dict):
+ raise ValueError("请求体必须是 JSON 对象")
+ return payload
+
+ @staticmethod
+ def _first(query: dict[str, list[str]], key: str) -> str:
+ values = query.get(key) or [""]
+ return values[0]
+
+
+INDEX_HTML = r"""
+
+
+
+
+ Maisaka 回复效果评分预览
+
+
+
+
+ Maisaka 回复效果评分预览
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+"""
+
+
+INDEX_HTML_V2 = r"""
+
+
+
+
+ Maisaka 回复效果评分预览
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+"""
+
+INDEX_HTML_V3 = r"""
+
+
+
+
+ Maisaka 回复效果评分预览
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+"""
+
+
+def build_handler(repository: ReplyEffectRepository) -> type[ReplyEffectPreviewHandler]:
+ class ConfiguredHandler(ReplyEffectPreviewHandler):
+ pass
+
+ ConfiguredHandler.repository = repository
+ return ConfiguredHandler
+
+
+def main() -> None:
+ parser = argparse.ArgumentParser(description="预览 Maisaka 回复效果评分,并记录人工评分。")
+ parser.add_argument("--host", default=DEFAULT_HOST, help=f"监听地址,默认 {DEFAULT_HOST}")
+ parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"监听端口,默认 {DEFAULT_PORT}")
+ parser.add_argument("--log-dir", type=Path, default=DEFAULT_LOG_DIR, help="回复效果 JSON 日志目录")
+ parser.add_argument("--manual-dir", type=Path, default=DEFAULT_MANUAL_DIR, help="人工评分 JSON 保存目录")
+ parser.add_argument("--no-browser", action="store_true", help="不自动打开浏览器")
+ args = parser.parse_args()
+
+ mimetypes.add_type("text/html", ".html")
+ repository = ReplyEffectRepository(args.log_dir, args.manual_dir)
+ server = ThreadingHTTPServer((args.host, args.port), build_handler(repository))
+ url = f"http://{args.host}:{args.port}/"
+ print(f"Maisaka 回复效果评分预览已启动: {url}")
+ print(f"自动评分目录: {args.log_dir}")
+ print(f"人工评分目录: {args.manual_dir}")
+ if not args.no_browser:
+ webbrowser.open(url)
+ try:
+ server.serve_forever()
+ except KeyboardInterrupt:
+ print("\n正在关闭预览服务...")
+ finally:
+ server.server_close()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/src/chat/replyer/maisaka_generator_base.py b/src/chat/replyer/maisaka_generator_base.py
index 812b82d5..8fcf57ce 100644
--- a/src/chat/replyer/maisaka_generator_base.py
+++ b/src/chat/replyer/maisaka_generator_base.py
@@ -2,7 +2,6 @@ from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Awaitable, Callable, Dict, List, Literal, Optional, Tuple
-import random
import time
from rich.console import Group, RenderableType
@@ -83,14 +82,6 @@ class BaseMaisakaReplyGenerator:
bot_aliases = f",也有人叫你{','.join(alias_names)}" if alias_names else ""
prompt_personality = global_config.personality.personality
- if (
- hasattr(global_config.personality, "states")
- and global_config.personality.states
- and hasattr(global_config.personality, "state_probability")
- and global_config.personality.state_probability > 0
- and random.random() < global_config.personality.state_probability
- ):
- prompt_personality = random.choice(global_config.personality.states)
return f"你的名字是{bot_name}{bot_aliases},你{prompt_personality};"
except Exception as exc:
diff --git a/src/config/config.py b/src/config/config.py
index 8de2a29b..db7e9863 100644
--- a/src/config/config.py
+++ b/src/config/config.py
@@ -23,7 +23,6 @@ from .official_configs import (
EmojiConfig,
ExpressionConfig,
KeywordReactionConfig,
- MaiSakaConfig,
MaimMessageConfig,
MCPConfig,
MemoryConfig,
@@ -55,7 +54,7 @@ BOT_CONFIG_PATH: Path = (CONFIG_DIR / "bot_config.toml").resolve().absolute()
MODEL_CONFIG_PATH: Path = (CONFIG_DIR / "model_config.toml").resolve().absolute()
LEGACY_ENV_PATH: Path = (PROJECT_ROOT / ".env").resolve().absolute()
MMC_VERSION: str = "1.0.0"
-CONFIG_VERSION: str = "8.8.0"
+CONFIG_VERSION: str = "8.9.3"
MODEL_CONFIG_VERSION: str = "1.14.0"
logger = get_logger("config")
@@ -121,9 +120,6 @@ class Config(ConfigBase):
database: DatabaseConfig = Field(default_factory=DatabaseConfig)
"""数据库配置类"""
- maisaka: MaiSakaConfig = Field(default_factory=MaiSakaConfig)
- """MaiSaka对话系统配置类"""
-
mcp: MCPConfig = Field(default_factory=MCPConfig)
"""MCP 配置类"""
diff --git a/src/config/official_configs.py b/src/config/official_configs.py
index 8d3cff38..e8f364ae 100644
--- a/src/config/official_configs.py
+++ b/src/config/official_configs.py
@@ -113,32 +113,6 @@ class PersonalityConfig(ConfigBase):
)
"""每次构建回复时,从 multiple_reply_style 中随机替换 reply_style 的概率(0.0-1.0)"""
- states: list[str] = Field(
- default_factory=lambda: [
- "是一个女大学生,喜欢上网聊天,会刷小红书。",
- "是一个大二心理学生,会刷贴吧和中国知网。",
- "是一个赛博网友,最近很想吐槽人。",
- ],
- json_schema_extra={
- "x-widget": "custom",
- "x-icon": "shuffle",
- },
- )
- """_wrap_状态列表,用于随机替换personality"""
-
- state_probability: float = Field(
- default=0.3,
- ge=0,
- le=1,
- json_schema_extra={
- "x-widget": "slider",
- "x-icon": "percent",
- "step": 0.1,
- },
- )
- """状态概率,每次构建人格时替换personality的概率"""
-
-
class VisualConfig(ConfigBase):
"""视觉配置类"""
@@ -1161,6 +1135,16 @@ class DebugConfig(ConfigBase):
)
"""是否显示记忆检索相关prompt"""
+ enable_reply_effect_tracking: bool = Field(
+ default=False,
+ json_schema_extra={
+ "x-widget": "switch",
+ "x-icon": "activity",
+ },
+ )
+ """是否开启回复效果评分追踪,默认关闭,需要手动打开"""
+
+
class ExtraPromptItem(ConfigBase):
platform: str = Field(
default="",
diff --git a/src/maisaka/builtin_tool/reply.py b/src/maisaka/builtin_tool/reply.py
index fa182401..8f66a8ca 100644
--- a/src/maisaka/builtin_tool/reply.py
+++ b/src/maisaka/builtin_tool/reply.py
@@ -121,13 +121,15 @@ async def handle_tool(
"Maisaka 回复生成器当前不可用。",
)
+ replyer_chat_history = list(tool_ctx.runtime._chat_history)
+
try:
success, reply_result = await replyer.generate_reply_with_context(
reply_reason=latest_thought,
reference_info=reference_info,
stream_id=tool_ctx.runtime.session_id,
reply_message=target_message,
- chat_history=tool_ctx.runtime._chat_history,
+ chat_history=replyer_chat_history,
sub_agent_runner=lambda system_prompt: _run_expression_selector(
tool_ctx,
system_prompt,
@@ -207,6 +209,17 @@ async def handle_tool(
if tool_ctx.runtime.chat_stream.platform == CLI_PLATFORM_NAME:
tool_ctx.append_guided_reply_to_chat_history(combined_reply_text)
tool_ctx.runtime._record_reply_sent()
+ await tool_ctx.runtime.track_reply_effect(
+ tool_call_id=invocation.call_id,
+ target_message=target_message,
+ set_quote=set_quote,
+ reply_text=combined_reply_text,
+ reply_segments=reply_segments,
+ planner_reasoning=latest_thought,
+ reference_info=reference_info,
+ reply_metadata=reply_metadata,
+ replyer_context_messages=replyer_chat_history,
+ )
return tool_ctx.build_success_result(
invocation.tool_name,
"回复已生成并发送。",
diff --git a/src/maisaka/chat_loop_service.py b/src/maisaka/chat_loop_service.py
index f4a8a7db..377eee9c 100644
--- a/src/maisaka/chat_loop_service.py
+++ b/src/maisaka/chat_loop_service.py
@@ -5,7 +5,6 @@ from datetime import datetime
from typing import Any, List, Optional, Sequence
import asyncio
-import random
from rich.console import RenderableType
from src.common.data_models.llm_service_data_models import LLMGenerationOptions
@@ -263,14 +262,6 @@ class MaisakaChatLoopService:
bot_nickname = ""
prompt_personality = global_config.personality.personality
- if (
- hasattr(global_config.personality, "states")
- and global_config.personality.states
- and hasattr(global_config.personality, "state_probability")
- and global_config.personality.state_probability > 0
- and random.random() < global_config.personality.state_probability
- ):
- prompt_personality = random.choice(global_config.personality.states)
return f"Your name is {bot_name}{bot_nickname}; persona: {prompt_personality};"
except Exception:
diff --git a/src/maisaka/reply_effect/__init__.py b/src/maisaka/reply_effect/__init__.py
new file mode 100644
index 00000000..efccf32e
--- /dev/null
+++ b/src/maisaka/reply_effect/__init__.py
@@ -0,0 +1,5 @@
+"""Maisaka 回复效果观察器。"""
+
+from .tracker import ReplyEffectTracker
+
+__all__ = ["ReplyEffectTracker"]
diff --git a/src/maisaka/reply_effect/image_utils.py b/src/maisaka/reply_effect/image_utils.py
new file mode 100644
index 00000000..98b3e46a
--- /dev/null
+++ b/src/maisaka/reply_effect/image_utils.py
@@ -0,0 +1,100 @@
+"""回复效果记录中的图片/表情附件提取工具。"""
+
+from base64 import b64encode
+from pathlib import Path
+from typing import Any
+
+from src.common.data_models.message_component_data_model import EmojiComponent, ImageComponent, MessageSequence
+
+
+_MAX_INLINE_IMAGE_BYTES = 2 * 1024 * 1024
+
+
+def extract_visual_attachments_from_sequence(message_sequence: MessageSequence | None) -> list[dict[str, Any]]:
+ """从消息片段中提取可供评分页面展示的图片/表情信息。"""
+
+ if message_sequence is None:
+ return []
+
+ attachments: list[dict[str, Any]] = []
+ for index, component in enumerate(message_sequence.components):
+ if isinstance(component, ImageComponent):
+ attachments.append(_build_visual_attachment(component, index=index, kind="image"))
+ elif isinstance(component, EmojiComponent):
+ attachments.append(_build_visual_attachment(component, index=index, kind="emoji"))
+ return attachments
+
+
+def _build_visual_attachment(component: ImageComponent | EmojiComponent, *, index: int, kind: str) -> dict[str, Any]:
+ binary_hash = str(component.binary_hash or "").strip()
+ attachment: dict[str, Any] = {
+ "kind": kind,
+ "index": index,
+ "hash": binary_hash,
+ "content": str(component.content or "").strip(),
+ "path": "",
+ "data_url": "",
+ }
+
+ file_path = _resolve_image_path(binary_hash, kind=kind)
+ if file_path:
+ attachment["path"] = str(file_path)
+ attachment["file_name"] = file_path.name
+ attachment["mime_type"] = _guess_mime_type(file_path.suffix)
+ return attachment
+
+ binary_data = bytes(component.binary_data or b"")
+ if binary_data and len(binary_data) <= _MAX_INLINE_IMAGE_BYTES:
+ mime_type = _guess_mime_type_from_bytes(binary_data)
+ attachment["mime_type"] = mime_type
+ attachment["data_url"] = f"data:{mime_type};base64,{b64encode(binary_data).decode('ascii')}"
+ return attachment
+
+
+def _resolve_image_path(binary_hash: str, *, kind: str) -> Path | None:
+ if not binary_hash:
+ return None
+
+ try:
+ from sqlmodel import select
+
+ from src.common.database.database import get_db_session
+ from src.common.database.database_model import Images, ImageType
+
+ image_type = ImageType.EMOJI if kind == "emoji" else ImageType.IMAGE
+ with get_db_session() as db:
+ statement = select(Images).filter_by(image_hash=binary_hash, image_type=image_type).limit(1)
+ image_record = db.exec(statement).first()
+ if image_record is None or getattr(image_record, "no_file_flag", False):
+ return None
+ file_path = Path(str(image_record.full_path or "")).expanduser().resolve()
+ if file_path.is_file():
+ return file_path
+ except Exception:
+ return None
+ return None
+
+
+def _guess_mime_type(suffix: str) -> str:
+ normalized_suffix = suffix.lower().lstrip(".")
+ if normalized_suffix in {"jpg", "jpeg"}:
+ return "image/jpeg"
+ if normalized_suffix == "gif":
+ return "image/gif"
+ if normalized_suffix == "webp":
+ return "image/webp"
+ if normalized_suffix == "bmp":
+ return "image/bmp"
+ return "image/png"
+
+
+def _guess_mime_type_from_bytes(binary_data: bytes) -> str:
+ if binary_data.startswith(b"\xff\xd8\xff"):
+ return "image/jpeg"
+ if binary_data.startswith(b"GIF8"):
+ return "image/gif"
+ if binary_data.startswith(b"RIFF") and b"WEBP" in binary_data[:16]:
+ return "image/webp"
+ if binary_data.startswith(b"BM"):
+ return "image/bmp"
+ return "image/png"
diff --git a/src/maisaka/reply_effect/judge.py b/src/maisaka/reply_effect/judge.py
new file mode 100644
index 00000000..58b20c74
--- /dev/null
+++ b/src/maisaka/reply_effect/judge.py
@@ -0,0 +1,116 @@
+"""回复效果 LLM 窄维度评审。"""
+
+from __future__ import annotations
+
+from collections.abc import Awaitable, Callable
+from typing import Any, Dict, List, Tuple
+
+import json
+
+from .models import FollowupMessageSnapshot, ReplyEffectRecord, RubricScoreItem, RubricScores
+from .scoring import normalize_text_for_prompt
+
+JudgeRunner = Callable[[str], Awaitable[str]]
+
+
+async def judge_reply_effect(record: ReplyEffectRecord, judge_runner: JudgeRunner | None) -> Tuple[RubricScores, str]:
+ """执行 LLM rubric judge,失败时返回中性分。"""
+
+ if judge_runner is None:
+ return RubricScores(), "未提供 LLM judge runner"
+
+ prompt = build_judge_prompt(record)
+ try:
+ response_text = await judge_runner(prompt)
+ payload = _loads_json_object(response_text)
+ return parse_rubric_scores(payload), ""
+ except Exception as exc:
+ return RubricScores(), str(exc)
+
+
+def build_judge_prompt(record: ReplyEffectRecord) -> str:
+ """构建窄维度评分 prompt。"""
+
+ followup_text = _format_followups(record.followup_messages)
+ return (
+ "你是 Maisaka 回复效果的窄维度评审器,只评估这一次 bot 回复的交互感知质量。\n"
+ "不要评价总体满意度,不要给建议,只输出 JSON。\n\n"
+ "评分范围:1 到 5,1=很差,3=中性,5=很好。\n"
+ "uncanny_risk 的 1=完全不怪,5=非常过度拟人/越界/油腻。\n\n"
+ f"bot 回复:\n{normalize_text_for_prompt(record.reply.reply_text, 1200)}\n\n"
+ f"后续用户消息:\n{followup_text or '(暂无后续用户消息)'}\n\n"
+ "请输出严格 JSON 对象,格式如下:\n"
+ "{\n"
+ ' "social_presence": {"score": 3, "reason": "...", "evidence_spans": ["..."], "confidence": 0.7},\n'
+ ' "warmth": {"score": 3, "reason": "...", "evidence_spans": ["..."], "confidence": 0.7},\n'
+ ' "competence": {"score": 3, "reason": "...", "evidence_spans": ["..."], "confidence": 0.7},\n'
+ ' "appropriateness": {"score": 3, "reason": "...", "evidence_spans": ["..."], "confidence": 0.7},\n'
+ ' "uncanny_risk": {"score": 3, "reason": "...", "evidence_spans": ["..."], "confidence": 0.7}\n'
+ "}"
+ )
+
+
+def parse_rubric_scores(payload: Dict[str, Any]) -> RubricScores:
+ """解析 LLM rubric JSON。"""
+
+ return RubricScores(
+ social_presence=_parse_item(payload.get("social_presence")),
+ warmth=_parse_item(payload.get("warmth")),
+ competence=_parse_item(payload.get("competence")),
+ appropriateness=_parse_item(payload.get("appropriateness")),
+ uncanny_risk=_parse_item(payload.get("uncanny_risk")),
+ available=True,
+ )
+
+
+def _parse_item(raw_item: Any) -> RubricScoreItem:
+ if not isinstance(raw_item, dict):
+ raw_item = {}
+ score = _coerce_float(raw_item.get("score"), 3.0)
+ score = max(1.0, min(5.0, score))
+ evidence_spans = raw_item.get("evidence_spans")
+ if not isinstance(evidence_spans, list):
+ evidence_spans = []
+ return RubricScoreItem(
+ score=score,
+ normalized_score=round((score - 1.0) / 4.0, 4),
+ reason=str(raw_item.get("reason") or "").strip(),
+ evidence_spans=[str(item).strip() for item in evidence_spans if str(item).strip()],
+ confidence=max(0.0, min(1.0, _coerce_float(raw_item.get("confidence"), 0.0))),
+ )
+
+
+def _loads_json_object(response_text: str) -> Dict[str, Any]:
+ normalized_response = str(response_text or "").strip()
+ if normalized_response.startswith("```"):
+ normalized_response = normalized_response.strip("`")
+ if normalized_response.lower().startswith("json"):
+ normalized_response = normalized_response[4:].strip()
+ try:
+ parsed = json.loads(normalized_response)
+ except json.JSONDecodeError:
+ start = normalized_response.find("{")
+ end = normalized_response.rfind("}")
+ if start < 0 or end <= start:
+ raise
+ parsed = json.loads(normalized_response[start : end + 1])
+ if not isinstance(parsed, dict):
+ raise ValueError("LLM judge 未返回 JSON 对象")
+ return parsed
+
+
+def _format_followups(followups: List[FollowupMessageSnapshot]) -> str:
+ lines: List[str] = []
+ for index, followup in enumerate(followups[:5], start=1):
+ marker = "目标用户" if followup.is_target_user else "其他用户"
+ lines.append(
+ f"{index}. [{marker}] {normalize_text_for_prompt(followup.visible_text or followup.plain_text, 500)}"
+ )
+ return "\n".join(lines)
+
+
+def _coerce_float(value: Any, default: float) -> float:
+ try:
+ return float(value)
+ except (TypeError, ValueError):
+ return default
diff --git a/src/maisaka/reply_effect/models.py b/src/maisaka/reply_effect/models.py
new file mode 100644
index 00000000..26d06fb0
--- /dev/null
+++ b/src/maisaka/reply_effect/models.py
@@ -0,0 +1,164 @@
+"""回复效果观察器的数据模型。"""
+
+from dataclasses import asdict, dataclass, field
+from datetime import datetime
+from enum import Enum
+from pathlib import Path
+from typing import Any, Dict, List, Optional
+
+
+SCHEMA_VERSION = 1
+
+
+class ReplyEffectStatus(str, Enum):
+ """回复效果记录状态。"""
+
+ PENDING = "pending"
+ FINALIZED = "finalized"
+
+
+@dataclass(slots=True)
+class SessionSnapshot:
+ """会话快照。"""
+
+ session_id: str
+ platform_type_id: str
+ platform: str
+ chat_type: str
+ group_id: str
+ user_id: str
+ session_name: str
+
+
+@dataclass(slots=True)
+class UserSnapshot:
+ """用户快照。"""
+
+ user_id: str
+ nickname: str
+ cardname: str
+
+
+@dataclass(slots=True)
+class ReplySnapshot:
+ """被观察的回复内容。"""
+
+ tool_call_id: str
+ target_message_id: str
+ set_quote: bool
+ reply_text: str
+ reply_segments: List[str]
+ planner_reasoning: str
+ reference_info: str
+ reply_metadata: Dict[str, Any] = field(default_factory=dict)
+
+
+@dataclass(slots=True)
+class FollowupMessageSnapshot:
+ """后续用户消息快照。"""
+
+ message_id: str
+ timestamp: str
+ user_id: str
+ nickname: str
+ cardname: str
+ visible_text: str
+ plain_text: str
+ latency_seconds: float
+ is_target_user: bool
+ attachments: List[Dict[str, Any]] = field(default_factory=list)
+
+
+@dataclass(slots=True)
+class BehaviorSignals:
+ """行为满意度信号。"""
+
+ continue_2turns: float = 0.0
+ next_user_sentiment: float = 0.5
+ user_expansion: float = 0.0
+ no_correction: float = 1.0
+ no_abort: float = 1.0
+ evidence_source: str = "no_followup"
+
+
+@dataclass(slots=True)
+class RubricScoreItem:
+ """单个 LLM rubric 项。"""
+
+ score: float = 3.0
+ normalized_score: float = 0.5
+ reason: str = ""
+ evidence_spans: List[str] = field(default_factory=list)
+ confidence: float = 0.0
+
+
+@dataclass(slots=True)
+class RubricScores:
+ """LLM 感知质量评分。"""
+
+ social_presence: RubricScoreItem = field(default_factory=RubricScoreItem)
+ warmth: RubricScoreItem = field(default_factory=RubricScoreItem)
+ competence: RubricScoreItem = field(default_factory=RubricScoreItem)
+ appropriateness: RubricScoreItem = field(default_factory=RubricScoreItem)
+ uncanny_risk: RubricScoreItem = field(default_factory=RubricScoreItem)
+ available: bool = False
+
+
+@dataclass(slots=True)
+class FrictionSignals:
+ """摩擦和反感信号。"""
+
+ explicit_negative: float = 0.0
+ repair_loop: float = 0.0
+ uncanny_risk: float = 0.5
+ evidence_messages: List[str] = field(default_factory=list)
+
+
+@dataclass(slots=True)
+class ReplyEffectScores:
+ """最终效果评分。"""
+
+ asi: float
+ behavior_score: float
+ relational_score: float
+ friction_score: float
+ behavior_signals: BehaviorSignals
+ rubric_scores: RubricScores
+ friction_signals: FrictionSignals
+ judge_error: str = ""
+
+
+@dataclass(slots=True)
+class ReplyEffectRecord:
+ """一条回复效果观察记录。"""
+
+ effect_id: str
+ status: ReplyEffectStatus
+ created_at: str
+ updated_at: str
+ session: SessionSnapshot
+ reply: ReplySnapshot
+ target_user: UserSnapshot
+ context_snapshot: List[Dict[str, Any]] = field(default_factory=list)
+ followup_messages: List[FollowupMessageSnapshot] = field(default_factory=list)
+ scores: Optional[ReplyEffectScores] = None
+ finalized_at: str = ""
+ finalize_reason: str = ""
+ confidence_note: str = ""
+ followup_summary: Dict[str, Any] = field(default_factory=dict)
+ file_path: Optional[Path] = field(default=None, repr=False)
+
+ def to_json_dict(self) -> Dict[str, Any]:
+ """转换为可直接写入 JSON 的字典。"""
+
+ payload = asdict(self)
+ payload["schema_version"] = SCHEMA_VERSION
+ payload["status"] = self.status.value
+ payload.pop("file_path", None)
+ return payload
+
+
+def now_iso() -> str:
+ """返回本地时区 ISO 时间字符串。"""
+
+ return datetime.now().astimezone().isoformat(timespec="seconds")
diff --git a/src/maisaka/reply_effect/path_utils.py b/src/maisaka/reply_effect/path_utils.py
new file mode 100644
index 00000000..4114bcbf
--- /dev/null
+++ b/src/maisaka/reply_effect/path_utils.py
@@ -0,0 +1,24 @@
+"""回复效果日志路径工具。"""
+
+from pathlib import Path
+
+from src.maisaka.display.preview_path_utils import build_preview_chat_dir_name, normalize_preview_name
+
+BASE_DIR = Path("logs") / "maisaka_reply_effect"
+
+
+def build_reply_effect_chat_dir_name(session_id: str) -> str:
+ """构建回复效果日志的会话目录名。"""
+
+ chat_dir_name = build_preview_chat_dir_name(session_id)
+ normalized_chat_dir_name = normalize_preview_name(chat_dir_name)
+ if normalized_chat_dir_name != "unknown":
+ return normalized_chat_dir_name
+ return "unknown_chat"
+
+
+def build_reply_effect_chat_dir(session_id: str, base_dir: Path | None = None) -> Path:
+ """返回某个会话对应的回复效果日志目录。"""
+
+ root_dir = base_dir or BASE_DIR
+ return root_dir / build_reply_effect_chat_dir_name(session_id)
diff --git a/src/maisaka/reply_effect/scoring.py b/src/maisaka/reply_effect/scoring.py
new file mode 100644
index 00000000..41a38b5b
--- /dev/null
+++ b/src/maisaka/reply_effect/scoring.py
@@ -0,0 +1,262 @@
+"""回复效果评分规则。"""
+
+from __future__ import annotations
+
+from typing import Iterable, List
+
+import re
+
+from .models import BehaviorSignals, FollowupMessageSnapshot, FrictionSignals, ReplyEffectScores, RubricScores
+
+NEGATIVE_PATTERNS = (
+ "你没懂",
+ "没懂",
+ "不是这个意思",
+ "不是",
+ "别这样",
+ "好烦",
+ "烦死",
+ "算了",
+ "离谱",
+ "无语",
+ "你在说什么",
+ "听不懂",
+ "看不懂",
+ "错了",
+ "不对",
+)
+REPAIR_PATTERNS = (
+ "我是说",
+ "我说的是",
+ "重新说",
+ "再说一遍",
+ "不是问",
+ "你理解错",
+ "你搞错",
+ "我问的是",
+ "纠正",
+)
+POSITIVE_PATTERNS = (
+ "谢谢",
+ "感谢",
+ "懂了",
+ "明白了",
+ "可以",
+ "有用",
+ "不错",
+ "好耶",
+ "太好了",
+)
+
+
+def clamp(value: float, lower: float = 0.0, upper: float = 1.0) -> float:
+ """限制数值范围。"""
+
+ return max(lower, min(upper, value))
+
+
+def score_reply_effect(
+ followups: List[FollowupMessageSnapshot],
+ rubric_scores: RubricScores,
+ *,
+ target_user_id: str = "",
+ judge_error: str = "",
+) -> ReplyEffectScores:
+ """计算一条回复的 ASI 分数。"""
+
+ behavior_signals = build_behavior_signals(followups, target_user_id=target_user_id)
+ friction_signals = build_friction_signals(followups, rubric_scores, target_user_id=target_user_id)
+ behavior_score = calculate_behavior_score(behavior_signals)
+ relational_score = calculate_relational_score(rubric_scores)
+ friction_score = calculate_friction_score(friction_signals)
+ asi = calculate_asi_score(behavior_score, relational_score, friction_score)
+ return ReplyEffectScores(
+ asi=asi,
+ behavior_score=round(behavior_score, 4),
+ relational_score=round(relational_score, 4),
+ friction_score=round(friction_score, 4),
+ behavior_signals=behavior_signals,
+ rubric_scores=rubric_scores,
+ friction_signals=friction_signals,
+ judge_error=judge_error,
+ )
+
+
+def build_behavior_signals(
+ followups: List[FollowupMessageSnapshot],
+ *,
+ target_user_id: str = "",
+) -> BehaviorSignals:
+ """从后续消息构造行为满意度信号。"""
+
+ target_followups = [
+ followup
+ for followup in followups
+ if target_user_id and followup.user_id == target_user_id
+ ]
+ evidence_followups = target_followups or followups
+ evidence_source = (
+ "target_user_feedback"
+ if target_followups
+ else "indirect_session_feedback"
+ if followups
+ else "no_followup"
+ )
+ if not evidence_followups:
+ return BehaviorSignals(
+ continue_2turns=0.0,
+ next_user_sentiment=0.5,
+ user_expansion=0.0,
+ no_correction=1.0,
+ no_abort=0.6,
+ evidence_source=evidence_source,
+ )
+
+ combined_text = "\n".join(followup.plain_text for followup in evidence_followups)
+ negative_count = count_matches(combined_text, NEGATIVE_PATTERNS)
+ repair_count = count_matches(combined_text, REPAIR_PATTERNS)
+ positive_count = count_matches(combined_text, POSITIVE_PATTERNS)
+ average_length = sum(len(followup.plain_text.strip()) for followup in evidence_followups) / len(evidence_followups)
+
+ return BehaviorSignals(
+ continue_2turns=1.0 if len(evidence_followups) >= 2 else 0.5,
+ next_user_sentiment=estimate_sentiment(positive_count, negative_count, repair_count),
+ user_expansion=clamp((average_length - 8.0) / 42.0),
+ no_correction=0.0 if repair_count > 0 else 1.0,
+ no_abort=0.0 if negative_count >= 2 or "算了" in combined_text else 1.0,
+ evidence_source=evidence_source,
+ )
+
+
+def build_friction_signals(
+ followups: List[FollowupMessageSnapshot],
+ rubric_scores: RubricScores,
+ *,
+ target_user_id: str = "",
+) -> FrictionSignals:
+ """从后续消息和 LLM judge 结果构造摩擦信号。"""
+
+ evidence_messages: List[str] = []
+ explicit_negative = 0.0
+ repair_loop = 0.0
+ for followup in followups:
+ text = followup.plain_text
+ source_weight = 1.0 if target_user_id and followup.user_id == target_user_id else 0.65
+ if any(pattern in text for pattern in NEGATIVE_PATTERNS):
+ explicit_negative = max(explicit_negative, source_weight)
+ evidence_messages.append(followup.message_id)
+ if any(pattern in text for pattern in REPAIR_PATTERNS):
+ repair_loop = max(repair_loop, source_weight)
+ evidence_messages.append(followup.message_id)
+
+ uncanny_risk = rubric_scores.uncanny_risk.normalized_score if rubric_scores.available else 0.5
+ return FrictionSignals(
+ explicit_negative=round(clamp(explicit_negative), 4),
+ repair_loop=round(clamp(repair_loop), 4),
+ uncanny_risk=round(clamp(uncanny_risk), 4),
+ evidence_messages=sorted(set(evidence_messages)),
+ )
+
+
+def calculate_behavior_score(signals: BehaviorSignals) -> float:
+ """计算行为满意度分数。"""
+
+ return clamp(
+ 0.30 * signals.continue_2turns
+ + 0.25 * signals.next_user_sentiment
+ + 0.20 * signals.user_expansion
+ + 0.15 * signals.no_correction
+ + 0.10 * signals.no_abort
+ )
+
+
+def calculate_relational_score(rubric_scores: RubricScores) -> float:
+ """计算感知质量分数。"""
+
+ if not rubric_scores.available:
+ return 0.5
+ return clamp(
+ 0.35 * rubric_scores.social_presence.normalized_score
+ + 0.25 * rubric_scores.warmth.normalized_score
+ + 0.25 * rubric_scores.competence.normalized_score
+ + 0.15 * rubric_scores.appropriateness.normalized_score
+ )
+
+
+def calculate_friction_score(signals: FrictionSignals) -> float:
+ """计算摩擦惩罚分数。"""
+
+ return clamp(
+ 0.40 * signals.explicit_negative
+ + 0.30 * signals.repair_loop
+ + 0.30 * signals.uncanny_risk
+ )
+
+
+def calculate_asi_score(behavior_score: float, relational_score: float, friction_score: float) -> float:
+ """计算 0-100 的 ASI 总分,摩擦分越高扣分越多。"""
+
+ return round(
+ clamp(
+ 0.45 * behavior_score
+ + 0.35 * relational_score
+ + 0.20 * (1.0 - friction_score)
+ )
+ * 100,
+ 2,
+ )
+
+
+def has_explicit_negative_feedback(
+ followups: Iterable[FollowupMessageSnapshot],
+ *,
+ target_user_id: str = "",
+ allow_indirect: bool = False,
+) -> bool:
+ """判断是否出现可提前结算的明确负反馈。"""
+
+ for followup in followups:
+ if target_user_id and followup.user_id != target_user_id and not allow_indirect:
+ continue
+ if any(pattern in followup.plain_text for pattern in NEGATIVE_PATTERNS):
+ return True
+ return False
+
+
+def has_repair_loop(
+ followups: Iterable[FollowupMessageSnapshot],
+ *,
+ target_user_id: str = "",
+ allow_indirect: bool = False,
+) -> bool:
+ """判断是否出现修复循环。"""
+
+ repair_count = 0
+ for followup in followups:
+ if target_user_id and followup.user_id != target_user_id and not allow_indirect:
+ continue
+ if any(pattern in followup.plain_text for pattern in REPAIR_PATTERNS):
+ repair_count += 1
+ return repair_count >= 1
+
+
+def count_matches(text: str, patterns: Iterable[str]) -> int:
+ """统计模式命中次数。"""
+
+ return sum(1 for pattern in patterns if pattern and pattern in text)
+
+
+def estimate_sentiment(positive_count: int, negative_count: int, repair_count: int) -> float:
+ """用轻量规则估计后续消息情绪。"""
+
+ raw_score = 0.5 + 0.2 * positive_count - 0.25 * negative_count - 0.15 * repair_count
+ return round(clamp(raw_score), 4)
+
+
+def normalize_text_for_prompt(text: str, limit: int = 800) -> str:
+ """清理用于评分 prompt 的文本。"""
+
+ normalized_text = re.sub(r"\s+", " ", str(text or "")).strip()
+ if len(normalized_text) <= limit:
+ return normalized_text
+ return normalized_text[: limit - 1] + "…"
diff --git a/src/maisaka/reply_effect/storage.py b/src/maisaka/reply_effect/storage.py
new file mode 100644
index 00000000..7c296add
--- /dev/null
+++ b/src/maisaka/reply_effect/storage.py
@@ -0,0 +1,75 @@
+"""回复效果独立 JSON 存储。"""
+
+from pathlib import Path
+from typing import Dict
+
+import json
+import time
+
+from .models import ReplyEffectRecord
+from .path_utils import BASE_DIR, build_reply_effect_chat_dir, normalize_preview_name
+
+
+class ReplyEffectStorage:
+ """负责回复效果记录的独立 JSON 文件存储。"""
+
+ _MAX_RECORDS_PER_CHAT = 1024
+ _TRIM_COUNT = 100
+
+ def __init__(self, base_dir: Path | None = None) -> None:
+ self._base_dir = base_dir or BASE_DIR
+
+ def create_record_file(self, record: ReplyEffectRecord) -> Path:
+ """为新记录创建文件路径并写入初始 JSON。"""
+
+ chat_dir_name = normalize_preview_name(record.session.platform_type_id)
+ if chat_dir_name == "unknown":
+ chat_dir = build_reply_effect_chat_dir(record.session.session_id, self._base_dir).resolve()
+ else:
+ chat_dir = (self._base_dir / chat_dir_name).resolve()
+ chat_dir.mkdir(parents=True, exist_ok=True)
+ timestamp_ms = int(time.time() * 1000)
+ safe_effect_id = record.effect_id.replace("-", "")
+ file_path = chat_dir / f"{timestamp_ms}_{safe_effect_id}.json"
+ record.file_path = file_path
+ self.save_record(record)
+ self._trim_overflow(chat_dir)
+ return file_path
+
+ def save_record(self, record: ReplyEffectRecord) -> None:
+ """原子写入记录 JSON。"""
+
+ if record.file_path is None:
+ self.create_record_file(record)
+ return
+
+ file_path = record.file_path
+ file_path.parent.mkdir(parents=True, exist_ok=True)
+ temp_path = file_path.with_name(f".{file_path.name}.tmp")
+ temp_path.write_text(
+ json.dumps(record.to_json_dict(), ensure_ascii=False, indent=2, default=str),
+ encoding="utf-8",
+ )
+ temp_path.replace(file_path)
+
+ @staticmethod
+ def read_json(file_path: Path) -> Dict[str, object]:
+ """读取已保存的 JSON 文件。"""
+
+ return json.loads(file_path.read_text(encoding="utf-8"))
+
+ def _trim_overflow(self, chat_dir: Path) -> None:
+ """超过容量时删除最旧的回复效果记录。"""
+
+ files = [file_path for file_path in chat_dir.glob("*.json") if file_path.is_file()]
+ if len(files) <= self._MAX_RECORDS_PER_CHAT:
+ return
+
+ sorted_files = sorted(files, key=lambda file_path: file_path.stat().st_mtime)
+ overflow_count = len(files) - self._MAX_RECORDS_PER_CHAT
+ trim_count = min(len(sorted_files), max(self._TRIM_COUNT, overflow_count))
+ for old_file in sorted_files[:trim_count]:
+ try:
+ old_file.unlink()
+ except FileNotFoundError:
+ continue
diff --git a/src/maisaka/reply_effect/tracker.py b/src/maisaka/reply_effect/tracker.py
new file mode 100644
index 00000000..39895403
--- /dev/null
+++ b/src/maisaka/reply_effect/tracker.py
@@ -0,0 +1,267 @@
+"""会话级回复效果观察器。"""
+
+from __future__ import annotations
+
+from datetime import datetime
+from typing import Any, Dict, List
+
+import asyncio
+import time
+import uuid
+
+from src.chat.message_receive.message import SessionMessage
+from src.maisaka.history_utils import build_session_message_visible_text
+
+from .image_utils import extract_visual_attachments_from_sequence
+from .judge import JudgeRunner, judge_reply_effect
+from .models import (
+ FollowupMessageSnapshot,
+ ReplyEffectRecord,
+ ReplyEffectStatus,
+ ReplySnapshot,
+ SessionSnapshot,
+ UserSnapshot,
+ now_iso,
+)
+from .path_utils import build_reply_effect_chat_dir_name
+from .scoring import (
+ has_explicit_negative_feedback,
+ has_repair_loop,
+ score_reply_effect,
+)
+from .storage import ReplyEffectStorage
+
+TARGET_USER_FOLLOWUP_LIMIT = 2
+SESSION_FOLLOWUP_LIMIT = 5
+OBSERVATION_WINDOW_SECONDS = 600.0
+
+
+class ReplyEffectTracker:
+ """追踪单个 Maisaka 会话内 reply 工具回复后的用户反馈。"""
+
+ def __init__(
+ self,
+ *,
+ session_id: str,
+ session_name: str,
+ chat_stream: Any,
+ judge_runner: JudgeRunner | None = None,
+ storage: ReplyEffectStorage | None = None,
+ ) -> None:
+ self._session_id = session_id
+ self._session_name = session_name
+ self._chat_stream = chat_stream
+ self._judge_runner = judge_runner
+ self._storage = storage or ReplyEffectStorage()
+ self._pending_records: Dict[str, ReplyEffectRecord] = {}
+ self._timeout_tasks: Dict[str, asyncio.Task[None]] = {}
+
+ async def record_reply(
+ self,
+ *,
+ tool_call_id: str,
+ target_message: SessionMessage,
+ set_quote: bool,
+ reply_text: str,
+ reply_segments: List[str],
+ planner_reasoning: str,
+ reference_info: str,
+ reply_metadata: Dict[str, Any] | None = None,
+ context_snapshot: List[Dict[str, Any]] | None = None,
+ ) -> ReplyEffectRecord:
+ """登记一条已经成功发出的 reply 回复。"""
+
+ effect_id = str(uuid.uuid4())
+ target_user_info = target_message.message_info.user_info
+ record = ReplyEffectRecord(
+ effect_id=effect_id,
+ status=ReplyEffectStatus.PENDING,
+ created_at=now_iso(),
+ updated_at=now_iso(),
+ session=self._build_session_snapshot(),
+ reply=ReplySnapshot(
+ tool_call_id=tool_call_id,
+ target_message_id=target_message.message_id,
+ set_quote=set_quote,
+ reply_text=reply_text,
+ reply_segments=list(reply_segments),
+ planner_reasoning=planner_reasoning,
+ reference_info=reference_info,
+ reply_metadata=dict(reply_metadata or {}),
+ ),
+ target_user=UserSnapshot(
+ user_id=str(target_user_info.user_id or "").strip(),
+ nickname=str(target_user_info.user_nickname or "").strip(),
+ cardname=str(target_user_info.user_cardname or "").strip(),
+ ),
+ context_snapshot=list(context_snapshot or []),
+ )
+ self._storage.create_record_file(record)
+ self._pending_records[effect_id] = record
+ self._timeout_tasks[effect_id] = asyncio.create_task(self._finalize_after_timeout(effect_id))
+ return record
+
+ async def observe_user_message(self, message: SessionMessage) -> None:
+ """观察一条后续用户消息,并在满足规则时完成相关 pending 记录。"""
+
+ if not self._pending_records or message.session_id != self._session_id:
+ return
+
+ for effect_id, record in list(self._pending_records.items()):
+ if record.status != ReplyEffectStatus.PENDING:
+ continue
+ followup = self._build_followup_snapshot(message, record)
+ record.followup_messages.append(followup)
+ record.updated_at = now_iso()
+ self._storage.save_record(record)
+
+ reason = self._resolve_finalize_reason(record)
+ if reason:
+ await self.finalize(effect_id, reason)
+
+ async def finalize_all(self, reason: str = "runtime_stop") -> None:
+ """强制完成当前会话所有 pending 记录。"""
+
+ for effect_id in list(self._pending_records.keys()):
+ await self.finalize(effect_id, reason)
+
+ async def finalize(self, effect_id: str, reason: str) -> None:
+ """完成一条 pending 记录并写回 JSON。"""
+
+ record = self._pending_records.pop(effect_id, None)
+ if record is None or record.status == ReplyEffectStatus.FINALIZED:
+ return
+
+ timeout_task = self._timeout_tasks.pop(effect_id, None)
+ current_task = asyncio.current_task()
+ if timeout_task is not None and timeout_task is not current_task:
+ timeout_task.cancel()
+
+ rubric_scores, judge_error = await judge_reply_effect(record, self._judge_runner)
+ record.scores = score_reply_effect(
+ record.followup_messages,
+ rubric_scores,
+ target_user_id=record.target_user.user_id,
+ judge_error=judge_error,
+ )
+ record.status = ReplyEffectStatus.FINALIZED
+ record.finalized_at = now_iso()
+ record.updated_at = record.finalized_at
+ record.finalize_reason = reason
+ record.confidence_note = self._build_confidence_note(record)
+ record.followup_summary = self._build_followup_summary(record)
+ self._storage.save_record(record)
+
+ def _build_session_snapshot(self) -> SessionSnapshot:
+ platform = str(getattr(self._chat_stream, "platform", "") or "").strip()
+ group_id = str(getattr(self._chat_stream, "group_id", "") or "").strip()
+ user_id = str(getattr(self._chat_stream, "user_id", "") or "").strip()
+ is_group_session = bool(getattr(self._chat_stream, "is_group_session", False))
+ return SessionSnapshot(
+ session_id=self._session_id,
+ platform_type_id=build_reply_effect_chat_dir_name(self._session_id),
+ platform=platform,
+ chat_type="group" if is_group_session else "private",
+ group_id=group_id,
+ user_id=user_id,
+ session_name=self._session_name,
+ )
+
+ def _build_followup_snapshot(
+ self,
+ message: SessionMessage,
+ record: ReplyEffectRecord,
+ ) -> FollowupMessageSnapshot:
+ user_info = message.message_info.user_info
+ plain_text = str(message.processed_plain_text or "").strip()
+ try:
+ visible_text = build_session_message_visible_text(message)
+ except Exception:
+ visible_text = plain_text
+ latency_seconds = max(0.0, time.time() - _parse_iso_timestamp(record.created_at))
+ user_id = str(user_info.user_id or "").strip()
+ return FollowupMessageSnapshot(
+ message_id=str(message.message_id or "").strip(),
+ timestamp=_message_timestamp_to_iso(message),
+ user_id=user_id,
+ nickname=str(user_info.user_nickname or "").strip(),
+ cardname=str(user_info.user_cardname or "").strip(),
+ visible_text=visible_text,
+ plain_text=plain_text,
+ latency_seconds=round(latency_seconds, 3),
+ is_target_user=bool(record.target_user.user_id and user_id == record.target_user.user_id),
+ attachments=extract_visual_attachments_from_sequence(message.raw_message),
+ )
+
+ def _resolve_finalize_reason(self, record: ReplyEffectRecord) -> str:
+ target_user_id = record.target_user.user_id
+ target_followups = [
+ followup
+ for followup in record.followup_messages
+ if target_user_id and followup.user_id == target_user_id
+ ]
+ has_target_feedback = bool(target_followups)
+ if has_explicit_negative_feedback(target_followups, target_user_id=target_user_id, allow_indirect=False):
+ return "explicit_negative"
+ if has_repair_loop(target_followups, target_user_id=target_user_id, allow_indirect=False):
+ return "repair_loop"
+ if len(target_followups) >= TARGET_USER_FOLLOWUP_LIMIT:
+ return "target_user_followups"
+
+ if not target_user_id or not has_target_feedback:
+ allow_indirect = not target_user_id
+ if has_explicit_negative_feedback(
+ record.followup_messages,
+ target_user_id=target_user_id,
+ allow_indirect=allow_indirect,
+ ):
+ return "explicit_negative"
+ if has_repair_loop(
+ record.followup_messages,
+ target_user_id=target_user_id,
+ allow_indirect=allow_indirect,
+ ):
+ return "repair_loop"
+ if len(record.followup_messages) >= SESSION_FOLLOWUP_LIMIT:
+ return "session_followups_limit"
+
+ return ""
+
+ async def _finalize_after_timeout(self, effect_id: str) -> None:
+ try:
+ await asyncio.sleep(OBSERVATION_WINDOW_SECONDS)
+ await self.finalize(effect_id, "window_timeout")
+ except asyncio.CancelledError:
+ return
+
+ @staticmethod
+ def _build_confidence_note(record: ReplyEffectRecord) -> str:
+ if not record.followup_messages:
+ return "没有观察到后续用户消息,行为分使用保守中性信号。"
+ if any(followup.is_target_user for followup in record.followup_messages):
+ return "行为反馈包含回复对象本人的后续发言。"
+ return "行为反馈来自同会话其他用户,不是回复对象本人,置信度较低。"
+
+ @staticmethod
+ def _build_followup_summary(record: ReplyEffectRecord) -> Dict[str, Any]:
+ target_count = sum(1 for followup in record.followup_messages if followup.is_target_user)
+ return {
+ "total_count": len(record.followup_messages),
+ "target_user_count": target_count,
+ "other_user_count": len(record.followup_messages) - target_count,
+ "target_user_id": record.target_user.user_id,
+ }
+
+
+def _message_timestamp_to_iso(message: SessionMessage) -> str:
+ timestamp = getattr(message, "timestamp", None)
+ if isinstance(timestamp, datetime):
+ return timestamp.astimezone().isoformat(timespec="seconds")
+ return now_iso()
+
+
+def _parse_iso_timestamp(value: str) -> float:
+ try:
+ return datetime.fromisoformat(value).timestamp()
+ except ValueError:
+ return time.time()
diff --git a/src/maisaka/runtime.py b/src/maisaka/runtime.py
index 37c79180..c91b972c 100644
--- a/src/maisaka/runtime.py
+++ b/src/maisaka/runtime.py
@@ -1,6 +1,7 @@
"""Maisaka 非 CLI 运行时。"""
from collections import deque
+from datetime import datetime
from math import ceil
from typing import Any, Literal, Optional, Sequence
@@ -33,11 +34,13 @@ from src.plugin_runtime.tool_provider import PluginToolProvider
from src.plugin_runtime.hook_payloads import deserialize_prompt_messages
from .chat_loop_service import ChatResponse, MaisakaChatLoopService
-from .context_messages import LLMContextMessage
+from .context_messages import LLMContextMessage, ReferenceMessage, ReferenceMessageType
from .display.display_utils import build_tool_call_summary_lines, format_token_count
from .display.prompt_cli_renderer import PromptCLIVisualizer
from .display.stage_status_board import remove_stage_status, update_stage_status
from .reasoning_engine import MaisakaReasoningEngine
+from .reply_effect import ReplyEffectTracker
+from .reply_effect.image_utils import extract_visual_attachments_from_sequence
from .tool_provider import MaisakaBuiltinToolProvider
logger = get_logger("maisaka_runtime")
@@ -120,8 +123,20 @@ class MaisakaHeartFlowChatting:
self._reasoning_engine = MaisakaReasoningEngine(self)
self._tool_registry = ToolRegistry()
+ self._reply_effect_tracker = ReplyEffectTracker(
+ session_id=self.session_id,
+ session_name=self.session_name,
+ chat_stream=self.chat_stream,
+ judge_runner=self._run_reply_effect_judge,
+ )
self._register_tool_providers()
+ @staticmethod
+ def _is_reply_effect_tracking_enabled() -> bool:
+ """判断是否启用回复效果评分追踪。"""
+
+ return bool(global_config.debug.enable_reply_effect_tracking)
+
def _update_stage_status(self, stage: str, detail: str = "", *, round_text: str = "") -> None:
"""更新当前会话的阶段状态。"""
@@ -171,6 +186,8 @@ class MaisakaHeartFlowChatting:
finally:
self._internal_loop_task = None
+ if self._is_reply_effect_tracking_enabled():
+ await self._reply_effect_tracker.finalize_all("runtime_stop")
await self._tool_registry.close()
self._mcp_manager = None
self._mcp_host_bridge = None
@@ -230,6 +247,8 @@ class MaisakaHeartFlowChatting:
self.message_cache.append(message)
self._message_received_at_by_id[message.message_id] = received_at
self._source_messages_by_id[message.message_id] = message
+ if self._is_reply_effect_tracking_enabled():
+ asyncio.create_task(self._reply_effect_tracker.observe_user_message(message))
if self._agent_state == self._STATE_RUNNING:
self._message_debounce_required = True
if self._agent_state == self._STATE_RUNNING and self._planner_interrupt_flag is not None:
@@ -266,6 +285,79 @@ class MaisakaHeartFlowChatting:
talk_value = max(0.01, float(ChatConfigUtils.get_talk_value(self.session_id)))
return max(0.01, talk_value * self._talk_frequency_adjust)
+ async def track_reply_effect(
+ self,
+ *,
+ tool_call_id: str,
+ target_message: SessionMessage,
+ set_quote: bool,
+ reply_text: str,
+ reply_segments: list[str],
+ planner_reasoning: str,
+ reference_info: str,
+ reply_metadata: Optional[dict[str, Any]] = None,
+ replyer_context_messages: Optional[Sequence[LLMContextMessage]] = None,
+ ) -> None:
+ """登记一次已成功发送的 reply 工具回复,供后续用户反馈评分。"""
+
+ if not self._is_reply_effect_tracking_enabled():
+ return
+
+ try:
+ context_snapshot = self._build_reply_effect_context_snapshot(
+ context_messages=replyer_context_messages,
+ exclude_reply_segments=reply_segments if replyer_context_messages is None else None,
+ )
+ enriched_reply_metadata = dict(reply_metadata or {})
+ enriched_reply_metadata["replyer_context_count"] = (
+ len(replyer_context_messages) if replyer_context_messages is not None else len(self._chat_history)
+ )
+ enriched_reply_metadata["recorded_context_count"] = len(context_snapshot)
+ await self._reply_effect_tracker.record_reply(
+ tool_call_id=tool_call_id,
+ target_message=target_message,
+ set_quote=set_quote,
+ reply_text=reply_text,
+ reply_segments=reply_segments,
+ planner_reasoning=planner_reasoning,
+ reference_info=reference_info,
+ reply_metadata=enriched_reply_metadata,
+ context_snapshot=context_snapshot,
+ )
+ except Exception as exc:
+ logger.warning(f"{self.log_prefix} 创建回复效果观察记录失败: {exc}")
+
+ def _build_reply_effect_context_snapshot(
+ self,
+ *,
+ context_messages: Optional[Sequence[LLMContextMessage]] = None,
+ exclude_reply_segments: Optional[Sequence[str]] = None,
+ ) -> list[dict[str, Any]]:
+ """构建回复效果观察使用的上下文快照。
+
+ 优先记录 replyer 当次生成时实际收到的完整上下文列表;只有旧调用未传入时才回退到当前运行时历史。
+ """
+
+ source_messages = list(context_messages) if context_messages is not None else list(self._chat_history)
+ snapshot: list[dict[str, Any]] = []
+ excluded_segments = [segment.strip() for segment in (exclude_reply_segments or []) if segment.strip()]
+ for message in source_messages:
+ text = str(message.processed_plain_text or "").strip()
+ if not text:
+ continue
+ if message.source == "guided_reply" and any(segment in text for segment in excluded_segments):
+ continue
+ snapshot.append(
+ {
+ "source": message.source,
+ "role": message.role,
+ "timestamp": message.timestamp.isoformat(timespec="seconds"),
+ "text": text,
+ "attachments": extract_visual_attachments_from_sequence(getattr(message, "raw_message", None)),
+ }
+ )
+ return snapshot
+
def _get_message_trigger_threshold(self) -> int:
"""根据回复频率折算出触发一轮循环所需的消息数。"""
effective_frequency = min(1.0, self._get_effective_reply_frequency())
@@ -496,6 +588,27 @@ class MaisakaHeartFlowChatting:
tool_definitions=[] if tool_definitions is None else tool_definitions,
)
+ async def _run_reply_effect_judge(self, prompt: str) -> str:
+ """运行回复效果观察器使用的临时 LLM 评审。"""
+
+ judge_message = ReferenceMessage(
+ content=prompt,
+ timestamp=datetime.now(),
+ reference_type=ReferenceMessageType.TOOL_HINT,
+ remaining_uses_value=1,
+ display_prefix="[回复效果评分任务]",
+ )
+ response = await self.run_sub_agent(
+ context_message_limit=1,
+ system_prompt="你是回复效果评分器。请严格按用户给出的 JSON 格式输出,不要输出 JSON 之外的内容。",
+ request_kind="reply_effect_judge",
+ extra_messages=[judge_message],
+ max_tokens=900,
+ temperature=0.1,
+ tool_definitions=[],
+ )
+ return (response.content or "").strip()
+
def set_current_action_tool_names(self, tool_names: Sequence[str]) -> None:
"""记录当前 Action Loop 已实际暴露给 planner 的工具名集合。"""