导航菜单

冷热分离

账单上的一记闷棍

“光影”上线第 4 个月的一天早上,我收到一封阿里云的月度账单邮件。随手点开一看——存储费 380 元

380 块?我们才 8000 个用户、5 万张图片,怎么就 380 了?

我打开 OSS 控制台一看,存储量已经飙到 2.1 TB。原因很简单:用户每上传一张 5MB 的原图,我们会生成 5 张缩略图(大/中/小/WebP/AVIF),加上原图本身,一张图片就占了将近 8MB 的存储空间。

# 存储用量分析
def analyze_storage():
    """分析"光影"的存储构成"""
    return {
        'total_images': 52000,
        'avg_original_size_mb': 4.8,       # 原图平均 4.8MB
        'thumbnails_per_image': 5,         # 每张原图生成 5 个缩略图
        'avg_thumbnail_size_kb': 45,       # 缩略图平均 45KB

        'original_storage_tb': 52000 * 4.8 / 1024,     # ≈ 243.75 GB
        'thumbnail_storage_gb': 52000 * 5 * 45 / 1024,  # ≈ 11.43 GB
        'total_storage_tb': (243.75 + 11.43) / 1024,   # ≈ 0.25 TB(远小于 2.1TB)

        # 实际存储远大于理论值的原因:
        'reasons': [
            '历史版本的缩略图没有清理(多次调整尺寸规格)',
            '上传失败产生的临时文件堆积',
            'EXIF 提取后的中间文件未删除',
            '用户删除图片后,存储层的文件未同步清理',
        ],
    }

但更让我不安的不是这些——而是这些存储的利用率

帕累托法则现身

我写了一个脚本,分析过去 30 天所有图片的访问日志:

import pandas as pd
from collections import Counter
from datetime import datetime, timedelta

class ImageAccessAnalyzer:
    """图片访问频率分析器"""

    def __init__(self, db_connection):
        self.db = db_connection

    def analyze_access_distribution(self, days=30):
        """分析图片访问的分布情况"""
        cutoff = datetime.now() - timedelta(days=days)

        # 从访问日志中统计每张图片的访问次数
        query = """
            SELECT
                image_id,
                COUNT(*) as access_count,
                SUM(response_bytes) as total_bytes
            FROM access_logs
            WHERE timestamp >= %s
            GROUP BY image_id
            ORDER BY access_count DESC
        """
        rows = self.db.execute(query, (cutoff,))

        df = pd.DataFrame(rows, columns=['image_id', 'access_count', 'total_bytes'])
        total_images = len(df)
        total_access = df['access_count'].sum()

        # 按访问量降序排列,计算累积百分比
        df['access_pct'] = df['access_count'] / total_access * 100
        df['cumulative_pct'] = df['access_pct'].cumsum()
        df['image_pct'] = range(1, total_images + 1)
        df['image_pct'] = df['image_pct'] / total_images * 100

        return df

    def pareto_analysis(self, df):
        """帕累托分析:找到 20% 的图片贡献了多少流量"""
        total_images = len(df)
        top_20_pct_count = int(total_images * 0.2)

        top_20 = df.head(top_20_pct_count)
        top_20_traffic = top_20['access_count'].sum()
        total_traffic = df['access_count'].sum()

        return {
            'total_images': total_images,
            'top_20_pct_images': top_20_pct_count,
            'top_20_traffic_share': round(top_20_traffic / total_traffic * 100, 1),
            'bottom_80_pct_images': total_images - top_20_pct_count,
            'bottom_80_traffic_share': round(
                (total_traffic - top_20_traffic) / total_traffic * 100, 1
            ),
        }

# 运行分析
analyzer = ImageAccessAnalyzer(db)
df = analyzer.analyze_access_distribution(days=30)
result = analyzer.pareto_analysis(df)

# 结果让我大吃一惊:
# {
#     'total_images': 52000,
#     'top_20_pct_images': 10400,         # 20% 的图片 = 10400 张
#     'top_20_traffic_share': 84.3,       # 贡献了 84.3% 的流量!
#     'bottom_80_pct_images': 41600,      # 80% 的图片 = 41600 张
#     'bottom_80_traffic_share': 15.7,    # 只贡献了 15.7% 的流量
# }

