海量日志
场景
两年半后,数据量激增。
数据规模:
- 日均调用量:200 万次
- 日志表记录:每天 200 万条
- 累计数据:18 个月 × 30 天 × 200 万 = 10 亿条
问题:
- 查询越来越慢
- 数据库存储告急
- 备份时间过长解决方案
1. 分库分表
-- 按时间分表
CREATE TABLE api_logs_2024_01 (
LIKE api_logs INCLUDING ALL
) PARTITION BY RANGE (created_at);
-- 自动创建分区
ALTER TABLE api_logs
PARTITION BY RANGE (TO_DAYS(created_at)) (
PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),
PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),
-- ...
);2. 冷热分离
def archive_old_logs():
"""归档旧日志到冷存储"""
# 6 个月前的数据
cutoff_date = datetime.now() - timedelta(days=180)
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
'SELECT * FROM api_logs WHERE created_at < %s',
(cutoff_date,)
)
# 写入 S3(使用 Parquet 格式)
for batch in iter_batch(cursor, batch_size=10000):
s3_client.put_object(
Bucket='api-logs-archive',
Key=f'logs/{batch[0]["created_at"].date()}.parquet',
Body=to_parquet(batch)
)
# 删除已归档的数据
cursor.execute(
'DELETE FROM api_logs WHERE created_at < %s',
(cutoff_date,)
)3. 使用 Elasticsearch
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://elasticsearch:9200'])
def index_log(log_entry):
"""索引日志到 Elasticsearch"""
es.index(
index='api-logs',
body={
'timestamp': log_entry['created_at'],
'user_id': log_entry['user_id'],
'endpoint': log_entry['endpoint'],
'response_time': log_entry['response_time'],
'status': log_entry['status']
}
)
def search_logs(user_id, start_date, end_date):
"""搜索日志"""
query = {
'query': {
'bool': {
'must': [
{'term': {'user_id': user_id}},
{'range': {'timestamp': {
'gte': start_date,
'lte': end_date
}}}
]
}
}
}
results = es.search(index='api-logs', body=query)
return results['hits']['hits']效果验证
优化前
日志查询(用户本月调用量):
- 扫描:1 亿条记录
- 时间:15 秒
- 影响:用户体验差优化后
Elasticsearch 查询:
- 扫描:仅相关记录
- 时间:0.5 秒
- 提升:30 倍本节小结
✅ 完成的工作:
- 实现了数据分区
- 实现了冷热分离
- 引入了 Elasticsearch
✅ 效果:
- 查询性能提升 30 倍
- 存储成本降低
🎯 下一步:产品经理要数据分析,如何实时统计?
