RabbitMQ

RabbitMQ 延时方案

RabbitMQ 本身不支持延时消息,但可以通过插件实现。


延时插件

安装插件

# 下载并安装延时插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez

# 复制到插件目录
cp rabbitmq_delayed_message_exchange-3.12.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.12.0/plugins/

# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

基本使用

import pika

class RabbitMQDelayQueue:
    """RabbitMQ 延时队列"""
    
    def __init__(self):
        connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = connection.channel()
        
        # ✅ 声明延时交换机
        self.channel.exchange_declare(
            exchange='delayed_exchange',
            exchange_type='x-delayed-message',
            arguments={'x-delayed-type': 'direct'}
        )
        
        # ✅ 声明队列
        self.channel.queue_declare(queue='delayed_queue')
        
        # ✅ 绑定队列
        self.channel.queue_bind(
            exchange='delayed_exchange',
            queue='delayed_queue',
            routing_key='delayed'
        )
    
    def add_task(self, task_data, delay_seconds):
        """添加延时任务"""
        # ✅ 设置延时时间(毫秒)
        headers = {'x-delay': delay_seconds * 1000}
        
        self.channel.basic_publish(
            exchange='delayed_exchange',
            routing_key='delayed',
            body=json.dumps(task_data),
            properties=pika.BasicProperties(
                headers=headers
            )
        )
        
        logger.info(f"添加任务,延时: {delay_seconds}s")
    
    def consume(self, handler):
        """消费任务"""
        def callback(ch, method, properties, body):
            try:
                task = json.loads(body)
                handler(task)
                ch.basic_ack(delivery_tag=method.delivery_tag)
            except Exception as e:
                logger.error(f"任务失败: {e}")
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
        
        self.channel.basic_consume(
            queue='delayed_queue',
            on_message_callback=callback
        )
        
        self.channel.start_consuming()


# 使用示例
queue = RabbitMQDelayQueue()

# 添加任务
queue.add_task(
    {'order_id': 12345},
    delay_seconds=30 * 60
)

# 消费任务
def handler(task):
    print(f"处理任务: {task}")

queue.consume(handler)

优缺点

优点

  • ✅ 原生分布式支持
  • ✅ 高可靠性(持久化、ACK)
  • ✅ 易于集成

缺点

  • ❌ 需要安装额外插件
  • ❌ 延时时间有限(受限于插件实现)
  • ❌ 运维复杂度高