RabbitMQ
RabbitMQ 延时方案
RabbitMQ 本身不支持延时消息,但可以通过插件实现。
延时插件
安装插件
# 下载并安装延时插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez
# 复制到插件目录
cp rabbitmq_delayed_message_exchange-3.12.0.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.12.0/plugins/
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange基本使用
import pika
class RabbitMQDelayQueue:
"""RabbitMQ 延时队列"""
def __init__(self):
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
self.channel = connection.channel()
# ✅ 声明延时交换机
self.channel.exchange_declare(
exchange='delayed_exchange',
exchange_type='x-delayed-message',
arguments={'x-delayed-type': 'direct'}
)
# ✅ 声明队列
self.channel.queue_declare(queue='delayed_queue')
# ✅ 绑定队列
self.channel.queue_bind(
exchange='delayed_exchange',
queue='delayed_queue',
routing_key='delayed'
)
def add_task(self, task_data, delay_seconds):
"""添加延时任务"""
# ✅ 设置延时时间(毫秒)
headers = {'x-delay': delay_seconds * 1000}
self.channel.basic_publish(
exchange='delayed_exchange',
routing_key='delayed',
body=json.dumps(task_data),
properties=pika.BasicProperties(
headers=headers
)
)
logger.info(f"添加任务,延时: {delay_seconds}s")
def consume(self, handler):
"""消费任务"""
def callback(ch, method, properties, body):
try:
task = json.loads(body)
handler(task)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(f"任务失败: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
self.channel.basic_consume(
queue='delayed_queue',
on_message_callback=callback
)
self.channel.start_consuming()
# 使用示例
queue = RabbitMQDelayQueue()
# 添加任务
queue.add_task(
{'order_id': 12345},
delay_seconds=30 * 60
)
# 消费任务
def handler(task):
print(f"处理任务: {task}")
queue.consume(handler)优缺点
优点
- ✅ 原生分布式支持
- ✅ 高可靠性(持久化、ACK)
- ✅ 易于集成
缺点
- ❌ 需要安装额外插件
- ❌ 延时时间有限(受限于插件实现)
- ❌ 运维复杂度高