大数据挑战

日志表爆炸的那天 💥

早上九点,我打开监控面板,看到红色的警告灯在疯狂闪烁。

click_logs 表突破 5000 万条记录了。磁盘占用飙到 40GB。我试着查询上周的数据统计,进度条慢悠悠地爬了 30 秒才返回结果。

我盯着屏幕,意识到一个残酷的事实:按照这个增长速度,三个月后数据库就要爆炸了

当前数据量:
- 每天点击: 100 万次
- 每条日志: 500 字节
- 每天新增: 500MB
- 一年累积: 180GB

存储增长曲线:
Month 1:  15GB
Month 3:  45GB  ← 查询开始变慢
Month 6:  90GB  ← 数据库扛不住了
Month 12: 180GB ← 💥

我翻着 AWS 账单,看着 RDS 的存储成本一路飙升。我知道,必须做数据分层了。

冷热数据分离 🌡️

那天晚上我画了一张图,把数据按照访问频率分成三类:

数据温度分布:

🔥 热数据(最近 7 天)
   占比: 2%    访问频率: 80%
   查询特点: 实时统计、活跃短链接监控
   存储: Redis + MySQL
   
   这些数据是用户最常看的——今天点击了多少、最近一周的趋势。
   必须毫秒级响应,放在 Redis 缓存里。

🌡️ 温数据(7 天 ~ 30 天)
   占比: 8%   访问频率: 15%
   查询特点: 周报、月报查看
   存储: MySQL
   
   用户偶尔要看上个月的统计数据,但不会频繁访问。
   放在 MySQL 里,用索引优化查询。

❄️ 冷数据(30 天以上)
   占比: 90%  访问频率: 5%
   查询特点: 历史数据分析、合规审计
   存储: 对象存储(S3)
   
   这些数据几乎没人看,但不能删——可能要做年度总结,
   或者有人问"去年 12 月的点击量是多少"。
   压缩后扔到 S3,成本只有 MySQL 的 1/10。

核心理念: 80% 的查询只看 2% 的数据。把冷数据搬走,热数据跑得飞快。

归档方案实现

我写了个定时任务,每天凌晨把冷数据搬到 S3:

from datetime import datetime, timedelta
import gzip
import json

class DataArchiver:
    """数据归档器——把冷数据搬到 S3"""
    
    def __init__(self, mysql_db, s3_client):
        self.mysql = mysql_db
        self.s3 = s3_client
    
    def archive_old_logs(self, days_to_keep=7):
        """归档超过 N 天的日志"""
        cutoff_date = datetime.now() - timedelta(days=days_to_keep)
        
        # 步骤 1: 找出需要归档的数据
        print(f"🔍 查找 {cutoff_date} 之前的数据...")
        count = self.mysql.query_one("""
            SELECT COUNT(*) as cnt
            FROM click_logs
            WHERE created_at < ?
        """, cutoff_date)['cnt']
        
        if count == 0:
            print("✅ 没有需要归档的数据")
            return
        
        print(f"📦 找到 {count:,} 条记录需要归档")
        
        # 步骤 2: 分批导出(避免内存爆炸)
        batch_size = 50000
        offset = 0
        total_archived = 0
        
        while True:
            rows = self.mysql.query_all("""
                SELECT * FROM click_logs
                WHERE created_at < ?
                ORDER BY created_at
                LIMIT ? OFFSET ?
            """, cutoff_date, batch_size, offset)
            
            if not rows:
                break
            
            # 步骤 3: 压缩并上传到 S3
            self._upload_batch_to_s3(rows, cutoff_date)
            total_archived += len(rows)
            offset += batch_size
            
            print(f"📤 已归档 {total_archived:,}/{count:,} 条记录")
        
        # 步骤 4: 删除已归档的数据
        print(f"🗑️ 删除 MySQL 中的旧数据...")
        deleted = self.mysql.execute("""
            DELETE FROM click_logs
            WHERE created_at < ?
        """, cutoff_date)
        
        print(f"✅ 归档完成! 删除了 {deleted:,} 条记录,释放了 {deleted * 500 / 1024 / 1024:.1f} MB 空间")
    
    def _upload_batch_to_s3(self, rows, cutoff_date):
        """压缩并上传一批数据到 S3"""
        # 转成 JSON
        json_data = json.dumps([dict(row) for row in rows])
        
        # Gzip 压缩(压缩率约 10:1)
        compressed = gzip.compress(json_data.encode('utf-8'))
        
        # 生成 S3 路径(按年月组织)
        year_month = cutoff_date.strftime("%Y/%m")
        filename = f"click_logs/{year_month}/{cutoff_date.strftime('%Y%m%d')}_{datetime.now().timestamp()}.json.gz"
        
        # 上传
        self.s3.put_object(
            bucket='analytics-archive',
            key=filename,
            body=compressed
        )


