冷热分离
场景
运行两年后,数据量巨大。
数据规模:
- 总日志记录:10 亿条
- 每天新增:200 万条
- 数据库大小:500GB
问题:
- 查询越来越慢
- 存储成本高
- 备份时间长解决方案
1. 冷热数据分离策略
数据分类:
热数据(近期):
- 最近 30 天的日志
- 频繁查询
- 存储在 MySQL 主库
- 需要快速访问
温数据(中期):
- 30-180 天的日志
- 偶尔查询
- 存储在 MySQL 从库
- 访问速度要求一般
冷数据(长期):
- 180 天前的日志
- 很少查询
- 归档到对象存储
- 成本优化2. 分区表设计
-- 按时间分区
CREATE TABLE api_logs (
id BIGINT NOT NULL AUTO_INCREMENT,
user_id INT NOT NULL,
endpoint VARCHAR(255) NOT NULL,
params TEXT,
response_time INT,
status VARCHAR(50),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id, created_at),
INDEX idx_user_created (user_id, created_at),
INDEX idx_created (created_at)
) PARTITION BY RANGE (TO_DAYS(created_at)) (
PARTITION p202401 VALUES LESS THAN (TO_DAYS('2024-02-01')),
PARTITION p202402 VALUES LESS THAN (TO_DAYS('2024-03-01')),
PARTITION p202403 VALUES LESS THAN (TO_DAYS('2024-04-01')),
-- ...
PARTITION pmax VALUES LESS THAN MAXVALUE
);
-- 自动创建分区(存储过程)
DELIMITER //
CREATE PROCEDURE create_monthly_partition()
BEGIN
DECLARE next_month DATE;
DECLARE partition_name VARCHAR(20);
DECLARE partition_value VARCHAR(50);
SET next_month = DATE_ADD(CURDATE(), INTERVAL 2 MONTH);
SET next_month = DATE_FORMAT(next_month, '%Y-%m-01');
SET partition_name = CONCAT('p', DATE_FORMAT(next_month, '%Y%m'));
SET partition_value = CONCAT('TO_DAYS(\'', next_month, '\')');
SET @sql = CONCAT(
'ALTER TABLE api_logs ',
'REORGANIZE PARTITION pmax INTO (',
'PARTITION ', partition_name, ' VALUES LESS THAN (', partition_value, '),',
'PARTITION pmax VALUES LESS THAN MAXVALUE)'
);
PREPARE stmt FROM @sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
END //
DELIMITER ;
-- 定时执行:每月 1 号
CREATE EVENT auto_create_partition
ON SCHEDULE EVERY 1 MONTH
STARTS '2024-01-01 00:00:00'
DO CALL create_monthly_partition();3. 数据归档
def archive_old_logs():
"""归档旧日志到对象存储"""
# 归档 6 个月前的数据
cutoff_date = datetime.now() - timedelta(days=180)
logging.info(f'Starting archival for data before {cutoff_date}')
with get_db_connection() as conn:
cursor = conn.cursor()
# 获取需要归档的分区
cursor.execute(
'''SELECT PARTITION_NAME
FROM INFORMATION_SCHEMA.PARTITIONS
WHERE TABLE_SCHEMA = 'api_platform'
AND TABLE_NAME = 'api_logs'
AND PARTITION_NAME != 'pmax'
ORDER BY PARTITION_NAME'''
)
partitions = cursor.fetchall()
for partition in partitions:
partition_name = partition['PARTITION_NAME']
# 提取分区日期
partition_date = datetime.strptime(partition_name[1:], '%Y%m')
if partition_date < cutoff_date:
logging.info(f'Archiving partition {partition_name}')
# 导出数据到 S3
archive_partition_to_s3(partition_name)
# 删除分区
cursor.execute(
f'ALTER TABLE api_logs DROP PARTITION {partition_name}'
)
conn.commit()
logging.info(f'Archived and dropped partition {partition_name}')
def archive_partition_to_s3(partition_name):
"""将分区数据导出到 S3"""
# 使用 SELECT INTO OUTFILE 导出
export_file = f'/tmp/{partition_name}.csv'
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
f'''SELECT * FROM api_logs PARTITION ({partition_name})
INTO OUTFILE '{export_file}'
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
'''
)
# 上传到 S3
s3_client.upload_file(
export_file,
'api-logs-archive',
f'logs/{partition_name}.csv.gz'
)
# 删除临时文件
os.remove(export_file)4. 冷数据查询
@app.route('/api/admin/logs/archive/<date>')
@require_admin
def get_archived_logs(date):
"""查询归档日志"""
# 从 S3 获取数据
try:
# 解析日期
archive_date = datetime.strptime(date, '%Y-%m')
partition_name = f'p{archive_date.strftime("%Y%m")}'
# 从 S3 下载
response = s3_client.get_object(
Bucket='api-logs-archive',
Key=f'logs/{partition_name}.csv.gz'
)
# 解压并解析
content = gzip.decompress(response['Body'].read())
lines = content.decode('utf-8').split('\n')
# 返回前 100 条
results = []
reader = csv.DictReader(lines)
for i, row in enumerate(reader):
if i >= 100:
break
results.append(row)
return jsonify({
'archive_date': date,
'total_count': len(lines) - 1,
'sample': results
})
except Exception as e:
return jsonify({
'error': f'Failed to retrieve archived logs: {str(e)}'
}), 404效果验证
优化前
单表存储:
- 总记录:10 亿条
- 查询时间:10-30 秒
- 存储成本:高
- 备份时间:数小时优化后(冷热分离)
分区 + 归档:
- 热数据:600 万条(30 天)
- 查询时间:<1 秒
- 存储成本:降低 70%
- 备份时间:30 分钟本节小结
✅ 完成的工作:
- 实现了分区表
- 实现了数据归档
- 实现了冷数据查询
✅ 效果:
- 查询性能提升
- 存储成本降低 70%
- 维护效率提升
🎯 完成!我已经学会了处理大数据挑战
