导航菜单

削峰填谷

场景

某个大型活动期间,流量暴增。

活动背景:
- 双 11 促销活动
- 大量电商客户使用 API
- 持续时间:24 小时

流量变化:
- 平时:10 万次/天
- 活动期间:100 万次/天
- 峰值:平时 QPS 的 20 倍

问题分析

流量曲线:

平时:
10 万次/天 ≈ 1.2 次/秒

活动期间:
100 万次/天 ≈ 11.5 次/秒
峰值时刻:200 次/秒(持续 10 分钟)

系统容量:
- 数据库:50 次/秒
- 应用服务器:100 次/秒

问题:
峰值流量超过系统容量

解决方案:消息队列

架构设计

同步处理(之前):
用户请求 → 应用服务器 → 数据库(阻塞)

异步处理(现在):
用户请求 → 应用服务器 → 消息队列 → 后端处理

     立即返回(接受)

实现

1. 消息队列选择

使用 RabbitMQ

为什么选择 RabbitMQ:
- 可靠性高(持久化、确认机制)
- 支持多种消息模式
- 社区活跃
- 有管理界面

2. 队列设计

import pika

class MessageQueue:
    """消息队列管理"""

    def __init__(self):
        # 连接 RabbitMQ
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()

        # 声明队列
        self.channel.queue_declare(
            queue='api_requests',
            durable=True  # 持久化
        )

    def publish(self, message):
        """发送消息到队列"""

        self.channel.basic_publish(
            exchange='',
            routing_key='api_requests',
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # 持久化消息
            )
        )

    def start_consumer(self, callback):
        """启动消费者"""

        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue='api_requests',
            on_message_callback=callback
        )

        self.channel.start_consuming()

mq = MessageQueue()

3. API 处理改为异步

@app.route('/api/news', methods=['POST'])
@require_api_key
def create_news_request():
    """创建新闻查询请求(异步)"""

    data = request.get_json()
    category = data.get('category')

    # 生成请求 ID
    request_id = generate_request_id()

    # 发送到消息队列(不阻塞)
    mq.publish({
        'request_id': request_id,
        'user_id': request.user['id'],
        'category': category,
        'timestamp': time.time()
    })

    # 立即返回
    return jsonify({
        'success': True,
        'request_id': request_id,
        'message': '请求已接受,请稍后查询结果',
        'status_url': f'/api/request_status/{request_id}'
    }), 202

# 后端消费者
def process_request(ch, method, properties, body):
    """处理请求(后台)"""

    try:
        data = json.loads(body)
        request_id = data['request_id']
        category = data['category']

        # 调用外部 API
        news_data = fetch_news_from_external(category)

        # 存储结果
        redis_client.setex(
            f'result:{request_id}',
            3600,
            json.dumps(news_data)
        )

        # 确认消息
        ch.basic_ack(delivery_tag=method.delivery_tag)

    except Exception as e:
        logging.error(f'Failed to process request: {e}')
        # 拒绝消息(重新入队)
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

# 启动消费者(独立进程)
def start_consumer_worker():
    """启动消费者工作进程"""

    while True:
        try:
            mq.start_consumer(process_request)
        except Exception as e:
            logging.error(f'Consumer error: {e}')
            time.sleep(5)  # 重连

# 启动多个工作进程
for i in range(10):  # 10 个消费者
    Process(target=start_consumer_worker).start()

4. 查询结果

@app.route('/api/request_status/<request_id>')
@require_api_key
def get_request_status(request_id):
    """查询请求状态"""

    # 查询结果
    result = redis_client.get(f'result:{request_id}')

    if result:
        return jsonify({
            'status': 'completed',
            'data': json.loads(result)
        })

    return jsonify({
        'status': 'processing',
        'message': '请求正在处理中'
    }), 202

队列管理

监控队列长度

def check_queue_health():
    """检查队列健康状态"""

    # 获取队列信息
    method = mq.channel.queue_declare(
        queue='api_requests',
        passive=True
    )

    queue_length = method.method.message_count

    # 队列长度告警
    if queue_length > 10000:
        send_alert(f'Queue backlog: {queue_length} messages')

    return {
        'queue_length': queue_length,
        'status': 'healthy' if queue_length < 10000 else 'warning'
    }

# 定时检查
scheduler.add_job(
    check_queue_health,
    'interval',
    seconds=60,
    id='check_queue_health'
)

效果验证

优化前(同步处理)

峰值流量:200 请求/秒
系统容量:100 请求/秒
结果:
- 100 个请求成功
- 100 个请求失败/超时
- 用户体验差

优化后(异步处理)

峰值流量:200 请求/秒
系统容量:
- API 接收:200 请求/秒(快速返回)
- 后端处理:100 请求/秒(持续处理)

结果:
- 所有请求都接受
- 队列缓冲:5000 条消息
- 逐步处理完成
- 用户体验好

本节小结

✅ 完成的工作:

  • 引入了 RabbitMQ 消息队列
  • 实现了异步处理模式
  • 实现了队列监控

✅ 效果:

  • 削峰填谷,平滑流量
  • 峰值流量不影响系统
  • 用户体验提升

⚠️ 新问题: 发现有人恶意刷接口

🎯 下一步:遭遇 DDoS 攻击,需要多层防护

搜索