20% 的图片贡献了 84.3% 的流量。

更极端的是,前 1% 的图片(520 张)贡献了 42% 的流量——这些都是热门摄影师的作品、首页推荐图和社区活动中的照片。

而底部的 60% 图片(31,200 张),在 30 天内一次都没被访问过。它们就静静地躺在 OSS 上,每个月产生标准存储的费用。

# 更细粒度的分层分析
def tier_analysis(df):
    """按访问频率分层"""
    total = len(df)
    total_access = df['access_count'].sum()

    tiers = {
        '🔥 火爆(日均 > 100 次)': df[df['access_count'] > 3000],
        '🌟 热门(日均 10~100 次)': df[
            (df['access_count'] >= 300) & (df['access_count'] <= 3000)
        ],
        '😐 普通(日均 1~10 次)': df[
            (df['access_count'] >= 30) & (df['access_count'] < 300)
        ],
        '🧊 冷门(30 天内有访问)': df[
            (df['access_count'] > 0) & (df['access_count'] < 30)
        ],
        '💀 僵尸(30 天零访问)': df[df['access_count'] == 0],
    }

    report = {}
    for name, tier_df in tiers.items():
        count = len(tier_df)
        access = tier_df['access_count'].sum()
        report[name] = {
            'images': f'{count} ({count / total * 100:.1f}%)',
            'traffic_share': f'{access / total_access * 100:.1f}%',
        }

    return report

# 分析结果:
# {
#     '🔥 火爆(日均 > 100 次)':   {'images': '520  (1.0%)',  'traffic_share': '42.1%'},
#     '🌟 热门(日均 10~100 次)':  {'images': '2480 (4.8%)',  'traffic_share': '32.5%'},
#     '😐 普通(日均 1~10 次)':    {'images': '12320 (23.7%)', 'traffic_share': '20.2%'},
#     '🧊 冷门(30 天内有访问)':   {'images': '14480 (27.8%)', 'traffic_share': '5.2%'},
#     '💀 僵尸(30 天零访问)':     {'images': '22200 (42.7%)', 'traffic_share': '0.0%'},
# }

42.7% 的图片是僵尸图片,30 天内无人问津,却在用最贵的标准存储。

这就是冷热分离的价值——把钱花在刀刃上。

设计分层策略

OSS 提供了三种存储类型,价格差距巨大:

存储类型      价格(元/GB/月)    适用场景              读取延迟
───────────────────────────────────────────────────────────────
标准存储      0.12               频繁访问(热数据)     毫秒级
低频存储      0.08               偶尔访问(温数据)     毫秒级
归档存储      0.03               极少访问(冷数据)     1~5 分钟(需解冻)
冷归档存储    0.01               合规归档               1~12 小时(需解冻)

我的分层策略:

from dataclasses import dataclass
from enum import Enum
from datetime import datetime, timedelta

class StorageTier(Enum):
    STANDARD = 'standard'     # 标准存储 - 热数据
    INFREQUENT = 'infrequent' # 低频存储 - 温数据
    ARCHIVE = 'archive'       # 归档存储 - 冷数据

@dataclass
class TierRule:
    """分层规则"""
    tier: StorageTier
    condition: str
    min_days_since_upload: int
    max_daily_access: float       # 日均访问次数
    description: str

# 基于访问数据的分层规则
TIER_RULES = [
    TierRule(
        tier=StorageTier.STANDARD,
        condition='最近 30 天内有访问,且日均访问 > 1 次',
        min_days_since_upload=0,
        max_daily_access=float('inf'),
        description='热数据:用户正在浏览、分享的图片',
    ),
    TierRule(
        tier=StorageTier.INFREQUENT,
        condition='最近 30 天内有访问,但日均访问 < 1 次',
        min_days_since_upload=30,
        max_daily_access=1.0,
        description='温数据:偶尔有人翻到的老照片',
    ),
    TierRule(
        tier=StorageTier.ARCHIVE,
        condition='最近 90 天内零访问',
        min_days_since_upload=90,
        max_daily_access=0,
        description='冷数据:几乎没人看的归档图片',
    ),
]

