调用量统计
场景
用户认证系统上线后,业务继续增长。
三个月后:
- 注册用户:1000 个
- 日活用户:300 个
- 日调用量:20 万次
新需求
运营团队找到我:
我们需要了解:
1. 每个用户用了多少次 API?
2. 哪些时间段的调用最多?
3. 哪些 API 接口最热门?
4. 谁是我们的重度用户?
这些数据对后续推出付费套餐很重要。我意识到:需要建立完善的统计系统。
当前问题
我查看了一下当前的日志系统:
-- 调用日志表
CREATE TABLE api_logs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id INT NOT NULL,
endpoint VARCHAR(255) NOT NULL,
params TEXT,
response_time INT,
status VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);问题 1:查询太慢
# 统计某个用户的本月调用量
def get_user_monthly_usage(user_id):
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
'''SELECT COUNT(*) FROM api_logs
WHERE user_id = ?
AND created_at >= DATE_FORMAT(NOW(), '%Y-%m-01')''',
(user_id,)
)
result = cursor.fetchone()
return result['COUNT(*)']这个查询需要扫描几十万条记录,耗时 2-5 秒。
问题 2:统计维度不够
当前只能按用户统计,但运营需要:
- 按时间统计(每小时、每天)
- 按接口统计(哪个接口最热门)
- 按响应时间统计(性能分析)
问题 3:数据量太大
日志表每个月增加 600 万条记录:
- 3 个月:1800 万条
- 查询越来越慢
- 数据库压力越来越大
解决思路
我决定采用预聚合的方式:
核心思想
不实时统计原始日志,而是:
- 实时更新统计数据
- 定期聚合原始日志
- 查询时直接读取统计数据
优势
- 查询速度极快(直接读取统计结果)
- 原始日志可以定期清理
- 支持多种统计维度
设计统计表
按小时统计表
CREATE TABLE api_stats_hourly (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id INT NOT NULL,
endpoint VARCHAR(255) NOT NULL,
hour TIMESTAMP NOT NULL, -- 统计时间(精确到小时)
total_calls INT DEFAULT 0, -- 总调用次数
success_calls INT DEFAULT 0, -- 成功次数
error_calls INT DEFAULT 0, -- 失败次数
avg_response_time FLOAT, -- 平均响应时间
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY unique_stats (user_id, endpoint, hour),
INDEX idx_user_hour (user_id, hour),
INDEX idx_hour (hour)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;按日统计表
CREATE TABLE api_stats_daily (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id INT NOT NULL,
endpoint VARCHAR(255) NOT NULL,
date DATE NOT NULL, -- 统计日期
total_calls INT DEFAULT 0,
success_calls INT DEFAULT 0,
error_calls INT DEFAULT 0,
avg_response_time FLOAT,
p95_response_time FLOAT, -- 95 分位响应时间
p99_response_time FLOAT, -- 99 分位响应时间
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY unique_stats (user_id, endpoint, date),
INDEX idx_user_date (user_id, date),
INDEX idx_date (date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;用户汇总表
CREATE TABLE user_stats_monthly (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id INT NOT NULL,
month DATE NOT NULL, -- 统计月份(每月 1 号)
total_calls INT DEFAULT 0,
total_success INT DEFAULT 0,
total_error INT DEFAULT 0,
avg_daily_calls FLOAT,
peak_hour_calls INT, -- 峰值小时调用量
most_used_endpoint VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY unique_stats (user_id, month),
INDEX idx_user_month (user_id, month)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;实时统计更新
修改 API 调用逻辑
@app.route('/api/weather')
@require_api_key
def get_weather():
start_time = time.time()
try:
# ... 业务逻辑
result = fetch_weather_data(city)
status = 'success'
except Exception as e:
result = {'error': str(e)}
status = 'error'
response_time = int((time.time() - start_time) * 1000)
# 记录原始日志(用于详细分析)
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
'''INSERT INTO api_logs
(user_id, endpoint, params, response_time, status)
VALUES (?, ?, ?, ?, ?)''',
(request.user['id'], '/api/weather',
f'city={city}', response_time, status)
)
conn.commit()
# 实时更新统计数据
update_hourly_stats(
request.user['id'],
'/api/weather',
response_time,
status
)
return jsonify(result)更新小时统计
def update_hourly_stats(user_id, endpoint, response_time, status):
"""实时更新小时统计"""
current_hour = datetime.now().replace(minute=0, second=0, microsecond=0)
with get_db_connection() as conn:
cursor = conn.cursor()
# 使用 INSERT ... ON DUPLICATE KEY UPDATE
cursor.execute(
'''INSERT INTO api_stats_hourly
(user_id, endpoint, hour, total_calls, success_calls, error_calls, avg_response_time)
VALUES (?, ?, ?, 1, ?, ?, ?)
ON DUPLICATE KEY UPDATE
total_calls = total_calls + 1,
success_calls = success_calls + %s,
error_calls = error_calls + %s,
avg_response_time = (avg_response_time * (total_calls - 1) + %s) / total_calls''',
(
user_id, endpoint, current_hour,
1 if status == 'success' else 0,
0 if status == 'success' else 1,
response_time,
1 if status == 'success' else 0,
0 if status == 'success' else 1,
response_time
)
)
conn.commit()定时聚合任务
每日聚合
def aggregate_daily_stats():
"""每日聚合小时统计数据"""
yesterday = (datetime.now() - timedelta(days=1)).date()
with get_db_connection() as conn:
cursor = conn.cursor()
# 获取昨天所有用户的数据
cursor.execute(
'''SELECT user_id, endpoint,
SUM(total_calls) as total_calls,
SUM(success_calls) as success_calls,
SUM(error_calls) as error_calls,
AVG(avg_response_time) as avg_response_time
FROM api_stats_hourly
WHERE DATE(hour) = %s
GROUP BY user_id, endpoint''',
(yesterday,)
)
stats = cursor.fetchall()
# 插入日统计表
for stat in stats:
cursor.execute(
'''INSERT INTO api_stats_daily
(user_id, endpoint, date, total_calls, success_calls,
error_calls, avg_response_time)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
total_calls = VALUES(total_calls),
success_calls = VALUES(success_calls),
error_calls = VALUES(error_calls),
avg_response_time = VALUES(avg_response_time)''',
(stat['user_id'], stat['endpoint'], yesterday,
stat['total_calls'], stat['success_calls'],
stat['error_calls'], stat['avg_response_time'])
)
conn.commit()
# 使用 cron 定时执行:每天凌晨 1 点
# 0 1 * * * /usr/bin/python3 /path/to/aggregate_daily.py每月聚合
def aggregate_monthly_stats():
"""每月聚合日统计数据"""
last_month = (datetime.now().replace(day=1) - timedelta(days=1)).date()
last_month_day = last_month.replace(day=1)
with get_db_connection() as conn:
cursor = conn.cursor()
# 获取上月所有用户的数据
cursor.execute(
'''SELECT user_id,
SUM(total_calls) as total_calls,
SUM(success_calls) as success_calls,
SUM(error_calls) as error_calls,
AVG(total_calls) as avg_daily_calls,
MAX(peak_hour_calls) as peak_calls,
(SELECT endpoint FROM api_stats_daily
WHERE user_id = t.user_id
AND date >= %s
GROUP BY endpoint
ORDER BY SUM(total_calls) DESC
LIMIT 1) as most_used_endpoint
FROM api_stats_daily t
WHERE date >= %s AND date < %s
GROUP BY user_id''',
(last_month_day, last_month_day, datetime.now().replace(day=1))
)
stats = cursor.fetchall()
for stat in stats:
cursor.execute(
'''INSERT INTO user_stats_monthly
(user_id, month, total_calls, total_success, total_error,
avg_daily_calls, peak_hour_calls, most_used_endpoint)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
total_calls = VALUES(total_calls),
total_success = VALUES(total_success),
total_error = VALUES(total_error)''',
(stat['user_id'], last_month_day,
stat['total_calls'], stat['success_calls'],
stat['error_calls'], stat['avg_daily_calls'],
stat['peak_calls'], stat['most_used_endpoint'])
)
conn.commit()
# 使用 cron 定时执行:每月 1 号凌晨 2 点
# 0 2 1 * * /usr/bin/python3 /path/to/aggregate_monthly.py查询接口
用户本月调用量
@app.route('/api/stats/my-usage')
@require_api_key
def get_my_usage():
"""获取用户本月调用量"""
this_month = datetime.now().replace(day=1).date()
with get_db_connection() as conn:
cursor = conn.cursor()
# 从月统计表查询(超快!)
cursor.execute(
'SELECT * FROM user_stats_monthly WHERE user_id = ? AND month = ?',
(request.user['id'], this_month)
)
stats = cursor.fetchone()
if not stats:
# 如果没有统计数据,返回 0
return jsonify({
'month': this_month.isoformat(),
'total_calls': 0,
'avg_daily_calls': 0
})
return jsonify({
'month': stats['month'].isoformat(),
'total_calls': stats['total_calls'],
'total_success': stats['total_success'],
'total_error': stats['total_error'],
'avg_daily_calls': stats['avg_daily_calls'],
'peak_hour_calls': stats['peak_hour_calls'],
'most_used_endpoint': stats['most_used_endpoint']
})每日调用趋势
@app.route('/api/stats/daily-trend')
@require_api_key
def get_daily_trend():
"""获取每日调用趋势"""
days = request.args.get('days', 30, type=int)
start_date = (datetime.now() - timedelta(days=days)).date()
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
'''SELECT date, SUM(total_calls) as total_calls
FROM api_stats_daily
WHERE user_id = ? AND date >= ?
GROUP BY date
ORDER BY date ASC''',
(request.user['id'], start_date)
)
results = cursor.fetchall()
return jsonify({
'data': [
{
'date': r['date'].isoformat(),
'calls': r['total_calls']
}
for r in results
]
})性能对比
优化前(直接查询日志表)
# 查询用户本月调用量
cursor.execute(
'''SELECT COUNT(*) FROM api_logs
WHERE user_id = ?
AND created_at >= DATE_FORMAT(NOW(), '%Y-%m-01')'''
)
# 执行时间:2-5 秒
# 需要扫描:几十万条记录优化后(查询统计表)
# 查询用户本月调用量
cursor.execute(
'SELECT * FROM user_stats_monthly WHERE user_id = ? AND month = ?'
)
# 执行时间:10-20 毫秒
# 直接读取:1 条记录性能提升:100-500 倍!
数据清理
有了统计表后,原始日志可以定期清理:
def clean_old_logs():
"""清理 90 天前的原始日志"""
cutoff_date = datetime.now() - timedelta(days=90)
with get_db_connection() as conn:
cursor = conn.cursor()
# 先归档(可选)
cursor.execute(
'''SELECT * FROM api_logs
WHERE created_at < %s
INTO OUTFILE '/archive/logs_%s.csv'
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n' ''',
(cutoff_date, cutoff_date.strftime('%Y%m%d'))
)
# 删除旧日志
cursor.execute(
'DELETE FROM api_logs WHERE created_at < %s',
(cutoff_date,)
)
conn.commit()
print(f"Deleted {cursor.rowcount} old log entries")
# 定时执行:每周日凌晨 3 点
# 0 3 * * 0 /usr/bin/python3 /path/to/clean_logs.py本节小结
✅ 完成的工作:
- 设计了多维度统计表(小时、日、月)
- 实现了实时统计更新
- 实现了定时聚合任务
- 查询性能提升 100-500 倍
✅ 效果:
- 用户可以实时查看自己的调用量
- 运营可以分析数据趋势
- 为付费套餐提供数据支持
⚠️ 新的问题:
- 某些用户调用过于频繁
- 需要限制每个用户的调用速率
🎯 下一步: 某些用户每秒调用几百次 API,我需要限流。
(下一节:限流机制)
