import asyncio
import concurrent.futures
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Any, Dict, Tuple, List
from src.common.logger import get_logger
from src.common.database.database import db
from src.common.database.database_model import OnlineTime, LLMUsage, Messages
from src.manager.async_task_manager import AsyncTask
from src.manager.local_store_manager import local_storage
logger = get_logger("maibot_statistic")
# 统计数据的键
TOTAL_REQ_CNT = "total_requests"
TOTAL_COST = "total_cost"
REQ_CNT_BY_TYPE = "requests_by_type"
REQ_CNT_BY_USER = "requests_by_user"
REQ_CNT_BY_MODEL = "requests_by_model"
REQ_CNT_BY_MODULE = "requests_by_module"
IN_TOK_BY_TYPE = "in_tokens_by_type"
IN_TOK_BY_USER = "in_tokens_by_user"
IN_TOK_BY_MODEL = "in_tokens_by_model"
IN_TOK_BY_MODULE = "in_tokens_by_module"
OUT_TOK_BY_TYPE = "out_tokens_by_type"
OUT_TOK_BY_USER = "out_tokens_by_user"
OUT_TOK_BY_MODEL = "out_tokens_by_model"
OUT_TOK_BY_MODULE = "out_tokens_by_module"
TOTAL_TOK_BY_TYPE = "tokens_by_type"
TOTAL_TOK_BY_USER = "tokens_by_user"
TOTAL_TOK_BY_MODEL = "tokens_by_model"
TOTAL_TOK_BY_MODULE = "tokens_by_module"
COST_BY_TYPE = "costs_by_type"
COST_BY_USER = "costs_by_user"
COST_BY_MODEL = "costs_by_model"
COST_BY_MODULE = "costs_by_module"
TIME_COST_BY_TYPE = "time_costs_by_type"
TIME_COST_BY_USER = "time_costs_by_user"
TIME_COST_BY_MODEL = "time_costs_by_model"
TIME_COST_BY_MODULE = "time_costs_by_module"
AVG_TIME_COST_BY_TYPE = "avg_time_costs_by_type"
AVG_TIME_COST_BY_USER = "avg_time_costs_by_user"
AVG_TIME_COST_BY_MODEL = "avg_time_costs_by_model"
AVG_TIME_COST_BY_MODULE = "avg_time_costs_by_module"
STD_TIME_COST_BY_TYPE = "std_time_costs_by_type"
STD_TIME_COST_BY_USER = "std_time_costs_by_user"
STD_TIME_COST_BY_MODEL = "std_time_costs_by_model"
STD_TIME_COST_BY_MODULE = "std_time_costs_by_module"
ONLINE_TIME = "online_time"
TOTAL_MSG_CNT = "total_messages"
MSG_CNT_BY_CHAT = "messages_by_chat"
class OnlineTimeRecordTask(AsyncTask):
"""在线时间记录任务"""
def __init__(self):
super().__init__(task_name="Online Time Record Task", run_interval=60)
self.record_id: int | None = None # Changed to int for Peewee's default ID
"""记录ID"""
self._init_database() # 初始化数据库
@staticmethod
def _init_database():
"""初始化数据库"""
with db.atomic(): # Use atomic operations for schema changes
OnlineTime.create_table(safe=True) # Creates table if it doesn't exist, Peewee handles indexes from model
async def run(self): # sourcery skip: use-named-expression
try:
current_time = datetime.now()
extended_end_time = current_time + timedelta(minutes=1)
if self.record_id:
# 如果有记录,则更新结束时间
query = OnlineTime.update(end_timestamp=extended_end_time).where(OnlineTime.id == self.record_id) # type: ignore
updated_rows = query.execute()
if updated_rows == 0:
# Record might have been deleted or ID is stale, try to find/create
self.record_id = None # Reset record_id to trigger find/create logic below
if not self.record_id: # Check again if record_id was reset or initially None
# 如果没有记录,检查一分钟以内是否已有记录
# Look for a record whose end_timestamp is recent enough to be considered ongoing
recent_record = (
OnlineTime.select()
.where(OnlineTime.end_timestamp >= (current_time - timedelta(minutes=1))) # type: ignore
.order_by(OnlineTime.end_timestamp.desc())
.first()
)
if recent_record:
# 如果有记录,则更新结束时间
self.record_id = recent_record.id
recent_record.end_timestamp = extended_end_time
recent_record.save()
else:
# 若没有记录,则插入新的在线时间记录
new_record = OnlineTime.create(
timestamp=current_time.timestamp(), # 添加此行
start_timestamp=current_time,
end_timestamp=extended_end_time,
duration=5, # 初始时长为5分钟
)
self.record_id = new_record.id
except Exception as e:
logger.error(f"在线时间记录失败,错误信息:{e}")
def _format_online_time(online_seconds: int) -> str:
"""
格式化在线时间
:param online_seconds: 在线时间(秒)
:return: 格式化后的在线时间字符串
"""
total_online_time = timedelta(seconds=online_seconds)
days = total_online_time.days
hours = total_online_time.seconds // 3600
minutes = (total_online_time.seconds // 60) % 60
seconds = total_online_time.seconds % 60
if days > 0:
# 如果在线时间超过1天,则格式化为"X天X小时X分钟"
return f"{total_online_time.days}天{hours}小时{minutes}分钟{seconds}秒"
elif hours > 0:
# 如果在线时间超过1小时,则格式化为"X小时X分钟X秒"
return f"{hours}小时{minutes}分钟{seconds}秒"
else:
# 其他情况格式化为"X分钟X秒"
return f"{minutes}分钟{seconds}秒"
class StatisticOutputTask(AsyncTask):
"""统计输出任务"""
SEP_LINE = "-" * 84
def __init__(self, record_file_path: str = "maibot_statistics.html"):
# 延迟300秒启动,运行间隔300秒
super().__init__(task_name="Statistics Data Output Task", wait_before_start=0, run_interval=300)
self.name_mapping: Dict[str, Tuple[str, float]] = {}
"""
联系人/群聊名称映射 {聊天ID: (联系人/群聊名称, 记录时间(timestamp))}
注:设计记录时间的目的是方便更新名称,使联系人/群聊名称保持最新
"""
self.record_file_path: str = record_file_path
"""
记录文件路径
"""
now = datetime.now()
if "deploy_time" in local_storage:
# 如果存在部署时间,则使用该时间作为全量统计的起始时间
deploy_time = datetime.fromtimestamp(local_storage["deploy_time"]) # type: ignore
else:
# 否则,使用最大时间范围,并记录部署时间为当前时间
deploy_time = datetime(2000, 1, 1)
local_storage["deploy_time"] = now.timestamp()
self.stat_period: List[Tuple[str, timedelta, str]] = [
("all_time", now - deploy_time, "自部署以来"), # 必须保留"all_time"
("last_30_days", timedelta(days=30), "近30天"),
("last_7_days", timedelta(days=7), "近7天"),
("last_3_days", timedelta(days=3), "近3天"),
("last_24_hours", timedelta(days=1), "近1天"),
("last_3_hours", timedelta(hours=3), "近3小时"),
("last_hour", timedelta(hours=1), "近1小时"),
("last_15_minutes", timedelta(minutes=15), "近15分钟"),
]
"""
统计时间段 [(统计名称, 统计时间段, 统计描述), ...]
"""
def _statistic_console_output(self, stats: Dict[str, Any], now: datetime):
"""
输出统计数据到控制台
:param stats: 统计数据
:param now: 基准当前时间
"""
# 输出最近一小时的统计数据
output = [
self.SEP_LINE,
f" 最近1小时的统计数据 (自{now.strftime('%Y-%m-%d %H:%M:%S')}开始,详细信息见文件:{self.record_file_path})",
self.SEP_LINE,
self._format_total_stat(stats["last_hour"]),
"",
self._format_model_classified_stat(stats["last_hour"]),
"",
self._format_chat_stat(stats["last_hour"]),
self.SEP_LINE,
"",
]
logger.info("\n" + "\n".join(output))
async def run(self):
try:
now = datetime.now()
# 使用线程池并行执行耗时操作
loop = asyncio.get_event_loop()
# 在线程池中并行执行数据收集和之前的HTML生成(如果存在)
with concurrent.futures.ThreadPoolExecutor() as executor:
logger.info("正在收集统计数据...")
# 数据收集任务
collect_task = loop.run_in_executor(executor, self._collect_all_statistics, now)
# 等待数据收集完成
stats = await collect_task
logger.info("统计数据收集完成")
# 并行执行控制台输出和HTML报告生成
console_task = loop.run_in_executor(executor, self._statistic_console_output, stats, now)
html_task = loop.run_in_executor(executor, self._generate_html_report, stats, now)
# 等待两个输出任务完成
await asyncio.gather(console_task, html_task)
logger.info("统计数据输出完成")
except Exception as e:
logger.exception(f"输出统计数据过程中发生异常,错误信息:{e}")
async def run_async_background(self):
"""
备选方案:完全异步后台运行统计输出
使用此方法可以让统计任务完全非阻塞
"""
async def _async_collect_and_output():
try:
import concurrent.futures
now = datetime.now()
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
logger.info("正在后台收集统计数据...")
# 创建后台任务,不等待完成
collect_task = asyncio.create_task(
loop.run_in_executor(executor, self._collect_all_statistics, now) # type: ignore
)
stats = await collect_task
logger.info("统计数据收集完成")
# 创建并发的输出任务
output_tasks = [
asyncio.create_task(loop.run_in_executor(executor, self._statistic_console_output, stats, now)), # type: ignore
asyncio.create_task(loop.run_in_executor(executor, self._generate_html_report, stats, now)), # type: ignore
]
# 等待所有输出任务完成
await asyncio.gather(*output_tasks)
logger.info("统计数据后台输出完成")
except Exception as e:
logger.exception(f"后台统计数据输出过程中发生异常:{e}")
# 创建后台任务,立即返回
asyncio.create_task(_async_collect_and_output())
# -- 以下为统计数据收集方法 --
@staticmethod
def _collect_model_request_for_period(collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]:
"""
收集指定时间段的LLM请求统计数据
:param collect_period: 统计时间段
"""
if not collect_period:
return {}
# 排序-按照时间段开始时间降序排列(最晚的时间段在前)
collect_period.sort(key=lambda x: x[1], reverse=True)
stats = {
period_key: {
TOTAL_REQ_CNT: 0,
REQ_CNT_BY_TYPE: defaultdict(int),
REQ_CNT_BY_USER: defaultdict(int),
REQ_CNT_BY_MODEL: defaultdict(int),
REQ_CNT_BY_MODULE: defaultdict(int),
IN_TOK_BY_TYPE: defaultdict(int),
IN_TOK_BY_USER: defaultdict(int),
IN_TOK_BY_MODEL: defaultdict(int),
IN_TOK_BY_MODULE: defaultdict(int),
OUT_TOK_BY_TYPE: defaultdict(int),
OUT_TOK_BY_USER: defaultdict(int),
OUT_TOK_BY_MODEL: defaultdict(int),
OUT_TOK_BY_MODULE: defaultdict(int),
TOTAL_TOK_BY_TYPE: defaultdict(int),
TOTAL_TOK_BY_USER: defaultdict(int),
TOTAL_TOK_BY_MODEL: defaultdict(int),
TOTAL_TOK_BY_MODULE: defaultdict(int),
TOTAL_COST: 0.0,
COST_BY_TYPE: defaultdict(float),
COST_BY_USER: defaultdict(float),
COST_BY_MODEL: defaultdict(float),
COST_BY_MODULE: defaultdict(float),
TIME_COST_BY_TYPE: defaultdict(list),
TIME_COST_BY_USER: defaultdict(list),
TIME_COST_BY_MODEL: defaultdict(list),
TIME_COST_BY_MODULE: defaultdict(list),
AVG_TIME_COST_BY_TYPE: defaultdict(float),
AVG_TIME_COST_BY_USER: defaultdict(float),
AVG_TIME_COST_BY_MODEL: defaultdict(float),
AVG_TIME_COST_BY_MODULE: defaultdict(float),
STD_TIME_COST_BY_TYPE: defaultdict(float),
STD_TIME_COST_BY_USER: defaultdict(float),
STD_TIME_COST_BY_MODEL: defaultdict(float),
STD_TIME_COST_BY_MODULE: defaultdict(float),
}
for period_key, _ in collect_period
}
# 以最早的时间戳为起始时间获取记录
# Assuming LLMUsage.timestamp is a DateTimeField
query_start_time = collect_period[-1][1]
for record in LLMUsage.select().where(LLMUsage.timestamp >= query_start_time): # type: ignore
record_timestamp = record.timestamp # This is already a datetime object
for idx, (_, period_start) in enumerate(collect_period):
if record_timestamp >= period_start:
for period_key, _ in collect_period[idx:]:
stats[period_key][TOTAL_REQ_CNT] += 1
request_type = record.request_type or "unknown"
user_id = record.user_id or "unknown" # user_id is TextField, already string
model_name = record.model_assign_name or record.model_name or "unknown"
# 提取模块名:如果请求类型包含".",取第一个"."之前的部分
module_name = request_type.split(".")[0] if "." in request_type else request_type
stats[period_key][REQ_CNT_BY_TYPE][request_type] += 1
stats[period_key][REQ_CNT_BY_USER][user_id] += 1
stats[period_key][REQ_CNT_BY_MODEL][model_name] += 1
stats[period_key][REQ_CNT_BY_MODULE][module_name] += 1
prompt_tokens = record.prompt_tokens or 0
completion_tokens = record.completion_tokens or 0
total_tokens = prompt_tokens + completion_tokens
stats[period_key][IN_TOK_BY_TYPE][request_type] += prompt_tokens
stats[period_key][IN_TOK_BY_USER][user_id] += prompt_tokens
stats[period_key][IN_TOK_BY_MODEL][model_name] += prompt_tokens
stats[period_key][IN_TOK_BY_MODULE][module_name] += prompt_tokens
stats[period_key][OUT_TOK_BY_TYPE][request_type] += completion_tokens
stats[period_key][OUT_TOK_BY_USER][user_id] += completion_tokens
stats[period_key][OUT_TOK_BY_MODEL][model_name] += completion_tokens
stats[period_key][OUT_TOK_BY_MODULE][module_name] += completion_tokens
stats[period_key][TOTAL_TOK_BY_TYPE][request_type] += total_tokens
stats[period_key][TOTAL_TOK_BY_USER][user_id] += total_tokens
stats[period_key][TOTAL_TOK_BY_MODEL][model_name] += total_tokens
stats[period_key][TOTAL_TOK_BY_MODULE][module_name] += total_tokens
cost = record.cost or 0.0
stats[period_key][TOTAL_COST] += cost
stats[period_key][COST_BY_TYPE][request_type] += cost
stats[period_key][COST_BY_USER][user_id] += cost
stats[period_key][COST_BY_MODEL][model_name] += cost
stats[period_key][COST_BY_MODULE][module_name] += cost
# 收集time_cost数据
time_cost = record.time_cost or 0.0
if time_cost > 0: # 只记录有效的time_cost
stats[period_key][TIME_COST_BY_TYPE][request_type].append(time_cost)
stats[period_key][TIME_COST_BY_USER][user_id].append(time_cost)
stats[period_key][TIME_COST_BY_MODEL][model_name].append(time_cost)
stats[period_key][TIME_COST_BY_MODULE][module_name].append(time_cost)
break
# 计算平均耗时和标准差
for period_key in stats:
for category in [REQ_CNT_BY_TYPE, REQ_CNT_BY_USER, REQ_CNT_BY_MODEL, REQ_CNT_BY_MODULE]:
time_cost_key = f"time_costs_by_{category.split('_')[-1]}"
avg_key = f"avg_time_costs_by_{category.split('_')[-1]}"
std_key = f"std_time_costs_by_{category.split('_')[-1]}"
for item_name in stats[period_key][category]:
time_costs = stats[period_key][time_cost_key].get(item_name, [])
if time_costs:
# 计算平均耗时
avg_time_cost = sum(time_costs) / len(time_costs)
stats[period_key][avg_key][item_name] = round(avg_time_cost, 3)
# 计算标准差
if len(time_costs) > 1:
variance = sum((x - avg_time_cost) ** 2 for x in time_costs) / len(time_costs)
std_time_cost = variance**0.5
stats[period_key][std_key][item_name] = round(std_time_cost, 3)
else:
stats[period_key][std_key][item_name] = 0.0
else:
stats[period_key][avg_key][item_name] = 0.0
stats[period_key][std_key][item_name] = 0.0
return stats
@staticmethod
def _collect_online_time_for_period(collect_period: List[Tuple[str, datetime]], now: datetime) -> Dict[str, Any]:
"""
收集指定时间段的在线时间统计数据
:param collect_period: 统计时间段
"""
if not collect_period:
return {}
collect_period.sort(key=lambda x: x[1], reverse=True)
stats = {
period_key: {
ONLINE_TIME: 0.0,
}
for period_key, _ in collect_period
}
query_start_time = collect_period[-1][1]
# Assuming OnlineTime.end_timestamp is a DateTimeField
for record in OnlineTime.select().where(OnlineTime.end_timestamp >= query_start_time): # type: ignore
# record.end_timestamp and record.start_timestamp are datetime objects
record_end_timestamp = record.end_timestamp
record_start_timestamp = record.start_timestamp
for idx, (_, period_boundary_start) in enumerate(collect_period):
if record_end_timestamp >= period_boundary_start:
# Calculate effective end time for this record in relation to 'now'
effective_end_time = min(record_end_timestamp, now)
for period_key, current_period_start_time in collect_period[idx:]:
# Determine the portion of the record that falls within this specific statistical period
overlap_start = max(record_start_timestamp, current_period_start_time)
overlap_end = effective_end_time # Already capped by 'now' and record's own end
if overlap_end > overlap_start:
stats[period_key][ONLINE_TIME] += (overlap_end - overlap_start).total_seconds()
break
return stats
def _collect_message_count_for_period(self, collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]:
"""
收集指定时间段的消息统计数据
:param collect_period: 统计时间段
"""
if not collect_period:
return {}
collect_period.sort(key=lambda x: x[1], reverse=True)
stats = {
period_key: {
TOTAL_MSG_CNT: 0,
MSG_CNT_BY_CHAT: defaultdict(int),
}
for period_key, _ in collect_period
}
query_start_timestamp = collect_period[-1][1].timestamp() # Messages.time is a DoubleField (timestamp)
for message in Messages.select().where(Messages.time >= query_start_timestamp): # type: ignore
message_time_ts = message.time # This is a float timestamp
chat_id = None
chat_name = None
# Logic based on Peewee model structure, aiming to replicate original intent
if message.chat_info_group_id:
chat_id = f"g{message.chat_info_group_id}"
chat_name = message.chat_info_group_name or f"群{message.chat_info_group_id}"
elif message.user_id: # Fallback to sender's info for chat_id if not a group_info based chat
# This uses the message SENDER's ID as per original logic's fallback
chat_id = f"u{message.user_id}" # SENDER's user_id
chat_name = message.user_nickname # SENDER's nickname
else:
# If neither group_id nor sender_id is available for chat identification
logger.warning(
f"Message (PK: {message.id if hasattr(message, 'id') else 'N/A'}) lacks group_id and user_id for chat stats."
)
continue
if not chat_id: # Should not happen if above logic is correct
continue
# Update name_mapping
try:
if chat_id in self.name_mapping:
if chat_name != self.name_mapping[chat_id][0] and message_time_ts > self.name_mapping[chat_id][1]:
self.name_mapping[chat_id] = (chat_name, message_time_ts)
else:
self.name_mapping[chat_id] = (chat_name, message_time_ts)
except (IndexError, TypeError) as e:
logger.warning(f"更新 name_mapping 时发生错误,chat_id: {chat_id}, 错误: {e}")
# 重置为正确的格式
self.name_mapping[chat_id] = (chat_name, message_time_ts)
for idx, (_, period_start_dt) in enumerate(collect_period):
if message_time_ts >= period_start_dt.timestamp():
for period_key, _ in collect_period[idx:]:
stats[period_key][TOTAL_MSG_CNT] += 1
stats[period_key][MSG_CNT_BY_CHAT][chat_id] += 1
break
return stats
def _collect_all_statistics(self, now: datetime) -> Dict[str, Dict[str, Any]]:
"""
收集各时间段的统计数据
:param now: 基准当前时间
"""
last_all_time_stat = None
try:
if "last_full_statistics" in local_storage:
# 如果存在上次完整统计数据,则使用该数据进行增量统计
last_stat: Dict[str, Any] = local_storage["last_full_statistics"] # 上次完整统计数据 # type: ignore
# 修复 name_mapping 数据类型不匹配问题
# JSON 中存储为列表,但代码期望为元组
raw_name_mapping = last_stat["name_mapping"]
self.name_mapping = {}
for chat_id, value in raw_name_mapping.items():
if isinstance(value, list) and len(value) == 2:
# 将列表转换为元组
self.name_mapping[chat_id] = (value[0], value[1])
elif isinstance(value, tuple) and len(value) == 2:
# 已经是元组,直接使用
self.name_mapping[chat_id] = value
else:
# 数据格式不正确,跳过或使用默认值
logger.warning(f"name_mapping 中 chat_id {chat_id} 的数据格式不正确: {value}")
continue
last_all_time_stat = last_stat["stat_data"] # 上次完整统计的统计数据
last_stat_timestamp = datetime.fromtimestamp(last_stat["timestamp"]) # 上次完整统计数据的时间戳
self.stat_period = [item for item in self.stat_period if item[0] != "all_time"] # 删除"所有时间"的统计时段
self.stat_period.append(("all_time", now - last_stat_timestamp, "自部署以来的"))
except Exception as e:
logger.warning(f"加载上次完整统计数据失败,进行全量统计,错误信息:{e}")
stat_start_timestamp = [(period[0], now - period[1]) for period in self.stat_period]
stat = {item[0]: {} for item in self.stat_period}
model_req_stat = self._collect_model_request_for_period(stat_start_timestamp)
online_time_stat = self._collect_online_time_for_period(stat_start_timestamp, now)
message_count_stat = self._collect_message_count_for_period(stat_start_timestamp)
# 统计数据合并
# 合并三类统计数据
for period_key, _ in stat_start_timestamp:
stat[period_key].update(model_req_stat[period_key])
stat[period_key].update(online_time_stat[period_key])
stat[period_key].update(message_count_stat[period_key])
if last_all_time_stat:
# 若存在上次完整统计数据,则将其与当前统计数据合并
for key, val in last_all_time_stat.items():
# 确保当前统计数据中存在该key
if key not in stat["all_time"]:
continue
if isinstance(val, dict):
# 是字典类型,则进行合并
for sub_key, sub_val in val.items():
# 普通的数值或字典合并
if sub_key in stat["all_time"][key]:
# 检查是否为嵌套的字典类型(如版本统计)
if isinstance(sub_val, dict) and isinstance(stat["all_time"][key][sub_key], dict):
# 合并嵌套字典
for nested_key, nested_val in sub_val.items():
if nested_key in stat["all_time"][key][sub_key]:
stat["all_time"][key][sub_key][nested_key] += nested_val
else:
stat["all_time"][key][sub_key][nested_key] = nested_val
else:
# 普通数值累加
stat["all_time"][key][sub_key] += sub_val
else:
stat["all_time"][key][sub_key] = sub_val
else:
# 直接合并
stat["all_time"][key] += val
# 更新上次完整统计数据的时间戳
# 将所有defaultdict转换为普通dict以避免类型冲突
clean_stat_data = self._convert_defaultdict_to_dict(stat["all_time"])
# 将 name_mapping 中的元组转换为列表,因为JSON不支持元组
json_safe_name_mapping = {}
for chat_id, (chat_name, timestamp) in self.name_mapping.items():
json_safe_name_mapping[chat_id] = [chat_name, timestamp]
local_storage["last_full_statistics"] = {
"name_mapping": json_safe_name_mapping,
"stat_data": clean_stat_data,
"timestamp": now.timestamp(),
}
return stat
def _convert_defaultdict_to_dict(self, data):
# sourcery skip: dict-comprehension, extract-duplicate-method, inline-immediately-returned-variable, merge-duplicate-blocks
"""递归转换defaultdict为普通dict"""
if isinstance(data, defaultdict):
# 转换defaultdict为普通dict
result = {}
for key, value in data.items():
result[key] = self._convert_defaultdict_to_dict(value)
return result
elif isinstance(data, dict):
# 递归处理普通dict
result = {}
for key, value in data.items():
result[key] = self._convert_defaultdict_to_dict(value)
return result
else:
# 其他类型直接返回
return data
# -- 以下为统计数据格式化方法 --
@staticmethod
def _format_total_stat(stats: Dict[str, Any]) -> str:
"""
格式化总统计数据
"""
output = [
f"总在线时间: {_format_online_time(stats[ONLINE_TIME])}",
f"总消息数: {stats[TOTAL_MSG_CNT]}",
f"总请求数: {stats[TOTAL_REQ_CNT]}",
f"总花费: {stats[TOTAL_COST]:.2f}¥",
"",
]
return "\n".join(output)
@staticmethod
def _format_model_classified_stat(stats: Dict[str, Any]) -> str:
"""
格式化按模型分类的统计数据
"""
if stats[TOTAL_REQ_CNT] <= 0:
return ""
data_fmt = "{:<32} {:>10} {:>12} {:>12} {:>12} {:>9.2f}¥ {:>10.1f} {:>10.1f}"
output = [
"按模型分类统计:",
" 模型名称 调用次数 输入Token 输出Token Token总量 累计花费 平均耗时(秒) 标准差(秒)",
]
for model_name, count in sorted(stats[REQ_CNT_BY_MODEL].items()):
name = f"{model_name[:29]}..." if len(model_name) > 32 else model_name
in_tokens = stats[IN_TOK_BY_MODEL][model_name]
out_tokens = stats[OUT_TOK_BY_MODEL][model_name]
tokens = stats[TOTAL_TOK_BY_MODEL][model_name]
cost = stats[COST_BY_MODEL][model_name]
avg_time_cost = stats[AVG_TIME_COST_BY_MODEL][model_name]
std_time_cost = stats[STD_TIME_COST_BY_MODEL][model_name]
output.append(
data_fmt.format(name, count, in_tokens, out_tokens, tokens, cost, avg_time_cost, std_time_cost)
)
output.append("")
return "\n".join(output)
def _format_chat_stat(self, stats: Dict[str, Any]) -> str:
"""
格式化聊天统计数据
"""
if stats[TOTAL_MSG_CNT] <= 0:
return ""
output = ["聊天消息统计:", " 联系人/群组名称 消息数量"]
for chat_id, count in sorted(stats[MSG_CNT_BY_CHAT].items()):
try:
chat_name = self.name_mapping.get(chat_id, ("未知聊天", 0))[0]
output.append(f"{chat_name[:32]:<32} {count:>10}")
except (IndexError, TypeError) as e:
logger.warning(f"格式化聊天统计时发生错误,chat_id: {chat_id}, 错误: {e}")
output.append(f"{'未知聊天':<32} {count:>10}")
output.append("")
return "\n".join(output)
def _get_chat_display_name_from_id(self, chat_id: str) -> str:
"""从chat_id获取显示名称"""
try:
# 首先尝试从chat_stream获取真实群组名称
from src.chat.message_receive.chat_stream import get_chat_manager
chat_manager = get_chat_manager()
if chat_id in chat_manager.streams:
stream = chat_manager.streams[chat_id]
if stream.group_info and hasattr(stream.group_info, "group_name"):
group_name = stream.group_info.group_name
if group_name and group_name.strip():
return group_name.strip()
elif stream.user_info and hasattr(stream.user_info, "user_nickname"):
user_name = stream.user_info.user_nickname
if user_name and user_name.strip():
return user_name.strip()
# 如果从chat_stream获取失败,尝试解析chat_id格式
if chat_id.startswith("g"):
return f"群聊{chat_id[1:]}"
elif chat_id.startswith("u"):
return f"用户{chat_id[1:]}"
else:
return chat_id
except Exception as e:
logger.warning(f"获取聊天显示名称失败: {e}")
return chat_id
# 移除_generate_versions_tab方法
def _generate_html_report(self, stat: dict[str, Any], now: datetime):
"""
生成HTML格式的统计报告
:param stat: 统计数据
:param now: 基准当前时间
:return: HTML格式的统计报告
"""
# 移除版本对比内容相关tab和内容
tab_list = [
f''
for period in self.stat_period
]
tab_list.append('')
def _format_stat_data(stat_data: dict[str, Any], div_id: str, start_time: datetime) -> str:
"""
格式化一个时间段的统计数据到html div块
:param stat_data: 统计数据
:param div_id: div的ID
:param start_time: 统计时间段开始时间
"""
# format总在线时间
# 按模型分类统计
model_rows = "\n".join(
[
f"
"
f"| {model_name} | "
f"{count} | "
f"{stat_data[IN_TOK_BY_MODEL][model_name]} | "
f"{stat_data[OUT_TOK_BY_MODEL][model_name]} | "
f"{stat_data[TOTAL_TOK_BY_MODEL][model_name]} | "
f"{stat_data[COST_BY_MODEL][model_name]:.2f} ¥ | "
f"{stat_data[AVG_TIME_COST_BY_MODEL][model_name]:.1f} 秒 | "
f"{stat_data[STD_TIME_COST_BY_MODEL][model_name]:.1f} 秒 | "
f"
"
for model_name, count in sorted(stat_data[REQ_CNT_BY_MODEL].items())
]
if stat_data[REQ_CNT_BY_MODEL]
else ["| 暂无数据 |
"]
)
# 按请求类型分类统计
type_rows = "\n".join(
[
f""
f"| {req_type} | "
f"{count} | "
f"{stat_data[IN_TOK_BY_TYPE][req_type]} | "
f"{stat_data[OUT_TOK_BY_TYPE][req_type]} | "
f"{stat_data[TOTAL_TOK_BY_TYPE][req_type]} | "
f"{stat_data[COST_BY_TYPE][req_type]:.2f} ¥ | "
f"{stat_data[AVG_TIME_COST_BY_TYPE][req_type]:.1f} 秒 | "
f"{stat_data[STD_TIME_COST_BY_TYPE][req_type]:.1f} 秒 | "
f"
"
for req_type, count in sorted(stat_data[REQ_CNT_BY_TYPE].items())
]
if stat_data[REQ_CNT_BY_TYPE]
else ["| 暂无数据 |
"]
)
# 按模块分类统计
module_rows = "\n".join(
[
f""
f"| {module_name} | "
f"{count} | "
f"{stat_data[IN_TOK_BY_MODULE][module_name]} | "
f"{stat_data[OUT_TOK_BY_MODULE][module_name]} | "
f"{stat_data[TOTAL_TOK_BY_MODULE][module_name]} | "
f"{stat_data[COST_BY_MODULE][module_name]:.2f} ¥ | "
f"{stat_data[AVG_TIME_COST_BY_MODULE][module_name]:.1f} 秒 | "
f"{stat_data[STD_TIME_COST_BY_MODULE][module_name]:.1f} 秒 | "
f"
"
for module_name, count in sorted(stat_data[REQ_CNT_BY_MODULE].items())
]
if stat_data[REQ_CNT_BY_MODULE]
else ["| 暂无数据 |
"]
)
# 聊天消息统计
chat_rows = []
for chat_id, count in sorted(stat_data[MSG_CNT_BY_CHAT].items()):
try:
chat_name = self.name_mapping.get(chat_id, ("未知聊天", 0))[0]
chat_rows.append(f"| {chat_name} | {count} |
")
except (IndexError, TypeError) as e:
logger.warning(f"生成HTML聊天统计时发生错误,chat_id: {chat_id}, 错误: {e}")
chat_rows.append(f"| 未知聊天 | {count} |
")
chat_rows_html = "\n".join(chat_rows) if chat_rows else "| 暂无数据 |
"
# 生成HTML
return f"""
统计时段:
{start_time.strftime("%Y-%m-%d %H:%M:%S")} ~ {now.strftime("%Y-%m-%d %H:%M:%S")}
总在线时间
{_format_online_time(stat_data[ONLINE_TIME])}
总消息数
{stat_data[TOTAL_MSG_CNT]}
总请求数
{stat_data[TOTAL_REQ_CNT]}
总花费
{stat_data[TOTAL_COST]:.2f} ¥
按模型分类统计
| 模型名称 | 调用次数 | 输入Token | 输出Token | Token总量 | 累计花费 | 平均耗时(秒) | 标准差(秒) |
{model_rows}
按模块分类统计
| 模块名称 | 调用次数 | 输入Token | 输出Token | Token总量 | 累计花费 | 平均耗时(秒) | 标准差(秒) |
{module_rows}
按请求类型分类统计
| 请求类型 | 调用次数 | 输入Token | 输出Token | Token总量 | 累计花费 | 平均耗时(秒) | 标准差(秒) |
{type_rows}
聊天消息统计
| 联系人/群组名称 | 消息数量 |
{chat_rows_html}
数据分布图表
模型花费分布
模块花费分布
请求类型花费分布
聊天消息分布
"""
tab_content_list = [
_format_stat_data(stat[period[0]], period[0], now - period[1])
for period in self.stat_period
if period[0] != "all_time"
]
tab_content_list.append(
_format_stat_data(stat["all_time"], "all_time", datetime.fromtimestamp(local_storage["deploy_time"])) # type: ignore
)
# 不再添加版本对比内容
# 添加图表内容
chart_data = self._generate_chart_data(stat)
tab_content_list.append(self._generate_chart_tab(chart_data))
joined_tab_list = "\n".join(tab_list)
joined_tab_content = "\n".join(tab_content_list)
html_template = (
"""
MaiBot运行统计报告
"""
+ f"""
{joined_tab_list}
{joined_tab_content}
"""
+ """
"""
)
with open(self.record_file_path, "w", encoding="utf-8") as f:
f.write(html_template)
def _generate_chart_data(self, stat: dict[str, Any]) -> dict:
"""生成图表数据"""
now = datetime.now()
chart_data = {}
# 支持多个时间范围
time_ranges = [
("6h", 6, 10), # 6小时,10分钟间隔
("12h", 12, 15), # 12小时,15分钟间隔
("24h", 24, 15), # 24小时,15分钟间隔
("48h", 48, 30), # 48小时,30分钟间隔
]
for range_key, hours, interval_minutes in time_ranges:
range_data = self._collect_interval_data(now, hours, interval_minutes)
chart_data[range_key] = range_data
return chart_data
def _collect_interval_data(self, now: datetime, hours: int, interval_minutes: int) -> dict:
"""收集指定时间范围内每个间隔的数据"""
# 生成时间点
start_time = now - timedelta(hours=hours)
time_points = []
current_time = start_time
while current_time <= now:
time_points.append(current_time)
current_time += timedelta(minutes=interval_minutes)
# 初始化数据结构
total_cost_data = [0] * len(time_points)
cost_by_model = {}
cost_by_module = {}
message_by_chat = {}
time_labels = [t.strftime("%H:%M") for t in time_points]
interval_seconds = interval_minutes * 60
# 查询LLM使用记录
query_start_time = start_time
for record in LLMUsage.select().where(LLMUsage.timestamp >= query_start_time): # type: ignore
record_time = record.timestamp
# 找到对应的时间间隔索引
time_diff = (record_time - start_time).total_seconds()
interval_index = int(time_diff // interval_seconds)
if 0 <= interval_index < len(time_points):
# 累加总花费数据
cost = record.cost or 0.0
total_cost_data[interval_index] += cost # type: ignore
# 累加按模型分类的花费
model_name = record.model_assign_name or record.model_name or "unknown"
if model_name not in cost_by_model:
cost_by_model[model_name] = [0] * len(time_points)
cost_by_model[model_name][interval_index] += cost
# 累加按模块分类的花费
request_type = record.request_type or "unknown"
module_name = request_type.split(".")[0] if "." in request_type else request_type
if module_name not in cost_by_module:
cost_by_module[module_name] = [0] * len(time_points)
cost_by_module[module_name][interval_index] += cost
# 查询消息记录
query_start_timestamp = start_time.timestamp()
for message in Messages.select().where(Messages.time >= query_start_timestamp): # type: ignore
message_time_ts = message.time
# 找到对应的时间间隔索引
time_diff = message_time_ts - query_start_timestamp
interval_index = int(time_diff // interval_seconds)
if 0 <= interval_index < len(time_points):
# 确定聊天流名称
chat_name = None
if message.chat_info_group_id:
chat_name = message.chat_info_group_name or f"群{message.chat_info_group_id}"
elif message.user_id:
chat_name = message.user_nickname or f"用户{message.user_id}"
else:
continue
if not chat_name:
continue
# 累加消息数
if chat_name not in message_by_chat:
message_by_chat[chat_name] = [0] * len(time_points)
message_by_chat[chat_name][interval_index] += 1
return {
"time_labels": time_labels,
"total_cost_data": total_cost_data,
"cost_by_model": cost_by_model,
"cost_by_module": cost_by_module,
"message_by_chat": message_by_chat,
}
def _generate_chart_tab(self, chart_data: dict) -> str:
# sourcery skip: extract-duplicate-method, move-assign-in-block
"""生成图表选项卡HTML内容"""
# 生成不同颜色的调色板
colors = [
"#8b5cf6",
"#9f8efb",
"#b5a6ff",
"#c7bbff",
"#d9ceff",
"#a78bfa",
"#9073d8",
"#bfaefc",
"#cabdfd",
"#e6e0ff",
]
# 默认使用24小时数据生成数据集
default_data = chart_data["24h"]
# 为每个模型生成数据集
model_datasets = []
for i, (model_name, cost_data) in enumerate(default_data["cost_by_model"].items()):
color = colors[i % len(colors)]
model_datasets.append(f"""{{
label: '{model_name}',
data: {cost_data},
borderColor: '{color}',
backgroundColor: '{color}20',
tension: 0.4,
fill: false
}}""")
",\n ".join(model_datasets)
# 为每个模块生成数据集
module_datasets = []
for i, (module_name, cost_data) in enumerate(default_data["cost_by_module"].items()):
color = colors[i % len(colors)]
module_datasets.append(f"""{{
label: '{module_name}',
data: {cost_data},
borderColor: '{color}',
backgroundColor: '{color}20',
tension: 0.4,
fill: false
}}""")
",\n ".join(module_datasets)
# 为每个聊天流生成消息数据集
message_datasets = []
for i, (chat_name, message_data) in enumerate(default_data["message_by_chat"].items()):
color = colors[i % len(colors)]
message_datasets.append(f"""{{
label: '{chat_name}',
data: {message_data},
borderColor: '{color}',
backgroundColor: '{color}20',
tension: 0.4,
fill: false
}}""")
",\n ".join(message_datasets)
return f"""
数据图表
"""
class AsyncStatisticOutputTask(AsyncTask):
"""完全异步的统计输出任务 - 更高性能版本"""
def __init__(self, record_file_path: str = "maibot_statistics.html"):
# 延迟0秒启动,运行间隔300秒
super().__init__(task_name="Async Statistics Data Output Task", wait_before_start=0, run_interval=300)
# 直接复用 StatisticOutputTask 的初始化逻辑
temp_stat_task = StatisticOutputTask(record_file_path)
self.name_mapping = temp_stat_task.name_mapping
self.record_file_path = temp_stat_task.record_file_path
self.stat_period = temp_stat_task.stat_period
async def run(self):
"""完全异步执行统计任务"""
async def _async_collect_and_output():
try:
now = datetime.now()
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
logger.info("正在后台收集统计数据...")
# 数据收集任务
collect_task = asyncio.create_task(
loop.run_in_executor(executor, self._collect_all_statistics, now) # type: ignore
)
stats = await collect_task
logger.info("统计数据收集完成")
# 创建并发的输出任务
output_tasks = [
asyncio.create_task(loop.run_in_executor(executor, self._statistic_console_output, stats, now)), # type: ignore
asyncio.create_task(loop.run_in_executor(executor, self._generate_html_report, stats, now)), # type: ignore
]
# 等待所有输出任务完成
await asyncio.gather(*output_tasks)
logger.info("统计数据后台输出完成")
except Exception as e:
logger.exception(f"后台统计数据输出过程中发生异常:{e}")
# 创建后台任务,立即返回
asyncio.create_task(_async_collect_and_output())
# 复用 StatisticOutputTask 的所有方法
def _collect_all_statistics(self, now: datetime):
return StatisticOutputTask._collect_all_statistics(self, now) # type: ignore
def _statistic_console_output(self, stats: Dict[str, Any], now: datetime):
return StatisticOutputTask._statistic_console_output(self, stats, now) # type: ignore
def _generate_html_report(self, stats: dict[str, Any], now: datetime):
return StatisticOutputTask._generate_html_report(self, stats, now) # type: ignore
# 其他需要的方法也可以类似复用...
@staticmethod
def _collect_model_request_for_period(collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]:
return StatisticOutputTask._collect_model_request_for_period(collect_period)
@staticmethod
def _collect_online_time_for_period(collect_period: List[Tuple[str, datetime]], now: datetime) -> Dict[str, Any]:
return StatisticOutputTask._collect_online_time_for_period(collect_period, now)
def _collect_message_count_for_period(self, collect_period: List[Tuple[str, datetime]]) -> Dict[str, Any]:
return StatisticOutputTask._collect_message_count_for_period(self, collect_period) # type: ignore
@staticmethod
def _format_total_stat(stats: Dict[str, Any]) -> str:
return StatisticOutputTask._format_total_stat(stats)
@staticmethod
def _format_model_classified_stat(stats: Dict[str, Any]) -> str:
return StatisticOutputTask._format_model_classified_stat(stats)
def _format_chat_stat(self, stats: Dict[str, Any]) -> str:
return StatisticOutputTask._format_chat_stat(self, stats) # type: ignore
def _generate_chart_data(self, stat: dict[str, Any]) -> dict:
return StatisticOutputTask._generate_chart_data(self, stat) # type: ignore
def _collect_interval_data(self, now: datetime, hours: int, interval_minutes: int) -> dict:
return StatisticOutputTask._collect_interval_data(self, now, hours, interval_minutes) # type: ignore
def _generate_chart_tab(self, chart_data: dict) -> str:
return StatisticOutputTask._generate_chart_tab(self, chart_data) # type: ignore
def _get_chat_display_name_from_id(self, chat_id: str) -> str:
return StatisticOutputTask._get_chat_display_name_from_id(self, chat_id) # type: ignore
def _convert_defaultdict_to_dict(self, data):
return StatisticOutputTask._convert_defaultdict_to_dict(self, data) # type: ignore