自动分层引擎

接下来是核心——自动分析访问频率并迁移存储层级的定时任务。

import logging
from datetime import datetime, timedelta
from typing import List, Dict, Tuple
from dataclasses import dataclass

logger = logging.getLogger(__name__)

@dataclass
class MigrationTask:
    """迁移任务"""
    image_id: str
    object_key: str
    current_tier: str
    target_tier: str
    reason: str
    estimated_saving: float  # 预计月度节省(元)

class StorageTierManager:
    """存储分层管理器"""

    # 存储层级的优先级(数值越大越"冷")
    TIER_PRIORITY = {
        'standard': 0,
        'infrequent': 1,
        'archive': 2,
    }

    # 每层的价格(元/GB/月)
    TIER_PRICE = {
        'standard': 0.12,
        'infrequent': 0.08,
        'archive': 0.03,
    }

    def __init__(self, db, oss_client, config=None):
        self.db = db
        self.oss = oss_client
        self.config = config or {
            'hot_threshold': 30,       # 30 天内有访问 = 热
            'warm_max_access': 1.0,    # 日均 < 1 次 = 温
            'cold_threshold': 90,      # 90 天零访问 = 冷
            'batch_size': 500,         # 每批迁移数量
            'dry_run': False,          # 是否只分析不执行
        }

    def analyze_tiers(self) -> List[MigrationTask]:
        """分析所有图片的当前层级,生成迁移任务"""
        tasks = []

        # 获取所有图片的元数据
        query = """
            SELECT
                i.id as image_id,
                i.object_key,
                i.storage_tier as current_tier,
                i.file_size_bytes,
                COALESCE(stats.access_count_30d, 0) as access_count_30d,
                COALESCE(stats.access_count_7d, 0) as access_count_7d,
                COALESCE(stats.last_access, i.created_at) as last_access_time
            FROM images i
            LEFT JOIN (
                SELECT
                    image_id,
                    COUNT(CASE WHEN timestamp >= NOW() - INTERVAL '30 days' THEN 1 END) as access_count_30d,
                    COUNT(CASE WHEN timestamp >= NOW() - INTERVAL '7 days' THEN 1 END) as access_count_7d,
                    MAX(timestamp) as last_access
                FROM access_logs
                GROUP BY image_id
            ) stats ON i.id = stats.image_id
            WHERE i.deleted_at IS NULL
        """

        rows = self.db.execute(query)

        for row in rows:
            image_id, object_key, current_tier, file_size, \
                access_30d, access_7d, last_access = row

            # 计算日均访问次数
            daily_access = access_30d / 30.0 if access_30d > 0 else 0

            # 计算距离上次访问的天数
            days_since_access = (datetime.now() - last_access).days \
                if last_access else 999

            # 确定目标层级
            target_tier = self._determine_tier(daily_access, days_since_access)

            # 只有层级变化时才生成迁移任务
            if current_tier != target_tier:
                file_size_gb = file_size / 1024 / 1024 / 1024
                saving = file_size_gb * (
                    self.TIER_PRICE.get(current_tier, 0.12)
                    - self.TIER_PRICE.get(target_tier, 0.12)
                )

                tasks.append(MigrationTask(
                    image_id=image_id,
                    object_key=object_key,
                    current_tier=current_tier,
                    target_tier=target_tier,
                    reason=self._build_reason(
                        daily_access, days_since_access, target_tier
                    ),
                    estimated_saving=round(saving, 4),
                ))

        # 按预计节省排序(优先迁移节省最多的)
        tasks.sort(key=lambda t: t.estimated_saving, reverse=True)
        return tasks

    def _determine_tier(self, daily_access: float, days_since_access: int) -> str:
        """根据访问模式确定存储层级"""
        if days_since_access > self.config['cold_threshold']:
            return 'archive'        # 冷
        elif daily_access < self.config['warm_max_access']:
            return 'infrequent'     # 温
        else:
            return 'standard'       # 热

    def _build_reason(self, daily_access: float, days_since: int, tier: str) -> str:
        """构建迁移原因说明"""
        reasons = {
            'standard': f'日均访问 {daily_access:.1f} 次,升级到标准存储',
            'infrequent': f'日均访问 {daily_access:.1f} 次,降级到低频存储',
            'archive': f'已 {days_since} 天无访问,归档存储',
        }
        return reasons.get(tier, '')

    def execute_migrations(self, tasks: List[MigrationTask]) -> Dict:
        """执行存储迁移"""
        results = {
            'total': len(tasks),
            'success': 0,
            'failed': 0,
            'total_saving': 0.0,
            'errors': [],
        }

        batch_size = self.config['batch_size']
        for i in range(0, len(tasks), batch_size):
            batch = tasks[i:i + batch_size]

            for task in batch:
                try:
                    if not self.config['dry_run']:
                        # 调用 OSS API 修改存储类型
                        self.oss.copy_object(
                            bucket='guangying-images',
                            source_key=task.object_key,
                            dest_key=task.object_key,
                            storage_class=self._oss_storage_class(task.target_tier),
                        )

                        # 更新数据库记录
                        self.db.execute(
                            "UPDATE images SET storage_tier = %s WHERE id = %s",
                            (task.target_tier, task.image_id),
                        )

                    results['success'] += 1
                    results['total_saving'] += task.estimated_saving

                    logger.info(
                        f"迁移 {task.object_key}: "
                        f"{task.current_tier}{task.target_tier} "
                        f"({task.reason})"
                    )

                except Exception as e:
                    results['failed'] += 1
                    results['errors'].append({
                        'image_id': task.image_id,
                        'error': str(e),
                    })
                    logger.error(f"迁移失败 {task.object_key}: {e}")

        results['total_saving'] = round(results['total_saving'], 2)
        return results

    @staticmethod
    def _oss_storage_class(tier: str) -> str:
        """映射到 OSS 存储类型常量"""
        mapping = {
            'standard': 'Standard',
            'infrequent': 'IA',
            'archive': 'Archive',
        }
        return mapping.get(tier, 'Standard')


