导航菜单

缓存击穿

那次惊魂时刻

周三上午 10 点,流量高峰期。

一切看似正常:

  • API 响应时间:50ms ✅
  • 缓存命中率:98% ✅
  • 外部 API 调用成功率:99% ✅

突然,某个瞬间:

⚠️ 警告:API 响应时间飙升到 8 秒
⚠️ 警告:外部 API 调用失败率飙升
⚠️ 警告:请求超时率 60%

系统差点崩溃。事后分析日志,我发现了一个致命问题。

什么是缓存击穿?

缓存击穿流程
🔥 热点 Key 形成
北京 50,000 次/天
上海 40,000 次/天
深圳 30,000 次/天
⏱️ 击穿发生过程
T0
热点 key 过期 weather:北京 缓存失效
T1
👥
100 个并发请求同时到达 全部访问同一个 key
T2
缓存未命中 Redis 中无此数据
T3
🌐
100 个请求同时访问外部 API 触发 API 限流
100%
T4
💥
系统崩溃 API 超时,错误率飙升
⚖️ 正常情况 vs 击穿时刻
正常情况
👤 请求
🗄️ 缓存命中
✅ 返回 (50ms)
缓存命中率: 98%
⚠️ 击穿时刻
👥 100 请求
🗄️ 缓存过期
🌐 外部 API
API 限流状态: 已触发

缓存击穿是指:

  • 某个热点 key(被频繁访问的数据)突然过期
  • 大量并发请求同时访问这个 key
  • 所有请求都穿透到外部 API
  • 外部 API 瞬间压力激增,可能触发限流

关键特征

  • 只有一个 key 过期(不是大面积)
  • 但这个 key 非常”热”(访问量巨大)
  • 并发请求同时击穿缓存层

击穿 vs 穿透

这两个概念很容易混淆,我们来对比一下:

对比项缓存穿透缓存击穿
攻击目标不存在的数据存在的热点数据
key 特征大量不同的无效 key单个有效的热点 key
触发原因恶意攻击/非法请求热点 key 自然过期
数据存在性缓存和外部 API 都没有外部 API 有,缓存刚好过期
请求特点持续的恶意请求并发量大的正常请求

简单记忆

  • 穿透 = 穿透空对象(数据不存在)
  • 击穿 = 击穿热点 key(数据存在但缓存过期)

真实场景还原

热点 key 的形成

我的天气 API 中,某些城市特别热门:

热门城市统计(日调用量):
- 北京:50,000 次
- 上海:40,000 次
- 深圳:30,000 次
- 广州:25,000 次

这些热点 city 的缓存 key 长期存在于 Redis 中:

# 北京的缓存 key
weather:北京 = {"temp": 25, "condition": "晴", ...}
# 过期时间:1 小时
# 每小时自动刷新

击穿发生的过程

时间线(毫秒级):

T0:    北京的缓存 key 过期
T1:    100 个并发请求同时到达
T2:    100 个请求都发现缓存没有数据
T3:    100 个请求同时访问外部 API
T4:    外部 API 触发限流
T5:    其他请求开始超时
T6:    系统雪崩

核心问题: 缓存过期 → 并发请求 → 外部 API 压力 → 系统崩溃

解决方案一:互斥锁

方案一:互斥锁
👤 请求 1
👤 请求 2
👤 请求 3
...
👤 请求 N
🔐 互斥锁竞争 (SETNX)
👤 请求 1
🔑 获得锁
🔍 查缓存 (无) 🛢️ 查数据库 💾 写缓存
🔓 释放锁
👤 请求 2
等待锁
sleep(100ms) 重试→查缓存 (命中!)
👤 请求 3
等待锁
sleep(100ms) 重试→查缓存 (命中!)
数据库受到保护 只有一个请求访问数据库
后续请求快速响应 直接从缓存读取数据

核心思路

  • 只让一个请求去调用外部 API
  • 其他请求等待
  • 等第一个请求把数据写入缓存后,大家都从缓存读
