削峰填谷
场景
某个大型活动期间,流量暴增。
活动背景:
- 双 11 促销活动
- 大量电商客户使用 API
- 持续时间:24 小时
流量变化:
- 平时:10 万次/天
- 活动期间:100 万次/天
- 峰值:平时 QPS 的 20 倍问题分析
流量曲线:
平时:
10 万次/天 ≈ 1.2 次/秒
活动期间:
100 万次/天 ≈ 11.5 次/秒
峰值时刻:200 次/秒(持续 10 分钟)
系统容量:
- 数据库:50 次/秒
- 应用服务器:100 次/秒
问题:
峰值流量超过系统容量解决方案:消息队列
架构设计
同步处理(之前):
用户请求 → 应用服务器 → 数据库(阻塞)
异步处理(现在):
用户请求 → 应用服务器 → 消息队列 → 后端处理
↓
立即返回(接受)实现
1. 消息队列选择
使用 RabbitMQ:
为什么选择 RabbitMQ:
- 可靠性高(持久化、确认机制)
- 支持多种消息模式
- 社区活跃
- 有管理界面2. 队列设计
import pika
class MessageQueue:
"""消息队列管理"""
def __init__(self):
# 连接 RabbitMQ
self.connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = self.connection.channel()
# 声明队列
self.channel.queue_declare(
queue='api_requests',
durable=True # 持久化
)
def publish(self, message):
"""发送消息到队列"""
self.channel.basic_publish(
exchange='',
routing_key='api_requests',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # 持久化消息
)
)
def start_consumer(self, callback):
"""启动消费者"""
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue='api_requests',
on_message_callback=callback
)
self.channel.start_consuming()
mq = MessageQueue()3. API 处理改为异步
@app.route('/api/news', methods=['POST'])
@require_api_key
def create_news_request():
"""创建新闻查询请求(异步)"""
data = request.get_json()
category = data.get('category')
# 生成请求 ID
request_id = generate_request_id()
# 发送到消息队列(不阻塞)
mq.publish({
'request_id': request_id,
'user_id': request.user['id'],
'category': category,
'timestamp': time.time()
})
# 立即返回
return jsonify({
'success': True,
'request_id': request_id,
'message': '请求已接受,请稍后查询结果',
'status_url': f'/api/request_status/{request_id}'
}), 202
# 后端消费者
def process_request(ch, method, properties, body):
"""处理请求(后台)"""
try:
data = json.loads(body)
request_id = data['request_id']
category = data['category']
# 调用外部 API
news_data = fetch_news_from_external(category)
# 存储结果
redis_client.setex(
f'result:{request_id}',
3600,
json.dumps(news_data)
)
# 确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logging.error(f'Failed to process request: {e}')
# 拒绝消息(重新入队)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# 启动消费者(独立进程)
def start_consumer_worker():
"""启动消费者工作进程"""
while True:
try:
mq.start_consumer(process_request)
except Exception as e:
logging.error(f'Consumer error: {e}')
time.sleep(5) # 重连
# 启动多个工作进程
for i in range(10): # 10 个消费者
Process(target=start_consumer_worker).start()4. 查询结果
@app.route('/api/request_status/<request_id>')
@require_api_key
def get_request_status(request_id):
"""查询请求状态"""
# 查询结果
result = redis_client.get(f'result:{request_id}')
if result:
return jsonify({
'status': 'completed',
'data': json.loads(result)
})
return jsonify({
'status': 'processing',
'message': '请求正在处理中'
}), 202队列管理
监控队列长度
def check_queue_health():
"""检查队列健康状态"""
# 获取队列信息
method = mq.channel.queue_declare(
queue='api_requests',
passive=True
)
queue_length = method.method.message_count
# 队列长度告警
if queue_length > 10000:
send_alert(f'Queue backlog: {queue_length} messages')
return {
'queue_length': queue_length,
'status': 'healthy' if queue_length < 10000 else 'warning'
}
# 定时检查
scheduler.add_job(
check_queue_health,
'interval',
seconds=60,
id='check_queue_health'
)效果验证
优化前(同步处理)
峰值流量:200 请求/秒
系统容量:100 请求/秒
结果:
- 100 个请求成功
- 100 个请求失败/超时
- 用户体验差优化后(异步处理)
峰值流量:200 请求/秒
系统容量:
- API 接收:200 请求/秒(快速返回)
- 后端处理:100 请求/秒(持续处理)
结果:
- 所有请求都接受
- 队列缓冲:5000 条消息
- 逐步处理完成
- 用户体验好本节小结
✅ 完成的工作:
- 引入了 RabbitMQ 消息队列
- 实现了异步处理模式
- 实现了队列监控
✅ 效果:
- 削峰填谷,平滑流量
- 峰值流量不影响系统
- 用户体验提升
⚠️ 新问题: 发现有人恶意刷接口
🎯 下一步:遭遇 DDoS 攻击,需要多层防护
