限流设计

我决定不再裸奔

上次流量洪峰后,我躺在椅子上盯着天花板发呆。虽然最后扛过去了,但那种系统即将崩溃的无力感让我后怕。

更糟糕的是,第二天我又发现了新问题:

用户 A:用脚本每秒发 1000 次请求创建短链接
用户 B:疯狂刷某个短链接,每秒点击 500 次
用户 C:写了个爬虫,在 1 分钟内遍历了 10 万个短链接

虽然总流量不算大,但这些人像洪水一样涌入,正常用户的请求反而被挤掉了。我的系统像个没有红绿灯的十字路口,谁抢得凶谁先过。

我意识到:限流是保护系统的第一道防线。

我需要一个”红绿灯”🚦——让请求有序通过,超出的部分礼貌地拒绝。

我研究了一周的限流算法

限流的核心很简单:限制单位时间内的请求数量

正常流量:  ──────○──────○──────○──────○──────     ✅ 放行
              ↓     ↓     ↓     ↓     ↓
限流后:    ──────○──────×──────○──────×──────     ✅ 部分拒绝
              ↓           ↓           ↓
系统负载:  ▓▓▓▓▓░░░░░▓▓▓▓▓░░░░░▓▓▓▓▓             稳定可控

但怎么实现这个”限制”,我花了一周时间研究了三种经典算法。

固定窗口计数器:简单但有坑 🪟

最简单的方式——把时间切成固定大小的窗口,每个窗口内统计请求数。

import time

class FixedWindowRateLimiter:
    """固定窗口计数器限流"""

    def __init__(self, max_requests, window_seconds):
        self.max_requests = max_requests     # 窗口内最大请求数
        self.window_seconds = window_seconds  # 窗口大小(秒)
        self.counter = 0                      # 当前计数
        self.window_start = time.time()       # 窗口起始时间

    def allow_request(self):
        """判断是否允许请求"""
        now = time.time()

        # 是否进入新窗口?
        if now - self.window_start >= self.window_seconds:
            self.window_start = now
            self.counter = 0

        # 计数 +1,判断是否超限
        self.counter += 1
        if self.counter <= self.max_requests:
            return True
        else:
            return False


# 示例:每秒最多 100 个请求
limiter = FixedWindowRateLimiter(max_requests=100, window_seconds=1)

for i in range(105):
    if limiter.allow_request():
        print(f"请求 {i+1}: ✅ 放行")
    else:
        print(f"请求 {i+1}: ❌ 拒绝")

我一开始觉得这个方案挺好的,直到我在测试时发现了一个致命问题:边界突刺

时间轴:  |← 窗口1 →|← 窗口2 →|
请求量:  100        100
          ↑          ↑
实际流量: ...99|100 1|99|100...

              这里瞬间通过了 200 个请求!
              (窗口1末尾 100 + 窗口2开头 100)

在窗口边界处,实际流量可能达到限流值的 2 倍!这个缺陷让我无法接受。

滑动窗口:精确但消耗内存 📊

为了解决固定窗口的边界问题,我研究出了滑动窗口——让窗口”滑动”起来。

import time
from collections import deque

class SlidingWindowRateLimiter:
    """滑动窗口限流"""

    def __init__(self, max_requests, window_seconds):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = deque()  # 记录每个请求的时间戳

    def allow_request(self):
        """判断是否允许请求"""
        now = time.time()

        # 移除窗口外的旧请求
        while self.requests and self.requests[0] <= now - self.window_seconds:
            self.requests.popleft()

        # 检查窗口内的请求数
        if len(self.requests) < self.max_requests:
            self.requests.append(now)
            return True
        else:
            return False


# 示例:每秒最多 100 个请求
limiter = SlidingWindowRateLimiter(max_requests=100, window_seconds=1)

for i in range(105):
    if limiter.allow_request():
        print(f"请求 {i+1}: ✅ 放行")
    else:
        print(f"请求 {i+1}: ❌ 被限流")

滑动窗口解决了边界突刺问题,精确度很高:

固定窗口:  |████████|████████|████████|
                  ↑ 边界突刺

滑动窗口:  ...████████▓
            ╱ 窗口随时间平滑滑动
特性固定窗口滑动窗口
实现难度简单中等
内存消耗低(1 个计数器)中(存时间戳)
精确度低(有边界突刺)
适用场景要求不高的场景精确限流

但我发现一个问题:滑动窗口需要存储每个请求的时间戳,内存消耗会随着流量增长。对于高并发系统,这可能成为瓶颈。

令牌桶:我选择了它 🪣⭐

