导航菜单

分布式限流

场景

负载均衡上线后,系统运行正常。

但过了一段时间,我发现了一个奇怪的问题:

某个免费用户的调用情况:
- 套餐限额:1 次/秒
- 实际调用:4 次/秒

为什么限流没有生效?

问题分析

我检查了限流逻辑,发现:

当前限流实现

def is_rate_limited_token_bucket(user_id, rate, capacity):
    """令牌桶限流"""

    cache_key = f'token_bucket:{user_id}'

    # ... 省略令牌桶逻辑

    # 使用 Redis 存储
    redis_client.hset(cache_key, 'tokens', tokens)
    redis_client.hset(cache_key, 'last_time', current_time)

看起来没问题啊,Redis 是共享的。

深入调查

我查了一下日志,发现:

时间轴(User 123 的请求):

14:00:00.100 → Server 1: 允许(桶里有 10 个令牌)
14:00:00.200 → Server 2: 允许(桶里有 10 个令牌)
14:00:00.300 → Server 3: 允许(桶里有 10 个令牌)
14:00:00.400 → Server 4: 允许(桶里有 10 个令牌)

问题找到了!

每台服务器都认为桶里有 10 个令牌,所以都放行了。

根本原因

问题:竞态条件

# Server 1 的执行流程
current_tokens = redis_client.hget(cache_key, 'tokens')  # 读取:10
new_tokens = current_tokens - 1  # 计算:9
redis_client.hset(cache_key, 'tokens', new_tokens)  # 写入:9

# Server 2 的执行流程(几乎同时)
current_tokens = redis_client.hget(cache_key, 'tokens')  # 读取:10(因为 Server 1 还没写入)
new_tokens = current_tokens - 1  # 计算:9
redis_client.hset(cache_key, 'tokens', new_tokens)  # 写入:9(覆盖了 Server 1 的写入)

结果:本该消耗 4 个令牌,实际只消耗了 1 个!

为什么之前没问题?

单机时代:
- 所有请求都在同一台服务器
- Python 的 GIL 保证同一时间只有一个线程执行
- 不存在竞态条件

负载均衡后:
- 请求分配到不同的服务器
- 多台服务器同时读取和修改 Redis
- 存在竞态条件

解决方案

需要使用Redis 原子操作

方案 1:Redis 事务(WATCH + MULTI)

def is_rate_limited_with_transaction(user_id, rate, capacity):
    """使用 Redis 事务实现限流"""

    current_time = time.time()
    cache_key = f'token_bucket:{user_id}'

    with redis_client.pipeline() as pipe:
        while True:
            try:
                # 监视 key
                pipe.watch(cache_key)

                # 获取当前状态
                current_data = pipe.hgetall(cache_key)

                if not current_data:
                    # 首次使用
                    tokens = capacity
                    last_time = current_time
                else:
                    tokens = float(current_data.get('tokens', capacity))
                    last_time = float(current_data.get('last_time', current_time))

                # 计算新增令牌
                time_passed = current_time - last_time
                new_tokens = min(tokens + time_passed * rate, capacity)

                # 检查是否有足够令牌
                if new_tokens < 1:
                    pipe.unwatch()
                    return True  # 被限流

                # 开始事务
                pipe.multi()

                # 更新状态
                pipe.hset(cache_key, 'tokens', new_tokens - 1)
                pipe.hset(cache_key, 'last_time', current_time)
                pipe.expire(cache_key, 3600)

                # 执行事务
                pipe.execute()

                return False  # 未被限流

            except WatchError:
                # key 被其他客户端修改,重试
                continue

问题:

  • 性能较差(需要重试)
  • 复杂度高

方案 2:Lua 脚本(推荐)

# Lua 脚本(原子执行)
rate_limit_lua = '''
local current_time = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local capacity = tonumber(ARGV[3])
local cache_key = KEYS[1]

-- 获取当前状态
local current_data = redis.call('HGETALL', cache_key)

if #current_data == 0 then
    -- 首次使用
    redis.call('HMSET', cache_key, 'tokens', capacity - 1, 'last_time', current_time)
    redis.call('EXPIRE', cache_key, 3600)
    return 0  -- 未被限流
end

-- 解析当前状态
local tokens = tonumber(current_data[2])
local last_time = tonumber(current_data[4])

-- 计算新增令牌
local time_passed = current_time - last_time
local new_tokens = math.min(tokens + time_passed * rate, capacity)

-- 检查是否有足够令牌
if new_tokens < 1 then
    -- 更新状态(虽然被限流,但也要更新令牌数和时间)
    redis.call('HMSET', cache_key, 'tokens', new_tokens, 'last_time', current_time)
    redis.call('EXPIRE', cache_key, 3600)
    return 1  -- 被限流
end

-- 消耗一个令牌
redis.call('HMSET', cache_key, 'tokens', new_tokens - 1, 'last_time', current_time)
redis.call('EXPIRE', cache_key, 3600)
return 0  -- 未被限流
'''

def is_rate_limited_with_lua(user_id, rate, capacity):
    """使用 Lua 脚本实现限流"""

    current_time = time.time()
    cache_key = f'token_bucket:{user_id}'

    # 执行 Lua 脚本(原子操作)
    result = redis_client.eval(
        rate_limit_lua,
        1,  # key 数量
        cache_key,  # key
        current_time,  # ARGV[1]
        rate,  # ARGV[2]
        capacity  # ARGV[3]
    )

    return bool(result)  # 1 表示被限流,0 表示未限流