# 使用示例
if __name__ == '__main__':
    manager = StorageTierManager(db=db, oss_client=oss_client)

    # 第一步:分析(dry run)
    manager.config['dry_run'] = True
    tasks = manager.analyze_tiers()
    print(f"分析完成:{len(tasks)} 个图片需要迁移")

    # 打印前 10 个迁移任务
    for task in tasks[:10]:
        print(
            f"  {task.object_key}: "
            f"{task.current_tier}{task.target_tier} "
            f"(节省 {task.estimated_saving} 元/月)"
        )

    # 第二步:确认后执行
    total_saving = sum(t.estimated_saving for t in tasks)
    print(f"\n预计月度节省:{total_saving:.2f} 元")
    confirm = input("确认执行迁移?(y/N): ")

    if confirm.lower() == 'y':
        manager.config['dry_run'] = False
        result = manager.execute_migrations(tasks)
        print(f"迁移完成:成功 {result['success']},失败 {result['failed']}")
        print(f"月度节省:{result['total_saving']:.2f} 元")

缩略图的特殊处理

这里有个细节值得注意:缩略图不应该跟原图走同一套分层规则

# 缩略图的分层策略与原图不同
THUMBNAIL_STRATEGY = {
    'all_in_standard': True,       # 缩略图始终在标准存储
    'reason': (
        '缩略图体积小(平均 45KB),'
        '即使全部存标准存储也很便宜(11GB × 0.12 = 1.32 元/月)。'
        '但用户列表页每秒都在请求缩略图,'
        '如果降级到归档存储,解冻延迟 1~5 分钟,用户体验灾难。'
    ),
}