import redis
import time
import json

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def get_weather_with_mutex(city):
    cache_key = f"weather:{city}"
    lock_key = f"lock:{cache_key}"

    # 1. 先查缓存
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)

    # 2. 缓存未命中,尝试获取互斥锁
    # SETNX = SET if Not eXists(不存在才设置)
    lock_acquired = redis_client.set(lock_key, '1', nx=True, ex=10)

    if lock_acquired:
        # 3. 获取锁成功,去调用外部 API
        try:
            # 双重检查:查 API 前再看一眼缓存
            # 防止在等待锁期间已经有其他请求写入了数据
            cached = redis_client.get(cache_key)
            if cached:
                return json.loads(cached)

            # 调用外部 API
            data = external_api.get_weather(city)

            # 写入缓存
            redis_client.setex(cache_key, 3600, json.dumps(data))

            return data

        finally:
            # 4. 释放锁(一定要释放!)
            redis_client.delete(lock_key)

    else:
        # 5. 获取锁失败,说明有其他请求在调用外部 API
        # 等待一会儿,然后从缓存读
        time.sleep(0.1)  # 等待 100ms
        cached = redis_client.get(cache_key)
        if cached:
            return json.loads(cached)

        # 缓存还是没有,重试(设置重试次数限制)
        return get_weather_with_mutex(city)

流程图解

请求 1          请求 2          请求 3
  ↓              ↓              ↓
查缓存 (无)      查缓存 (无)      查缓存 (无)
  ↓              ↓              ↓
抢锁 (成功)      抢锁 (失败)      抢锁 (失败)
  ↓              ↓              ↓
调用外部 API     等待...         等待...
  ↓              ↓              ↓
写缓存          等待...         等待...
  ↓              ↓              ↓
释放锁          重试→查缓存     重试→查缓存
                ↓              ↓
              命中!           命中!

优点

  • 保证只有一个请求调用外部 API
  • 完全解决击穿问题
  • 实现相对简单

缺点

  • 需要处理锁的获取和释放
  • 如果持锁请求失败,需要超时机制
  • 性能略有损耗(等待时间)

解决方案二:逻辑过期(高级)

方案二:逻辑过期
💾
写入缓存 数据 + 逻辑过期时间
{ "data": {...}, "expire_at": "2025-03-15T15:00:00" }
👤
用户请求到达 查询缓存数据
🔍 检查逻辑过期时间
未过期
📦 直接返回数据
响应快 (50ms)
已过期
📤 立即返回旧数据
用户无感知
🔄 后台异步更新
不阻塞用户请求
🔄 后台异步更新流程
1 启动异步线程
2 获取更新锁
3 查数据库
4 更新缓存
用户零等待 总是立即返回数据
⚠️
可能返回旧数据 短暂不一致窗口

核心思路

  • 数据永不过期(物理上)
  • 在数据内部记录逻辑过期时间
  • 发现过期后,异步更新缓存
import threading
import json
from datetime import datetime, timedelta

def get_weather_logical_expiry(city):
    cache_key = f"weather:{city}"

    # 1. 查缓存
    cached = redis_client.get(cache_key)

    if not cached:
        # 缓存完全没有数据,直接调用外部 API 并设置
        data = external_api.get_weather(city)
        cache_data = {
            'data': data,
            'expire_at': (datetime.now() + timedelta(hours=1)).isoformat()
        }
        redis_client.set(cache_key, json.dumps(cache_data))
        return data

    cache_data = json.loads(cached)

    # 2. 检查逻辑过期时间
    expire_at = datetime.fromisoformat(cache_data['expire_at'])
    is_expired = datetime.now() > expire_at

    if is_expired:
        # 3. 逻辑过期了,启动异步线程更新
        # 当前请求先返回旧数据
        threading.Thread(target=update_cache_async, args=(city, cache_key)).start()

        # 返回旧数据(用户无感知)
        return cache_data['data']

    # 4. 没有过期,正常返回
    return cache_data['data']