研究到最后,我发现了业界最常用的限流算法——令牌桶。

核心思想很简单:以固定速率往桶里放令牌,每个请求拿走一个令牌。

        令牌以固定速率放入
              ↓ ↓ ↓
         ┌─────────────┐
         │  🪣 令牌桶    │ ← 桶满了就不再放
         │ 🪙🪙🪙🪙🪙   │
         │ 🪙🪙🪙🪙    │ ← 最多装 max_tokens 个
         └──────┬──────┘

        请求来了拿一个令牌

         有令牌?✅ 放行
         没令牌?❌ 拒绝
import time

class TokenBucketRateLimiter:
    """令牌桶限流"""

    def __init__(self, rate, max_tokens):
        self.rate = rate               # 每秒放入令牌数
        self.max_tokens = max_tokens   # 桶容量
        self.tokens = max_tokens       # 当前令牌数(初始满桶)
        self.last_refill = time.time()

    def allow_request(self, tokens=1):
        """判断是否允许请求"""
        now = time.time()

        # 计算应该补充的令牌数
        elapsed = now - self.last_refill
        refill_tokens = elapsed * self.rate

        # 补充令牌(不超过桶容量)
        self.tokens = min(self.max_tokens, self.tokens + refill_tokens)
        self.last_refill = now

        # 尝试消费令牌
        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        else:
            return False


# 示例:每秒放 100 个令牌,桶容量 200
limiter = TokenBucketRateLimiter(rate=100, max_tokens=200)

# 突发 150 个请求
print("=== 突发 150 个请求 ===")
for i in range(150):
    allowed = limiter.allow_request()
    status = "✅" if allowed else "❌"
    if (i + 1) % 50 == 0:
        print(f"请求 {i+1}: {status}")

令牌桶最大的优点是允许突发流量 🔥:

正常情况:桶里有 200 个令牌
突发 180 个请求:全部通过 ✅(桶里还有 20 个)
随后恢复:以每秒 100 个的速率补充令牌

这对我的短链接服务特别友好——平时积累令牌,突发时可以一次性消耗。

漏桶:强制匀速 🚰

我也研究了漏桶算法——请求像水一样倒入桶中,以固定速率从底部漏出。

import time
from collections import deque

class LeakyBucketRateLimiter:
    """漏桶限流"""

    def __init__(self, rate, bucket_size):
        self.rate = rate              # 每秒处理的请求数
        self.bucket_size = bucket_size  # 桶容量
        self.bucket = deque()         # 请求队列
        self.last_leak = time.time()

    def allow_request(self):
        """判断是否允许请求"""
        now = time.time()

        # 漏出已处理的请求
        elapsed = now - self.last_leak
        leak_count = int(elapsed * self.rate)
        for _ in range(min(leak_count, len(self.bucket))):
            self.bucket.popleft()
        self.last_leak = now

        # 尝试加入桶中
        if len(self.bucket) < self.bucket_size:
            self.bucket.append(now)
            return True
        else:
            return False


# 示例:每秒处理 50 个请求,桶容量 100
limiter = LeakyBucketRateLimiter(rate=50, bucket_size=100)

但对于短链接跳转场景,漏桶的”强制匀速”特性太严格了。用户点击短链接时,我不希望他们感受到延迟。

四种算法的最终对比

算法突发流量实现难度内存消耗适用场景
固定窗口❌ 不支持⭐ 简单⭐ 低粗粒度限流
滑动窗口❌ 不支持⭐⭐ 中⭐⭐ 中精确限流
令牌桶✅ 允许突发⭐⭐ 中⭐ 低API 限流(推荐)
漏桶❌ 匀速处理⭐⭐ 中⭐⭐ 中流量整形

最终,我选择了令牌桶——既能限流,又能容忍合理的突发流量。

我实现了分布式限流

选定算法后,我开始实现。单机限流只能保护单个实例,在分布式系统中,我需要全局限流。

我用 Redis + Lua 脚本实现了一个高性能的令牌桶限流器:

import redis
import time