# 只对原图做冷热分离
ORIGINAL_STRATEGY = {
    'hot_threshold_days': 30,      # 30 天内有访问
    'warm_threshold_days': 90,     # 30~90 天偶有访问
    'cold_threshold_days': 90,     # 90 天以上零访问
    'size_breakdown': {
        'original_storage_tb': 0.24,   # 原图总量 243 GB
        'hot_pct': 0.20,               # 热数据占 20%
        'warm_pct': 0.30,              # 温数据占 30%
        'cold_pct': 0.50,              # 冷数据占 50%
    },
}

# 优化前后的成本对比
def compare_storage_cost():
    """对比冷热分离前后的存储费用"""
    original_storage_gb = 243  # 原图总量
    thumbnail_storage_gb = 11  # 缩略图总量

    # 优化前:全部标准存储
    before = (original_storage_gb + thumbnail_storage_gb) * 0.12
    # = 254 × 0.12 = 30.48 元/月

    # 优化后:原图分层 + 缩略图标准存储
    hot_gb = original_storage_gb * 0.20     # 48.6 GB
    warm_gb = original_storage_gb * 0.30    # 72.9 GB
    cold_gb = original_storage_gb * 0.50    # 121.5 GB

    after = (
        hot_gb * 0.12                       # 标准:5.83 元
        + warm_gb * 0.08                    # 低频:5.83 元
        + cold_gb * 0.03                    # 归档:3.65 元
        + thumbnail_storage_gb * 0.12       # 缩略图全标准:1.32 元
    )
    # = 16.63 元/月

    return {
        'before': round(before, 2),
        'after': round(after, 2),
        'saving': round(before - after, 2),
        'saving_pct': round((before - after) / before * 100, 1),
    }

# {
#     'before': 30.48,
#     'after': 16.63,
#     'saving': 13.85,
#     'saving_pct': 45.4%
# }

存储费直接砍掉 45%! 从 30.48 元降到 16.63 元。

归档图片的”复活”机制

冷数据归档后,如果用户突然想看一张老照片怎么办?

// 冷数据访问的"复活"机制
interface ArchiveRestoreRequest {
  imageId: string;
  objectKey: string;
  requesterId: string;
  priority: 'standard' | 'expedited';  // 标准解冻 / 加急解冻
}

class ArchiveRestoreService {
  /**
   * 处理归档图片的访问请求
   * - 如果图片在标准/低频存储:直接返回
   * - 如果图片在归档存储:触发解冻,返回占位图
   */
  async handleImageAccess(
    imageId: string,
    userId: string
  ): Promise<{
    url: string;
    status: 'ready' | 'restoring';
    estimatedWaitSeconds?: number;
  }> {
    const image = await this.db.query(
      'SELECT * FROM images WHERE id = ?',
      [imageId]
    );

    // 热数据 / 温数据:直接返回
    if (image.storage_tier !== 'archive') {
      return {
        url: this.cdn.getSignedUrl(image.object_key),
        status: 'ready',
      };
    }

    // 冷数据:触发解冻
    const isRestoring = await this.isBeingRestored(imageId);

    if (!isRestoring) {
      // 发起解冻请求
      await this.oss.restoreObject({
        bucket: 'guangying-images',
        key: image.object_key,
        restoreDays: 7,        // 解冻后保持 7 天可读
        tier: 'Expedited',     // 加急模式,1~5 分钟
      });

      // 记录解冻状态
      await this.db.execute(
        `INSERT INTO restore_requests (image_id, requester_id, status, created_at)
         VALUES (?, ?, 'restoring', NOW())`,
        [imageId, userId]
      );
    }

    // 返回占位图 + 预估等待时间
    return {
      url: '/images/archived-placeholder.svg',  // 占位图
      status: 'restoring',
      estimatedWaitSeconds: 180,                // 约 3 分钟
    };
  }