def update_cache_async(city, cache_key):
    """异步更新缓存的后台线程"""
    try:
        # 尝试获取更新锁(避免多个线程同时更新)
        lock_key = f"lock:update:{cache_key}"
        lock_acquired = redis_client.set(lock_key, '1', nx=True, ex=10)

        if lock_acquired:
            # 调用外部 API,更新缓存
            data = external_api.get_weather(city)
            cache_data = {
                'data': data,
                'expire_at': (datetime.now() + timedelta(hours=1)).isoformat()
            }
            redis_client.set(cache_key, json.dumps(cache_data))
    finally:
        pass  # 锁会自动过期,无需手动释放

时间线对比

传统过期方式:
T0: 写入缓存(1 小时后过期)
T1: 30 分钟后,用户访问 → 命中缓存 ✅
T2: 61 分钟后,缓存过期 ❌
T3: 用户访问 → 缓存未命中 → 调用外部 API

逻辑过期方式:
T0: 写入缓存(逻辑过期:1 小时)
T1: 30 分钟后,用户访问 → 命中缓存 ✅
T2: 61 分钟后,用户访问 → 命中缓存(但发现过期)
T3: 启动异步更新 → 返回旧数据(用户无感知)
T4: 后台完成更新 → 新数据就绪

优点

  • 用户永远能读到数据(不会等待)
  • 完全避免外部 API 压力
  • 适合对实时性要求不高的场景

缺点

  • 可能返回过期数据(一致性弱)
  • 实现复杂(需要异步线程)
  • 需要处理并发更新问题

方案对比

对比项互斥锁逻辑过期
实现难度⭐⭐ 中等⭐⭐⭐⭐ 复杂
数据一致性强一致弱一致(可能返回旧数据)
响应时间需要等待(~100ms)无需等待(立即返回)
外部 API 压力完全保护完全保护
适用场景大多数场景高并发、可接受短暂不一致

实战:完整的热点 key 防护

import redis
import threading
import json
import time
from functools import wraps
from datetime import datetime, timedelta

redis_client = redis.Redis(host='localhost', port=6379, db=0)
NULL_MARKER = '__NULL__'

class CacheBreakdownProtection:
    """缓存击穿防护工具类"""

    def __init__(self, default_ttl=3600, lock_timeout=10, use_logical_expiry=False):
        self.default_ttl = default_ttl
        self.lock_timeout = lock_timeout
        self.use_logical_expiry = use_logical_expiry

    def hot_key(self, key_prefix=''):
        """
        热点 key 防护装饰器
        key_prefix: 缓存 key 的前缀
        """
        def decorator(func):
            @wraps(func)
            def wrapper(*args, **kwargs):
                # 生成缓存 key
                cache_key = f"{key_prefix}:{func.__name__}:{args}:{kwargs}"

                if self.use_logical_expiry:
                    return self._logical_expiry_flow(func, cache_key, *args, **kwargs)
                else:
                    return self._mutex_flow(func, cache_key, *args, **kwargs)

            return wrapper
        return decorator

    def _mutex_flow(self, func, cache_key, *args, **kwargs):
        """互斥锁方案"""
        # 查缓存
        cached = redis_client.get(cache_key)
        if cached:
            return json.loads(cached)

        # 抢锁
        lock_key = f"lock:{cache_key}"
        lock_acquired = redis_client.set(lock_key, '1', nx=True, ex=self.lock_timeout)

        if lock_acquired:
            try:
                # 双重检查
                cached = redis_client.get(cache_key)
                if cached:
                    return json.loads(cached)

                # 调用外部 API
                result = func(*args, **kwargs)

                # 写缓存
                redis_client.setex(cache_key, self.default_ttl, json.dumps(result))

                return result
            finally:
                redis_client.delete(lock_key)
        else:
            # 等待重试
            time.sleep(0.05)
            return self._mutex_flow(func, cache_key, *args, **kwargs)

    def _logical_expiry_flow(self, func, cache_key, *args, **kwargs):
        """逻辑过期方案"""
        cached = redis_client.get(cache_key)

        if not cached:
            # 缓存没有数据,直接写入
            result = func(*args, **kwargs)
            cache_data = {
                'data': result,
                'expire_at': (datetime.now() + timedelta(seconds=self.default_ttl)).isoformat()
            }
            redis_client.set(cache_key, json.dumps(cache_data))
            return result

        cache_data = json.loads(cached)
        expire_at = datetime.fromisoformat(cache_data['expire_at'])

        if datetime.now() > expire_at:
            # 逻辑过期,异步更新
            threading.Thread(
                target=self._async_update,
                args=(func, cache_key, *args)
            ).start()
            return cache_data['data']

        return cache_data['data']

    def _async_update(self, func, cache_key, *args):
        """异步更新"""
        lock_key = f"lock:update:{cache_key}"
        lock_acquired = redis_client.set(lock_key, '1', nx=True, ex=30)

        if lock_acquired:
            try:
                result = func(*args)
                cache_data = {
                    'data': result,
                    'expire_at': (datetime.now() + timedelta(seconds=self.default_ttl)).isoformat()
                }
                redis_client.set(cache_key, json.dumps(cache_data))
            finally:
                pass

