diff --git a/src/chat/utils/statistic.py b/src/chat/utils/statistic.py index 2159019b..a6e4e996 100644 --- a/src/chat/utils/statistic.py +++ b/src/chat/utils/statistic.py @@ -8,7 +8,7 @@ from typing import cast from typing_extensions import TypedDict -from sqlmodel import col, select +from sqlmodel import col, func, select from src.common.logger import get_logger from src.common.database.database import get_db_session @@ -249,8 +249,10 @@ class StatisticOutputTask(AsyncTask): deploy_time = datetime(2000, 1, 1) local_storage["deploy_time"] = now.timestamp() + self.all_time_start_time = self._get_all_time_start_time(deploy_time) + self.stat_period: list[tuple[str, timedelta, str]] = [ - ("all_time", now - deploy_time, "自部署以来"), # 必须保留"all_time" + ("all_time", now - self.all_time_start_time, "自部署以来"), # 必须保留"all_time" ("last_30_days", timedelta(days=30), "近30天"), ("last_7_days", timedelta(days=7), "近7天"), ("last_3_days", timedelta(days=3), "近3天"), @@ -263,6 +265,24 @@ class StatisticOutputTask(AsyncTask): 统计时间段 [(统计名称, 统计时间段, 统计描述), ...] """ + @staticmethod + def _get_all_time_start_time(fallback_time: datetime) -> datetime: + """获取统计数据的最早时间,避免全量统计展示窗口漏掉历史数据。""" + try: + with get_db_session(auto_commit=False) as session: + start_times = [ + session.exec(select(func.min(ModelUsage.timestamp))).first(), + session.exec(select(func.min(Messages.timestamp))).first(), + session.exec(select(func.min(OnlineTime.start_timestamp))).first(), + session.exec(select(func.min(ToolRecord.timestamp))).first(), + ] + valid_start_times = [item for item in start_times if isinstance(item, datetime)] + if valid_start_times: + return min(valid_start_times) + except Exception as e: + logger.warning(f"获取全量统计起始时间失败,将使用部署时间:{e}") + return fallback_time + def _statistic_console_output(self, stats: StatPeriodMapping, now: datetime) -> None: """ 输出统计数据到控制台 @@ -816,9 +836,12 @@ class StatisticOutputTask(AsyncTask): # 直接合并 stat["all_time"][key] += val + self._refresh_all_time_duration_stats(stat["all_time"]) + # 更新上次完整统计数据的时间戳 # 将所有defaultdict转换为普通dict以避免类型冲突 clean_stat_data = self._convert_defaultdict_to_dict(stat["all_time"]) + self._drop_cached_time_cost_lists(clean_stat_data) # 将 name_mapping 中的元组转换为列表,因为JSON不支持元组 json_safe_name_mapping = {} @@ -833,6 +856,67 @@ class StatisticOutputTask(AsyncTask): return cast(StatPeriodMapping, stat) + def _refresh_all_time_duration_stats(self, stat_data: StatPeriodData) -> None: + """全量耗时均值/标准差从数据库现算,不依赖 local_store 中的原始耗时列表。""" + duration_stats: dict[str, defaultdict[str, dict[str, float]]] = { + "type": defaultdict(lambda: {"count": 0.0, "sum": 0.0, "sum_sq": 0.0}), + "user": defaultdict(lambda: {"count": 0.0, "sum": 0.0, "sum_sq": 0.0}), + "model": defaultdict(lambda: {"count": 0.0, "sum": 0.0, "sum_sq": 0.0}), + "module": defaultdict(lambda: {"count": 0.0, "sum": 0.0, "sum_sq": 0.0}), + } + + records = self._fetch_model_usage_since(self.all_time_start_time) + for record in records: + time_cost = cast(float | None, record["time_cost"]) or 0.0 + if time_cost <= 0: + continue + + request_type = cast(str | None, record["request_type"]) or "unknown" + user_id = cast(str | None, record["model_api_provider_name"]) or "unknown" + model_assign_name = cast(str | None, record["model_assign_name"]) + model_name = model_assign_name or cast(str | None, record["model_name"]) or "unknown" + module_name = request_type.split(".")[0] if "." in request_type else request_type + + for category, item_name in [ + ("type", request_type), + ("user", user_id), + ("model", model_name), + ("module", module_name), + ]: + item_stats = duration_stats[category][item_name] + item_stats["count"] += 1 + item_stats["sum"] += time_cost + item_stats["sum_sq"] += time_cost * time_cost + + for category, avg_key, std_key in [ + ("type", AVG_TIME_COST_BY_TYPE, STD_TIME_COST_BY_TYPE), + ("user", AVG_TIME_COST_BY_USER, STD_TIME_COST_BY_USER), + ("model", AVG_TIME_COST_BY_MODEL, STD_TIME_COST_BY_MODEL), + ("module", AVG_TIME_COST_BY_MODULE, STD_TIME_COST_BY_MODULE), + ]: + avg_data = cast(defaultdict[str, float], stat_data[avg_key]) + std_data = cast(defaultdict[str, float], stat_data[std_key]) + avg_data.clear() + std_data.clear() + + for item_name, item_stats in duration_stats[category].items(): + count = item_stats["count"] + if count <= 0: + continue + avg_time_cost = item_stats["sum"] / count + variance = max(item_stats["sum_sq"] / count - avg_time_cost * avg_time_cost, 0.0) + avg_data[item_name] = round(avg_time_cost, 3) + std_data[item_name] = round(variance**0.5, 3) + + @staticmethod + def _drop_cached_time_cost_lists(stat_data: object) -> None: + """不把原始耗时列表写入 local_store;需要时从数据库重新统计。""" + if not isinstance(stat_data, dict): + return + + for key in [TIME_COST_BY_TYPE, TIME_COST_BY_USER, TIME_COST_BY_MODEL, TIME_COST_BY_MODULE]: + stat_data.pop(key, None) + def _convert_defaultdict_to_dict(self, data: object) -> object: # sourcery skip: dict-comprehension, extract-duplicate-method, inline-immediately-returned-variable, merge-duplicate-blocks """递归转换defaultdict为普通dict""" @@ -1480,7 +1564,7 @@ class StatisticOutputTask(AsyncTask): _format_stat_data( stat["all_time"], "all_time", - datetime.fromtimestamp(self._to_float_timestamp(local_storage["deploy_time"])), + self.all_time_start_time, ) )