  /**
   * 定时检查解冻完成的图片
   * 解冻完成后:
   * 1. 通知请求者
   * 2. 将图片临时升级为标准存储(7 天)
   * 3. 7 天后如果无人访问,自动回归档
   */
  async checkRestoreStatus(): Promise<void> {
    const restoring = await this.db.query(
      `SELECT * FROM restore_requests WHERE status = 'restoring'`
    );

    for (const req of restoring) {
      const restored = await this.oss.isObjectRestored({
        bucket: 'guangying-images',
        key: req.object_key,
      });

      if (restored) {
        // 更新状态
        await this.db.execute(
          `UPDATE restore_requests SET status = 'completed' WHERE id = ?`,
          [req.id]
        );

        // 通知请求者
        await this.notification.send(req.requester_id, {
          type: 'image_restored',
          imageId: req.image_id,
          message: '您请求的图片已解冻完成,点击查看',
        });

        logger.info(`图片 ${req.object_key} 解冻完成,通知用户 ${req.requester_id}`);
      }
    }
  }
}

前端配合:解冻等待体验

// 前端:冷数据访问的渐进体验
function ArchivedImageLoader({ imageId }: { imageId: string }) {
  const [status, setStatus] = useState<'loading' | 'restoring' | 'ready'>('loading');
  const [imageUrl, setImageUrl] = useState('');
  const [countdown, setCountdown] = useState(180);

  useEffect(() => {
    // 请求图片
    api.getImage(imageId).then((res) => {
      if (res.status === 'ready') {
        setImageUrl(res.url);
        setStatus('ready');
      } else if (res.status === 'restoring') {
        setStatus('restoring');
        // 轮询检查是否解冻完成
        const poll = setInterval(async () => {
          const check = await api.getImage(imageId);
          if (check.status === 'ready') {
            setImageUrl(check.url);
            setStatus('ready');
            clearInterval(poll);
          }
          setCountdown((prev) => Math.max(0, prev - 5));
        }, 5000);  // 每 5 秒检查一次

        return () => clearInterval(poll);
      }
    });
  }, [imageId]);

  if (status === 'restoring') {
    return (
      <div className="archived-image-placeholder">
        <img src="/images/archived-placeholder.svg" alt="归档图片" />
        <p>这张老照片正在从归档中解冻...</p>
        <p className="text-sm text-gray-500">
          预计 {Math.ceil(countdown / 60)} 分钟后可查看
        </p>
        <div className="progress-bar">
          <div style={{ width: `${((180 - countdown) / 180) * 100}%` }} />
        </div>
      </div>
    );
  }

  return status === 'ready'
    ? <img src={imageUrl} alt="" loading="lazy" />
    : <div className="skeleton" />;
}

定时任务配置

冷热分析是一个定时任务,每天凌晨运行:

# Celery 定时任务配置
from celery import Celery
from celery.schedules import crontab

app = Celery('guangying')

app.conf.beat_schedule = {
    # 每天凌晨 3 点分析冷热数据
    'analyze-storage-tiers': {
        'task': 'tasks.analyze_storage_tiers',
        'schedule': crontab(hour=3, minute=0),
    },
    # 每天凌晨 4 点执行迁移
    'execute-tier-migrations': {
        'task': 'tasks.execute_tier_migrations',
        'schedule': crontab(hour=4, minute=0),
    },
    # 每 10 分钟检查解冻状态
    'check-restore-status': {
        'task': 'tasks.check_restore_status',
        'schedule': crontab(minute='*/10'),
    },
}

@app.task
def analyze_storage_tiers():
    """每天分析所有图片的访问频率,生成迁移计划"""
    manager = StorageTierManager(db=get_db(), oss_client=get_oss())
    tasks = manager.analyze_tiers()

    # 保存迁移计划到数据库(不立即执行)
    for task in tasks:
        db.execute(
            """INSERT INTO migration_plans
               (image_id, current_tier, target_tier, reason, saving, created_at)
               VALUES (%s, %s, %s, %s, %s, NOW())""",
            (task.image_id, task.current_tier, task.target_tier,
             task.reason, task.estimated_saving),
        )

    total_saving = sum(t.estimated_saving for t in tasks)
    logger.info(f"分析完成:{len(tasks)} 个迁移任务,预计节省 {total_saving:.2f} 元/月")


