大数据挑战
日志表爆炸的那天 💥
早上九点,我打开监控面板,看到红色的警告灯在疯狂闪烁。
click_logs 表突破 5000 万条记录了。磁盘占用飙到 40GB。我试着查询上周的数据统计,进度条慢悠悠地爬了 30 秒才返回结果。
我盯着屏幕,意识到一个残酷的事实:按照这个增长速度,三个月后数据库就要爆炸了。
当前数据量:
- 每天点击: 100 万次
- 每条日志: 500 字节
- 每天新增: 500MB
- 一年累积: 180GB
存储增长曲线:
Month 1: 15GB
Month 3: 45GB ← 查询开始变慢
Month 6: 90GB ← 数据库扛不住了
Month 12: 180GB ← 💥我翻着 AWS 账单,看着 RDS 的存储成本一路飙升。我知道,必须做数据分层了。
冷热数据分离 🌡️
那天晚上我画了一张图,把数据按照访问频率分成三类:
数据温度分布:
🔥 热数据(最近 7 天)
占比: 2% 访问频率: 80%
查询特点: 实时统计、活跃短链接监控
存储: Redis + MySQL
这些数据是用户最常看的——今天点击了多少、最近一周的趋势。
必须毫秒级响应,放在 Redis 缓存里。
🌡️ 温数据(7 天 ~ 30 天)
占比: 8% 访问频率: 15%
查询特点: 周报、月报查看
存储: MySQL
用户偶尔要看上个月的统计数据,但不会频繁访问。
放在 MySQL 里,用索引优化查询。
❄️ 冷数据(30 天以上)
占比: 90% 访问频率: 5%
查询特点: 历史数据分析、合规审计
存储: 对象存储(S3)
这些数据几乎没人看,但不能删——可能要做年度总结,
或者有人问"去年 12 月的点击量是多少"。
压缩后扔到 S3,成本只有 MySQL 的 1/10。核心理念: 80% 的查询只看 2% 的数据。把冷数据搬走,热数据跑得飞快。
归档方案实现
我写了个定时任务,每天凌晨把冷数据搬到 S3:
from datetime import datetime, timedelta
import gzip
import json
class DataArchiver:
"""数据归档器——把冷数据搬到 S3"""
def __init__(self, mysql_db, s3_client):
self.mysql = mysql_db
self.s3 = s3_client
def archive_old_logs(self, days_to_keep=7):
"""归档超过 N 天的日志"""
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
# 步骤 1: 找出需要归档的数据
print(f"🔍 查找 {cutoff_date} 之前的数据...")
count = self.mysql.query_one("""
SELECT COUNT(*) as cnt
FROM click_logs
WHERE created_at < ?
""", cutoff_date)['cnt']
if count == 0:
print("✅ 没有需要归档的数据")
return
print(f"📦 找到 {count:,} 条记录需要归档")
# 步骤 2: 分批导出(避免内存爆炸)
batch_size = 50000
offset = 0
total_archived = 0
while True:
rows = self.mysql.query_all("""
SELECT * FROM click_logs
WHERE created_at < ?
ORDER BY created_at
LIMIT ? OFFSET ?
""", cutoff_date, batch_size, offset)
if not rows:
break
# 步骤 3: 压缩并上传到 S3
self._upload_batch_to_s3(rows, cutoff_date)
total_archived += len(rows)
offset += batch_size
print(f"📤 已归档 {total_archived:,}/{count:,} 条记录")
# 步骤 4: 删除已归档的数据
print(f"🗑️ 删除 MySQL 中的旧数据...")
deleted = self.mysql.execute("""
DELETE FROM click_logs
WHERE created_at < ?
""", cutoff_date)
print(f"✅ 归档完成! 删除了 {deleted:,} 条记录,释放了 {deleted * 500 / 1024 / 1024:.1f} MB 空间")
def _upload_batch_to_s3(self, rows, cutoff_date):
"""压缩并上传一批数据到 S3"""
# 转成 JSON
json_data = json.dumps([dict(row) for row in rows])
# Gzip 压缩(压缩率约 10:1)
compressed = gzip.compress(json_data.encode('utf-8'))
# 生成 S3 路径(按年月组织)
year_month = cutoff_date.strftime("%Y/%m")
filename = f"click_logs/{year_month}/{cutoff_date.strftime('%Y%m%d')}_{datetime.now().timestamp()}.json.gz"
# 上传
self.s3.put_object(
bucket='analytics-archive',
key=filename,
body=compressed
)
# 定时任务配置
# 每天凌晨 2 点执行
# cron: 0 2 * * *第一次运行时,我看着日志输出,心跳加速:
🔍 查找 2024-01-15 之前的数据...
📦 找到 42,356,789 条记录需要归档
📤 已归档 50,000/42,356,789 条记录
📤 已归档 100,000/42,356,789 条记录
...
✅ 归档完成! 删除了 42,356,789 条记录,释放了 20.2 GB 空间MySQL 表从 5000 万条记录降到了 700 万条,查询速度从 30 秒降到了 0.3 秒。
快了 100 倍。
数据分区优化 📂
归档解决了存储问题,但 MySQL 表还有 700 万条记录。我学会了分区表——按日期分区,查询时只扫描相关分区。
-- 创建分区表
CREATE TABLE click_logs_partitioned (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
short_code VARCHAR(20) NOT NULL,
ip VARCHAR(45),
country VARCHAR(5),
city VARCHAR(100),
device VARCHAR(50),
browser VARCHAR(50),
os VARCHAR(50),
referer VARCHAR(500),
utm_source VARCHAR(100),
created_at DATETIME NOT NULL,
INDEX idx_short_code_created (short_code, created_at)
)
PARTITION BY RANGE (YEAR(created_at) * 100 + MONTH(created_at)) (
PARTITION p202401 VALUES LESS THAN (202402),
PARTITION p202402 VALUES LESS THAN (202403),
PARTITION p202403 VALUES LESS THAN (202404),
-- ... 每个月自动添加新分区
PARTITION p_future VALUES LESS THAN MAXVALUE
);
-- 查询优化效果
-- ❌ 未分区: 扫描全表 700 万行
-- EXPLAIN SELECT * FROM click_logs WHERE created_at >= '2024-03-01';
-- rows: 7,000,000
-- ✅ 分区后: 只扫描 3 月分区 100 万行
-- EXPLAIN SELECT * FROM click_logs_partitioned WHERE created_at >= '2024-03-01';
-- rows: 1,000,000 (只扫描 p202403 分区)分区表的好处:
- 查询快: 只扫描相关分区
- 删除快: 删整个分区比逐行删除快 100 倍
- 维护方便: 可以单独优化某个分区
自动分区管理
我写了个脚本,每月自动创建新分区:
class PartitionManager:
"""MySQL 分区管理器"""
def __init__(self, mysql_db):
self.mysql = mysql_db
def add_next_month_partition(self):
"""为下个月创建分区"""
next_month = datetime.now().replace(day=1) + timedelta(days=32)
next_month = next_month.replace(day=1)
partition_name = f"p{next_month.strftime('%Y%m')}"
partition_value = next_month.year * 100 + next_month.month
# 检查分区是否已存在
existing = self.mysql.query_one("""
SELECT COUNT(*) as cnt
FROM information_schema.partitions
WHERE table_name = 'click_logs_partitioned'
AND partition_name = ?
""", partition_name)
if existing['cnt'] > 0:
print(f"✅ 分区 {partition_name} 已存在")
return
# 添加新分区
sql = f"""
ALTER TABLE click_logs_partitioned
ADD PARTITION (
PARTITION {partition_name}
VALUES LESS THAN ({partition_value + 1})
)
"""
self.mysql.execute(sql)
print(f"✅ 创建分区 {partition_name}")
def drop_old_partition(self, keep_months=3):
"""删除旧分区(释放空间)"""
cutoff_date = datetime.now() - timedelta(days=keep_months * 30)
cutoff_value = cutoff_date.year * 100 + cutoff_date.month
partitions = self.mysql.query_all("""
SELECT partition_name, partition_description
FROM information_schema.partitions
WHERE table_name = 'click_logs_partitioned'
AND partition_name != 'p_future'
ORDER BY partition_ordinal_position
""")
for p in partitions:
partition_value = int(p['partition_description'])
if partition_value < cutoff_value:
self.mysql.execute(f"""
ALTER TABLE click_logs_partitioned
DROP PARTITION {p['partition_name']}
""")
print(f"🗑️ 删除分区 {p['partition_name']}")数据压缩的艺术 🗜️
冷数据搬到 S3 后,我做了个实验——看看不同压缩方式的效果。
import pandas as pd
import gzip
import lzma
import zlib
class CompressionBenchmark:
"""压缩算法对比"""
def __init__(self, sample_data):
self.original = sample_data.encode('utf-8')
self.original_size = len(self.original) / 1024 # KB
def test_compression(self):
methods = [
('Gzip', gzip.compress),
('LZMA', lzma.compress),
('Zlib', zlib.compress),
]
results = []
for name, compress_func in methods:
compressed = compress_func(self.original)
ratio = len(compressed) / len(self.original)
results.append({
'method': name,
'original_size_kb': f"{self.original_size:.1f}",
'compressed_size_kb': f"{len(compressed) / 1024:.1f}",
'compression_ratio': f"{ratio:.2%}",
'space_saved': f"{(1-ratio):.1%}"
})
return pd.DataFrame(results)
# 测试结果
# 假设样本是 1MB 的 JSON 日志数据
benchmark = CompressionBenchmark(sample_data)
results = benchmark.test_compression()
print(results)测试结果让我惊喜:
method original_size_kb compressed_size_kb compression_ratio space_saved
0 Gzip 1024.0 102.4 10.00% 90.0%
1 LZMA 1024.0 68.3 6.67% 93.3%
2 Zlib 1024.0 128.0 12.50% 87.5%LZMA 压缩率最高,能节省 93% 的空间。但压缩速度慢,适合归档场景。Gzip 是折中方案,压缩率和速度都不错。
实际压缩效果
我统计了生产环境的压缩效果:
原始数据: 42,356,789 条 × 500 字节 = 20.2 GB
压缩后:
- Gzip: 2.0 GB (压缩率 10:1)
- LZMA: 1.4 GB (压缩率 14:1)
S3 存储成本:
- 标准: ¥0.18/GB/月
- 归档: ¥0.025/GB/月
月成本对比:
- MySQL 存储: ¥18/月 × 20GB = ¥360/月
- S3 归档(LZMA): ¥0.025 × 1.4GB = ¥0.035/月
节省了 99%! 💰成本分析 💰
我把数据分层前后的成本做了个详细对比:
【方案 A: 全放 MySQL】
存储: 50GB × ¥1.5/GB/月 = ¥75/月
问题:
- 查询越来越慢
- 备份时间长
- 磁盘快满了
- 扩容成本高
月成本: ¥75 💸
【方案 B: 冷热分离】
热数据(MySQL): 7 天 × 500MB/天 = 3.5GB × ¥1.5 = ¥5.25/月
温数据(MySQL): 23 天 × 500MB/天 = 11.5GB × ¥1.5 = ¥17.25/月
冷数据(S3 归档): 335 天 × 500MB/天 × 10%压缩 = 16.75GB × ¥0.025 = ¥0.42/月
月成本: ¥22.92 💰
【节省】
¥75 - ¥22.92 = ¥52.08/月
一年节省: ¥625/年
而且查询速度快了 100 倍! ⚡但成本不是唯一的考虑因素。数据分层还带来了:
- 查询速度: 热数据查询从 30 秒降到 0.3 秒
- 系统稳定性: MySQL 不再因为磁盘满而宕机
- 扩展性: S3 理论上可以存无限数据
- 合规性: 冷数据归档可以永久保存,满足审计要求
从冷数据恢复 🔄
有一天,客户问我:“能查下去年 12 月的完整点击记录吗?”
我淡定地打开 S3 浏览器,找到对应的归档文件,下载、解压、导入临时表。
class DataRestorer:
"""从 S3 恢复冷数据"""
def __init__(self, s3_client, mysql_db):
self.s3 = s3_client
self.mysql = mysql_db
def restore_date_range(self, start_date, end_date):
"""恢复指定日期范围的数据到临时表"""
current = start_date
# 创建临时表
self.mysql.execute("""
CREATE TEMPORARY TABLE click_logs_restored
LIKE click_logs
""")
total_restored = 0
while current <= end_date:
year_month = current.strftime("%Y/%m")
date_str = current.strftime("%Y%m%d")
# 列出 S3 上的归档文件
files = self.s3.list_objects(
bucket='analytics-archive',
prefix=f'click_logs/{year_month}/{date_str}_'
)
for file in files:
# 下载并解压
compressed = self.s3.get_object('analytics-archive', file['key'])
json_data = gzip.decompress(compressed['Body'].read())
rows = json.loads(json_data)
# 插入临时表
for row in rows:
self.mysql.execute("""
INSERT INTO click_logs_restored VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", row['id'], row['short_code'], row['ip'],
row['country'], row['city'], row['device'],
row['browser'], row['os'], row['referer'],
row['utm_source'], row['created_at'])
total_restored += len(rows)
print(f"📥 恢复 {file['key']}: {len(rows)} 条记录")
current += timedelta(days=1)
print(f"✅ 恢复完成! 共 {total_restored:,} 条记录")
# 返回临时表名供查询
return "click_logs_restored"
# 使用示例
restorer = DataRestorer(s3_client, mysql_db)
temp_table = restorer.restore_date_range(
start_date=datetime(2023, 12, 1),
end_date=datetime(2023, 12, 31)
)
# 查询恢复的数据
result = mysql_db.query(f"""
SELECT
DATE(created_at) as date,
COUNT(*) as clicks
FROM {temp_table}
GROUP BY date
ORDER BY date
""")30 秒后,我给客户发了完整的 Excel 表格。客户很惊讶:“这么快!”
我笑了笑,心里想:数据分层不只是省钱,更是让系统可持续。
新的商机 💡
数据问题搞定了,但我发现一个有趣的现象:
有些用户愿意付费来获取更详细的统计数据。比如:
- 某电商客户想知道:“哪个国家的点击量最高?”
- 某自媒体博主问:“我的粉丝在哪些城市最活跃?”
- 某广告代理商需要:“每小时点击趋势图”
这些查询都需要访问冷数据,但用户愿意为此付费。
我看着账本上的数字,脑子里冒出一个想法:是不是可以推出付费高级分析功能?
👉 下一章:自定义短链接 —— 用户可以自定义短链接后缀,但这个功能带来了新的技术挑战…