")
+ nested_diff = _find_first_structural_diff(previous_value[index], current_value[index], index_path)
+ if nested_diff is not None:
+ return nested_diff
+ return None
+
+ if previous_value == current_value:
+ return None
+
+ if isinstance(previous_value, str) and isinstance(current_value, str):
+ diff_index = _longest_common_prefix_length(previous_value, current_value)
+ return _DynamicDiff(
+ f"{path}@char{diff_index}",
+ _summarize_value(previous_value[diff_index:]),
+ _summarize_value(current_value[diff_index:]),
+ )
+
+ return _DynamicDiff(path, _summarize_value(previous_value), _summarize_value(current_value))
+
+
+def _diagnose_dynamic_diff(previous_prompt_text: str | None, current_prompt_text: str | None) -> _DynamicDiff:
+ if not current_prompt_text:
+ return _DynamicDiff("prompt_text.unavailable", "", "")
+ if not previous_prompt_text:
+ return _DynamicDiff("cache_pool.empty", "", _summarize_value(current_prompt_text))
+
+ try:
+ previous_payload = json.loads(previous_prompt_text)
+ current_payload = json.loads(current_prompt_text)
+ except json.JSONDecodeError:
+ diff_index = _longest_common_prefix_length(previous_prompt_text, current_prompt_text)
+ return _DynamicDiff(
+ f"raw_prompt@char{diff_index}",
+ _summarize_value(previous_prompt_text[diff_index:]),
+ _summarize_value(current_prompt_text[diff_index:]),
+ )
+
+ diff = _find_first_structural_diff(previous_payload, current_payload)
+ if diff is None:
+ return _DynamicDiff("identical", "", "")
+ return diff
+
+
+def _load_prompt_payload(prompt_text: str | None) -> dict[str, Any] | None:
+ if not prompt_text:
+ return None
+ try:
+ payload = json.loads(prompt_text)
+ except json.JSONDecodeError:
+ return None
+ return payload if isinstance(payload, dict) else None
+
+
+def _extract_prompt_messages(prompt_text: str | None) -> list[dict[str, Any]]:
+ payload = _load_prompt_payload(prompt_text)
+ if payload is None:
+ return []
+ messages = payload.get("messages")
+ return [message for message in messages if isinstance(message, dict)] if isinstance(messages, list) else []
+
+
+def _message_fingerprints(messages: list[dict[str, Any]]) -> list[str]:
+ return [json.dumps(message, ensure_ascii=False, sort_keys=True, default=str) for message in messages]
+
+
+def _count_common_prefix_items(left_items: list[str], right_items: list[str]) -> int:
+ common_count = 0
+ for left_item, right_item in zip(left_items, right_items, strict=False):
+ if left_item != right_item:
+ break
+ common_count += 1
+ return common_count
+
+
+def _count_common_suffix_items(left_items: list[str], right_items: list[str]) -> int:
+ common_count = 0
+ max_count = min(len(left_items), len(right_items))
+ while common_count < max_count and left_items[-common_count - 1] == right_items[-common_count - 1]:
+ common_count += 1
+ return common_count
+
+
+def _find_longest_message_alignment(previous_items: list[str], current_items: list[str]) -> tuple[int, int, int]:
+ best_overlap = 0
+ best_previous_start = 0
+ best_current_start = 0
+ for previous_start in range(len(previous_items)):
+ for current_start in range(len(current_items)):
+ overlap = 0
+ while (
+ previous_start + overlap < len(previous_items)
+ and current_start + overlap < len(current_items)
+ and previous_items[previous_start + overlap] == current_items[current_start + overlap]
+ ):
+ overlap += 1
+ if overlap > best_overlap:
+ best_overlap = overlap
+ best_previous_start = previous_start
+ best_current_start = current_start
+ return best_overlap, best_previous_start, best_current_start
+
+
+def _get_message_role(messages: list[dict[str, Any]], index: int) -> str:
+ if not messages:
+ return ""
+ try:
+ value = messages[index].get("role", "")
+ except IndexError:
+ return ""
+ return str(value or "")
+
+
+def _diagnose_prompt_cache_details(
+ *,
+ previous_prompt_text: str | None,
+ current_prompt_text: str | None,
+ common_prefix_chars: int,
+) -> _PromptCacheDiagnostics:
+ current_messages = _extract_prompt_messages(current_prompt_text)
+ previous_messages = _extract_prompt_messages(previous_prompt_text)
+ current_items = _message_fingerprints(current_messages)
+ previous_items = _message_fingerprints(previous_messages)
+ current_prompt_length = len(current_prompt_text or "")
+ previous_prompt_length = len(previous_prompt_text or "")
+ common_prefix_rate = common_prefix_chars / current_prompt_length * 100 if current_prompt_length > 0 else 0.0
+
+ common_prefix_messages = _count_common_prefix_items(previous_items, current_items)
+ common_suffix_messages = _count_common_suffix_items(previous_items, current_items)
+ aligned_overlap, aligned_previous_start, aligned_current_start = _find_longest_message_alignment(
+ previous_items,
+ current_items,
+ )
+ suspected_context_sliding = (
+ aligned_previous_start > aligned_current_start
+ and aligned_overlap > common_prefix_messages
+ )
+ sliding_dropped_head_messages = aligned_previous_start - aligned_current_start if suspected_context_sliding else 0
+
+ return _PromptCacheDiagnostics(
+ current_message_count=len(current_messages),
+ best_match_message_count=len(previous_messages),
+ common_prefix_messages=common_prefix_messages,
+ common_suffix_messages=common_suffix_messages,
+ common_prefix_rate=common_prefix_rate,
+ prompt_growth_chars=current_prompt_length - previous_prompt_length,
+ longest_aligned_message_overlap=aligned_overlap,
+ aligned_previous_start_index=aligned_previous_start,
+ aligned_current_start_index=aligned_current_start,
+ suspected_context_sliding=suspected_context_sliding,
+ sliding_dropped_head_messages=sliding_dropped_head_messages,
+ sliding_aligned_messages=aligned_overlap if suspected_context_sliding else 0,
+ sliding_new_tail_messages=(
+ max(len(current_messages) - aligned_current_start - aligned_overlap, 0)
+ if suspected_context_sliding
+ else 0
+ ),
+ current_first_message_role=_get_message_role(current_messages, 0),
+ best_first_message_role=_get_message_role(previous_messages, 0),
+ current_last_message_role=_get_message_role(current_messages, -1),
+ best_last_message_role=_get_message_role(previous_messages, -1),
+ )
+
+
+def _get_usage_log_path(now: datetime) -> Path:
+ return CACHE_STATS_DIR / f"usage_{now:%Y%m%d}.jsonl"
+
+
+def _get_report_path() -> Path:
+ return CACHE_STATS_DIR / REPORT_FILE_NAME
+
+
+def _get_session_report_path() -> Path:
+ return CACHE_STATS_DIR / SESSION_REPORT_FILE_NAME
+
+
+def _iter_usage_log_paths() -> list[Path]:
+ if not CACHE_STATS_DIR.exists():
+ return []
+ return sorted(CACHE_STATS_DIR.glob("usage_*.jsonl"))
+
+
+def _read_usage_events() -> list[dict[str, Any]]:
+ events: list[dict[str, Any]] = []
+ for file_path in _iter_usage_log_paths():
+ try:
+ lines = file_path.read_text(encoding="utf-8").splitlines()
+ except OSError:
+ continue
+ for line in lines:
+ if not line.strip():
+ continue
+ try:
+ event = json.loads(line)
+ except json.JSONDecodeError:
+ continue
+ if isinstance(event, dict):
+ events.append(event)
+ return events
+
+
+def _write_json_line(file_path: Path, payload: Dict[str, int | str | float | bool]) -> None:
+ CACHE_STATS_DIR.mkdir(parents=True, exist_ok=True)
+ with file_path.open("a", encoding="utf-8") as file:
+ file.write(json.dumps(payload, ensure_ascii=False) + "\n")
+
+
+def _format_int(value: int | str | float) -> str:
+ return f"{int(value):,}"
+
+
+def _format_rate(value: int | str | float) -> str:
+ return f"{float(value):.2f}%"
+
+
+def _calculate_rate(hit_tokens: int, miss_tokens: int) -> float:
+ total_tokens = hit_tokens + miss_tokens
+ return hit_tokens / total_tokens * 100 if total_tokens > 0 else 0.0
+
+
+def _normal_cdf(value: float) -> float:
+ return 0.5 * (1.0 + erf(value / sqrt(2.0)))
+
+
+def _confidence_from_z_score(z_score: float) -> float:
+ p_value = 2.0 * (1.0 - _normal_cdf(abs(z_score)))
+ return max(0.0, min(100.0, (1.0 - p_value) * 100.0))
+
+
+def _format_significance_label(confidence: float, *, min_confidence: float = 95.0) -> str:
+ return "显著" if confidence >= min_confidence else "不显著"
+
+
+def _calculate_two_proportion_confidence(
+ *,
+ current_hit: int,
+ current_total: int,
+ baseline_hit: int,
+ baseline_total: int,
+) -> float:
+ if current_total <= 0 or baseline_total <= 0:
+ return 0.0
+ current_rate = current_hit / current_total
+ baseline_rate = baseline_hit / baseline_total
+ pooled_rate = (current_hit + baseline_hit) / (current_total + baseline_total)
+ standard_error = sqrt(pooled_rate * (1.0 - pooled_rate) * (1.0 / current_total + 1.0 / baseline_total))
+ if standard_error <= 0:
+ return 0.0
+ return _confidence_from_z_score((current_rate - baseline_rate) / standard_error)
+
+
+def _calculate_sample_variance(*, value_total: float, square_total: float, count: int) -> float:
+ if count <= 1:
+ return 0.0
+ return max((square_total - (value_total * value_total / count)) / (count - 1), 0.0)
+
+
+def _calculate_mean_difference_confidence(
+ *,
+ current_mean: float,
+ current_variance: float,
+ current_count: int,
+ baseline_mean: float,
+ baseline_variance: float,
+ baseline_count: int,
+) -> float:
+ if current_count <= 1 or baseline_count <= 1:
+ return 0.0
+ standard_error = sqrt(current_variance / current_count + baseline_variance / baseline_count)
+ if standard_error <= 0:
+ return 0.0
+ return _confidence_from_z_score((current_mean - baseline_mean) / standard_error)
+
+
+def _normalize_event_run_id(event: dict[str, Any]) -> str:
+ run_id = str(event.get("run_id") or "").strip()
+ return run_id or "legacy"
+
+
+def _aggregate_usage_events_by_run(events: list[dict[str, Any]]) -> list[dict[str, int | str | float]]:
+ grouped: dict[str, dict[str, int | str | float]] = {}
+ for event in events:
+ run_id = _normalize_event_run_id(event)
+ item = grouped.setdefault(
+ run_id,
+ {
+ "run_id": run_id,
+ "process_started_at": str(event.get("process_started_at") or ""),
+ "first_seen_at": str(event.get("created_at") or ""),
+ "last_seen_at": str(event.get("created_at") or ""),
+ "calls": 0,
+ "prompt_tokens": 0,
+ "prompt_cache_hit_tokens": 0,
+ "prompt_cache_miss_tokens": 0,
+ "theoretical_prompt_cache_hit_tokens": 0,
+ "theoretical_prompt_cache_miss_tokens": 0,
+ "common_prefix_rate_total": 0.0,
+ "common_prefix_rate_square_total": 0.0,
+ "suspected_context_sliding_calls": 0,
+ },
+ )
+ created_at = str(event.get("created_at") or "")
+ if created_at:
+ if not item["first_seen_at"] or created_at < str(item["first_seen_at"]):
+ item["first_seen_at"] = created_at
+ if created_at > str(item["last_seen_at"]):
+ item["last_seen_at"] = created_at
+ item["calls"] = int(item["calls"]) + 1
+ item["prompt_tokens"] = int(item["prompt_tokens"]) + int(event.get("prompt_tokens") or 0)
+ item["prompt_cache_hit_tokens"] = int(item["prompt_cache_hit_tokens"]) + int(
+ event.get("prompt_cache_hit_tokens") or 0
+ )
+ item["prompt_cache_miss_tokens"] = int(item["prompt_cache_miss_tokens"]) + int(
+ event.get("prompt_cache_miss_tokens") or 0
+ )
+ item["theoretical_prompt_cache_hit_tokens"] = int(item["theoretical_prompt_cache_hit_tokens"]) + int(
+ event.get("theoretical_prompt_cache_hit_tokens") or 0
+ )
+ item["theoretical_prompt_cache_miss_tokens"] = int(item["theoretical_prompt_cache_miss_tokens"]) + int(
+ event.get("theoretical_prompt_cache_miss_tokens") or 0
+ )
+ item["common_prefix_rate_total"] = float(item["common_prefix_rate_total"]) + float(
+ event.get("theoretical_common_prefix_rate") or 0.0
+ )
+ if bool(event.get("suspected_context_sliding", False)):
+ item["suspected_context_sliding_calls"] = int(item["suspected_context_sliding_calls"]) + 1
+
+ result: list[dict[str, int | str | float]] = []
+ for item in grouped.values():
+ calls = int(item["calls"])
+ hit_tokens = int(item["prompt_cache_hit_tokens"])
+ miss_tokens = int(item["prompt_cache_miss_tokens"])
+ theoretical_hit_tokens = int(item["theoretical_prompt_cache_hit_tokens"])
+ theoretical_miss_tokens = int(item["theoretical_prompt_cache_miss_tokens"])
+ item["prompt_cache_hit_rate"] = round(_calculate_rate(hit_tokens, miss_tokens), 2)
+ item["theoretical_prompt_cache_hit_rate"] = round(
+ _calculate_rate(theoretical_hit_tokens, theoretical_miss_tokens),
+ 2,
+ )
+ item["avg_common_prefix_rate"] = round(float(item["common_prefix_rate_total"]) / calls, 2) if calls else 0.0
+ result.append(item)
+
+ return sorted(result, key=lambda item: str(item["first_seen_at"]))
+
+
+def _get_previous_run_id(run_stats: list[dict[str, int | str | float]], current_run_id: str) -> str:
+ run_ids = [str(item["run_id"]) for item in run_stats]
+ if current_run_id not in run_ids:
+ return ""
+ current_index = run_ids.index(current_run_id)
+ if current_index <= 0:
+ return ""
+ return run_ids[current_index - 1]
+
+
+def _aggregate_usage_events_by_call_site(
+ events: list[dict[str, Any]],
+ *,
+ run_id: str,
+ include_session: bool = True,
+) -> dict[tuple[str, ...], dict[str, int | str | float]]:
+ grouped: dict[tuple[str, ...], dict[str, int | str | float]] = {}
+ for event in events:
+ if _normalize_event_run_id(event) != run_id:
+ continue
+ base_key = (
+ str(event.get("task_name") or ""),
+ str(event.get("request_type") or ""),
+ str(event.get("model_name") or ""),
+ )
+ key = (
+ *base_key,
+ _normalize_session_id(str(event.get("session_id") or "")),
+ ) if include_session else base_key
+ item = grouped.setdefault(
+ key,
+ {
+ "task_name": key[0],
+ "request_type": key[1],
+ "model_name": key[2],
+ "session_id": key[3] if include_session else "",
+ "calls": 0,
+ "prompt_cache_hit_tokens": 0,
+ "prompt_cache_miss_tokens": 0,
+ "theoretical_prompt_cache_hit_tokens": 0,
+ "theoretical_prompt_cache_miss_tokens": 0,
+ "common_prefix_rate_total": 0.0,
+ "common_prefix_rate_square_total": 0.0,
+ "suspected_context_sliding_calls": 0,
+ },
+ )
+ item["calls"] = int(item["calls"]) + 1
+ item["prompt_cache_hit_tokens"] = int(item["prompt_cache_hit_tokens"]) + int(
+ event.get("prompt_cache_hit_tokens") or 0
+ )
+ item["prompt_cache_miss_tokens"] = int(item["prompt_cache_miss_tokens"]) + int(
+ event.get("prompt_cache_miss_tokens") or 0
+ )
+ item["theoretical_prompt_cache_hit_tokens"] = int(item["theoretical_prompt_cache_hit_tokens"]) + int(
+ event.get("theoretical_prompt_cache_hit_tokens") or 0
+ )
+ item["theoretical_prompt_cache_miss_tokens"] = int(item["theoretical_prompt_cache_miss_tokens"]) + int(
+ event.get("theoretical_prompt_cache_miss_tokens") or 0
+ )
+ prefix_rate = float(event.get("theoretical_common_prefix_rate") or 0.0)
+ item["common_prefix_rate_total"] = float(item["common_prefix_rate_total"]) + prefix_rate
+ item["common_prefix_rate_square_total"] = float(item["common_prefix_rate_square_total"]) + prefix_rate * prefix_rate
+ if bool(event.get("suspected_context_sliding", False)):
+ item["suspected_context_sliding_calls"] = int(item["suspected_context_sliding_calls"]) + 1
+
+ for item in grouped.values():
+ calls = int(item["calls"])
+ prefix_total = float(item["common_prefix_rate_total"])
+ prefix_square_total = float(item["common_prefix_rate_square_total"])
+ item["prompt_cache_hit_rate"] = round(
+ _calculate_rate(int(item["prompt_cache_hit_tokens"]), int(item["prompt_cache_miss_tokens"])),
+ 2,
+ )
+ item["theoretical_prompt_cache_hit_rate"] = round(
+ _calculate_rate(
+ int(item["theoretical_prompt_cache_hit_tokens"]),
+ int(item["theoretical_prompt_cache_miss_tokens"]),
+ ),
+ 2,
+ )
+ item["avg_common_prefix_rate"] = round(prefix_total / calls, 2) if calls else 0.0
+ item["common_prefix_rate_variance"] = round(
+ _calculate_sample_variance(
+ value_total=prefix_total,
+ square_total=prefix_square_total,
+ count=calls,
+ ),
+ 4,
+ )
+ return grouped
+
+
+def _render_run_rows(run_stats: list[dict[str, int | str | float]], current_run_id: str) -> str:
+ rows: list[str] = []
+ for item in reversed(run_stats[-12:]):
+ current_marker = "当前" if str(item["run_id"]) == current_run_id else ""
+ rows.append(
+ ""
+ f"{escape(current_marker)} "
+ f"{escape(str(item['run_id']))} "
+ f"{escape(str(item['process_started_at']))} "
+ f"{escape(str(item['first_seen_at']))} "
+ f"{escape(str(item['last_seen_at']))} "
+ f"{_format_int(item['calls'])} "
+ f"{_format_int(item['prompt_tokens'])} "
+ f"{_format_rate(item['prompt_cache_hit_rate'])} "
+ f"{_format_rate(item['theoretical_prompt_cache_hit_rate'])} "
+ f"{_format_rate(item['avg_common_prefix_rate'])} "
+ f"{_format_int(item['suspected_context_sliding_calls'])} "
+ " "
+ )
+ return "\n".join(rows)
+
+
+def _render_run_comparison_rows(
+ *,
+ current_by_call_site: dict[tuple[str, ...], dict[str, int | str | float]],
+ previous_by_call_site: dict[tuple[str, ...], dict[str, int | str | float]],
+ include_session: bool,
+) -> str:
+ rows: list[str] = []
+ keys = sorted(set(current_by_call_site) | set(previous_by_call_site))
+ for key in keys:
+ current_item = current_by_call_site.get(key, {})
+ previous_item = previous_by_call_site.get(key, {})
+ current_api = float(current_item.get("prompt_cache_hit_rate") or 0.0)
+ previous_api = float(previous_item.get("prompt_cache_hit_rate") or 0.0)
+ current_theory = float(current_item.get("theoretical_prompt_cache_hit_rate") or 0.0)
+ previous_theory = float(previous_item.get("theoretical_prompt_cache_hit_rate") or 0.0)
+ current_prefix = float(current_item.get("avg_common_prefix_rate") or 0.0)
+ previous_prefix = float(previous_item.get("avg_common_prefix_rate") or 0.0)
+ rows.append(
+ ""
+ f"{escape(key[0])} "
+ f"{escape(key[1])} "
+ f"{escape(key[2])} "
+ + (f"{escape(key[3])} " if include_session and len(key) > 3 else "")
+ +
+ f"{_format_int(current_item.get('calls', 0))} "
+ f"{_format_int(previous_item.get('calls', 0))} "
+ f"{_format_rate(current_api)} "
+ f"{_format_rate(previous_api)} "
+ f"{_format_rate(current_api - previous_api)} "
+ f"{_format_rate(current_theory)} "
+ f"{_format_rate(previous_theory)} "
+ f"{_format_rate(current_theory - previous_theory)} "
+ f"{_format_rate(current_prefix)} "
+ f"{_format_rate(previous_prefix)} "
+ f"{_format_rate(current_prefix - previous_prefix)} "
+ f"{_format_int(current_item.get('suspected_context_sliding_calls', 0))} "
+ f"{_format_int(previous_item.get('suspected_context_sliding_calls', 0))} "
+ " "
+ )
+ return "\n".join(rows)
+
+
+def _format_run_time_label(run_stat: dict[str, int | str | float] | None) -> str:
+ if not run_stat:
+ return ""
+ first_seen_at = str(run_stat.get("first_seen_at") or "").strip()
+ last_seen_at = str(run_stat.get("last_seen_at") or "").strip()
+ process_started_at = str(run_stat.get("process_started_at") or "").strip()
+ if first_seen_at and last_seen_at and first_seen_at != last_seen_at:
+ return f"{first_seen_at} -> {last_seen_at}"
+ if first_seen_at:
+ return first_seen_at
+ return process_started_at
+
+
+def _get_previous_run_stats(
+ run_stats: list[dict[str, int | str | float]],
+ current_run_id: str,
+) -> list[dict[str, int | str | float]]:
+ return [
+ item
+ for item in run_stats
+ if str(item["run_id"]) != current_run_id
+ ]
+
+
+def _render_run_significance_controls(
+ run_stats: list[dict[str, int | str | float]],
+ current_run_id: str,
+) -> str:
+ previous_run_stats = _get_previous_run_stats(run_stats, current_run_id)
+ if not previous_run_stats:
+ return (
+ ""
+ "No previous runs to compare. "
+ "
"
+ )
+
+ option_payload = [
+ {
+ "run_id": str(item["run_id"]),
+ "time_label": _format_run_time_label(item),
+ "calls": int(item.get("calls") or 0),
+ }
+ for item in previous_run_stats
+ ]
+ option_json = escape(json.dumps(option_payload, ensure_ascii=False), quote=True)
+ max_index = len(previous_run_stats) - 1
+ return (
+ ""
+ "
< "
+ "
"
+ "
> "
+ "
"
+ "
"
+ )
+
+
+def _render_run_significance_script() -> str:
+ return """
+
+"""
+
+
+def _build_run_significance_rows(
+ *,
+ usage_events: list[dict[str, Any]],
+ run_stats: list[dict[str, int | str | float]],
+ current_run_id: str,
+ include_session: bool,
+) -> str:
+ current_by_call_site = _aggregate_usage_events_by_call_site(
+ usage_events,
+ run_id=current_run_id,
+ include_session=include_session,
+ )
+ rows: list[str] = []
+ previous_run_stats = _get_previous_run_stats(run_stats, current_run_id)
+ for previous_run_stat in previous_run_stats:
+ previous_run_id = str(previous_run_stat["run_id"])
+ baseline_time = _format_run_time_label(previous_run_stat)
+ previous_by_call_site = _aggregate_usage_events_by_call_site(
+ usage_events,
+ run_id=previous_run_id,
+ include_session=include_session,
+ )
+ keys = sorted(set(current_by_call_site) & set(previous_by_call_site))
+ for key in keys:
+ current_item = current_by_call_site[key]
+ previous_item = previous_by_call_site[key]
+ current_hit = int(current_item.get("prompt_cache_hit_tokens") or 0)
+ current_miss = int(current_item.get("prompt_cache_miss_tokens") or 0)
+ previous_hit = int(previous_item.get("prompt_cache_hit_tokens") or 0)
+ previous_miss = int(previous_item.get("prompt_cache_miss_tokens") or 0)
+ current_total = current_hit + current_miss
+ previous_total = previous_hit + previous_miss
+ current_api = _calculate_rate(current_hit, current_miss)
+ previous_api = _calculate_rate(previous_hit, previous_miss)
+ api_confidence = _calculate_two_proportion_confidence(
+ current_hit=current_hit,
+ current_total=current_total,
+ baseline_hit=previous_hit,
+ baseline_total=previous_total,
+ )
+ current_calls = int(current_item.get("calls") or 0)
+ previous_calls = int(previous_item.get("calls") or 0)
+ current_prefix = float(current_item.get("avg_common_prefix_rate") or 0.0)
+ previous_prefix = float(previous_item.get("avg_common_prefix_rate") or 0.0)
+ prefix_confidence = _calculate_mean_difference_confidence(
+ current_mean=current_prefix,
+ current_variance=float(current_item.get("common_prefix_rate_variance") or 0.0),
+ current_count=current_calls,
+ baseline_mean=previous_prefix,
+ baseline_variance=float(previous_item.get("common_prefix_rate_variance") or 0.0),
+ baseline_count=previous_calls,
+ )
+ rows.append(
+ f""
+ f"{escape(previous_run_id)} "
+ f"{escape(baseline_time)} "
+ f"{escape(key[0])} "
+ f"{escape(key[1])} "
+ f"{escape(key[2])} "
+ + (f"{escape(key[3])} " if include_session and len(key) > 3 else "")
+ +
+ f"{_format_int(current_calls)} "
+ f"{_format_int(previous_calls)} "
+ f"{_format_rate(current_api - previous_api)} "
+ f"{_format_rate(api_confidence)} "
+ f"{escape(_format_significance_label(api_confidence))} "
+ f"{_format_rate(current_prefix - previous_prefix)} "
+ f"{_format_rate(prefix_confidence)} "
+ f"{escape(_format_significance_label(prefix_confidence))} "
+ f"{_format_int(current_item.get('suspected_context_sliding_calls', 0))} "
+ f"{_format_int(previous_item.get('suspected_context_sliding_calls', 0))} "
+ " "
+ )
+
+ if not rows:
+ return (
+ "当前 run 还没有可与历史 run 比较的同类调用点,"
+ "或历史数据缺少 run_id。 "
+ )
+ return "\n".join(rows)
+
+
+def _render_stat_rows(stats: List[Dict[str, int | str | float]], *, include_session: bool) -> str:
+ rows: list[str] = []
+ for item in stats:
+ rows.append(
+ ""
+ f"{escape(str(item['task_name']))} "
+ f"{escape(str(item['request_type']))} "
+ f"{escape(str(item['model_name']))} "
+ + (f"{escape(str(item.get('session_id', '')))} " if include_session else "")
+ +
+ f"{_format_rate(item['prompt_cache_hit_rate'])} "
+ f"{_format_rate(item['theoretical_prompt_cache_hit_rate'])} "
+ f"{_format_rate(item['prompt_cache_hit_rate_delta'])} "
+ f"{_format_int(item['prompt_cache_hit_tokens'])} "
+ f"{_format_int(item['prompt_cache_miss_tokens'])} "
+ f"{_format_int(item['theoretical_prompt_cache_hit_tokens'])} "
+ f"{_format_int(item['theoretical_prompt_cache_miss_tokens'])} "
+ f"{_format_int(item['prompt_tokens'])} "
+ f"{_format_int(item['calls'])} "
+ f"{_format_int(item['cache_reported_calls'])} "
+ f"{_format_int(item['theoretical_compared_calls'])} "
+ f"{_format_int(item['theoretical_cache_pool_hits'])} "
+ f"{_format_rate(item['avg_common_prefix_rate'])} "
+ f"{_format_int(item['suspected_context_sliding_calls'])} "
+ f"{item['avg_sliding_dropped_messages']} "
+ f"{item['avg_sliding_aligned_messages']} "
+ f"{escape(str(item.get('top_dynamic_diff_paths', '')))} "
+ " "
+ )
+ return "\n".join(rows)
+
+
+def _aggregate_stats_snapshot(
+ stats_snapshot: List[Dict[str, int | str | float]],
+ *,
+ include_session: bool,
+) -> List[Dict[str, int | str | float]]:
+ grouped: dict[tuple[str, ...], dict[str, int | str | float]] = {}
+ for item in stats_snapshot:
+ base_key = (
+ str(item.get("task_name") or ""),
+ str(item.get("request_type") or ""),
+ str(item.get("model_name") or ""),
+ )
+ key = (*base_key, str(item.get("session_id") or "")) if include_session else base_key
+ target = grouped.setdefault(
+ key,
+ {
+ "task_name": base_key[0],
+ "request_type": base_key[1],
+ "model_name": base_key[2],
+ "session_id": str(item.get("session_id") or "") if include_session else "",
+ "calls": 0,
+ "cache_reported_calls": 0,
+ "prompt_tokens": 0,
+ "prompt_cache_hit_tokens": 0,
+ "prompt_cache_miss_tokens": 0,
+ "theoretical_prompt_cache_hit_tokens": 0,
+ "theoretical_prompt_cache_miss_tokens": 0,
+ "theoretical_compared_calls": 0,
+ "theoretical_cache_pool_hits": 0,
+ "common_prefix_rate_weighted_total": 0.0,
+ "suspected_context_sliding_calls": 0,
+ "sliding_dropped_weighted_total": 0.0,
+ "sliding_aligned_weighted_total": 0.0,
+ "top_dynamic_diff_paths": "",
+ },
+ )
+ calls = int(item.get("calls") or 0)
+ sliding_calls = int(item.get("suspected_context_sliding_calls") or 0)
+ target["calls"] = int(target["calls"]) + calls
+ target["cache_reported_calls"] = int(target["cache_reported_calls"]) + int(item.get("cache_reported_calls") or 0)
+ target["prompt_tokens"] = int(target["prompt_tokens"]) + int(item.get("prompt_tokens") or 0)
+ target["prompt_cache_hit_tokens"] = int(target["prompt_cache_hit_tokens"]) + int(item.get("prompt_cache_hit_tokens") or 0)
+ target["prompt_cache_miss_tokens"] = int(target["prompt_cache_miss_tokens"]) + int(item.get("prompt_cache_miss_tokens") or 0)
+ target["theoretical_prompt_cache_hit_tokens"] = int(target["theoretical_prompt_cache_hit_tokens"]) + int(
+ item.get("theoretical_prompt_cache_hit_tokens") or 0
+ )
+ target["theoretical_prompt_cache_miss_tokens"] = int(target["theoretical_prompt_cache_miss_tokens"]) + int(
+ item.get("theoretical_prompt_cache_miss_tokens") or 0
+ )
+ target["theoretical_compared_calls"] = int(target["theoretical_compared_calls"]) + int(
+ item.get("theoretical_compared_calls") or 0
+ )
+ target["theoretical_cache_pool_hits"] = int(target["theoretical_cache_pool_hits"]) + int(
+ item.get("theoretical_cache_pool_hits") or 0
+ )
+ target["common_prefix_rate_weighted_total"] = float(target["common_prefix_rate_weighted_total"]) + (
+ float(item.get("avg_common_prefix_rate") or 0.0) * calls
+ )
+ target["suspected_context_sliding_calls"] = int(target["suspected_context_sliding_calls"]) + sliding_calls
+ target["sliding_dropped_weighted_total"] = float(target["sliding_dropped_weighted_total"]) + (
+ float(item.get("avg_sliding_dropped_messages") or 0.0) * sliding_calls
+ )
+ target["sliding_aligned_weighted_total"] = float(target["sliding_aligned_weighted_total"]) + (
+ float(item.get("avg_sliding_aligned_messages") or 0.0) * sliding_calls
+ )
+ if include_session:
+ target["top_dynamic_diff_paths"] = item.get("top_dynamic_diff_paths", "")
+
+ result: list[dict[str, int | str | float]] = []
+ for item in grouped.values():
+ calls = int(item["calls"])
+ sliding_calls = int(item["suspected_context_sliding_calls"])
+ hit_tokens = int(item["prompt_cache_hit_tokens"])
+ miss_tokens = int(item["prompt_cache_miss_tokens"])
+ theoretical_hit_tokens = int(item["theoretical_prompt_cache_hit_tokens"])
+ theoretical_miss_tokens = int(item["theoretical_prompt_cache_miss_tokens"])
+ item["prompt_cache_hit_rate"] = round(_calculate_rate(hit_tokens, miss_tokens), 2)
+ item["theoretical_prompt_cache_hit_rate"] = round(
+ _calculate_rate(theoretical_hit_tokens, theoretical_miss_tokens),
+ 2,
+ )
+ item["prompt_cache_hit_rate_delta"] = round(
+ float(item["prompt_cache_hit_rate"]) - float(item["theoretical_prompt_cache_hit_rate"]),
+ 2,
+ )
+ item["avg_common_prefix_rate"] = (
+ round(float(item["common_prefix_rate_weighted_total"]) / calls, 2) if calls else 0.0
+ )
+ item["avg_sliding_dropped_messages"] = (
+ round(float(item["sliding_dropped_weighted_total"]) / sliding_calls, 2) if sliding_calls else 0.0
+ )
+ item["avg_sliding_aligned_messages"] = (
+ round(float(item["sliding_aligned_weighted_total"]) / sliding_calls, 2) if sliding_calls else 0.0
+ )
+ result.append(item)
+ return result
+
+
+def _render_html_report(stats_snapshot: List[Dict[str, int | str | float]], *, include_session: bool = False) -> str:
+ updated_at = datetime.now().isoformat(timespec="seconds")
+ visible_stats_snapshot = _aggregate_stats_snapshot(stats_snapshot, include_session=include_session)
+ usage_events = _read_usage_events()
+ run_stats = _aggregate_usage_events_by_run(usage_events)
+ current_run_id = _store.run_id
+ previous_run_id = _get_previous_run_id(run_stats, current_run_id)
+ current_by_call_site = _aggregate_usage_events_by_call_site(
+ usage_events,
+ run_id=current_run_id,
+ include_session=include_session,
+ )
+ previous_by_call_site = (
+ _aggregate_usage_events_by_call_site(
+ usage_events,
+ run_id=previous_run_id,
+ include_session=include_session,
+ ) if previous_run_id else {}
+ )
+ sorted_by_rate = sorted(
+ visible_stats_snapshot,
+ key=lambda item: (
+ float(item["prompt_cache_hit_rate"]),
+ -int(item["prompt_cache_miss_tokens"]),
+ ),
+ )
+ low_stats = sorted_by_rate[:SUMMARY_LIMIT]
+ high_stats = list(reversed(sorted_by_rate[-SUMMARY_LIMIT:]))
+ all_stats = sorted(
+ visible_stats_snapshot,
+ key=lambda item: (
+ str(item["task_name"]),
+ str(item["request_type"]),
+ str(item["model_name"]),
+ ),
+ )
+ total_calls = sum(int(item["calls"]) for item in visible_stats_snapshot)
+ total_prompt_tokens = sum(int(item["prompt_tokens"]) for item in visible_stats_snapshot)
+ total_hit_tokens = sum(int(item["prompt_cache_hit_tokens"]) for item in visible_stats_snapshot)
+ total_theoretical_hit_tokens = sum(int(item["theoretical_prompt_cache_hit_tokens"]) for item in visible_stats_snapshot)
+ total_miss_tokens = sum(int(item["prompt_cache_miss_tokens"]) for item in visible_stats_snapshot)
+ total_theoretical_miss_tokens = sum(int(item["theoretical_prompt_cache_miss_tokens"]) for item in visible_stats_snapshot)
+ total_cache_tokens = total_hit_tokens + total_miss_tokens
+ total_theoretical_cache_tokens = total_theoretical_hit_tokens + total_theoretical_miss_tokens
+ overall_hit_rate = total_hit_tokens / total_cache_tokens * 100 if total_cache_tokens > 0 else 0.0
+ overall_theoretical_hit_rate = (
+ total_theoretical_hit_tokens / total_theoretical_cache_tokens * 100
+ if total_theoretical_cache_tokens > 0
+ else 0.0
+ )
+ session_head = "Session " if include_session else ""
+ report_title = "LLM Prompt Cache Stats By Session" if include_session else "LLM Prompt Cache Stats"
+ peer_report_link = (
+ f"Overview report "
+ if include_session
+ else f"Session detail report "
+ )
+ table_head = (
+ f"Task Request Model {session_head}API hit Theory hit "
+ "Delta API hit tok API miss tok Theory hit tok Theory miss tok "
+ "Prompt tok Calls Reported Compared Pool hits "
+ "Avg prefix Sliding calls Avg dropped msg Avg aligned msg "
+ "Top dynamic diff paths "
+ )
+ run_table_head = (
+ "Run ID Process started First event Last event "
+ "Calls Prompt tok API hit Theory hit Avg prefix "
+ "Sliding calls "
+ )
+ run_compare_head = (
+ f"Task Request Model {session_head}Current calls Previous calls "
+ "Current API Previous API API delta "
+ "Current Theory Previous Theory Theory delta "
+ "Current Prefix Previous Prefix Prefix delta "
+ "Current Sliding Previous Sliding "
+ )
+ run_significance_head = (
+ f"Baseline run Baseline time Task Request Model {session_head}"
+ "Current calls Baseline calls "
+ "API delta API confidence API significant "
+ "Prefix delta Prefix confidence Prefix significant "
+ "Current sliding Baseline sliding "
+ )
+
+ return f"""
+
+
+
+ {escape(report_title)}
+
+
+
+ {escape(report_title)}
+ Updated at: {escape(updated_at)}. Current run: {escape(current_run_id)}. Process started at: {escape(_store.process_started_at)}. Grouped by task_name / request_type / model_name{escape(' / session_id' if include_session else '')}. Local prompt pool size: {PROMPT_CACHE_POOL_SIZE}. {peer_report_link}
+
+
Calls
{_format_int(total_calls)}
+
Prompt tokens
{_format_int(total_prompt_tokens)}
+
API hit tokens
{_format_int(total_hit_tokens)}
+
API hit rate
{_format_rate(overall_hit_rate)}
+
Theory hit tokens
{_format_int(total_theoretical_hit_tokens)}
+
Theory hit rate
{_format_rate(overall_theoretical_hit_rate)}
+
+ Run Comparison
+
+ {run_table_head}
+ {_render_run_rows(run_stats, current_run_id)}
+
+ Current vs Previous Run By Call Site
+
+ {run_compare_head}
+ {_render_run_comparison_rows(current_by_call_site=current_by_call_site, previous_by_call_site=previous_by_call_site, include_session=include_session)}
+
+ Current vs Every Previous Run Significance
+ {_render_run_significance_controls(run_stats, current_run_id)}
+
+ {run_significance_head}
+ {_build_run_significance_rows(usage_events=usage_events, run_stats=run_stats, current_run_id=current_run_id, include_session=include_session)}
+
+ Low API Hit Rate
+
+ {table_head}
+ {_render_stat_rows(low_stats, include_session=include_session)}
+
+ High API Hit Rate
+
+ {table_head}
+ {_render_stat_rows(high_stats, include_session=include_session)}
+
+ All Call Sites
+
+ {table_head}
+ {_render_stat_rows(all_stats, include_session=include_session)}
+
+ {_render_run_significance_script()}
+
+
+"""
+
+
+def _write_html_report(stats_snapshot: List[Dict[str, int | str | float]]) -> None:
+ CACHE_STATS_DIR.mkdir(parents=True, exist_ok=True)
+ _get_report_path().write_text(_render_html_report(stats_snapshot, include_session=False), encoding="utf-8")
+ _get_session_report_path().write_text(_render_html_report(stats_snapshot, include_session=True), encoding="utf-8")
+
+
+def _write_usage_event(event: Dict[str, int | str | float | bool]) -> None:
+ try:
+ _write_json_line(_get_usage_log_path(datetime.now()), event)
+ except Exception as exc:
+ logger.warning(f"写入 LLM prompt cache 明细失败: {exc}")
+
+
+def _write_report(stats_snapshot: List[Dict[str, int | str | float]]) -> None:
+ try:
+ _write_html_report(stats_snapshot)
+ except Exception as exc:
+ logger.warning(f"写入 LLM prompt cache HTML 报告失败: {exc}")
+
+
+def record_llm_cache_usage(
+ *,
+ task_name: str,
+ request_type: str,
+ model_name: str,
+ session_id: str = "",
+ prompt_tokens: int,
+ prompt_cache_hit_tokens: int,
+ prompt_cache_miss_tokens: int,
+ prompt_text: str | None = None,
+) -> None:
+ """Record one LLM prompt cache usage event."""
+
+ normalized_task_name = str(task_name or "").strip()
+ if normalized_task_name not in FOCUSED_TASK_NAMES:
+ return
+
+ normalized_request_type = _normalize_request_type(request_type)
+ if normalized_request_type in EXCLUDED_REQUEST_TYPES:
+ return
+
+ normalized_model_name = _normalize_model_name(model_name)
+ normalized_session_id = _normalize_session_id(session_id)
+ normalized_prompt_tokens = max(int(prompt_tokens or 0), 0)
+ hit_tokens, miss_tokens, has_cache_report = _normalize_cache_tokens(
+ prompt_tokens=normalized_prompt_tokens,
+ prompt_cache_hit_tokens=prompt_cache_hit_tokens,
+ prompt_cache_miss_tokens=prompt_cache_miss_tokens,
+ )
+
+ with _store.lock:
+ key = (normalized_task_name, normalized_request_type, normalized_model_name, normalized_session_id)
+ prompt_pool = _store.prompt_pools.get(key, [])
+ cache_match = _calculate_theoretical_cache_match(
+ prompt_tokens=normalized_prompt_tokens,
+ prompt_text=prompt_text,
+ prompt_pool=prompt_pool,
+ )
+ dynamic_diff = _diagnose_dynamic_diff(cache_match.best_prompt_text, prompt_text)
+ prompt_diagnostics = _diagnose_prompt_cache_details(
+ previous_prompt_text=cache_match.best_prompt_text,
+ current_prompt_text=prompt_text,
+ common_prefix_chars=cache_match.common_prefix_chars,
+ )
+ if prompt_text:
+ next_prompt_pool = [*prompt_pool, prompt_text]
+ if len(next_prompt_pool) > PROMPT_CACHE_POOL_SIZE:
+ next_prompt_pool = next_prompt_pool[-PROMPT_CACHE_POOL_SIZE:]
+ _store.prompt_pools[key] = next_prompt_pool
+
+ stat = _store.stats.get(key)
+ if stat is None:
+ stat = LLMCacheStat(
+ task_name=normalized_task_name,
+ request_type=normalized_request_type,
+ model_name=normalized_model_name,
+ session_id=normalized_session_id,
+ )
+ _store.stats[key] = stat
+
+ stat.calls += 1
+ stat.prompt_tokens += normalized_prompt_tokens
+ stat.prompt_cache_hit_tokens += hit_tokens
+ stat.prompt_cache_miss_tokens += miss_tokens
+ stat.theoretical_prompt_cache_hit_tokens += cache_match.hit_tokens
+ stat.theoretical_prompt_cache_miss_tokens += cache_match.miss_tokens
+ stat.common_prefix_rate_total += prompt_diagnostics.common_prefix_rate
+ if prompt_diagnostics.suspected_context_sliding:
+ stat.suspected_context_sliding_calls += 1
+ stat.sliding_dropped_messages_total += prompt_diagnostics.sliding_dropped_head_messages
+ stat.sliding_aligned_messages_total += prompt_diagnostics.sliding_aligned_messages
+ stat.dynamic_diff_counts[dynamic_diff.path] = stat.dynamic_diff_counts.get(dynamic_diff.path, 0) + 1
+ if has_cache_report:
+ stat.cache_reported_calls += 1
+ if cache_match.compared:
+ stat.theoretical_compared_calls += 1
+ if cache_match.hit_tokens > 0:
+ stat.theoretical_cache_pool_hits += 1
+ _store.total_calls += 1
+ _store.calls_since_report += 1
+ _store.calls_in_run += 1
+
+ api_hit_rate = hit_tokens / (hit_tokens + miss_tokens) * 100 if hit_tokens + miss_tokens > 0 else 0.0
+ event = {
+ "created_at": datetime.now().isoformat(timespec="seconds"),
+ "run_id": _store.run_id,
+ "process_started_at": _store.process_started_at,
+ "call_index_in_run": _store.calls_in_run,
+ "task_name": normalized_task_name,
+ "request_type": normalized_request_type,
+ "model_name": normalized_model_name,
+ "session_id": normalized_session_id,
+ "prompt_tokens": normalized_prompt_tokens,
+ "prompt_chars": len(prompt_text or ""),
+ "prompt_cache_hit_tokens": hit_tokens,
+ "prompt_cache_miss_tokens": miss_tokens,
+ "prompt_cache_hit_rate": round(api_hit_rate, 2),
+ "theoretical_prompt_cache_hit_tokens": cache_match.hit_tokens,
+ "theoretical_prompt_cache_miss_tokens": cache_match.miss_tokens,
+ "theoretical_prompt_cache_hit_rate": round(cache_match.hit_rate, 2),
+ "theoretical_cache_pool_size": cache_match.pool_size,
+ "theoretical_best_match_rank": cache_match.best_match_rank,
+ "theoretical_common_prefix_chars": cache_match.common_prefix_chars,
+ "theoretical_common_prefix_rate": round(prompt_diagnostics.common_prefix_rate, 2),
+ "current_message_count": prompt_diagnostics.current_message_count,
+ "best_match_message_count": prompt_diagnostics.best_match_message_count,
+ "common_prefix_messages": prompt_diagnostics.common_prefix_messages,
+ "common_suffix_messages": prompt_diagnostics.common_suffix_messages,
+ "prompt_growth_chars": prompt_diagnostics.prompt_growth_chars,
+ "longest_aligned_message_overlap": prompt_diagnostics.longest_aligned_message_overlap,
+ "aligned_previous_start_index": prompt_diagnostics.aligned_previous_start_index,
+ "aligned_current_start_index": prompt_diagnostics.aligned_current_start_index,
+ "suspected_context_sliding": prompt_diagnostics.suspected_context_sliding,
+ "sliding_dropped_head_messages": prompt_diagnostics.sliding_dropped_head_messages,
+ "sliding_aligned_messages": prompt_diagnostics.sliding_aligned_messages,
+ "sliding_new_tail_messages": prompt_diagnostics.sliding_new_tail_messages,
+ "current_first_message_role": prompt_diagnostics.current_first_message_role,
+ "best_first_message_role": prompt_diagnostics.best_first_message_role,
+ "current_last_message_role": prompt_diagnostics.current_last_message_role,
+ "best_last_message_role": prompt_diagnostics.best_last_message_role,
+ "prompt_cache_hit_rate_delta": round(api_hit_rate - cache_match.hit_rate, 2),
+ "dynamic_diff_path": dynamic_diff.path,
+ "dynamic_diff_previous": dynamic_diff.previous_value,
+ "dynamic_diff_current": dynamic_diff.current_value,
+ "cache_reported": has_cache_report,
+ "theoretical_compared": cache_match.compared,
+ }
+ stats_snapshot = [stat.to_dict() for stat in _store.stats.values()]
+
+ now = time.time()
+ should_update_report = (
+ _store.last_report_at <= 0
+ or _store.calls_since_report >= REPORT_INTERVAL_CALLS
+ or now - _store.last_report_at >= REPORT_INTERVAL_SECONDS
+ )
+ if should_update_report:
+ _store.last_report_at = now
+ _store.calls_since_report = 0
+ stats_snapshot_to_report = stats_snapshot
+ else:
+ stats_snapshot_to_report = []
+
+ _write_usage_event(event)
+ if stats_snapshot_to_report:
+ _write_report(stats_snapshot_to_report)
+ log_llm_cache_stats_summary(stats_snapshot_to_report)
+
+
+def get_llm_cache_stats_snapshot() -> List[Dict[str, int | str | float]]:
+ """Return current in-process LLM prompt cache stats."""
+
+ with _store.lock:
+ return [stat.to_dict() for stat in _store.stats.values()]
+
+
+def reset_llm_cache_stats() -> None:
+ """Reset in-process stats. Intended for tests and local debugging."""
+
+ with _store.lock:
+ _store.stats.clear()
+ _store.prompt_pools.clear()
+ _store.total_calls = 0
+ _store.calls_in_run = 0
+ _store.last_report_at = 0
+ _store.calls_since_report = 0
+
+
+def log_llm_cache_stats_summary(stats_snapshot: List[Dict[str, int | str | float]] | None = None) -> None:
+ """Log current highest and lowest prompt cache hit-rate call sites."""
+
+ snapshot = stats_snapshot or get_llm_cache_stats_snapshot()
+ if not snapshot:
+ return
+
+ sorted_stats = sorted(
+ snapshot,
+ key=lambda item: (
+ float(item["prompt_cache_hit_rate"]),
+ -int(item["prompt_cache_miss_tokens"]),
+ ),
+ )
+ low_stats = sorted_stats[:SUMMARY_LIMIT]
+ high_stats = list(reversed(sorted_stats[-SUMMARY_LIMIT:]))
+
+ def _format_stat(item: Dict[str, int | str | float]) -> str:
+ return (
+ f"{item['task_name']}/{item['request_type']}/{item['model_name']}: "
+ f"api_hit_rate={float(item['prompt_cache_hit_rate']):.2f}%, "
+ f"theory_hit_rate={float(item['theoretical_prompt_cache_hit_rate']):.2f}%, "
+ f"delta={float(item['prompt_cache_hit_rate_delta']):.2f}%, "
+ f"avg_prefix={float(item['avg_common_prefix_rate']):.2f}%, "
+ f"sliding_calls={item['suspected_context_sliding_calls']}, "
+ f"top_dynamic={item.get('top_dynamic_diff_paths', '')}, "
+ f"hit={item['prompt_cache_hit_tokens']}, "
+ f"miss={item['prompt_cache_miss_tokens']}, "
+ f"prompt={item['prompt_tokens']}, "
+ f"calls={item['calls']}, "
+ f"reported={item['cache_reported_calls']}"
+ )
+
+ logger.info(
+ "LLM prompt cache 统计摘要\n"
+ "低命中调用点:\n- " + "\n- ".join(_format_stat(item) for item in low_stats) + "\n"
+ "高命中调用点:\n- " + "\n- ".join(_format_stat(item) for item in high_stats)
+ )
diff --git a/src/services/llm_service.py b/src/services/llm_service.py
index 264d2dd2..92da545f 100644
--- a/src/services/llm_service.py
+++ b/src/services/llm_service.py
@@ -6,6 +6,8 @@
from typing import Any, Dict, List, Tuple
+import hashlib
+import inspect
import json
from src.common.data_models.embedding_service_data_models import EmbeddingResult
@@ -26,6 +28,7 @@ from src.llm_models.payload_content.message import Message, MessageBuilder, Role
from src.llm_models.payload_content.tool_option import ToolCall
from src.llm_models.utils_model import LLMOrchestrator
from src.services.embedding_service import EmbeddingServiceClient
+from src.services.llm_cache_stats import record_llm_cache_usage
from src.services.service_task_resolver import (
get_available_models as _get_available_models,
resolve_task_name as _resolve_task_name,
@@ -46,7 +49,7 @@ class LLMServiceClient:
- `embed_text`(兼容入口,推荐改用 `EmbeddingServiceClient`)
"""
- def __init__(self, task_name: str, request_type: str = "") -> None:
+ def __init__(self, task_name: str, request_type: str = "", session_id: str = "") -> None:
"""初始化 LLM 服务门面。
Args:
@@ -55,6 +58,7 @@ class LLMServiceClient:
"""
self.task_name = _resolve_task_name(task_name)
self.request_type = request_type
+ self.session_id = str(session_id or "").strip()
self._orchestrator = LLMOrchestrator(task_name=self.task_name, request_type=request_type)
@staticmethod
@@ -85,6 +89,70 @@ class LLMServiceClient:
return LLMImageOptions()
return options
+ @staticmethod
+ def _serialize_message_for_cache_stats(message: Message) -> Dict[str, Any]:
+ parts: list[dict[str, Any]] = []
+ for part in message.parts:
+ if hasattr(part, "text"):
+ parts.append({"type": "text", "text": part.text})
+ continue
+
+ image_base64 = getattr(part, "image_base64", "")
+ image_digest = hashlib.sha256(image_base64.encode("utf-8")).hexdigest() if image_base64 else ""
+ parts.append(
+ {
+ "type": "image",
+ "format": getattr(part, "image_format", ""),
+ "size": len(image_base64),
+ "sha256": image_digest,
+ }
+ )
+
+ return {
+ "role": str(message.role.value if hasattr(message.role, "value") else message.role),
+ "parts": parts,
+ "tool_call_id": message.tool_call_id,
+ "tool_name": message.tool_name,
+ "tool_calls": [
+ {
+ "id": tool_call.call_id,
+ "name": tool_call.func_name,
+ "arguments": tool_call.args,
+ "extra_content": tool_call.extra_content,
+ }
+ for tool_call in (message.tool_calls or [])
+ ],
+ }
+
+ @classmethod
+ def _build_cache_stats_prompt_text(
+ cls,
+ *,
+ messages: List[Message],
+ tool_options: Any,
+ response_format: Any,
+ ) -> str:
+ payload = {
+ "messages": [cls._serialize_message_for_cache_stats(message) for message in messages],
+ "tool_options": tool_options or [],
+ "response_format": response_format,
+ }
+ return json.dumps(payload, ensure_ascii=False, sort_keys=True, default=str)
+
+ def _record_cache_stats(self, result: LLMResponseResult, prompt_text: str | None = None) -> None:
+ """记录当前调用的 prompt cache 统计。"""
+
+ record_llm_cache_usage(
+ task_name=self.task_name,
+ request_type=self.request_type,
+ model_name=result.model_name,
+ session_id=self.session_id,
+ prompt_tokens=result.prompt_tokens,
+ prompt_cache_hit_tokens=result.prompt_cache_hit_tokens,
+ prompt_cache_miss_tokens=result.prompt_cache_miss_tokens,
+ prompt_text=prompt_text,
+ )
+
async def generate_response(
self,
prompt: str,
@@ -100,7 +168,12 @@ class LLMServiceClient:
LLMResponseResult: 统一文本生成结果。
"""
active_options = self._normalize_generation_options(options)
- return await self._orchestrator.generate_response_async(
+ prompt_text = self._build_cache_stats_prompt_text(
+ messages=[MessageBuilder().add_text_content(prompt).build()],
+ tool_options=active_options.tool_options,
+ response_format=active_options.response_format,
+ )
+ result = await self._orchestrator.generate_response_async(
prompt=prompt,
temperature=active_options.temperature,
max_tokens=active_options.max_tokens,
@@ -109,6 +182,8 @@ class LLMServiceClient:
raise_when_empty=active_options.raise_when_empty,
interrupt_flag=active_options.interrupt_flag,
)
+ self._record_cache_stats(result, prompt_text=prompt_text)
+ return result
async def generate_response_with_messages(
self,
@@ -125,8 +200,22 @@ class LLMServiceClient:
LLMResponseResult: 统一文本生成结果。
"""
active_options = self._normalize_generation_options(options)
- return await self._orchestrator.generate_response_with_message_async(
- message_factory=message_factory,
+ prompt_text_holder: dict[str, str] = {}
+
+ def cache_stats_message_factory(client: BaseClient, model_info: Any = None) -> List[Message]:
+ if len(inspect.signature(message_factory).parameters) >= 2:
+ messages = message_factory(client, model_info)
+ else:
+ messages = message_factory(client)
+ prompt_text_holder["prompt_text"] = self._build_cache_stats_prompt_text(
+ messages=messages,
+ tool_options=active_options.tool_options,
+ response_format=active_options.response_format,
+ )
+ return messages
+
+ result = await self._orchestrator.generate_response_with_message_async(
+ message_factory=cache_stats_message_factory,
temperature=active_options.temperature,
max_tokens=active_options.max_tokens,
tools=active_options.tool_options,
@@ -134,6 +223,8 @@ class LLMServiceClient:
raise_when_empty=active_options.raise_when_empty,
interrupt_flag=active_options.interrupt_flag,
)
+ self._record_cache_stats(result, prompt_text=prompt_text_holder.get("prompt_text"))
+ return result
async def generate_response_for_image(
self,
@@ -154,7 +245,30 @@ class LLMServiceClient:
LLMResponseResult: 统一文本生成结果。
"""
active_options = self._normalize_image_options(options)
- return await self._orchestrator.generate_response_for_image(
+ image_digest = hashlib.sha256(image_base64.encode("utf-8")).hexdigest() if image_base64 else ""
+ prompt_text = json.dumps(
+ {
+ "messages": [
+ {
+ "role": "user",
+ "parts": [
+ {"type": "text", "text": prompt},
+ {
+ "type": "image",
+ "format": image_format,
+ "size": len(image_base64),
+ "sha256": image_digest,
+ },
+ ],
+ }
+ ],
+ "tool_options": [],
+ "response_format": None,
+ },
+ ensure_ascii=False,
+ sort_keys=True,
+ )
+ result = await self._orchestrator.generate_response_for_image(
prompt=prompt,
image_base64=image_base64,
image_format=image_format,
@@ -162,6 +276,8 @@ class LLMServiceClient:
max_tokens=active_options.max_tokens,
interrupt_flag=active_options.interrupt_flag,
)
+ self._record_cache_stats(result, prompt_text=prompt_text)
+ return result
async def transcribe_audio(self, voice_base64: str) -> LLMAudioTranscriptionResult:
"""执行音频转写请求。
From 88b895a925b2676e482d1aeb058b18cdcac0848c Mon Sep 17 00:00:00 2001
From: SengokuCola <1026294844@qq.com>
Date: Fri, 1 May 2026 13:00:54 +0800
Subject: [PATCH 11/12] perf: stabilize maisaka prompt cache
---
src/maisaka/chat_loop_service.py | 79 ++++++++++++++++++---------
src/maisaka/history_post_processor.py | 78 ++++++++++----------------
src/maisaka/reasoning_engine.py | 9 +--
src/maisaka/runtime.py | 32 ++++++++++-
4 files changed, 117 insertions(+), 81 deletions(-)
diff --git a/src/maisaka/chat_loop_service.py b/src/maisaka/chat_loop_service.py
index 81d66129..63ec38e8 100644
--- a/src/maisaka/chat_loop_service.py
+++ b/src/maisaka/chat_loop_service.py
@@ -41,6 +41,11 @@ from .display.prompt_cli_renderer import PromptCLIVisualizer
from .visual_mode_utils import resolve_enable_visual_planner
TIMING_GATE_TOOL_NAMES = {"continue", "no_reply", "wait"}
+REQUEST_TYPE_BY_REQUEST_KIND = {
+ "planner": "maisaka_planner",
+ "timing_gate": "maisaka_timing_gate",
+}
+CONTEXT_SELECTION_CACHE_STABILITY_RATIO = 2.0
@dataclass(slots=True)
@@ -212,7 +217,7 @@ class MaisakaChatLoopService:
self._chat_system_prompt = f"{self._personality_prompt}\n\nYou are a helpful AI assistant."
else:
self._chat_system_prompt = chat_system_prompt
- self._llm_chat = LLMServiceClient(task_name="planner", request_type="maisaka_planner")
+ self._llm_chat_clients: dict[str, LLMServiceClient] = {}
@property
def personality_prompt(self) -> str:
@@ -220,6 +225,30 @@ class MaisakaChatLoopService:
return self._personality_prompt
+ @staticmethod
+ def _resolve_llm_request_type(request_kind: str) -> str:
+ """根据 Maisaka 请求类型解析 LLM 统计口径。"""
+
+ normalized_request_kind = str(request_kind or "").strip()
+ return REQUEST_TYPE_BY_REQUEST_KIND.get(
+ normalized_request_kind,
+ f"maisaka_{normalized_request_kind}" if normalized_request_kind else "maisaka_planner",
+ )
+
+ def _get_llm_chat_client(self, request_kind: str) -> LLMServiceClient:
+ """获取当前请求类型对应的 planner LLM 客户端。"""
+
+ request_type = self._resolve_llm_request_type(request_kind)
+ llm_client = self._llm_chat_clients.get(request_type)
+ if llm_client is None:
+ llm_client = LLMServiceClient(
+ task_name="planner",
+ request_type=request_type,
+ session_id=self._session_id,
+ )
+ self._llm_chat_clients[request_type] = llm_client
+ return llm_client
+
@staticmethod
def _get_runtime_manager() -> Any:
"""获取插件运行时管理器。
@@ -321,7 +350,13 @@ class MaisakaChatLoopService:
@staticmethod
def _build_time_block() -> str:
- """构建当前时间提示块。"""
+ """构建静态时间提示块。"""
+
+ return "当前时间会在每次请求末尾以用户消息形式提供。"
+
+ @staticmethod
+ def _build_current_time_user_message() -> str:
+ """构建追加到请求末尾的当前时间消息。"""
return f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
@@ -446,7 +481,11 @@ class MaisakaChatLoopService:
messages.append(llm_message)
normalized_injected_messages: List[Message] = []
- for injected_message in injected_user_messages or []:
+ final_user_messages = [
+ *(injected_user_messages or []),
+ self._build_current_time_user_message(),
+ ]
+ for injected_message in final_user_messages:
normalized_message = str(injected_message or "").strip()
if not normalized_message:
continue
@@ -458,31 +497,10 @@ class MaisakaChatLoopService:
)
if normalized_injected_messages:
- insertion_index = self._resolve_injected_user_messages_insertion_index(messages)
- messages[insertion_index:insertion_index] = normalized_injected_messages
+ messages.extend(normalized_injected_messages)
return messages
- @staticmethod
- def _resolve_injected_user_messages_insertion_index(messages: Sequence[Message]) -> int:
- """计算 injected meta user messages 在请求中的插入位置。
-
- 规则与 deferred attachment 更接近:
- - 从尾部向前寻找最近的 stopping point;
- - stopping point 为 assistant 消息或 tool 结果消息;
- - 找到后插入到其后面;
- - 若不存在 stopping point,则退回到 system 消息之后。
- """
-
- for index in range(len(messages) - 1, -1, -1):
- message = messages[index]
- if message.role in {RoleType.Assistant, RoleType.Tool}:
- return index + 1
-
- if messages and messages[0].role == RoleType.System:
- return 1
- return 0
-
async def chat_loop_step(
self,
chat_history: List[LLMContextMessage],
@@ -575,7 +593,8 @@ class MaisakaChatLoopService:
tool_definitions=list(all_tools),
)
- generation_result = await self._llm_chat.generate_response_with_messages(
+ llm_chat = self._get_llm_chat_client(request_kind)
+ generation_result = await llm_chat.generate_response_with_messages(
message_factory=message_factory,
options=LLMGenerationOptions(
tool_options=all_tools if all_tools else None,
@@ -654,7 +673,11 @@ class MaisakaChatLoopService:
chat_history,
request_kind=request_kind,
)
- effective_context_size = max(1, int(max_context_size or global_config.chat.max_context_size))
+ base_context_size = max(1, int(max_context_size or global_config.chat.max_context_size))
+ effective_context_size = max(
+ base_context_size,
+ int(base_context_size * CONTEXT_SELECTION_CACHE_STABILITY_RATIO),
+ )
selected_indices: List[int] = []
counted_message_count = 0
@@ -690,9 +713,11 @@ class MaisakaChatLoopService:
selected_history, _ = normalize_tool_result_order(selected_history)
tool_message_count = sum(1 for message in selected_history if isinstance(message, ToolResultMessage))
normal_message_count = len(selected_history) - tool_message_count
+ stability_text = f"|cache_window {base_context_size}->{effective_context_size}"
selection_reason = (
f"实际发送 {len(selected_history)} 条消息"
f"|消息 {normal_message_count} 条|tool {tool_message_count} 条"
+ f"{stability_text}"
)
return (
selected_history,
diff --git a/src/maisaka/history_post_processor.py b/src/maisaka/history_post_processor.py
index 5b3a125d..aa038f08 100644
--- a/src/maisaka/history_post_processor.py
+++ b/src/maisaka/history_post_processor.py
@@ -3,11 +3,11 @@
from dataclasses import dataclass
from math import ceil
-from .context_messages import AssistantMessage, LLMContextMessage
+from .context_messages import LLMContextMessage
from .history_utils import drop_leading_orphan_tool_results, drop_orphan_tool_results, normalize_tool_result_order
-EARLY_TRIM_RATIO = 0.3
-TRIM_THRESHOLD_RATIO = 1.2
+TRIM_TARGET_RATIO = 1.0
+TRIM_THRESHOLD_RATIO = 2.0
@dataclass(slots=True)
@@ -36,21 +36,16 @@ def process_chat_history_after_cycle(
compact_removed_count = 0
trim_threshold = ceil(max_context_size * TRIM_THRESHOLD_RATIO)
if remaining_context_count > trim_threshold:
- removed_early_message_count = _remove_early_history_messages(processed_history)
- processed_history, removed_after_message_trim_count, moved_after_message_trim_count = (
- _normalize_history_structure(processed_history)
+ target_context_count = max(1, int(max_context_size * TRIM_TARGET_RATIO))
+ removed_early_message_count = _trim_history_to_context_target(
+ processed_history,
+ target_context_count=target_context_count,
)
- removed_assistant_thought_count = _remove_early_assistant_thoughts(processed_history)
- processed_history, removed_after_thought_trim_count, moved_after_thought_trim_count = (
- _normalize_history_structure(processed_history)
+ processed_history, removed_after_trim_count, moved_after_trim_count = _normalize_history_structure(
+ processed_history
)
- compact_removed_count = (
- removed_early_message_count
- + removed_after_message_trim_count
- + removed_assistant_thought_count
- + removed_after_thought_trim_count
- )
- moved_tool_result_count += moved_after_message_trim_count + moved_after_thought_trim_count
+ compact_removed_count = removed_early_message_count + removed_after_trim_count
+ moved_tool_result_count += moved_after_trim_count
remaining_context_count = sum(1 for message in processed_history if message.count_in_context)
removed_count = normalized_removed_count + compact_removed_count
@@ -78,42 +73,27 @@ def _normalize_history_structure(
)
-def _remove_early_history_messages(chat_history: list[LLMContextMessage]) -> int:
- """移除最早 30% 的全部历史消息。"""
+def _trim_history_to_context_target(
+ chat_history: list[LLMContextMessage],
+ *,
+ target_context_count: int,
+) -> int:
+ """移除最早的一段历史,直到普通上下文消息数量降到目标值以内。"""
+
+ remaining_context_count = sum(1 for message in chat_history if message.count_in_context)
+ if remaining_context_count <= target_context_count:
+ return 0
+
+ remove_count = 0
+ for message in chat_history:
+ remove_count += 1
+ if message.count_in_context:
+ remaining_context_count -= 1
+ if remaining_context_count <= target_context_count:
+ break
- remove_count = int(len(chat_history) * EARLY_TRIM_RATIO)
if remove_count <= 0:
return 0
del chat_history[:remove_count]
return remove_count
-
-
-def _remove_early_assistant_thoughts(chat_history: list[LLMContextMessage]) -> int:
- """移除最早 30% 的非工具 assistant 思考内容。"""
-
- candidate_indexes = [
- index
- for index, message in enumerate(chat_history)
- if isinstance(message, AssistantMessage)
- and not message.tool_calls
- and message.source_kind != "perception"
- and bool(message.content.strip())
- ]
- remove_count = int(len(candidate_indexes) * EARLY_TRIM_RATIO)
- if remove_count <= 0:
- return 0
-
- removed_indexes = set(candidate_indexes[:remove_count])
- filtered_history: list[LLMContextMessage] = []
- removed_total = 0
- for index, message in enumerate(chat_history):
- if index in removed_indexes:
- removed_total += 1
- continue
- filtered_history.append(message)
-
- chat_history[:] = filtered_history
- return removed_total
-
-
diff --git a/src/maisaka/reasoning_engine.py b/src/maisaka/reasoning_engine.py
index ab7484a2..037f6618 100644
--- a/src/maisaka/reasoning_engine.py
+++ b/src/maisaka/reasoning_engine.py
@@ -52,7 +52,7 @@ if TYPE_CHECKING:
logger = get_logger("maisaka_reasoning_engine")
-TIMING_GATE_CONTEXT_LIMIT = 24
+TIMING_GATE_CONTEXT_DROP_HEAD_RATIO = 0.7
TIMING_GATE_MAX_TOKENS = 384
TIMING_GATE_MAX_ATTEMPTS = 3
TIMING_GATE_TOOL_NAMES = {"continue", "no_reply", "wait"}
@@ -124,7 +124,6 @@ class MaisakaReasoningEngine:
async def _run_timing_gate_sub_agent(
self,
*,
- context_message_limit: int,
system_prompt: str,
tool_definitions: list[dict[str, Any]],
) -> Any:
@@ -134,7 +133,10 @@ class MaisakaReasoningEngine:
"""
return await self._runtime.run_sub_agent(
- context_message_limit=context_message_limit,
+ context_message_limit=self._runtime._max_context_size,
+ drop_head_context_count=int(
+ self._runtime._max_context_size * TIMING_GATE_CONTEXT_DROP_HEAD_RATIO,
+ ),
system_prompt=system_prompt,
request_kind="timing_gate",
interrupt_flag=None,
@@ -255,7 +257,6 @@ class MaisakaReasoningEngine:
invalid_tool_text = ""
for attempt_index in range(TIMING_GATE_MAX_ATTEMPTS):
response = await self._run_timing_gate_sub_agent(
- context_message_limit=TIMING_GATE_CONTEXT_LIMIT,
system_prompt=self._build_timing_gate_system_prompt(),
tool_definitions=get_timing_tools(),
)
diff --git a/src/maisaka/runtime.py b/src/maisaka/runtime.py
index a3db92bf..a96eaf11 100644
--- a/src/maisaka/runtime.py
+++ b/src/maisaka/runtime.py
@@ -45,6 +45,7 @@ from .context_messages import (
from .display.display_utils import build_tool_call_summary_lines, format_token_count
from .display.prompt_cli_renderer import PromptCLIVisualizer
from .display.stage_status_board import remove_stage_status, update_stage_status
+from .history_utils import drop_leading_orphan_tool_results
from .reasoning_engine import MaisakaReasoningEngine
from .reply_effect import ReplyEffectTracker
from .reply_effect.image_utils import extract_visual_attachments_from_sequence
@@ -583,6 +584,7 @@ class MaisakaHeartFlowChatting:
self,
*,
context_message_limit: int,
+ drop_head_context_count: int = 0,
system_prompt: str,
request_kind: str = "sub_agent",
extra_messages: Optional[Sequence[LLMContextMessage]] = None,
@@ -598,7 +600,10 @@ class MaisakaHeartFlowChatting:
request_kind=request_kind,
max_context_size=context_message_limit,
)
- sub_agent_history = list(selected_history)
+ sub_agent_history = self._drop_head_context_messages(
+ selected_history,
+ drop_head_context_count,
+ )
if extra_messages:
sub_agent_history.extend(list(extra_messages))
@@ -616,6 +621,31 @@ class MaisakaHeartFlowChatting:
tool_definitions=[] if tool_definitions is None else tool_definitions,
)
+ @staticmethod
+ def _drop_head_context_messages(
+ chat_history: Sequence[LLMContextMessage],
+ drop_context_count: int,
+ ) -> list[LLMContextMessage]:
+ """从已选上下文头部丢弃指定数量的普通上下文消息。"""
+
+ if drop_context_count <= 0:
+ return list(chat_history)
+
+ first_kept_index = 0
+ dropped_context_count = 0
+ while (
+ first_kept_index < len(chat_history)
+ and dropped_context_count < drop_context_count
+ ):
+ message = chat_history[first_kept_index]
+ if message.count_in_context:
+ dropped_context_count += 1
+ first_kept_index += 1
+
+ trimmed_history = list(chat_history[first_kept_index:])
+ trimmed_history, _ = drop_leading_orphan_tool_results(trimmed_history)
+ return trimmed_history
+
async def _run_reply_effect_judge(self, prompt: str) -> str:
"""运行回复效果观察器使用的临时 LLM 评审。"""
From 2238c34eca92da61d59a8496852334e8def1b2c6 Mon Sep 17 00:00:00 2001
From: SengokuCola <1026294844@qq.com>
Date: Fri, 1 May 2026 13:19:07 +0800
Subject: [PATCH 12/12] =?UTF-8?q?feat=EF=BC=9A=E7=BC=93=E5=AD=98=E8=B0=83?=
=?UTF-8?q?=E8=AF=95=E4=BF=A1=E6=81=AF=E5=BC=80=E5=85=B3?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
src/config/config.py | 2 +-
src/config/official_configs.py | 9 +++++++++
src/services/llm_cache_stats.py | 13 +++++++++++++
3 files changed, 23 insertions(+), 1 deletion(-)
diff --git a/src/config/config.py b/src/config/config.py
index 252b4a05..dcd5e7eb 100644
--- a/src/config/config.py
+++ b/src/config/config.py
@@ -56,7 +56,7 @@ BOT_CONFIG_PATH: Path = (CONFIG_DIR / "bot_config.toml").resolve().absolute()
MODEL_CONFIG_PATH: Path = (CONFIG_DIR / "model_config.toml").resolve().absolute()
LEGACY_ENV_PATH: Path = (PROJECT_ROOT / ".env").resolve().absolute()
MMC_VERSION: str = "1.0.0"
-CONFIG_VERSION: str = "8.9.19"
+CONFIG_VERSION: str = "8.9.20"
MODEL_CONFIG_VERSION: str = "1.14.3"
logger = get_logger("config")
diff --git a/src/config/official_configs.py b/src/config/official_configs.py
index 41f469af..ba11426a 100644
--- a/src/config/official_configs.py
+++ b/src/config/official_configs.py
@@ -1324,6 +1324,15 @@ class DebugConfig(ConfigBase):
)
"""是否记录 Replyer 请求体,默认关闭"""
+ enable_llm_cache_stats: bool = Field(
+ default=False,
+ json_schema_extra={
+ "x-widget": "switch",
+ "x-icon": "chart-no-axes-column",
+ },
+ )
+ """是否记录 LLM prompt cache 调试统计,默认关闭"""
+
class ExtraPromptItem(ConfigBase):
platform: str = Field(
diff --git a/src/services/llm_cache_stats.py b/src/services/llm_cache_stats.py
index e6b1c268..1d322ba4 100644
--- a/src/services/llm_cache_stats.py
+++ b/src/services/llm_cache_stats.py
@@ -182,6 +182,16 @@ class _LLMCacheStatsStore:
_store = _LLMCacheStatsStore()
+def _is_llm_cache_stats_enabled() -> bool:
+ """读取调试配置,默认关闭 LLM prompt cache 统计。"""
+
+ try:
+ from src.config.config import global_config
+ return bool(global_config.debug.enable_llm_cache_stats)
+ except Exception:
+ return False
+
+
def _normalize_request_type(request_type: str) -> str:
normalized = str(request_type or "").strip()
return normalized or "unknown"
@@ -1313,6 +1323,9 @@ def record_llm_cache_usage(
) -> None:
"""Record one LLM prompt cache usage event."""
+ if not _is_llm_cache_stats_enabled():
+ return
+
normalized_task_name = str(task_name or "").strip()
if normalized_task_name not in FOCUSED_TASK_NAMES:
return