@app.task
def execute_tier_migrations():
    """每天执行已审批的迁移计划"""
    # 获取今天待执行的迁移任务
    tasks = db.query(
        "SELECT * FROM migration_plans WHERE created_at = CURDATE() AND status = 'approved'"
    )

    manager = StorageTierManager(db=get_db(), oss_client=get_oss())
    migration_tasks = [MigrationTask(**t) for t in tasks]
    result = manager.execute_migrations(migration_tasks)

    logger.info(
        f"迁移完成:成功 {result['success']},"
        f"失败 {result['failed']},"
        f"月度节省 {result['total_saving']:.2f} 元"
    )

本节小结

我学到了什么

  • 图片访问严格遵循帕累托分布:20% 的图片贡献 84% 的流量
  • 42.7% 的图片是僵尸数据,30 天内零访问,却在用最贵的标准存储
  • 冷热分离的核心是按访问频率自动分层,把冷数据迁移到低成本的存储
  • 缩略图不需要分层(体积小、访问频繁),只对原图做分层

⚠️ 踩过的坑

  • 一开始把缩略图也做了分层,结果列表页加载变慢(解冻延迟)
  • 迁移任务不能一次跑太多,否则触发 OSS API 频率限制
  • 归档图片的”复活”通知很重要,不然用户以为图片丢了

🎯 下一步:分层只是第一步,还需要一套完整的生命周期管理规则——什么时候降级、什么时候删除、临时文件怎么清理。

我的思考

思考 1

如果一张归档图片突然”爆火”(比如一个老摄影师的作品突然在社交媒体上被大量转发),如何自动识别并快速”升级”回热数据?

参考答案

需要建立一个访问突增检测 + 自动升级机制:

class HotspotDetector:
    """突发流量检测器"""

    def __init__(self):
        self.window_size = 300  # 5 分钟窗口
        self.hotspot_threshold = 10  # 5 分钟内 10 次访问即视为突增

    async def on_access(self, image_id: str):
        """每次访问时检查"""
        # 滑动窗口计数
        key = f"access_burst:{image_id}"
        count = await redis.incr(key)
        if count == 1:
            await redis.expire(key, self.window_size)

        if count >= self.hotspot_threshold:
            # 突发流量!立即升级
            image = await db.get_image(image_id)
            if image.storage_tier == 'archive':
                await self.urgent_upgrade(image)
            elif image.storage_tier == 'infrequent':
                await self.upgrade_to_standard(image)

    async def urgent_upgrade(self, image):
        """紧急升级:归档 → 标准"""
        # 1. 触发加急解冻
        await oss.restore_object(image.object_key, tier='Expedited')
        # 2. 解冻完成后立即转为标准存储
        # 3. 预热到 CDN
        # 4. 通知前端刷新

关键点:用 Redis 的滑动窗口做实时计数,超过阈值后立即触发升级。同时配合 CDN 预热,让后续用户直接从 CDN 获取。

思考 2

冷热分离在什么规模下才有价值?10 万张图片的个人博客需要做冷热分离吗?

参考答案

规模决定收益,需要算账

个人博客(10 万张图片):
- 原图总量:10 万 × 5MB = 500 GB
- 全部标准存储:500 × 0.12 = 60 元/月
- 冷热分离后:约 35 元/月
- 节省:25 元/月 = 300 元/年

分析:
- 节省 300 元/年,但开发和维护冷热分离系统需要至少 2~3 天
- 个人博客的图片访问模式简单,大多数图常年没人看
- 结论:不值得自建系统,直接用 OSS 自带的生命周期规则即可

何时值得自建:
- 图片量 > 100 万张
- 月存储费 > 500 元
- 有复杂的分层逻辑(不只是按时间,还要按访问频率)
- 需要精细控制迁移时机

简单方案(适合小规模):
直接用 OSS 生命周期规则,按时间自动降级:
30 天 → 低频,180 天 → 归档
无需写代码,在 OSS 控制台配置即可。

搜索