Kafka
Kafka 延时方案
Kafka 本身不支持延时消息,但可以通过分区策略和消费者逻辑实现。
实现方案
分区策略
from kafka import KafkaProducer, KafkaConsumer
import json
class KafkaDelayQueue:
"""Kafka 延时队列"""
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode()
)
def add_task(self, task_id, task_data, delay_seconds):
"""添加任务"""
execute_at = int(time.time()) + delay_seconds
# ✅ 根据执行时间分区
# 每 10 秒一个分区
partition = (execute_at // 10) % 100
message = {
'task_id': task_id,
'task_data': task_data,
'execute_at': execute_at
}
self.producer.send(
'delay_queue',
value=message,
partition=partition
)
self.producer.flush()
logger.info(f"添加任务: {task_id}, 分区: {partition}")
def consume(self, handler):
"""消费任务"""
consumer = KafkaConsumer(
'delay_queue',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
group_id='delay_queue_consumers'
)
for message in consumer:
task = json.loads(message.value)
now = time.time()
if now >= task['execute_at']:
# ✅ 到期了,执行任务
handler(task)
else:
# ✅ 没到期,放回队列
remaining = task['execute_at'] - now
self.add_task(task['task_id'], task['task_data'], remaining)
# 使用示例
queue = KafkaDelayQueue()
queue.add_task('task_1', {'order_id': 12345}, 30 * 60)
def handler(task):
print(f"处理任务: {task}")
queue.consume(handler)优缺点
优点
- ✅ 高吞吐量
- ✅ 分布式
- ✅ 持久化保证
缺点
- ❌ 实现复杂
- ❌ 需要消费者逻辑处理延时
- ❌ 精度受限于轮询间隔