Kafka

Kafka 延时方案

Kafka 本身不支持延时消息,但可以通过分区策略和消费者逻辑实现。


实现方案

分区策略

from kafka import KafkaProducer, KafkaConsumer
import json

class KafkaDelayQueue:
    """Kafka 延时队列"""
    
    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda v: json.dumps(v).encode()
        )
    
    def add_task(self, task_id, task_data, delay_seconds):
        """添加任务"""
        execute_at = int(time.time()) + delay_seconds
        
        # ✅ 根据执行时间分区
        # 每 10 秒一个分区
        partition = (execute_at // 10) % 100
        
        message = {
            'task_id': task_id,
            'task_data': task_data,
            'execute_at': execute_at
        }
        
        self.producer.send(
            'delay_queue',
            value=message,
            partition=partition
        )
        
        self.producer.flush()
        
        logger.info(f"添加任务: {task_id}, 分区: {partition}")
    
    def consume(self, handler):
        """消费任务"""
        consumer = KafkaConsumer(
            'delay_queue',
            bootstrap_servers=['localhost:9092'],
            auto_offset_reset='earliest',
            group_id='delay_queue_consumers'
        )
        
        for message in consumer:
            task = json.loads(message.value)
            now = time.time()
            
            if now >= task['execute_at']:
                # ✅ 到期了,执行任务
                handler(task)
            else:
                # ✅ 没到期,放回队列
                remaining = task['execute_at'] - now
                self.add_task(task['task_id'], task['task_data'], remaining)


# 使用示例
queue = KafkaDelayQueue()

queue.add_task('task_1', {'order_id': 12345}, 30 * 60)

def handler(task):
    print(f"处理任务: {task}")

queue.consume(handler)

优缺点

优点

  • ✅ 高吞吐量
  • ✅ 分布式
  • ✅ 持久化保证

缺点

  • ❌ 实现复杂
  • ❌ 需要消费者逻辑处理延时
  • ❌ 精度受限于轮询间隔