优势:

  • 原子执行(Redis 保证)
  • 性能好(不需要重试)
  • 简单清晰

方案 3:Redis 模块(Redis-cell)

# 安装 redis-cell 模块
# https://github.com/brandur/redis-cell
def is_rate_limited_with_cell(user_id, rate, capacity):
    """使用 redis-cell 模块实现限流"""

    cache_key = f'token_bucket:{user_id}'

    # CL.THROTTLE 命令
    result = redis_client.execute_command(
        'CL.THROTTLE',
        cache_key,      # key
        capacity,       # max burst
        rate,           # count per second
        60,             # period (seconds)
        1               # quantity
    )

    # result[0] == 0 表示未被限流
    return result[0] != 0

优势:

  • 专门为限流设计
  • 功能强大
  • 性能最好

劣势:

  • 需要安装第三方模块
  • 不是所有 Redis 环境都支持

最终方案

我选择了Lua 脚本方案:

# rate_limiter.py
import redis
import time

class RateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client

        # 加载 Lua 脚本
        self.lua_script = self.redis.register_script(self._get_lua_script())

    def _get_lua_script(self):
        return '''
local current_time = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local capacity = tonumber(ARGV[3])
local cache_key = KEYS[1]

local current_data = redis.call('HGETALL', cache_key)

if #current_data == 0 then
    redis.call('HMSET', cache_key, 'tokens', capacity - 1, 'last_time', current_time)
    redis.call('EXPIRE', cache_key, 3600)
    return {0, capacity - 1, capacity, current_time}
end

local tokens = tonumber(current_data[2])
local last_time = tonumber(current_data[4])

local time_passed = current_time - last_time
local new_tokens = math.min(tokens + time_passed * rate, capacity)

if new_tokens < 1 then
    redis.call('HMSET', cache_key, 'tokens', new_tokens, 'last_time', current_time)
    redis.call('EXPIRE', cache_key, 3600)
    return {1, new_tokens, capacity, current_time}
end

redis.call('HMSET', cache_key, 'tokens', new_tokens - 1, 'last_time', current_time)
redis.call('EXPIRE', cache_key, 3600)
return {0, new_tokens - 1, capacity, current_time}
'''

    def is_allowed(self, user_id, rate, capacity):
        """检查是否允许请求"""

        current_time = time.time()
        cache_key = f'token_bucket:{user_id}'

        result = self.lua_script(
            keys=[cache_key],
            args=[current_time, rate, capacity]
        )

        return {
            'allowed': result[0] == 0,
            'remaining_tokens': result[1],
            'capacity': result[2],
            'last_update': result[3]
        }

# 使用示例
rate_limiter = RateLimiter(redis_client)

@app.route('/api/weather')
@require_api_key
def get_weather():
    user = request.user

    # 检查限流
    if user['plan'] == 'free':
        rate = 1
        capacity = 10
    elif user['plan'] == 'basic':
        rate = 10
        capacity = 100
    else:
        rate = 100
        capacity = 1000

    result = rate_limiter.is_allowed(user['id'], rate, capacity)

    if not result['allowed']:
        return jsonify({
            'error': 'Rate limit exceeded',
            'retry_after': 1
        }), 429

    # 设置响应头(显示剩余配额)
    response = jsonify(weather_data)
    response.headers['X-RateLimit-Remaining'] = int(result['remaining_tokens'])
    response.headers['X-RateLimit-Limit'] = result['capacity']

    return response

效果验证

上线后,我再次测试:

测试场景

User 123(免费版,1 次/秒)

并发 4 个请求到不同服务器:
Server 1: 14:00:00.000
Server 2: 14:00:00.001
Server 3: 14:00:00.002
Server 4: 14:00:00.003

结果

优化前(有竞态条件):
- Server 1: 允许
- Server 2: 允许
- Server 3: 允许
- Server 4: 允许
- 结果:4 个请求全部通过 ❌

优化后(Lua 脚本):
- Server 1: 允许(第 1 个)
- Server 2: 被限流
- Server 3: 被限流
- Server 4: 被限流
- 结果:只有 1 个请求通过 ✅

监控和告警

def monitor_rate_limits():
    """监控限流情况"""

    # 统计被限流最多的用户
    blocked_users = redis_client.zrevrange(
        'rate_limit_blocked',
        0, 9,
        withscores=True
    )

    print("\nTop 10 被限流用户:")
    for user_id, score in blocked_users:
        user = db.execute(
            'SELECT email, plan FROM users WHERE id = ?',
            (int(user_id),)
        )

        print(f"{user['email']} ({user['plan']}): {int(score)} 次")

    # 检查限流异常
    for user_id, score in blocked_users:
        if score > 1000:  # 1 小时内被限流超过 1000 次
            send_alert(f"User {user_id} is being heavily rate limited: {int(score)} times")

本节小结

✅ 完成的工作:

  • 发现了分布式环境下的限流问题
  • 分析了竞态条件的根本原因
  • 使用 Lua 脚本实现原子限流
  • 修复了分布式限流问题

✅ 效果:

  • 限流在分布式环境下正常工作
  • 防止了限流失效
  • 保护了系统稳定性

⚠️ 新的思考:

  • 系统越来越复杂了
  • 单点故障风险增加
  • 需要考虑容错和降级

🎯 下一步: 系统越来越复杂,我需要考虑商业化运营。

搜索