缓存击穿
那次惊魂时刻
周三上午 10 点,流量高峰期。
一切看似正常:
- API 响应时间:50ms ✅
- 缓存命中率:98% ✅
- 外部 API 调用成功率:99% ✅
突然,某个瞬间:
⚠️ 警告:API 响应时间飙升到 8 秒
⚠️ 警告:外部 API 调用失败率飙升
⚠️ 警告:请求超时率 60%系统差点崩溃。事后分析日志,我发现了一个致命问题。
什么是缓存击穿?
缓存击穿流程
🔥 热点 Key 形成
北京 50,000 次/天
上海 40,000 次/天
深圳 30,000 次/天
⏱️ 击穿发生过程
T0
⏰
热点 key 过期 weather:北京 缓存失效
T1
👥
100 个并发请求同时到达 全部访问同一个 key
T2
❌
缓存未命中 Redis 中无此数据
T3
🌐
100 个请求同时访问外部 API 触发 API 限流
100%
T4
💥
系统崩溃 API 超时,错误率飙升
⚖️ 正常情况 vs 击穿时刻
✅ 正常情况
👤 请求
→🗄️ 缓存命中
→✅ 返回 (50ms)
缓存命中率: 98%
⚠️ 击穿时刻
👥 100 请求
→🗄️ 缓存过期
→🌐 外部 API
API 限流状态: 已触发
缓存击穿是指:
- 某个热点 key(被频繁访问的数据)突然过期
- 大量并发请求同时访问这个 key
- 所有请求都穿透到外部 API
- 外部 API 瞬间压力激增,可能触发限流
关键特征:
- 只有一个 key 过期(不是大面积)
- 但这个 key 非常”热”(访问量巨大)
- 并发请求同时击穿缓存层
击穿 vs 穿透
这两个概念很容易混淆,我们来对比一下:
| 对比项 | 缓存穿透 | 缓存击穿 |
|---|---|---|
| 攻击目标 | 不存在的数据 | 存在的热点数据 |
| key 特征 | 大量不同的无效 key | 单个有效的热点 key |
| 触发原因 | 恶意攻击/非法请求 | 热点 key 自然过期 |
| 数据存在性 | 缓存和外部 API 都没有 | 外部 API 有,缓存刚好过期 |
| 请求特点 | 持续的恶意请求 | 并发量大的正常请求 |
简单记忆:
- 穿透 = 穿透空对象(数据不存在)
- 击穿 = 击穿热点 key(数据存在但缓存过期)
真实场景还原
热点 key 的形成
我的天气 API 中,某些城市特别热门:
热门城市统计(日调用量):
- 北京:50,000 次
- 上海:40,000 次
- 深圳:30,000 次
- 广州:25,000 次这些热点 city 的缓存 key 长期存在于 Redis 中:
# 北京的缓存 key
weather:北京 = {"temp": 25, "condition": "晴", ...}
# 过期时间:1 小时
# 每小时自动刷新击穿发生的过程
时间线(毫秒级):
T0: 北京的缓存 key 过期
T1: 100 个并发请求同时到达
T2: 100 个请求都发现缓存没有数据
T3: 100 个请求同时访问外部 API
T4: 外部 API 触发限流
T5: 其他请求开始超时
T6: 系统雪崩核心问题: 缓存过期 → 并发请求 → 外部 API 压力 → 系统崩溃
解决方案一:互斥锁
方案一:互斥锁
👤 请求 1
👤 请求 2
👤 请求 3
...
👤 请求 N
🔐 互斥锁竞争 (SETNX)
👤 请求 1
🔑 获得锁
🔍 查缓存 (无) → 🛢️ 查数据库 → 💾 写缓存
🔓 释放锁
👤 请求 2
⏳ 等待锁
sleep(100ms) → 重试→查缓存 (命中!)
👤 请求 3
⏳ 等待锁
sleep(100ms) → 重试→查缓存 (命中!)
✅
数据库受到保护 只有一个请求访问数据库
⚡
后续请求快速响应 直接从缓存读取数据
核心思路:
- 只让一个请求去调用外部 API
- 其他请求等待
- 等第一个请求把数据写入缓存后,大家都从缓存读
import redis
import time
import json
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_weather_with_mutex(city):
cache_key = f"weather:{city}"
lock_key = f"lock:{cache_key}"
# 1. 先查缓存
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 2. 缓存未命中,尝试获取互斥锁
# SETNX = SET if Not eXists(不存在才设置)
lock_acquired = redis_client.set(lock_key, '1', nx=True, ex=10)
if lock_acquired:
# 3. 获取锁成功,去调用外部 API
try:
# 双重检查:查 API 前再看一眼缓存
# 防止在等待锁期间已经有其他请求写入了数据
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 调用外部 API
data = external_api.get_weather(city)
# 写入缓存
redis_client.setex(cache_key, 3600, json.dumps(data))
return data
finally:
# 4. 释放锁(一定要释放!)
redis_client.delete(lock_key)
else:
# 5. 获取锁失败,说明有其他请求在调用外部 API
# 等待一会儿,然后从缓存读
time.sleep(0.1) # 等待 100ms
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 缓存还是没有,重试(设置重试次数限制)
return get_weather_with_mutex(city)流程图解:
请求 1 请求 2 请求 3
↓ ↓ ↓
查缓存 (无) 查缓存 (无) 查缓存 (无)
↓ ↓ ↓
抢锁 (成功) 抢锁 (失败) 抢锁 (失败)
↓ ↓ ↓
调用外部 API 等待... 等待...
↓ ↓ ↓
写缓存 等待... 等待...
↓ ↓ ↓
释放锁 重试→查缓存 重试→查缓存
↓ ↓
命中! 命中!优点:
- 保证只有一个请求调用外部 API
- 完全解决击穿问题
- 实现相对简单
缺点:
- 需要处理锁的获取和释放
- 如果持锁请求失败,需要超时机制
- 性能略有损耗(等待时间)
解决方案二:逻辑过期(高级)
方案二:逻辑过期
💾
写入缓存 数据 + 逻辑过期时间
{ "data": {...}, "expire_at": "2025-03-15T15:00:00" }
👤
用户请求到达 查询缓存数据
🔍 检查逻辑过期时间
✅ 未过期
📦 直接返回数据
⚡ 响应快 (50ms)
⏰ 已过期
📤 立即返回旧数据
用户无感知🔄 后台异步更新
不阻塞用户请求🔄 后台异步更新流程
1 启动异步线程
→2 获取更新锁
→3 查数据库
→4 更新缓存
⚡
用户零等待 总是立即返回数据
⚠️
可能返回旧数据 短暂不一致窗口
核心思路:
- 数据永不过期(物理上)
- 在数据内部记录逻辑过期时间
- 发现过期后,异步更新缓存
import threading
import json
from datetime import datetime, timedelta
def get_weather_logical_expiry(city):
cache_key = f"weather:{city}"
# 1. 查缓存
cached = redis_client.get(cache_key)
if not cached:
# 缓存完全没有数据,直接调用外部 API 并设置
data = external_api.get_weather(city)
cache_data = {
'data': data,
'expire_at': (datetime.now() + timedelta(hours=1)).isoformat()
}
redis_client.set(cache_key, json.dumps(cache_data))
return data
cache_data = json.loads(cached)
# 2. 检查逻辑过期时间
expire_at = datetime.fromisoformat(cache_data['expire_at'])
is_expired = datetime.now() > expire_at
if is_expired:
# 3. 逻辑过期了,启动异步线程更新
# 当前请求先返回旧数据
threading.Thread(target=update_cache_async, args=(city, cache_key)).start()
# 返回旧数据(用户无感知)
return cache_data['data']
# 4. 没有过期,正常返回
return cache_data['data']
def update_cache_async(city, cache_key):
"""异步更新缓存的后台线程"""
try:
# 尝试获取更新锁(避免多个线程同时更新)
lock_key = f"lock:update:{cache_key}"
lock_acquired = redis_client.set(lock_key, '1', nx=True, ex=10)
if lock_acquired:
# 调用外部 API,更新缓存
data = external_api.get_weather(city)
cache_data = {
'data': data,
'expire_at': (datetime.now() + timedelta(hours=1)).isoformat()
}
redis_client.set(cache_key, json.dumps(cache_data))
finally:
pass # 锁会自动过期,无需手动释放时间线对比:
传统过期方式:
T0: 写入缓存(1 小时后过期)
T1: 30 分钟后,用户访问 → 命中缓存 ✅
T2: 61 分钟后,缓存过期 ❌
T3: 用户访问 → 缓存未命中 → 调用外部 API
逻辑过期方式:
T0: 写入缓存(逻辑过期:1 小时)
T1: 30 分钟后,用户访问 → 命中缓存 ✅
T2: 61 分钟后,用户访问 → 命中缓存(但发现过期)
T3: 启动异步更新 → 返回旧数据(用户无感知)
T4: 后台完成更新 → 新数据就绪优点:
- 用户永远能读到数据(不会等待)
- 完全避免外部 API 压力
- 适合对实时性要求不高的场景
缺点:
- 可能返回过期数据(一致性弱)
- 实现复杂(需要异步线程)
- 需要处理并发更新问题
方案对比
| 对比项 | 互斥锁 | 逻辑过期 |
|---|---|---|
| 实现难度 | ⭐⭐ 中等 | ⭐⭐⭐⭐ 复杂 |
| 数据一致性 | 强一致 | 弱一致(可能返回旧数据) |
| 响应时间 | 需要等待(~100ms) | 无需等待(立即返回) |
| 外部 API 压力 | 完全保护 | 完全保护 |
| 适用场景 | 大多数场景 | 高并发、可接受短暂不一致 |
实战:完整的热点 key 防护
import redis
import threading
import json
import time
from functools import wraps
from datetime import datetime, timedelta
redis_client = redis.Redis(host='localhost', port=6379, db=0)
NULL_MARKER = '__NULL__'
class CacheBreakdownProtection:
"""缓存击穿防护工具类"""
def __init__(self, default_ttl=3600, lock_timeout=10, use_logical_expiry=False):
self.default_ttl = default_ttl
self.lock_timeout = lock_timeout
self.use_logical_expiry = use_logical_expiry
def hot_key(self, key_prefix=''):
"""
热点 key 防护装饰器
key_prefix: 缓存 key 的前缀
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存 key
cache_key = f"{key_prefix}:{func.__name__}:{args}:{kwargs}"
if self.use_logical_expiry:
return self._logical_expiry_flow(func, cache_key, *args, **kwargs)
else:
return self._mutex_flow(func, cache_key, *args, **kwargs)
return wrapper
return decorator
def _mutex_flow(self, func, cache_key, *args, **kwargs):
"""互斥锁方案"""
# 查缓存
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 抢锁
lock_key = f"lock:{cache_key}"
lock_acquired = redis_client.set(lock_key, '1', nx=True, ex=self.lock_timeout)
if lock_acquired:
try:
# 双重检查
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 调用外部 API
result = func(*args, **kwargs)
# 写缓存
redis_client.setex(cache_key, self.default_ttl, json.dumps(result))
return result
finally:
redis_client.delete(lock_key)
else:
# 等待重试
time.sleep(0.05)
return self._mutex_flow(func, cache_key, *args, **kwargs)
def _logical_expiry_flow(self, func, cache_key, *args, **kwargs):
"""逻辑过期方案"""
cached = redis_client.get(cache_key)
if not cached:
# 缓存没有数据,直接写入
result = func(*args, **kwargs)
cache_data = {
'data': result,
'expire_at': (datetime.now() + timedelta(seconds=self.default_ttl)).isoformat()
}
redis_client.set(cache_key, json.dumps(cache_data))
return result
cache_data = json.loads(cached)
expire_at = datetime.fromisoformat(cache_data['expire_at'])
if datetime.now() > expire_at:
# 逻辑过期,异步更新
threading.Thread(
target=self._async_update,
args=(func, cache_key, *args)
).start()
return cache_data['data']
return cache_data['data']
def _async_update(self, func, cache_key, *args):
"""异步更新"""
lock_key = f"lock:update:{cache_key}"
lock_acquired = redis_client.set(lock_key, '1', nx=True, ex=30)
if lock_acquired:
try:
result = func(*args)
cache_data = {
'data': result,
'expire_at': (datetime.now() + timedelta(seconds=self.default_ttl)).isoformat()
}
redis_client.set(cache_key, json.dumps(cache_data))
finally:
pass
# 使用示例
cache_protection = CacheBreakdownProtection(default_ttl=3600)
@cache_protection.hot_key(key_prefix='weather')
def get_weather(city):
"""查询天气,自动获得热点 key 防护"""
return external_api.get_weather(city)效果验证
实施互斥锁方案后的系统指标:
防护效果对比
| 指标 | 无防护 | 互斥锁方案 | 逻辑过期方案 |
|---|---|---|---|
| 外部 API 峰值调用量 | 10000 次/秒 | 100 次/秒 | 50 次/秒 |
| 平均响应时间 | 2000ms | 150ms | 50ms |
| 最大响应时间 | 8000ms | 300ms | 100ms |
| 请求超时率 | 60% | 0.1% | 0% |
| 数据一致性 | 强一致 | 强一致 | 弱一致 |
结论:
- 互斥锁方案:适合大多数场景,一致性好
- 逻辑过期方案:适合超高并发、可接受短暂不一致的场景
深度思考
热点 key 的发现
如何提前发现哪些是热点 key?
# 方案 1:Redis 自带的热键分析
redis-cli --hotkeys
# 方案 2:应用层统计
from collections import Counter
request_counter = Counter()
def get_weather(city):
request_counter[city] += 1
# ...
# 定期统计,找出 TOP 10 热点
top_cities = request_counter.most_common(10)
# 方案 3:监控系统
# 通过 Prometheus + Grafana 监控缓存 key 的访问频率锁的超时时间设置
超时太短(如 1 秒):
❌ 外部 API 调用可能还没完成,锁就过期了
❌ 其他请求会重复调用外部 API
超时太长(如 60 秒):
❌ 如果持锁请求崩溃,其他请求要等很久
❌ 影响系统可用性
推荐设置(5-15 秒):
✅ 足够完成外部 API 调用
✅ 崩溃影响可控
✅ 配合看门狗机制更佳小结
缓存击穿的本质:热点 key 过期瞬间,大量并发请求涌向外部 API
防护核心思想:
- 互斥锁 —— 只让一个请求调用外部 API,其他等待
- 逻辑过期 —— 数据永不过期,后台异步更新
最佳实践:
- 优先使用互斥锁方案(简单可靠)
- 超高并发场景考虑逻辑过期
- 合理设置锁超时时间(5-15 秒)
- 配合监控系统,提前发现热点 key
当前技术架构
每个请求都直接调用外部 API,响应慢
客户端
用户请求 200 个开发者
应用服务
应用服务器 处理请求
缓存层 / 外部服务
外部天气 API 响应时间 2 秒
引入 Redis 缓存,大幅降低响应时间
客户端
用户请求 高并发访问
应用服务
应用服务器 处理请求
缓存层 / 外部服务
Redis 缓存 1 小时过期
外部天气 API 响应时间 2 秒
增加空值缓存,防止缓存穿透
客户端
用户请求 包含恶意请求
应用服务
应用服务器 参数校验
缓存层 / 外部服务
Redis 缓存 包含空值缓存
外部天气 API 有调用限制
使用布隆过滤器提前过滤无效请求
客户端
用户请求 包含恶意请求
应用服务
应用服务器 布隆过滤器校验
缓存层 / 外部服务
Redis 缓存 包含空值缓存
外部天气 API 有调用限制
使用互斥锁防止缓存击穿
客户端
用户请求 高并发访问
应用服务
应用服务器 互斥锁控制
缓存层 / 外部服务
Redis 缓存 热点 key 防护
外部天气 API 有调用限制
熔断器 + 降级策略应对缓存雪崩
客户端
用户请求 高并发访问
应用服务
应用服务器 熔断器 + 降级
缓存层 / 外部服务
Redis Sentinel 主从高可用
外部天气 API 有调用限制
多地域部署的高可用 API 平台
客户端
用户请求 10 万用户
应用服务
应用服务器集群 26 台,3 地域
缓存层 / 外部服务
Redis 集群 6 主 6 从
MySQL 主从 1 主 5 从
消息队列 RabbitMQ 3 台
课后练习
练习 1
缓存击穿和缓存穿透的区别是什么?
参考答案 (3 个标签)
缓存击穿 缓存穿透 概念辨析
答案:
| 对比项 | 缓存穿透 | 缓存击穿 |
|---|---|---|
| 攻击目标 | 不存在的数据 | 存在的热点数据 |
| key 数量 | 大量不同的 key | 单个热点 key |
| 数据存在性 | 缓存和外部 API 都没有 | 外部 API 有,缓存刚好过期 |
| 触发原因 | 恶意攻击/非法请求 | 热点 key 自然过期 |
| 并发特征 | 持续的恶意请求 | 高并发的正常请求 |
简单记忆:
- 穿透 = 数据不存在,请求穿透空对象
- 击穿 = 热点 key 过期,请求击穿缓存层
练习 2
互斥锁方案中,为什么获取锁后要”双重检查”缓存?
参考答案 (3 个标签)
缓存击穿 互斥锁 双重检查
答案:
双重检查的目的是避免重复调用外部 API。
场景说明:
时间线:
T1: 请求 A 获取锁成功
T2: 请求 B 获取锁失败,进入等待
T3: 请求 A 调用外部 API 并写入缓存,释放锁
T4: 请求 B 获得锁(因为 A 释放了)
T5: 请求 B 再次调用外部 API ← 这是浪费的!双重检查代码:
if lock_acquired:
try:
# 第一次检查:抢锁前
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 调用外部 API
data = external_api.get_weather(...)
# 写缓存
redis_client.setex(cache_key, 3600, json.dumps(data))
finally:
redis_client.delete(lock_key)效果:
- 避免在等待锁期间,其他请求已经写入了数据
- 减少不必要的外部 API 调用
练习 3
请实现一个带有缓存击穿防护的商品查询函数,使用互斥锁方案。
参考答案 (3 个标签)
缓存击穿 互斥锁 实战编程
参考答案:
import redis
import json
import time
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def get_product_info(product_id):
"""
查询商品信息,带缓存击穿防护(互斥锁方案)
"""
cache_key = f"product:{product_id}"
lock_key = f"lock:{cache_key}"
# 1. 先查缓存
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 2. 尝试获取互斥锁(10 秒超时)
lock_acquired = redis_client.set(lock_key, '1', nx=True, ex=10)
if lock_acquired:
try:
# 3. 双重检查缓存
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 4. 调用外部 API
product = external_api.get_product(product_id)
if not product:
# 商品不存在,缓存空值(防穿透)
redis_client.setex(cache_key, 60, '__NULL__')
return None
# 5. 写入缓存(1 小时过期)
redis_client.setex(cache_key, 3600, json.dumps(product))
return product
finally:
# 6. 释放锁(一定要释放!)
redis_client.delete(lock_key)
else:
# 7. 获取锁失败,等待后重试
time.sleep(0.1) # 等待 100ms
return get_product_info(product_id) # 递归重试
# 使用示例
@app.route('/api/product/<int:product_id>')
def product_api(product_id):
# 参数校验
if product_id <= 0:
return {'error': '无效的商品 ID'}, 400
product = get_product_info(product_id)
if not product:
return {'error': '商品不存在'}, 404
return {'data': product}关键点:
- 互斥锁保护外部 API 调用
- 双重检查避免重复调用
- 空值缓存防止穿透
- 等待重试机制处理锁竞争
- finally 块确保锁释放