# 定时任务配置
# 每天凌晨 2 点执行
# cron: 0 2 * * *

第一次运行时,我看着日志输出,心跳加速:

🔍 查找 2024-01-15 之前的数据...
📦 找到 42,356,789 条记录需要归档
📤 已归档 50,000/42,356,789 条记录
📤 已归档 100,000/42,356,789 条记录
...
✅ 归档完成! 删除了 42,356,789 条记录,释放了 20.2 GB 空间

MySQL 表从 5000 万条记录降到了 700 万条,查询速度从 30 秒降到了 0.3 秒。

快了 100 倍

数据分区优化 📂

归档解决了存储问题,但 MySQL 表还有 700 万条记录。我学会了分区表——按日期分区,查询时只扫描相关分区。

-- 创建分区表
CREATE TABLE click_logs_partitioned (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    short_code VARCHAR(20) NOT NULL,
    ip VARCHAR(45),
    country VARCHAR(5),
    city VARCHAR(100),
    device VARCHAR(50),
    browser VARCHAR(50),
    os VARCHAR(50),
    referer VARCHAR(500),
    utm_source VARCHAR(100),
    created_at DATETIME NOT NULL,
    INDEX idx_short_code_created (short_code, created_at)
)
PARTITION BY RANGE (YEAR(created_at) * 100 + MONTH(created_at)) (
    PARTITION p202401 VALUES LESS THAN (202402),
    PARTITION p202402 VALUES LESS THAN (202403),
    PARTITION p202403 VALUES LESS THAN (202404),
    -- ... 每个月自动添加新分区
    PARTITION p_future VALUES LESS THAN MAXVALUE
);

-- 查询优化效果
-- ❌ 未分区: 扫描全表 700 万行
--    EXPLAIN SELECT * FROM click_logs WHERE created_at >= '2024-03-01';
--    rows: 7,000,000
    
-- ✅ 分区后: 只扫描 3 月分区 100 万行
--    EXPLAIN SELECT * FROM click_logs_partitioned WHERE created_at >= '2024-03-01';
--    rows: 1,000,000  (只扫描 p202403 分区)

分区表的好处:

  1. 查询快: 只扫描相关分区
  2. 删除快: 删整个分区比逐行删除快 100 倍
  3. 维护方便: 可以单独优化某个分区

自动分区管理

我写了个脚本,每月自动创建新分区:

class PartitionManager:
    """MySQL 分区管理器"""
    
    def __init__(self, mysql_db):
        self.mysql = mysql_db
    
    def add_next_month_partition(self):
        """为下个月创建分区"""
        next_month = datetime.now().replace(day=1) + timedelta(days=32)
        next_month = next_month.replace(day=1)
        
        partition_name = f"p{next_month.strftime('%Y%m')}"
        partition_value = next_month.year * 100 + next_month.month
        
        # 检查分区是否已存在
        existing = self.mysql.query_one("""
            SELECT COUNT(*) as cnt
            FROM information_schema.partitions
            WHERE table_name = 'click_logs_partitioned'
              AND partition_name = ?
        """, partition_name)
        
        if existing['cnt'] > 0:
            print(f"✅ 分区 {partition_name} 已存在")
            return
        
        # 添加新分区
        sql = f"""
            ALTER TABLE click_logs_partitioned
            ADD PARTITION (
                PARTITION {partition_name}
                VALUES LESS THAN ({partition_value + 1})
            )
        """
        self.mysql.execute(sql)
        print(f"✅ 创建分区 {partition_name}")
    
    def drop_old_partition(self, keep_months=3):
        """删除旧分区(释放空间)"""
        cutoff_date = datetime.now() - timedelta(days=keep_months * 30)
        cutoff_value = cutoff_date.year * 100 + cutoff_date.month
        
        partitions = self.mysql.query_all("""
            SELECT partition_name, partition_description
            FROM information_schema.partitions
            WHERE table_name = 'click_logs_partitioned'
              AND partition_name != 'p_future'
            ORDER BY partition_ordinal_position
        """)
        
        for p in partitions:
            partition_value = int(p['partition_description'])
            if partition_value < cutoff_value:
                self.mysql.execute(f"""
                    ALTER TABLE click_logs_partitioned
                    DROP PARTITION {p['partition_name']}
                """)
                print(f"🗑️ 删除分区 {p['partition_name']}")

数据压缩的艺术 🗜️

冷数据搬到 S3 后,我做了个实验——看看不同压缩方式的效果。

import pandas as pd
import gzip
import lzma
import zlib

class CompressionBenchmark:
    """压缩算法对比"""
    
    def __init__(self, sample_data):
        self.original = sample_data.encode('utf-8')
        self.original_size = len(self.original) / 1024  # KB
    
    def test_compression(self):
        methods = [
            ('Gzip', gzip.compress),
            ('LZMA', lzma.compress),
            ('Zlib', zlib.compress),
        ]
        
        results = []
        for name, compress_func in methods:
            compressed = compress_func(self.original)
            ratio = len(compressed) / len(self.original)
            results.append({
                'method': name,
                'original_size_kb': f"{self.original_size:.1f}",
                'compressed_size_kb': f"{len(compressed) / 1024:.1f}",
                'compression_ratio': f"{ratio:.2%}",
                'space_saved': f"{(1-ratio):.1%}"
            })
        
        return pd.DataFrame(results)


# 测试结果
# 假设样本是 1MB 的 JSON 日志数据
benchmark = CompressionBenchmark(sample_data)
results = benchmark.test_compression()

print(results)

测试结果让我惊喜:

  method  original_size_kb  compressed_size_kb  compression_ratio  space_saved
0  Gzip             1024.0               102.4             10.00%        90.0%
1   LZMA             1024.0                68.3              6.67%        93.3%
2  Zlib             1024.0               128.0             12.50%        87.5%

LZMA 压缩率最高,能节省 93% 的空间。但压缩速度慢,适合归档场景。Gzip 是折中方案,压缩率和速度都不错。

实际压缩效果

我统计了生产环境的压缩效果:

原始数据: 42,356,789 条 × 500 字节 = 20.2 GB

压缩后:
- Gzip: 2.0 GB  (压缩率 10:1)
- LZMA: 1.4 GB  (压缩率 14:1)

S3 存储成本:
- 标准: ¥0.18/GB/月
- 归档: ¥0.025/GB/月

月成本对比:
- MySQL 存储: ¥18/月 × 20GB = ¥360/月
- S3 归档(LZMA): ¥0.025 × 1.4GB = ¥0.035/月

节省了 99%! 💰

成本分析 💰

我把数据分层前后的成本做了个详细对比:

【方案 A: 全放 MySQL】
存储: 50GB × ¥1.5/GB/月 = ¥75/月
问题:
- 查询越来越慢
- 备份时间长
- 磁盘快满了
- 扩容成本高

月成本: ¥75 💸


