限流设计
我决定不再裸奔
上次流量洪峰后,我躺在椅子上盯着天花板发呆。虽然最后扛过去了,但那种系统即将崩溃的无力感让我后怕。
更糟糕的是,第二天我又发现了新问题:
用户 A:用脚本每秒发 1000 次请求创建短链接
用户 B:疯狂刷某个短链接,每秒点击 500 次
用户 C:写了个爬虫,在 1 分钟内遍历了 10 万个短链接虽然总流量不算大,但这些人像洪水一样涌入,正常用户的请求反而被挤掉了。我的系统像个没有红绿灯的十字路口,谁抢得凶谁先过。
我意识到:限流是保护系统的第一道防线。
我需要一个”红绿灯”🚦——让请求有序通过,超出的部分礼貌地拒绝。
我研究了一周的限流算法
限流的核心很简单:限制单位时间内的请求数量。
正常流量: ──────○──────○──────○──────○────── ✅ 放行
↓ ↓ ↓ ↓ ↓
限流后: ──────○──────×──────○──────×────── ✅ 部分拒绝
↓ ↓ ↓
系统负载: ▓▓▓▓▓░░░░░▓▓▓▓▓░░░░░▓▓▓▓▓ 稳定可控但怎么实现这个”限制”,我花了一周时间研究了三种经典算法。
固定窗口计数器:简单但有坑 🪟
最简单的方式——把时间切成固定大小的窗口,每个窗口内统计请求数。
import time
class FixedWindowRateLimiter:
"""固定窗口计数器限流"""
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests # 窗口内最大请求数
self.window_seconds = window_seconds # 窗口大小(秒)
self.counter = 0 # 当前计数
self.window_start = time.time() # 窗口起始时间
def allow_request(self):
"""判断是否允许请求"""
now = time.time()
# 是否进入新窗口?
if now - self.window_start >= self.window_seconds:
self.window_start = now
self.counter = 0
# 计数 +1,判断是否超限
self.counter += 1
if self.counter <= self.max_requests:
return True
else:
return False
# 示例:每秒最多 100 个请求
limiter = FixedWindowRateLimiter(max_requests=100, window_seconds=1)
for i in range(105):
if limiter.allow_request():
print(f"请求 {i+1}: ✅ 放行")
else:
print(f"请求 {i+1}: ❌ 拒绝")我一开始觉得这个方案挺好的,直到我在测试时发现了一个致命问题:边界突刺。
时间轴: |← 窗口1 →|← 窗口2 →|
请求量: 100 100
↑ ↑
实际流量: ...99|100 1|99|100...
↑
这里瞬间通过了 200 个请求!
(窗口1末尾 100 + 窗口2开头 100)在窗口边界处,实际流量可能达到限流值的 2 倍!这个缺陷让我无法接受。
滑动窗口:精确但消耗内存 📊
为了解决固定窗口的边界问题,我研究出了滑动窗口——让窗口”滑动”起来。
import time
from collections import deque
class SlidingWindowRateLimiter:
"""滑动窗口限流"""
def __init__(self, max_requests, window_seconds):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque() # 记录每个请求的时间戳
def allow_request(self):
"""判断是否允许请求"""
now = time.time()
# 移除窗口外的旧请求
while self.requests and self.requests[0] <= now - self.window_seconds:
self.requests.popleft()
# 检查窗口内的请求数
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
else:
return False
# 示例:每秒最多 100 个请求
limiter = SlidingWindowRateLimiter(max_requests=100, window_seconds=1)
for i in range(105):
if limiter.allow_request():
print(f"请求 {i+1}: ✅ 放行")
else:
print(f"请求 {i+1}: ❌ 被限流")滑动窗口解决了边界突刺问题,精确度很高:
固定窗口: |████████|████████|████████|
↑ 边界突刺
滑动窗口: ...████████▓
╱ 窗口随时间平滑滑动| 特性 | 固定窗口 | 滑动窗口 |
|---|---|---|
| 实现难度 | 简单 | 中等 |
| 内存消耗 | 低(1 个计数器) | 中(存时间戳) |
| 精确度 | 低(有边界突刺) | 高 |
| 适用场景 | 要求不高的场景 | 精确限流 |
但我发现一个问题:滑动窗口需要存储每个请求的时间戳,内存消耗会随着流量增长。对于高并发系统,这可能成为瓶颈。
令牌桶:我选择了它 🪣⭐
研究到最后,我发现了业界最常用的限流算法——令牌桶。
核心思想很简单:以固定速率往桶里放令牌,每个请求拿走一个令牌。
令牌以固定速率放入
↓ ↓ ↓
┌─────────────┐
│ 🪣 令牌桶 │ ← 桶满了就不再放
│ 🪙🪙🪙🪙🪙 │
│ 🪙🪙🪙🪙 │ ← 最多装 max_tokens 个
└──────┬──────┘
│
请求来了拿一个令牌
↓
有令牌?✅ 放行
没令牌?❌ 拒绝import time
class TokenBucketRateLimiter:
"""令牌桶限流"""
def __init__(self, rate, max_tokens):
self.rate = rate # 每秒放入令牌数
self.max_tokens = max_tokens # 桶容量
self.tokens = max_tokens # 当前令牌数(初始满桶)
self.last_refill = time.time()
def allow_request(self, tokens=1):
"""判断是否允许请求"""
now = time.time()
# 计算应该补充的令牌数
elapsed = now - self.last_refill
refill_tokens = elapsed * self.rate
# 补充令牌(不超过桶容量)
self.tokens = min(self.max_tokens, self.tokens + refill_tokens)
self.last_refill = now
# 尝试消费令牌
if self.tokens >= tokens:
self.tokens -= tokens
return True
else:
return False
# 示例:每秒放 100 个令牌,桶容量 200
limiter = TokenBucketRateLimiter(rate=100, max_tokens=200)
# 突发 150 个请求
print("=== 突发 150 个请求 ===")
for i in range(150):
allowed = limiter.allow_request()
status = "✅" if allowed else "❌"
if (i + 1) % 50 == 0:
print(f"请求 {i+1}: {status}")令牌桶最大的优点是允许突发流量 🔥:
正常情况:桶里有 200 个令牌
突发 180 个请求:全部通过 ✅(桶里还有 20 个)
随后恢复:以每秒 100 个的速率补充令牌这对我的短链接服务特别友好——平时积累令牌,突发时可以一次性消耗。
漏桶:强制匀速 🚰
我也研究了漏桶算法——请求像水一样倒入桶中,以固定速率从底部漏出。
import time
from collections import deque
class LeakyBucketRateLimiter:
"""漏桶限流"""
def __init__(self, rate, bucket_size):
self.rate = rate # 每秒处理的请求数
self.bucket_size = bucket_size # 桶容量
self.bucket = deque() # 请求队列
self.last_leak = time.time()
def allow_request(self):
"""判断是否允许请求"""
now = time.time()
# 漏出已处理的请求
elapsed = now - self.last_leak
leak_count = int(elapsed * self.rate)
for _ in range(min(leak_count, len(self.bucket))):
self.bucket.popleft()
self.last_leak = now
# 尝试加入桶中
if len(self.bucket) < self.bucket_size:
self.bucket.append(now)
return True
else:
return False
# 示例:每秒处理 50 个请求,桶容量 100
limiter = LeakyBucketRateLimiter(rate=50, bucket_size=100)但对于短链接跳转场景,漏桶的”强制匀速”特性太严格了。用户点击短链接时,我不希望他们感受到延迟。
四种算法的最终对比
| 算法 | 突发流量 | 实现难度 | 内存消耗 | 适用场景 |
|---|---|---|---|---|
| 固定窗口 | ❌ 不支持 | ⭐ 简单 | ⭐ 低 | 粗粒度限流 |
| 滑动窗口 | ❌ 不支持 | ⭐⭐ 中 | ⭐⭐ 中 | 精确限流 |
| 令牌桶 | ✅ 允许突发 | ⭐⭐ 中 | ⭐ 低 | API 限流(推荐) |
| 漏桶 | ❌ 匀速处理 | ⭐⭐ 中 | ⭐⭐ 中 | 流量整形 |
最终,我选择了令牌桶——既能限流,又能容忍合理的突发流量。
我实现了分布式限流
选定算法后,我开始实现。单机限流只能保护单个实例,在分布式系统中,我需要全局限流。
我用 Redis + Lua 脚本实现了一个高性能的令牌桶限流器:
import redis
import time
class RedisTokenBucket:
"""基于 Redis 的分布式令牌桶限流"""
# Lua 脚本保证原子性
LUA_SCRIPT = """
local key = KEYS[1]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local requested = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now
-- 计算补充的令牌数
local elapsed = now - last_refill
local refill_tokens = elapsed * rate
-- 补充令牌(不超过容量)
tokens = math.min(capacity, tokens + refill_tokens)
-- 判断是否允许
local allowed = 0
if tokens >= requested then
tokens = tokens - requested
allowed = 1
end
-- 更新桶状态
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, 3600)
return {allowed, tokens}
"""
def __init__(self, redis_client, key, rate, capacity):
self.redis = redis_client
self.key = f"rate_limit:{key}"
self.rate = rate
self.capacity = capacity
self.script = self.redis.register_script(self.LUA_SCRIPT)
def allow_request(self, tokens=1):
"""判断是否允许请求"""
now = time.time()
result = self.script(
keys=[self.key],
args=[self.rate, self.capacity, tokens, now]
)
return bool(result[0])
# 使用示例
r = redis.Redis(host='localhost', port=6379)
limiter = RedisTokenBucket(r, "api:shorten", rate=100, capacity=200)
if limiter.allow_request():
print("✅ 请求通过")
else:
print("❌ 被限流")为什么用 Lua 脚本?因为在 Redis 中执行 Lua 脚本是原子性的,可以避免并发竞争问题。
我设计了分层限流策略
不同的场景需要不同的限流策略。我设计了四级限流:
┌──────────────────────────────────────────┐
│ 全局限流:整个系统 100,000 QPS │ ← 保护整体
├──────────────────────────────────────────┤
│ 用户限流:每用户 100 QPS │ ← 防止单用户滥用
├──────────────────────────────────────────┤
│ 接口限流:创建接口 50 QPS/用户 │ ← 保护写操作
├──────────────────────────────────────────┤
│ IP 限流:每 IP 200 QPS │ ← 防止爬虫
└──────────────────────────────────────────┘代码实现:
class MultiLevelRateLimiter:
"""多级限流"""
def __init__(self, redis_client):
self.redis = redis_client
self.limiters = {
'global': RedisTokenBucket(redis_client, "global", 100000, 150000),
'user': RedisTokenBucket(redis_client, "user", 100, 200),
'api': RedisTokenBucket(redis_client, "api:shorten", 50, 100),
'ip': RedisTokenBucket(redis_client, "ip", 200, 300),
}
def check(self, user_id, ip):
"""逐级检查限流"""
checks = [
('global', 'system'),
('ip', ip),
('user', user_id),
]
for level, key in checks:
limiter = self.limiters[level]
if not limiter.allow_request():
return False, level
return True, None
# 在 Flask 中使用
from flask import Flask, request, jsonify
app = Flask(__name__)
limiter = MultiLevelRateLimiter(redis_client)
@app.route('/shorten', methods=['POST'])
def create_short_link():
# 限流检查
allowed, level = limiter.check(
user_id=get_current_user_id(),
ip=request.remote_addr
)
if not allowed:
# 🔑 返回 429 状态码 + Retry-After 头
response = jsonify({
"error": "Too Many Requests",
"message": f"请求过于频繁,请稍后再试",
"retry_after": 60
})
response.status_code = 429
response.headers['Retry-After'] = '60'
return response
# 正常处理
return handle_shorten()HTTP 429 响应的最佳实践
当请求被限流后,我返回标准的 HTTP 429 响应:
HTTP/1.1 429 Too Many Requests
Content-Type: application/json
Retry-After: 60
X-RateLimit-Limit: 100
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1672531200
{
"error": "Too Many Requests",
"message": "API rate limit exceeded",
"retry_after": 60
}这样客户端就知道什么时候可以重试,而不是盲目地不断请求。
限流保住了系统,但用户体验不好
部署了限流后,系统终于不再被恶意用户打垮了。但我发现了一个新问题:
正常用户在高峰期也被限流了。
短链接服务的流量特点是:突发性强、地域集中。比如某个营销活动在上午 10 点推送,大量用户同时点击,限流虽然保护了系统,但用户体验很差——很多人看到”请求过于频繁,请稍后再试”。
我在想,能不能让用户离得更近一些?把压力分散到边缘节点?
答案可能是 CDN。
想一想
- 令牌桶和漏桶的核心区别是什么?为什么我选择了令牌桶?
- 如果 Redis 挂了,我的分布式限流会怎样?应该放行还是拒绝?
- 除了返回 429,还有没有更好的处理方式来提升用户体验?