导航菜单

Redis 缓存优化

MySQL 迁移完成后,你的 API 平台性能得到了提升。但随着用户量增长,某些热点查询开始成为瓶颈。

为什么需要缓存?

无缓存的请求流程:
用户请求 → API → MySQL → 磁盘读取 → 返回结果
         响应时间:~100ms

有缓存的请求流程:
用户请求 → API → Redis → 内存读取 → 返回结果
         响应时间:~1ms

性能对比:

存储类型读取延迟典型场景
CPU 缓存0.5 纳秒CPU 内部运算
内存100 纳秒Redis
SSD150 微秒数据库
机械硬盘10 毫秒冷数据存储

Redis 比 MySQL 快 100-1000 倍

1. 安装 Redis

macOS 安装

brew install redis
brew services start redis
redis-cli ping  # 输出:PONG

Docker 安装

docker run --name redis-cache \
  -p 6379:6379 \
  -d redis:7 \
  redis-server --appendonly yes

2. 连接 Redis

import redis

redis_client = redis.Redis(
    host='localhost',
    port=6379,
    password='your_password',
    decode_responses=True
)

# 测试连接
redis_client.ping()  # True

3. 缓存热点查询

场景:获取用户今日 API 调用量

from datetime import date

def get_user_daily_usage(user_id: int) -> int:
    """获取用户今日调用量"""
    cache_key = f'usage:{user_id}:{date.today().isoformat()}'

    # 1. 先查 Redis 缓存
    cached = redis_client.get(cache_key)
    if cached is not None:
        return int(cached)

    # 2. 缓存未命中,查 MySQL
    with get_db_cursor() as cursor:
        cursor.execute(
            '''SELECT COUNT(*) as count FROM api_logs
               WHERE user_id = %s AND DATE(created_at) = CURDATE()''',
            (user_id,)
        )
        result = cursor.fetchone()
        count = result['count'] if result else 0

    # 3. 存入 Redis,设置过期时间
    redis_client.setex(cache_key, 3600, str(count))  # 1 小时过期

    return count

优化效果:

  • 缓存命中:1ms
  • 缓存未命中:100ms(后续命中变快)

4. 缓存写入策略

Cache-Aside(旁路缓存)- 最常用

def get_user_profile(user_id: int) -> dict:
    cache_key = f'user:{user_id}:profile'

    # 读:先查缓存
    cached = redis_client.get(cache_key)
    if cached:
        return json.loads(cached)

    # 查 DB
    user = user_repo.get_by_id(user_id)
    if user:
        redis_client.setex(cache_key, 1800, json.dumps(user))
    return user


def update_user_profile(user_id: int, data: dict):
    # 更新:先更新 DB,再删除缓存
    user_repo.update(user_id, data)
    redis_client.delete(f'user:{user_id}:profile')

Write-Behind(回写)- 高并发计数器

def increment_api_usage(user_id: int):
    """只写缓存,异步写入 DB"""
    cache_key = f'usage:{user_id}:{date.today().isoformat()}'
    redis_client.incr(cache_key)
    redis_client.expire(cache_key, 86400)
    # DB 由后台任务定期更新

5. 常用缓存模式

计数器

def get_api_count(user_id: int) -> int:
    key = f'counter:{user_id}:{date.today().isoformat()}'
    return int(redis_client.get(key) or 0)

def increment_counter(user_id: int):
    key = f'counter:{user_id}:{date.today().isoformat()}'
    redis_client.incr(key)

分布式锁

def acquire_lock(lock_name: str, timeout: int = 10) -> bool:
    lock_key = f'lock:{lock_name}'
    return redis_client.set(lock_key, '1', nx=True, ex=timeout)

def release_lock(lock_name: str, lock_id: str):
    lock_key = f'lock:{lock_name}'
    if redis_client.get(lock_key) == lock_id:
        redis_client.delete(lock_key)

限流器

def is_rate_limited(user_id: int, max_requests: int = 1000) -> bool:
    key = f'ratelimit:{user_id}:{int(time.time() // 60)}'
    count = redis_client.incr(key)
    redis_client.expire(key, 60)
    return count > max_requests

会话存储

def create_session(user_id: int) -> str:
    session_id = str(uuid.uuid4())
    redis_client.setex(
        f'session:{session_id}',
        604800,  # 7 天
        json.dumps({'user_id': user_id})
    )
    return session_id

6. 缓存数据结构选择

场景推荐数据结构命令示例
简单缓存StringSET/GET
对象存储HashHSET/HGET/HGETALL
计数器StringINCR/DECR
排行榜ZSetZADD/ZRANK/ZRANGE
消息队列ListLPUSH/RPOP
标签/集合SetSADD/SMEMBERS

7. 缓存问题处理

缓存穿透

问题:查询不存在的数据,缓存和 DB 都没有。

解决:缓存空值

def get_user(user_id: int) -> dict:
    cache_key = f'user:{user_id}'
    cached = redis_client.get(cache_key)
    if cached == '__EMPTY__':
        return None
    if cached:
        return json.loads(cached)

    user = user_repo.get_by_id(user_id)
    if user:
        redis_client.setex(cache_key, 1800, json.dumps(user))
    else:
        redis_client.setex(cache_key, 300, '__EMPTY__')  # 缓存空值 5 分钟
    return user

缓存击穿

问题:热点 Key 过期瞬间,大量请求打到 DB。

解决:互斥锁

def get_hot_data(key: str) -> dict:
    cached = redis_client.get(key)
    if cached:
        return json.loads(cached)

    lock_key = f'lock:{key}'
    if redis_client.set(lock_key, '1', nx=True, ex=10):
        try:
            # 双重检查
            cached = redis_client.get(key)
            if cached:
                return json.loads(cached)
            data = db_query()
            redis_client.setex(key, 3600, json.dumps(data))
            return data
        finally:
            redis_client.delete(lock_key)
    else:
        time.sleep(0.1)
        return get_hot_data(key)

缓存雪崩

问题:大量 Key 同时过期。

解决:随机过期时间

def set_with_jitter(key: str, value: str, base_ttl: int):
    import random
    jitter = random.randint(-base_ttl // 5, base_ttl // 5)
    ttl = base_ttl + jitter
    redis_client.setex(key, ttl, value)

本节小结

✅ Redis 缓存模式:

模式适用场景性能提升
Cache-Aside读多写少10-100 倍
计数器API 限流、统计100-1000 倍
分布式锁并发控制保证一致性
会话存储用户登录无状态扩展

⚠️ 缓存注意事项:

  • 缓存穿透:查询不存在的数据
  • 缓存击穿:热点 Key 过期
  • 缓存雪崩:大量 Key 同时过期

🎯 下一步

引入缓存后,我们需要设计数据归档策略,控制数据库规模。

搜索