class RedisTokenBucket:
    """基于 Redis 的分布式令牌桶限流"""

    # Lua 脚本保证原子性
    LUA_SCRIPT = """
    local key = KEYS[1]
    local rate = tonumber(ARGV[1])
    local capacity = tonumber(ARGV[2])
    local requested = tonumber(ARGV[3])
    local now = tonumber(ARGV[4])

    local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
    local tokens = tonumber(bucket[1]) or capacity
    local last_refill = tonumber(bucket[2]) or now

    -- 计算补充的令牌数
    local elapsed = now - last_refill
    local refill_tokens = elapsed * rate

    -- 补充令牌(不超过容量)
    tokens = math.min(capacity, tokens + refill_tokens)

    -- 判断是否允许
    local allowed = 0
    if tokens >= requested then
        tokens = tokens - requested
        allowed = 1
    end

    -- 更新桶状态
    redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
    redis.call('EXPIRE', key, 3600)

    return {allowed, tokens}
    """

    def __init__(self, redis_client, key, rate, capacity):
        self.redis = redis_client
        self.key = f"rate_limit:{key}"
        self.rate = rate
        self.capacity = capacity
        self.script = self.redis.register_script(self.LUA_SCRIPT)

    def allow_request(self, tokens=1):
        """判断是否允许请求"""
        now = time.time()
        result = self.script(
            keys=[self.key],
            args=[self.rate, self.capacity, tokens, now]
        )
        return bool(result[0])


# 使用示例
r = redis.Redis(host='localhost', port=6379)
limiter = RedisTokenBucket(r, "api:shorten", rate=100, capacity=200)

if limiter.allow_request():
    print("✅ 请求通过")
else:
    print("❌ 被限流")

为什么用 Lua 脚本?因为在 Redis 中执行 Lua 脚本是原子性的,可以避免并发竞争问题。

我设计了分层限流策略

不同的场景需要不同的限流策略。我设计了四级限流:

┌──────────────────────────────────────────┐
│  全局限流:整个系统 100,000 QPS           │  ← 保护整体
├──────────────────────────────────────────┤
│  用户限流:每用户 100 QPS                │  ← 防止单用户滥用
├──────────────────────────────────────────┤
│  接口限流:创建接口 50 QPS/用户           │  ← 保护写操作
├──────────────────────────────────────────┤
│  IP 限流:每 IP 200 QPS                 │  ← 防止爬虫
└──────────────────────────────────────────┘

代码实现:

class MultiLevelRateLimiter:
    """多级限流"""

    def __init__(self, redis_client):
        self.redis = redis_client
        self.limiters = {
            'global': RedisTokenBucket(redis_client, "global", 100000, 150000),
            'user':   RedisTokenBucket(redis_client, "user", 100, 200),
            'api':    RedisTokenBucket(redis_client, "api:shorten", 50, 100),
            'ip':     RedisTokenBucket(redis_client, "ip", 200, 300),
        }

    def check(self, user_id, ip):
        """逐级检查限流"""
        checks = [
            ('global', 'system'),
            ('ip', ip),
            ('user', user_id),
        ]

        for level, key in checks:
            limiter = self.limiters[level]
            if not limiter.allow_request():
                return False, level

        return True, None


# 在 Flask 中使用
from flask import Flask, request, jsonify

app = Flask(__name__)
limiter = MultiLevelRateLimiter(redis_client)

@app.route('/shorten', methods=['POST'])
def create_short_link():
    # 限流检查
    allowed, level = limiter.check(
        user_id=get_current_user_id(),
        ip=request.remote_addr
    )

    if not allowed:
        # 🔑 返回 429 状态码 + Retry-After 头
        response = jsonify({
            "error": "Too Many Requests",
            "message": f"请求过于频繁,请稍后再试",
            "retry_after": 60
        })
        response.status_code = 429
        response.headers['Retry-After'] = '60'
        return response

    # 正常处理
    return handle_shorten()

HTTP 429 响应的最佳实践

当请求被限流后,我返回标准的 HTTP 429 响应:

HTTP/1.1 429 Too Many Requests
Content-Type: application/json
Retry-After: 60
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1672531200

{
  "error": "Too Many Requests",
  "message": "API rate limit exceeded",
  "retry_after": 60
}

这样客户端就知道什么时候可以重试,而不是盲目地不断请求。

限流保住了系统,但用户体验不好

部署了限流后,系统终于不再被恶意用户打垮了。但我发现了一个新问题:

正常用户在高峰期也被限流了。

短链接服务的流量特点是:突发性强、地域集中。比如某个营销活动在上午 10 点推送,大量用户同时点击,限流虽然保护了系统,但用户体验很差——很多人看到”请求过于频繁,请稍后再试”。

我在想,能不能让用户离得更近一些?把压力分散到边缘节点?

答案可能是 CDN

想一想

  1. 令牌桶和漏桶的核心区别是什么?为什么我选择了令牌桶?
  2. 如果 Redis 挂了,我的分布式限流会怎样?应该放行还是拒绝?
  3. 除了返回 429,还有没有更好的处理方式来提升用户体验?