Redis 缓存优化
MySQL 迁移完成后,你的 API 平台性能得到了提升。但随着用户量增长,某些热点查询开始成为瓶颈。
为什么需要缓存?
无缓存的请求流程:
用户请求 → API → MySQL → 磁盘读取 → 返回结果
响应时间:~100ms
有缓存的请求流程:
用户请求 → API → Redis → 内存读取 → 返回结果
响应时间:~1ms性能对比:
| 存储类型 | 读取延迟 | 典型场景 |
|---|---|---|
| CPU 缓存 | 0.5 纳秒 | CPU 内部运算 |
| 内存 | 100 纳秒 | Redis |
| SSD | 150 微秒 | 数据库 |
| 机械硬盘 | 10 毫秒 | 冷数据存储 |
Redis 比 MySQL 快 100-1000 倍!
1. 安装 Redis
macOS 安装
brew install redis
brew services start redis
redis-cli ping # 输出:PONGDocker 安装
docker run --name redis-cache \
-p 6379:6379 \
-d redis:7 \
redis-server --appendonly yes2. 连接 Redis
import redis
redis_client = redis.Redis(
host='localhost',
port=6379,
password='your_password',
decode_responses=True
)
# 测试连接
redis_client.ping() # True3. 缓存热点查询
场景:获取用户今日 API 调用量
from datetime import date
def get_user_daily_usage(user_id: int) -> int:
"""获取用户今日调用量"""
cache_key = f'usage:{user_id}:{date.today().isoformat()}'
# 1. 先查 Redis 缓存
cached = redis_client.get(cache_key)
if cached is not None:
return int(cached)
# 2. 缓存未命中,查 MySQL
with get_db_cursor() as cursor:
cursor.execute(
'''SELECT COUNT(*) as count FROM api_logs
WHERE user_id = %s AND DATE(created_at) = CURDATE()''',
(user_id,)
)
result = cursor.fetchone()
count = result['count'] if result else 0
# 3. 存入 Redis,设置过期时间
redis_client.setex(cache_key, 3600, str(count)) # 1 小时过期
return count优化效果:
- 缓存命中:1ms
- 缓存未命中:100ms(后续命中变快)
4. 缓存写入策略
Cache-Aside(旁路缓存)- 最常用
def get_user_profile(user_id: int) -> dict:
cache_key = f'user:{user_id}:profile'
# 读:先查缓存
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 查 DB
user = user_repo.get_by_id(user_id)
if user:
redis_client.setex(cache_key, 1800, json.dumps(user))
return user
def update_user_profile(user_id: int, data: dict):
# 更新:先更新 DB,再删除缓存
user_repo.update(user_id, data)
redis_client.delete(f'user:{user_id}:profile')Write-Behind(回写)- 高并发计数器
def increment_api_usage(user_id: int):
"""只写缓存,异步写入 DB"""
cache_key = f'usage:{user_id}:{date.today().isoformat()}'
redis_client.incr(cache_key)
redis_client.expire(cache_key, 86400)
# DB 由后台任务定期更新5. 常用缓存模式
计数器
def get_api_count(user_id: int) -> int:
key = f'counter:{user_id}:{date.today().isoformat()}'
return int(redis_client.get(key) or 0)
def increment_counter(user_id: int):
key = f'counter:{user_id}:{date.today().isoformat()}'
redis_client.incr(key)分布式锁
def acquire_lock(lock_name: str, timeout: int = 10) -> bool:
lock_key = f'lock:{lock_name}'
return redis_client.set(lock_key, '1', nx=True, ex=timeout)
def release_lock(lock_name: str, lock_id: str):
lock_key = f'lock:{lock_name}'
if redis_client.get(lock_key) == lock_id:
redis_client.delete(lock_key)限流器
def is_rate_limited(user_id: int, max_requests: int = 1000) -> bool:
key = f'ratelimit:{user_id}:{int(time.time() // 60)}'
count = redis_client.incr(key)
redis_client.expire(key, 60)
return count > max_requests会话存储
def create_session(user_id: int) -> str:
session_id = str(uuid.uuid4())
redis_client.setex(
f'session:{session_id}',
604800, # 7 天
json.dumps({'user_id': user_id})
)
return session_id6. 缓存数据结构选择
| 场景 | 推荐数据结构 | 命令示例 |
|---|---|---|
| 简单缓存 | String | SET/GET |
| 对象存储 | Hash | HSET/HGET/HGETALL |
| 计数器 | String | INCR/DECR |
| 排行榜 | ZSet | ZADD/ZRANK/ZRANGE |
| 消息队列 | List | LPUSH/RPOP |
| 标签/集合 | Set | SADD/SMEMBERS |
7. 缓存问题处理
缓存穿透
问题:查询不存在的数据,缓存和 DB 都没有。
解决:缓存空值
def get_user(user_id: int) -> dict:
cache_key = f'user:{user_id}'
cached = redis_client.get(cache_key)
if cached == '__EMPTY__':
return None
if cached:
return json.loads(cached)
user = user_repo.get_by_id(user_id)
if user:
redis_client.setex(cache_key, 1800, json.dumps(user))
else:
redis_client.setex(cache_key, 300, '__EMPTY__') # 缓存空值 5 分钟
return user缓存击穿
问题:热点 Key 过期瞬间,大量请求打到 DB。
解决:互斥锁
def get_hot_data(key: str) -> dict:
cached = redis_client.get(key)
if cached:
return json.loads(cached)
lock_key = f'lock:{key}'
if redis_client.set(lock_key, '1', nx=True, ex=10):
try:
# 双重检查
cached = redis_client.get(key)
if cached:
return json.loads(cached)
data = db_query()
redis_client.setex(key, 3600, json.dumps(data))
return data
finally:
redis_client.delete(lock_key)
else:
time.sleep(0.1)
return get_hot_data(key)缓存雪崩
问题:大量 Key 同时过期。
解决:随机过期时间
def set_with_jitter(key: str, value: str, base_ttl: int):
import random
jitter = random.randint(-base_ttl // 5, base_ttl // 5)
ttl = base_ttl + jitter
redis_client.setex(key, ttl, value)本节小结
✅ Redis 缓存模式:
| 模式 | 适用场景 | 性能提升 |
|---|---|---|
| Cache-Aside | 读多写少 | 10-100 倍 |
| 计数器 | API 限流、统计 | 100-1000 倍 |
| 分布式锁 | 并发控制 | 保证一致性 |
| 会话存储 | 用户登录 | 无状态扩展 |
⚠️ 缓存注意事项:
- 缓存穿透:查询不存在的数据
- 缓存击穿:热点 Key 过期
- 缓存雪崩:大量 Key 同时过期
🎯 下一步:
引入缓存后,我们需要设计数据归档策略,控制数据库规模。
