分布式限流
场景
负载均衡上线后,系统运行正常。
但过了一段时间,我发现了一个奇怪的问题:
某个免费用户的调用情况:
- 套餐限额:1 次/秒
- 实际调用:4 次/秒为什么限流没有生效?
问题分析
我检查了限流逻辑,发现:
当前限流实现
def is_rate_limited_token_bucket(user_id, rate, capacity):
"""令牌桶限流"""
cache_key = f'token_bucket:{user_id}'
# ... 省略令牌桶逻辑
# 使用 Redis 存储
redis_client.hset(cache_key, 'tokens', tokens)
redis_client.hset(cache_key, 'last_time', current_time)看起来没问题啊,Redis 是共享的。
深入调查
我查了一下日志,发现:
时间轴(User 123 的请求):
14:00:00.100 → Server 1: 允许(桶里有 10 个令牌)
14:00:00.200 → Server 2: 允许(桶里有 10 个令牌)
14:00:00.300 → Server 3: 允许(桶里有 10 个令牌)
14:00:00.400 → Server 4: 允许(桶里有 10 个令牌)问题找到了!
每台服务器都认为桶里有 10 个令牌,所以都放行了。
根本原因
问题:竞态条件
# Server 1 的执行流程
current_tokens = redis_client.hget(cache_key, 'tokens') # 读取:10
new_tokens = current_tokens - 1 # 计算:9
redis_client.hset(cache_key, 'tokens', new_tokens) # 写入:9
# Server 2 的执行流程(几乎同时)
current_tokens = redis_client.hget(cache_key, 'tokens') # 读取:10(因为 Server 1 还没写入)
new_tokens = current_tokens - 1 # 计算:9
redis_client.hset(cache_key, 'tokens', new_tokens) # 写入:9(覆盖了 Server 1 的写入)结果:本该消耗 4 个令牌,实际只消耗了 1 个!
为什么之前没问题?
单机时代:
- 所有请求都在同一台服务器
- Python 的 GIL 保证同一时间只有一个线程执行
- 不存在竞态条件
负载均衡后:
- 请求分配到不同的服务器
- 多台服务器同时读取和修改 Redis
- 存在竞态条件解决方案
需要使用Redis 原子操作。
方案 1:Redis 事务(WATCH + MULTI)
def is_rate_limited_with_transaction(user_id, rate, capacity):
"""使用 Redis 事务实现限流"""
current_time = time.time()
cache_key = f'token_bucket:{user_id}'
with redis_client.pipeline() as pipe:
while True:
try:
# 监视 key
pipe.watch(cache_key)
# 获取当前状态
current_data = pipe.hgetall(cache_key)
if not current_data:
# 首次使用
tokens = capacity
last_time = current_time
else:
tokens = float(current_data.get('tokens', capacity))
last_time = float(current_data.get('last_time', current_time))
# 计算新增令牌
time_passed = current_time - last_time
new_tokens = min(tokens + time_passed * rate, capacity)
# 检查是否有足够令牌
if new_tokens < 1:
pipe.unwatch()
return True # 被限流
# 开始事务
pipe.multi()
# 更新状态
pipe.hset(cache_key, 'tokens', new_tokens - 1)
pipe.hset(cache_key, 'last_time', current_time)
pipe.expire(cache_key, 3600)
# 执行事务
pipe.execute()
return False # 未被限流
except WatchError:
# key 被其他客户端修改,重试
continue问题:
- 性能较差(需要重试)
- 复杂度高
方案 2:Lua 脚本(推荐)
# Lua 脚本(原子执行)
rate_limit_lua = '''
local current_time = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local capacity = tonumber(ARGV[3])
local cache_key = KEYS[1]
-- 获取当前状态
local current_data = redis.call('HGETALL', cache_key)
if #current_data == 0 then
-- 首次使用
redis.call('HMSET', cache_key, 'tokens', capacity - 1, 'last_time', current_time)
redis.call('EXPIRE', cache_key, 3600)
return 0 -- 未被限流
end
-- 解析当前状态
local tokens = tonumber(current_data[2])
local last_time = tonumber(current_data[4])
-- 计算新增令牌
local time_passed = current_time - last_time
local new_tokens = math.min(tokens + time_passed * rate, capacity)
-- 检查是否有足够令牌
if new_tokens < 1 then
-- 更新状态(虽然被限流,但也要更新令牌数和时间)
redis.call('HMSET', cache_key, 'tokens', new_tokens, 'last_time', current_time)
redis.call('EXPIRE', cache_key, 3600)
return 1 -- 被限流
end
-- 消耗一个令牌
redis.call('HMSET', cache_key, 'tokens', new_tokens - 1, 'last_time', current_time)
redis.call('EXPIRE', cache_key, 3600)
return 0 -- 未被限流
'''
def is_rate_limited_with_lua(user_id, rate, capacity):
"""使用 Lua 脚本实现限流"""
current_time = time.time()
cache_key = f'token_bucket:{user_id}'
# 执行 Lua 脚本(原子操作)
result = redis_client.eval(
rate_limit_lua,
1, # key 数量
cache_key, # key
current_time, # ARGV[1]
rate, # ARGV[2]
capacity # ARGV[3]
)
return bool(result) # 1 表示被限流,0 表示未限流优势:
- 原子执行(Redis 保证)
- 性能好(不需要重试)
- 简单清晰
方案 3:Redis 模块(Redis-cell)
# 安装 redis-cell 模块
# https://github.com/brandur/redis-celldef is_rate_limited_with_cell(user_id, rate, capacity):
"""使用 redis-cell 模块实现限流"""
cache_key = f'token_bucket:{user_id}'
# CL.THROTTLE 命令
result = redis_client.execute_command(
'CL.THROTTLE',
cache_key, # key
capacity, # max burst
rate, # count per second
60, # period (seconds)
1 # quantity
)
# result[0] == 0 表示未被限流
return result[0] != 0优势:
- 专门为限流设计
- 功能强大
- 性能最好
劣势:
- 需要安装第三方模块
- 不是所有 Redis 环境都支持
最终方案
我选择了Lua 脚本方案:
# rate_limiter.py
import redis
import time
class RateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
# 加载 Lua 脚本
self.lua_script = self.redis.register_script(self._get_lua_script())
def _get_lua_script(self):
return '''
local current_time = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local capacity = tonumber(ARGV[3])
local cache_key = KEYS[1]
local current_data = redis.call('HGETALL', cache_key)
if #current_data == 0 then
redis.call('HMSET', cache_key, 'tokens', capacity - 1, 'last_time', current_time)
redis.call('EXPIRE', cache_key, 3600)
return {0, capacity - 1, capacity, current_time}
end
local tokens = tonumber(current_data[2])
local last_time = tonumber(current_data[4])
local time_passed = current_time - last_time
local new_tokens = math.min(tokens + time_passed * rate, capacity)
if new_tokens < 1 then
redis.call('HMSET', cache_key, 'tokens', new_tokens, 'last_time', current_time)
redis.call('EXPIRE', cache_key, 3600)
return {1, new_tokens, capacity, current_time}
end
redis.call('HMSET', cache_key, 'tokens', new_tokens - 1, 'last_time', current_time)
redis.call('EXPIRE', cache_key, 3600)
return {0, new_tokens - 1, capacity, current_time}
'''
def is_allowed(self, user_id, rate, capacity):
"""检查是否允许请求"""
current_time = time.time()
cache_key = f'token_bucket:{user_id}'
result = self.lua_script(
keys=[cache_key],
args=[current_time, rate, capacity]
)
return {
'allowed': result[0] == 0,
'remaining_tokens': result[1],
'capacity': result[2],
'last_update': result[3]
}
# 使用示例
rate_limiter = RateLimiter(redis_client)
@app.route('/api/weather')
@require_api_key
def get_weather():
user = request.user
# 检查限流
if user['plan'] == 'free':
rate = 1
capacity = 10
elif user['plan'] == 'basic':
rate = 10
capacity = 100
else:
rate = 100
capacity = 1000
result = rate_limiter.is_allowed(user['id'], rate, capacity)
if not result['allowed']:
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': 1
}), 429
# 设置响应头(显示剩余配额)
response = jsonify(weather_data)
response.headers['X-RateLimit-Remaining'] = int(result['remaining_tokens'])
response.headers['X-RateLimit-Limit'] = result['capacity']
return response效果验证
上线后,我再次测试:
测试场景
User 123(免费版,1 次/秒)
并发 4 个请求到不同服务器:
Server 1: 14:00:00.000
Server 2: 14:00:00.001
Server 3: 14:00:00.002
Server 4: 14:00:00.003结果
优化前(有竞态条件):
- Server 1: 允许
- Server 2: 允许
- Server 3: 允许
- Server 4: 允许
- 结果:4 个请求全部通过 ❌
优化后(Lua 脚本):
- Server 1: 允许(第 1 个)
- Server 2: 被限流
- Server 3: 被限流
- Server 4: 被限流
- 结果:只有 1 个请求通过 ✅监控和告警
def monitor_rate_limits():
"""监控限流情况"""
# 统计被限流最多的用户
blocked_users = redis_client.zrevrange(
'rate_limit_blocked',
0, 9,
withscores=True
)
print("\nTop 10 被限流用户:")
for user_id, score in blocked_users:
user = db.execute(
'SELECT email, plan FROM users WHERE id = ?',
(int(user_id),)
)
print(f"{user['email']} ({user['plan']}): {int(score)} 次")
# 检查限流异常
for user_id, score in blocked_users:
if score > 1000: # 1 小时内被限流超过 1000 次
send_alert(f"User {user_id} is being heavily rate limited: {int(score)} times")本节小结
✅ 完成的工作:
- 发现了分布式环境下的限流问题
- 分析了竞态条件的根本原因
- 使用 Lua 脚本实现原子限流
- 修复了分布式限流问题
✅ 效果:
- 限流在分布式环境下正常工作
- 防止了限流失效
- 保护了系统稳定性
⚠️ 新的思考:
- 系统越来越复杂了
- 单点故障风险增加
- 需要考虑容错和降级
🎯 下一步: 系统越来越复杂,我需要考虑商业化运营。