# 使用示例
cache_protection = CacheBreakdownProtection(default_ttl=3600)

@cache_protection.hot_key(key_prefix='weather')
def get_weather(city):
    """查询天气,自动获得热点 key 防护"""
    return external_api.get_weather(city)

效果验证

实施互斥锁方案后的系统指标:

防护效果对比

指标无防护互斥锁方案逻辑过期方案
外部 API 峰值调用量10000 次/秒100 次/秒50 次/秒
平均响应时间2000ms150ms50ms
最大响应时间8000ms300ms100ms
请求超时率60%0.1%0%
数据一致性强一致强一致弱一致

结论

  • 互斥锁方案:适合大多数场景,一致性好
  • 逻辑过期方案:适合超高并发、可接受短暂不一致的场景

深度思考

热点 key 的发现

如何提前发现哪些是热点 key?

# 方案 1:Redis 自带的热键分析
redis-cli --hotkeys

# 方案 2:应用层统计
from collections import Counter

request_counter = Counter()

def get_weather(city):
    request_counter[city] += 1
    # ...

# 定期统计,找出 TOP 10 热点
top_cities = request_counter.most_common(10)

# 方案 3:监控系统
# 通过 Prometheus + Grafana 监控缓存 key 的访问频率

锁的超时时间设置

超时太短(如 1 秒):
❌ 外部 API 调用可能还没完成,锁就过期了
❌ 其他请求会重复调用外部 API

超时太长(如 60 秒):
❌ 如果持锁请求崩溃,其他请求要等很久
❌ 影响系统可用性

推荐设置(5-15 秒):
✅ 足够完成外部 API 调用
✅ 崩溃影响可控
✅ 配合看门狗机制更佳

小结

缓存击穿的本质:热点 key 过期瞬间,大量并发请求涌向外部 API

防护核心思想

  1. 互斥锁 —— 只让一个请求调用外部 API,其他等待
  2. 逻辑过期 —— 数据永不过期,后台异步更新

最佳实践

  • 优先使用互斥锁方案(简单可靠)
  • 超高并发场景考虑逻辑过期
  • 合理设置锁超时时间(5-15 秒)
  • 配合监控系统,提前发现热点 key

当前技术架构

每个请求都直接调用外部 API,响应慢
客户端
用户请求 200 个开发者
应用服务
应用服务器 处理请求
缓存层 / 外部服务
外部天气 API 响应时间 2 秒
引入 Redis 缓存,大幅降低响应时间
客户端
用户请求 高并发访问
应用服务
应用服务器 处理请求
缓存层 / 外部服务
Redis 缓存 1 小时过期
外部天气 API 响应时间 2 秒
增加空值缓存,防止缓存穿透
客户端
用户请求 包含恶意请求
应用服务
应用服务器 参数校验
缓存层 / 外部服务
Redis 缓存 包含空值缓存
外部天气 API 有调用限制
使用布隆过滤器提前过滤无效请求
客户端
用户请求 包含恶意请求
应用服务
应用服务器 布隆过滤器校验
缓存层 / 外部服务
Redis 缓存 包含空值缓存
外部天气 API 有调用限制
使用互斥锁防止缓存击穿
客户端
用户请求 高并发访问
应用服务
应用服务器 互斥锁控制
缓存层 / 外部服务
Redis 缓存 热点 key 防护
外部天气 API 有调用限制
熔断器 + 降级策略应对缓存雪崩
客户端
用户请求 高并发访问
应用服务
应用服务器 熔断器 + 降级
缓存层 / 外部服务
Redis Sentinel 主从高可用
外部天气 API 有调用限制
多地域部署的高可用 API 平台
客户端
用户请求 10 万用户
应用服务
应用服务器集群 26 台,3 地域
缓存层 / 外部服务
Redis 集群 6 主 6 从
MySQL 主从 1 主 5 从
消息队列 RabbitMQ 3 台

