冷热分离
账单上的一记闷棍
“光影”上线第 4 个月的一天早上,我收到一封阿里云的月度账单邮件。随手点开一看——存储费 380 元。
380 块?我们才 8000 个用户、5 万张图片,怎么就 380 了?
我打开 OSS 控制台一看,存储量已经飙到 2.1 TB。原因很简单:用户每上传一张 5MB 的原图,我们会生成 5 张缩略图(大/中/小/WebP/AVIF),加上原图本身,一张图片就占了将近 8MB 的存储空间。
# 存储用量分析
def analyze_storage():
"""分析"光影"的存储构成"""
return {
'total_images': 52000,
'avg_original_size_mb': 4.8, # 原图平均 4.8MB
'thumbnails_per_image': 5, # 每张原图生成 5 个缩略图
'avg_thumbnail_size_kb': 45, # 缩略图平均 45KB
'original_storage_tb': 52000 * 4.8 / 1024, # ≈ 243.75 GB
'thumbnail_storage_gb': 52000 * 5 * 45 / 1024, # ≈ 11.43 GB
'total_storage_tb': (243.75 + 11.43) / 1024, # ≈ 0.25 TB(远小于 2.1TB)
# 实际存储远大于理论值的原因:
'reasons': [
'历史版本的缩略图没有清理(多次调整尺寸规格)',
'上传失败产生的临时文件堆积',
'EXIF 提取后的中间文件未删除',
'用户删除图片后,存储层的文件未同步清理',
],
}但更让我不安的不是这些——而是这些存储的利用率。
帕累托法则现身
我写了一个脚本,分析过去 30 天所有图片的访问日志:
import pandas as pd
from collections import Counter
from datetime import datetime, timedelta
class ImageAccessAnalyzer:
"""图片访问频率分析器"""
def __init__(self, db_connection):
self.db = db_connection
def analyze_access_distribution(self, days=30):
"""分析图片访问的分布情况"""
cutoff = datetime.now() - timedelta(days=days)
# 从访问日志中统计每张图片的访问次数
query = """
SELECT
image_id,
COUNT(*) as access_count,
SUM(response_bytes) as total_bytes
FROM access_logs
WHERE timestamp >= %s
GROUP BY image_id
ORDER BY access_count DESC
"""
rows = self.db.execute(query, (cutoff,))
df = pd.DataFrame(rows, columns=['image_id', 'access_count', 'total_bytes'])
total_images = len(df)
total_access = df['access_count'].sum()
# 按访问量降序排列,计算累积百分比
df['access_pct'] = df['access_count'] / total_access * 100
df['cumulative_pct'] = df['access_pct'].cumsum()
df['image_pct'] = range(1, total_images + 1)
df['image_pct'] = df['image_pct'] / total_images * 100
return df
def pareto_analysis(self, df):
"""帕累托分析:找到 20% 的图片贡献了多少流量"""
total_images = len(df)
top_20_pct_count = int(total_images * 0.2)
top_20 = df.head(top_20_pct_count)
top_20_traffic = top_20['access_count'].sum()
total_traffic = df['access_count'].sum()
return {
'total_images': total_images,
'top_20_pct_images': top_20_pct_count,
'top_20_traffic_share': round(top_20_traffic / total_traffic * 100, 1),
'bottom_80_pct_images': total_images - top_20_pct_count,
'bottom_80_traffic_share': round(
(total_traffic - top_20_traffic) / total_traffic * 100, 1
),
}
# 运行分析
analyzer = ImageAccessAnalyzer(db)
df = analyzer.analyze_access_distribution(days=30)
result = analyzer.pareto_analysis(df)
# 结果让我大吃一惊:
# {
# 'total_images': 52000,
# 'top_20_pct_images': 10400, # 20% 的图片 = 10400 张
# 'top_20_traffic_share': 84.3, # 贡献了 84.3% 的流量!
# 'bottom_80_pct_images': 41600, # 80% 的图片 = 41600 张
# 'bottom_80_traffic_share': 15.7, # 只贡献了 15.7% 的流量
# }20% 的图片贡献了 84.3% 的流量。
更极端的是,前 1% 的图片(520 张)贡献了 42% 的流量——这些都是热门摄影师的作品、首页推荐图和社区活动中的照片。
而底部的 60% 图片(31,200 张),在 30 天内一次都没被访问过。它们就静静地躺在 OSS 上,每个月产生标准存储的费用。
# 更细粒度的分层分析
def tier_analysis(df):
"""按访问频率分层"""
total = len(df)
total_access = df['access_count'].sum()
tiers = {
'🔥 火爆(日均 > 100 次)': df[df['access_count'] > 3000],
'🌟 热门(日均 10~100 次)': df[
(df['access_count'] >= 300) & (df['access_count'] <= 3000)
],
'😐 普通(日均 1~10 次)': df[
(df['access_count'] >= 30) & (df['access_count'] < 300)
],
'🧊 冷门(30 天内有访问)': df[
(df['access_count'] > 0) & (df['access_count'] < 30)
],
'💀 僵尸(30 天零访问)': df[df['access_count'] == 0],
}
report = {}
for name, tier_df in tiers.items():
count = len(tier_df)
access = tier_df['access_count'].sum()
report[name] = {
'images': f'{count} ({count / total * 100:.1f}%)',
'traffic_share': f'{access / total_access * 100:.1f}%',
}
return report
# 分析结果:
# {
# '🔥 火爆(日均 > 100 次)': {'images': '520 (1.0%)', 'traffic_share': '42.1%'},
# '🌟 热门(日均 10~100 次)': {'images': '2480 (4.8%)', 'traffic_share': '32.5%'},
# '😐 普通(日均 1~10 次)': {'images': '12320 (23.7%)', 'traffic_share': '20.2%'},
# '🧊 冷门(30 天内有访问)': {'images': '14480 (27.8%)', 'traffic_share': '5.2%'},
# '💀 僵尸(30 天零访问)': {'images': '22200 (42.7%)', 'traffic_share': '0.0%'},
# }42.7% 的图片是僵尸图片,30 天内无人问津,却在用最贵的标准存储。
这就是冷热分离的价值——把钱花在刀刃上。
设计分层策略
OSS 提供了三种存储类型,价格差距巨大:
存储类型 价格(元/GB/月) 适用场景 读取延迟
───────────────────────────────────────────────────────────────
标准存储 0.12 频繁访问(热数据) 毫秒级
低频存储 0.08 偶尔访问(温数据) 毫秒级
归档存储 0.03 极少访问(冷数据) 1~5 分钟(需解冻)
冷归档存储 0.01 合规归档 1~12 小时(需解冻)我的分层策略:
from dataclasses import dataclass
from enum import Enum
from datetime import datetime, timedelta
class StorageTier(Enum):
STANDARD = 'standard' # 标准存储 - 热数据
INFREQUENT = 'infrequent' # 低频存储 - 温数据
ARCHIVE = 'archive' # 归档存储 - 冷数据
@dataclass
class TierRule:
"""分层规则"""
tier: StorageTier
condition: str
min_days_since_upload: int
max_daily_access: float # 日均访问次数
description: str
# 基于访问数据的分层规则
TIER_RULES = [
TierRule(
tier=StorageTier.STANDARD,
condition='最近 30 天内有访问,且日均访问 > 1 次',
min_days_since_upload=0,
max_daily_access=float('inf'),
description='热数据:用户正在浏览、分享的图片',
),
TierRule(
tier=StorageTier.INFREQUENT,
condition='最近 30 天内有访问,但日均访问 < 1 次',
min_days_since_upload=30,
max_daily_access=1.0,
description='温数据:偶尔有人翻到的老照片',
),
TierRule(
tier=StorageTier.ARCHIVE,
condition='最近 90 天内零访问',
min_days_since_upload=90,
max_daily_access=0,
description='冷数据:几乎没人看的归档图片',
),
]自动分层引擎
接下来是核心——自动分析访问频率并迁移存储层级的定时任务。
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Tuple
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class MigrationTask:
"""迁移任务"""
image_id: str
object_key: str
current_tier: str
target_tier: str
reason: str
estimated_saving: float # 预计月度节省(元)
class StorageTierManager:
"""存储分层管理器"""
# 存储层级的优先级(数值越大越"冷")
TIER_PRIORITY = {
'standard': 0,
'infrequent': 1,
'archive': 2,
}
# 每层的价格(元/GB/月)
TIER_PRICE = {
'standard': 0.12,
'infrequent': 0.08,
'archive': 0.03,
}
def __init__(self, db, oss_client, config=None):
self.db = db
self.oss = oss_client
self.config = config or {
'hot_threshold': 30, # 30 天内有访问 = 热
'warm_max_access': 1.0, # 日均 < 1 次 = 温
'cold_threshold': 90, # 90 天零访问 = 冷
'batch_size': 500, # 每批迁移数量
'dry_run': False, # 是否只分析不执行
}
def analyze_tiers(self) -> List[MigrationTask]:
"""分析所有图片的当前层级,生成迁移任务"""
tasks = []
# 获取所有图片的元数据
query = """
SELECT
i.id as image_id,
i.object_key,
i.storage_tier as current_tier,
i.file_size_bytes,
COALESCE(stats.access_count_30d, 0) as access_count_30d,
COALESCE(stats.access_count_7d, 0) as access_count_7d,
COALESCE(stats.last_access, i.created_at) as last_access_time
FROM images i
LEFT JOIN (
SELECT
image_id,
COUNT(CASE WHEN timestamp >= NOW() - INTERVAL '30 days' THEN 1 END) as access_count_30d,
COUNT(CASE WHEN timestamp >= NOW() - INTERVAL '7 days' THEN 1 END) as access_count_7d,
MAX(timestamp) as last_access
FROM access_logs
GROUP BY image_id
) stats ON i.id = stats.image_id
WHERE i.deleted_at IS NULL
"""
rows = self.db.execute(query)
for row in rows:
image_id, object_key, current_tier, file_size, \
access_30d, access_7d, last_access = row
# 计算日均访问次数
daily_access = access_30d / 30.0 if access_30d > 0 else 0
# 计算距离上次访问的天数
days_since_access = (datetime.now() - last_access).days \
if last_access else 999
# 确定目标层级
target_tier = self._determine_tier(daily_access, days_since_access)
# 只有层级变化时才生成迁移任务
if current_tier != target_tier:
file_size_gb = file_size / 1024 / 1024 / 1024
saving = file_size_gb * (
self.TIER_PRICE.get(current_tier, 0.12)
- self.TIER_PRICE.get(target_tier, 0.12)
)
tasks.append(MigrationTask(
image_id=image_id,
object_key=object_key,
current_tier=current_tier,
target_tier=target_tier,
reason=self._build_reason(
daily_access, days_since_access, target_tier
),
estimated_saving=round(saving, 4),
))
# 按预计节省排序(优先迁移节省最多的)
tasks.sort(key=lambda t: t.estimated_saving, reverse=True)
return tasks
def _determine_tier(self, daily_access: float, days_since_access: int) -> str:
"""根据访问模式确定存储层级"""
if days_since_access > self.config['cold_threshold']:
return 'archive' # 冷
elif daily_access < self.config['warm_max_access']:
return 'infrequent' # 温
else:
return 'standard' # 热
def _build_reason(self, daily_access: float, days_since: int, tier: str) -> str:
"""构建迁移原因说明"""
reasons = {
'standard': f'日均访问 {daily_access:.1f} 次,升级到标准存储',
'infrequent': f'日均访问 {daily_access:.1f} 次,降级到低频存储',
'archive': f'已 {days_since} 天无访问,归档存储',
}
return reasons.get(tier, '')
def execute_migrations(self, tasks: List[MigrationTask]) -> Dict:
"""执行存储迁移"""
results = {
'total': len(tasks),
'success': 0,
'failed': 0,
'total_saving': 0.0,
'errors': [],
}
batch_size = self.config['batch_size']
for i in range(0, len(tasks), batch_size):
batch = tasks[i:i + batch_size]
for task in batch:
try:
if not self.config['dry_run']:
# 调用 OSS API 修改存储类型
self.oss.copy_object(
bucket='guangying-images',
source_key=task.object_key,
dest_key=task.object_key,
storage_class=self._oss_storage_class(task.target_tier),
)
# 更新数据库记录
self.db.execute(
"UPDATE images SET storage_tier = %s WHERE id = %s",
(task.target_tier, task.image_id),
)
results['success'] += 1
results['total_saving'] += task.estimated_saving
logger.info(
f"迁移 {task.object_key}: "
f"{task.current_tier} → {task.target_tier} "
f"({task.reason})"
)
except Exception as e:
results['failed'] += 1
results['errors'].append({
'image_id': task.image_id,
'error': str(e),
})
logger.error(f"迁移失败 {task.object_key}: {e}")
results['total_saving'] = round(results['total_saving'], 2)
return results
@staticmethod
def _oss_storage_class(tier: str) -> str:
"""映射到 OSS 存储类型常量"""
mapping = {
'standard': 'Standard',
'infrequent': 'IA',
'archive': 'Archive',
}
return mapping.get(tier, 'Standard')
# 使用示例
if __name__ == '__main__':
manager = StorageTierManager(db=db, oss_client=oss_client)
# 第一步:分析(dry run)
manager.config['dry_run'] = True
tasks = manager.analyze_tiers()
print(f"分析完成:{len(tasks)} 个图片需要迁移")
# 打印前 10 个迁移任务
for task in tasks[:10]:
print(
f" {task.object_key}: "
f"{task.current_tier} → {task.target_tier} "
f"(节省 {task.estimated_saving} 元/月)"
)
# 第二步:确认后执行
total_saving = sum(t.estimated_saving for t in tasks)
print(f"\n预计月度节省:{total_saving:.2f} 元")
confirm = input("确认执行迁移?(y/N): ")
if confirm.lower() == 'y':
manager.config['dry_run'] = False
result = manager.execute_migrations(tasks)
print(f"迁移完成:成功 {result['success']},失败 {result['failed']}")
print(f"月度节省:{result['total_saving']:.2f} 元")缩略图的特殊处理
这里有个细节值得注意:缩略图不应该跟原图走同一套分层规则。
# 缩略图的分层策略与原图不同
THUMBNAIL_STRATEGY = {
'all_in_standard': True, # 缩略图始终在标准存储
'reason': (
'缩略图体积小(平均 45KB),'
'即使全部存标准存储也很便宜(11GB × 0.12 = 1.32 元/月)。'
'但用户列表页每秒都在请求缩略图,'
'如果降级到归档存储,解冻延迟 1~5 分钟,用户体验灾难。'
),
}
# 只对原图做冷热分离
ORIGINAL_STRATEGY = {
'hot_threshold_days': 30, # 30 天内有访问
'warm_threshold_days': 90, # 30~90 天偶有访问
'cold_threshold_days': 90, # 90 天以上零访问
'size_breakdown': {
'original_storage_tb': 0.24, # 原图总量 243 GB
'hot_pct': 0.20, # 热数据占 20%
'warm_pct': 0.30, # 温数据占 30%
'cold_pct': 0.50, # 冷数据占 50%
},
}
# 优化前后的成本对比
def compare_storage_cost():
"""对比冷热分离前后的存储费用"""
original_storage_gb = 243 # 原图总量
thumbnail_storage_gb = 11 # 缩略图总量
# 优化前:全部标准存储
before = (original_storage_gb + thumbnail_storage_gb) * 0.12
# = 254 × 0.12 = 30.48 元/月
# 优化后:原图分层 + 缩略图标准存储
hot_gb = original_storage_gb * 0.20 # 48.6 GB
warm_gb = original_storage_gb * 0.30 # 72.9 GB
cold_gb = original_storage_gb * 0.50 # 121.5 GB
after = (
hot_gb * 0.12 # 标准:5.83 元
+ warm_gb * 0.08 # 低频:5.83 元
+ cold_gb * 0.03 # 归档:3.65 元
+ thumbnail_storage_gb * 0.12 # 缩略图全标准:1.32 元
)
# = 16.63 元/月
return {
'before': round(before, 2),
'after': round(after, 2),
'saving': round(before - after, 2),
'saving_pct': round((before - after) / before * 100, 1),
}
# {
# 'before': 30.48,
# 'after': 16.63,
# 'saving': 13.85,
# 'saving_pct': 45.4%
# }存储费直接砍掉 45%! 从 30.48 元降到 16.63 元。
归档图片的”复活”机制
冷数据归档后,如果用户突然想看一张老照片怎么办?
// 冷数据访问的"复活"机制
interface ArchiveRestoreRequest {
imageId: string;
objectKey: string;
requesterId: string;
priority: 'standard' | 'expedited'; // 标准解冻 / 加急解冻
}
class ArchiveRestoreService {
/**
* 处理归档图片的访问请求
* - 如果图片在标准/低频存储:直接返回
* - 如果图片在归档存储:触发解冻,返回占位图
*/
async handleImageAccess(
imageId: string,
userId: string
): Promise<{
url: string;
status: 'ready' | 'restoring';
estimatedWaitSeconds?: number;
}> {
const image = await this.db.query(
'SELECT * FROM images WHERE id = ?',
[imageId]
);
// 热数据 / 温数据:直接返回
if (image.storage_tier !== 'archive') {
return {
url: this.cdn.getSignedUrl(image.object_key),
status: 'ready',
};
}
// 冷数据:触发解冻
const isRestoring = await this.isBeingRestored(imageId);
if (!isRestoring) {
// 发起解冻请求
await this.oss.restoreObject({
bucket: 'guangying-images',
key: image.object_key,
restoreDays: 7, // 解冻后保持 7 天可读
tier: 'Expedited', // 加急模式,1~5 分钟
});
// 记录解冻状态
await this.db.execute(
`INSERT INTO restore_requests (image_id, requester_id, status, created_at)
VALUES (?, ?, 'restoring', NOW())`,
[imageId, userId]
);
}
// 返回占位图 + 预估等待时间
return {
url: '/images/archived-placeholder.svg', // 占位图
status: 'restoring',
estimatedWaitSeconds: 180, // 约 3 分钟
};
}
/**
* 定时检查解冻完成的图片
* 解冻完成后:
* 1. 通知请求者
* 2. 将图片临时升级为标准存储(7 天)
* 3. 7 天后如果无人访问,自动回归档
*/
async checkRestoreStatus(): Promise<void> {
const restoring = await this.db.query(
`SELECT * FROM restore_requests WHERE status = 'restoring'`
);
for (const req of restoring) {
const restored = await this.oss.isObjectRestored({
bucket: 'guangying-images',
key: req.object_key,
});
if (restored) {
// 更新状态
await this.db.execute(
`UPDATE restore_requests SET status = 'completed' WHERE id = ?`,
[req.id]
);
// 通知请求者
await this.notification.send(req.requester_id, {
type: 'image_restored',
imageId: req.image_id,
message: '您请求的图片已解冻完成,点击查看',
});
logger.info(`图片 ${req.object_key} 解冻完成,通知用户 ${req.requester_id}`);
}
}
}
}前端配合:解冻等待体验
// 前端:冷数据访问的渐进体验
function ArchivedImageLoader({ imageId }: { imageId: string }) {
const [status, setStatus] = useState<'loading' | 'restoring' | 'ready'>('loading');
const [imageUrl, setImageUrl] = useState('');
const [countdown, setCountdown] = useState(180);
useEffect(() => {
// 请求图片
api.getImage(imageId).then((res) => {
if (res.status === 'ready') {
setImageUrl(res.url);
setStatus('ready');
} else if (res.status === 'restoring') {
setStatus('restoring');
// 轮询检查是否解冻完成
const poll = setInterval(async () => {
const check = await api.getImage(imageId);
if (check.status === 'ready') {
setImageUrl(check.url);
setStatus('ready');
clearInterval(poll);
}
setCountdown((prev) => Math.max(0, prev - 5));
}, 5000); // 每 5 秒检查一次
return () => clearInterval(poll);
}
});
}, [imageId]);
if (status === 'restoring') {
return (
<div className="archived-image-placeholder">
<img src="/images/archived-placeholder.svg" alt="归档图片" />
<p>这张老照片正在从归档中解冻...</p>
<p className="text-sm text-gray-500">
预计 {Math.ceil(countdown / 60)} 分钟后可查看
</p>
<div className="progress-bar">
<div style={{ width: `${((180 - countdown) / 180) * 100}%` }} />
</div>
</div>
);
}
return status === 'ready'
? <img src={imageUrl} alt="" loading="lazy" />
: <div className="skeleton" />;
}定时任务配置
冷热分析是一个定时任务,每天凌晨运行:
# Celery 定时任务配置
from celery import Celery
from celery.schedules import crontab
app = Celery('guangying')
app.conf.beat_schedule = {
# 每天凌晨 3 点分析冷热数据
'analyze-storage-tiers': {
'task': 'tasks.analyze_storage_tiers',
'schedule': crontab(hour=3, minute=0),
},
# 每天凌晨 4 点执行迁移
'execute-tier-migrations': {
'task': 'tasks.execute_tier_migrations',
'schedule': crontab(hour=4, minute=0),
},
# 每 10 分钟检查解冻状态
'check-restore-status': {
'task': 'tasks.check_restore_status',
'schedule': crontab(minute='*/10'),
},
}
@app.task
def analyze_storage_tiers():
"""每天分析所有图片的访问频率,生成迁移计划"""
manager = StorageTierManager(db=get_db(), oss_client=get_oss())
tasks = manager.analyze_tiers()
# 保存迁移计划到数据库(不立即执行)
for task in tasks:
db.execute(
"""INSERT INTO migration_plans
(image_id, current_tier, target_tier, reason, saving, created_at)
VALUES (%s, %s, %s, %s, %s, NOW())""",
(task.image_id, task.current_tier, task.target_tier,
task.reason, task.estimated_saving),
)
total_saving = sum(t.estimated_saving for t in tasks)
logger.info(f"分析完成:{len(tasks)} 个迁移任务,预计节省 {total_saving:.2f} 元/月")
@app.task
def execute_tier_migrations():
"""每天执行已审批的迁移计划"""
# 获取今天待执行的迁移任务
tasks = db.query(
"SELECT * FROM migration_plans WHERE created_at = CURDATE() AND status = 'approved'"
)
manager = StorageTierManager(db=get_db(), oss_client=get_oss())
migration_tasks = [MigrationTask(**t) for t in tasks]
result = manager.execute_migrations(migration_tasks)
logger.info(
f"迁移完成:成功 {result['success']},"
f"失败 {result['failed']},"
f"月度节省 {result['total_saving']:.2f} 元"
)本节小结
✅ 我学到了什么:
- 图片访问严格遵循帕累托分布:20% 的图片贡献 84% 的流量
- 42.7% 的图片是僵尸数据,30 天内零访问,却在用最贵的标准存储
- 冷热分离的核心是按访问频率自动分层,把冷数据迁移到低成本的存储
- 缩略图不需要分层(体积小、访问频繁),只对原图做分层
⚠️ 踩过的坑:
- 一开始把缩略图也做了分层,结果列表页加载变慢(解冻延迟)
- 迁移任务不能一次跑太多,否则触发 OSS API 频率限制
- 归档图片的”复活”通知很重要,不然用户以为图片丢了
🎯 下一步:分层只是第一步,还需要一套完整的生命周期管理规则——什么时候降级、什么时候删除、临时文件怎么清理。
我的思考
思考 1
如果一张归档图片突然”爆火”(比如一个老摄影师的作品突然在社交媒体上被大量转发),如何自动识别并快速”升级”回热数据?
需要建立一个访问突增检测 + 自动升级机制:
class HotspotDetector:
"""突发流量检测器"""
def __init__(self):
self.window_size = 300 # 5 分钟窗口
self.hotspot_threshold = 10 # 5 分钟内 10 次访问即视为突增
async def on_access(self, image_id: str):
"""每次访问时检查"""
# 滑动窗口计数
key = f"access_burst:{image_id}"
count = await redis.incr(key)
if count == 1:
await redis.expire(key, self.window_size)
if count >= self.hotspot_threshold:
# 突发流量!立即升级
image = await db.get_image(image_id)
if image.storage_tier == 'archive':
await self.urgent_upgrade(image)
elif image.storage_tier == 'infrequent':
await self.upgrade_to_standard(image)
async def urgent_upgrade(self, image):
"""紧急升级:归档 → 标准"""
# 1. 触发加急解冻
await oss.restore_object(image.object_key, tier='Expedited')
# 2. 解冻完成后立即转为标准存储
# 3. 预热到 CDN
# 4. 通知前端刷新关键点:用 Redis 的滑动窗口做实时计数,超过阈值后立即触发升级。同时配合 CDN 预热,让后续用户直接从 CDN 获取。
思考 2
冷热分离在什么规模下才有价值?10 万张图片的个人博客需要做冷热分离吗?
规模决定收益,需要算账:
个人博客(10 万张图片):
- 原图总量:10 万 × 5MB = 500 GB
- 全部标准存储:500 × 0.12 = 60 元/月
- 冷热分离后:约 35 元/月
- 节省:25 元/月 = 300 元/年
分析:
- 节省 300 元/年,但开发和维护冷热分离系统需要至少 2~3 天
- 个人博客的图片访问模式简单,大多数图常年没人看
- 结论:不值得自建系统,直接用 OSS 自带的生命周期规则即可
何时值得自建:
- 图片量 > 100 万张
- 月存储费 > 500 元
- 有复杂的分层逻辑(不只是按时间,还要按访问频率)
- 需要精细控制迁移时机
简单方案(适合小规模):
直接用 OSS 生命周期规则,按时间自动降级:
30 天 → 低频,180 天 → 归档
无需写代码,在 OSS 控制台配置即可。