【方案 B: 冷热分离】
热数据(MySQL): 7 天 × 500MB/天 = 3.5GB × ¥1.5 = ¥5.25/月
温数据(MySQL): 23 天 × 500MB/天 = 11.5GB × ¥1.5 = ¥17.25/月
冷数据(S3 归档): 335 天 × 500MB/天 × 10%压缩 = 16.75GB × ¥0.025 = ¥0.42/月

月成本: ¥22.92 💰


【节省】
¥75 - ¥22.92 = ¥52.08/月
一年节省: ¥625/年

而且查询速度快了 100 倍! ⚡

但成本不是唯一的考虑因素。数据分层还带来了:

  1. 查询速度: 热数据查询从 30 秒降到 0.3 秒
  2. 系统稳定性: MySQL 不再因为磁盘满而宕机
  3. 扩展性: S3 理论上可以存无限数据
  4. 合规性: 冷数据归档可以永久保存,满足审计要求

从冷数据恢复 🔄

有一天,客户问我:“能查下去年 12 月的完整点击记录吗?”

我淡定地打开 S3 浏览器,找到对应的归档文件,下载、解压、导入临时表。

class DataRestorer:
    """从 S3 恢复冷数据"""
    
    def __init__(self, s3_client, mysql_db):
        self.s3 = s3_client
        self.mysql = mysql_db
    
    def restore_date_range(self, start_date, end_date):
        """恢复指定日期范围的数据到临时表"""
        current = start_date
        
        # 创建临时表
        self.mysql.execute("""
            CREATE TEMPORARY TABLE click_logs_restored
            LIKE click_logs
        """)
        
        total_restored = 0
        
        while current <= end_date:
            year_month = current.strftime("%Y/%m")
            date_str = current.strftime("%Y%m%d")
            
            # 列出 S3 上的归档文件
            files = self.s3.list_objects(
                bucket='analytics-archive',
                prefix=f'click_logs/{year_month}/{date_str}_'
            )
            
            for file in files:
                # 下载并解压
                compressed = self.s3.get_object('analytics-archive', file['key'])
                json_data = gzip.decompress(compressed['Body'].read())
                rows = json.loads(json_data)
                
                # 插入临时表
                for row in rows:
                    self.mysql.execute("""
                        INSERT INTO click_logs_restored VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                    """, row['id'], row['short_code'], row['ip'], 
                        row['country'], row['city'], row['device'],
                        row['browser'], row['os'], row['referer'],
                        row['utm_source'], row['created_at'])
                
                total_restored += len(rows)
                print(f"📥 恢复 {file['key']}: {len(rows)} 条记录")
            
            current += timedelta(days=1)
        
        print(f"✅ 恢复完成! 共 {total_restored:,} 条记录")
        
        # 返回临时表名供查询
        return "click_logs_restored"


# 使用示例
restorer = DataRestorer(s3_client, mysql_db)
temp_table = restorer.restore_date_range(
    start_date=datetime(2023, 12, 1),
    end_date=datetime(2023, 12, 31)
)

# 查询恢复的数据
result = mysql_db.query(f"""
    SELECT 
        DATE(created_at) as date,
        COUNT(*) as clicks
    FROM {temp_table}
    GROUP BY date
    ORDER BY date
""")

30 秒后,我给客户发了完整的 Excel 表格。客户很惊讶:“这么快!”

我笑了笑,心里想:数据分层不只是省钱,更是让系统可持续

新的商机 💡

数据问题搞定了,但我发现一个有趣的现象:

有些用户愿意付费来获取更详细的统计数据。比如:

  • 某电商客户想知道:“哪个国家的点击量最高?”
  • 某自媒体博主问:“我的粉丝在哪些城市最活跃?”
  • 某广告代理商需要:“每小时点击趋势图”

这些查询都需要访问冷数据,但用户愿意为此付费。

我看着账本上的数字,脑子里冒出一个想法:是不是可以推出付费高级分析功能?


👉 下一章:自定义短链接 —— 用户可以自定义短链接后缀,但这个功能带来了新的技术挑战…