课后练习

练习 1

缓存击穿和缓存穿透的区别是什么?

参考答案 (3 个标签)
缓存击穿 缓存穿透 概念辨析

答案

对比项缓存穿透缓存击穿
攻击目标不存在的数据存在的热点数据
key 数量大量不同的 key单个热点 key
数据存在性缓存和外部 API 都没有外部 API 有,缓存刚好过期
触发原因恶意攻击/非法请求热点 key 自然过期
并发特征持续的恶意请求高并发的正常请求

简单记忆

  • 穿透 = 数据不存在,请求穿透空对象
  • 击穿 = 热点 key 过期,请求击穿缓存层

练习 2

互斥锁方案中,为什么获取锁后要”双重检查”缓存?

参考答案 (3 个标签)
缓存击穿 互斥锁 双重检查

答案

双重检查的目的是避免重复调用外部 API

场景说明

时间线:
T1: 请求 A 获取锁成功
T2: 请求 B 获取锁失败,进入等待
T3: 请求 A 调用外部 API 并写入缓存,释放锁
T4: 请求 B 获得锁(因为 A 释放了)
T5: 请求 B 再次调用外部 API ← 这是浪费的!

双重检查代码

if lock_acquired:
    try:
        # 第一次检查:抢锁前
        cached = redis_client.get(cache_key)
        if cached:
            return json.loads(cached)

        # 调用外部 API
        data = external_api.get_weather(...)

        # 写缓存
        redis_client.setex(cache_key, 3600, json.dumps(data))

    finally:
        redis_client.delete(lock_key)

效果

  • 避免在等待锁期间,其他请求已经写入了数据
  • 减少不必要的外部 API 调用

练习 3

请实现一个带有缓存击穿防护的商品查询函数,使用互斥锁方案。

参考答案 (3 个标签)
缓存击穿 互斥锁 实战编程

参考答案

import redis
import json
import time

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def get_product_info(product_id):
    """
    查询商品信息,带缓存击穿防护(互斥锁方案)
    """
    cache_key = f"product:{product_id}"
    lock_key = f"lock:{cache_key}"

    # 1. 先查缓存
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)

    # 2. 尝试获取互斥锁(10 秒超时)
    lock_acquired = redis_client.set(lock_key, '1', nx=True, ex=10)

    if lock_acquired:
        try:
            # 3. 双重检查缓存
            cached = redis_client.get(cache_key)
            if cached:
                return json.loads(cached)

            # 4. 调用外部 API
            product = external_api.get_product(product_id)

            if not product:
                # 商品不存在,缓存空值(防穿透)
                redis_client.setex(cache_key, 60, '__NULL__')
                return None

            # 5. 写入缓存(1 小时过期)
            redis_client.setex(cache_key, 3600, json.dumps(product))

            return product

        finally:
            # 6. 释放锁(一定要释放!)
            redis_client.delete(lock_key)

    else:
        # 7. 获取锁失败,等待后重试
        time.sleep(0.1)  # 等待 100ms
        return get_product_info(product_id)  # 递归重试

# 使用示例
@app.route('/api/product/<int:product_id>')
def product_api(product_id):
    # 参数校验
    if product_id <= 0:
        return {'error': '无效的商品 ID'}, 400

    product = get_product_info(product_id)

    if not product:
        return {'error': '商品不存在'}, 404

    return {'data': product}

关键点

  1. 互斥锁保护外部 API 调用
  2. 双重检查避免重复调用
  3. 空值缓存防止穿透
  4. 等待重试机制处理锁竞争
  5. finally 块确保锁释放

搜索