技术方案

凌晨 3 点的 GitHub 搜寻

那天晚上处理完订单积压问题,我睡不着。我打开 GitHub,搜索 “delayed queue”(延时队列)。

搜索结果让我眼花缭乱:

  • Python delay-queue - 基于 Redis 的实现,1.2k stars
  • Java DelayQueue - JDK 内置的延时队列
  • Redisson RDelayedQueue - Redisson 的延时队列实现
  • RabbitMQ delayed-message-plugin - RabbitMQ 的延时插件
  • Apache Kafka - 支持延时的消息队列
  • Kafka TimeWheel - Kafka 内部的时间轮实现

我意识到:这不是一个简单的问题,而是有成熟的工业级解决方案。


业界主流方案

经过一周的研究,我发现延时队列的实现方案可以分成几大类:

延时队列方案分类
├─ 1. 数据库轮询
│   ├─ 简单轮询
│   ├─ 索引优化
│   └─ 批量处理

├─ 2. 内存数据结构
│   ├─ 堆(优先队列)
│   ├─ 时间轮(单层、多层)
│   └─ 分桶(时间分片)

├─ 3. Redis
│   ├─ ZSet(有序集合)
│   ├─ Key 过期通知
│   └─ List + 定时扫描

├─ 4. 消息队列
│   ├─ RabbitMQ 延时插件
│   ├─ RocketMQ 延时消息
│   └─ Kafka 分区策略

└─ 5. 混合方案
    ├─ 分层存储
    ├─ 分级调度
    └─ 热冷数据分离

方案对比总览

让我用一个表格来对比这些方案:

方案核心思想时间复杂度精度适用规模优点缺点
数据库轮询定时扫描数据库O(n) 扫描
O(log n) 查询
分钟级小规模
< 10 万/天
简单可靠
持久化保证
精度差
数据库压力大
最小堆优先队列O(log n) 插入
O(log n) 取出
毫秒级单机
< 100 万
精度高
实现简单
内存有限
无持久化
时间轮时间分桶O(1) 插入
O(1) 取出
毫秒级单机
短延时
性能最优
精度高
实现复杂
长延时难支持
Redis ZSet有序集合O(log n) 插入
O(log n) 取出
毫秒级中规模
< 1000 万
高性能
易扩展
成本高
依赖 Redis
RabbitMQ延时插件O(1) 生产
O(1) 消费
毫秒级大规模
分布式
分布式
高可用
延时有限
依赖 MQ
RocketMQ内置延时O(1) 生产
O(1) 消费
毫秒级大规模
分布式
高吞吐
分布式
延时有限
复杂
混合方案分层存储O(log n) 热数据
O(n) 冷数据
分级超大规模
任意延时
灵活性高
成本低
架构复杂
运维难

核心技术点分析

1. 时间复杂度

为什么时间复杂度这么重要?

假设每天有 100 万任务,平均延时 1 小时:

操作100 万任务的耗时估算
数组查找最小值O(n) = 1,000,000 次比较
最小堆取最小值O(log n) ≈ 20 次比较
时间轮取到期任务O(1) = 直接取桶内任务

差距是 50,000 倍!

这就是为什么大规模系统必须使用高效的算法。

2. 精度要求

不同的业务场景对精度的要求不同:

精度要求分类
├─ 高精度(秒级以内)
│  ├─ 秒杀倒计时(差 1 秒就错过)
│  └─ 实时通知(差几秒用户体验差)

├─ 中精度(分钟级)
│  ├─ 订单超时取消(差 1 分钟可接受)
│  └─ 优惠券过期(差 5 分钟问题不大)

└─ 低精度(小时级)
   ├─ 定时报表(差 1 小时无所谓)
   └─ 数据归档(差 1 天也没问题)

选型原则:精度越高,成本越大。根据业务需求选择合适的精度。

3. 持久化

任务丢失是致命问题,需要考虑:

方案持久化方式可靠性恢复速度
数据库事务 + 日志⭐⭐⭐⭐⭐
Redis RDB定期快照⭐⭐⭐
Redis AOF追加日志⭐⭐⭐⭐
消息队列持久化队列⭐⭐⭐⭐
内存堆无法恢复

方案选型决策树

我整理了一个决策树,帮助你选择合适的方案:

是否需要分布式支持?

├─ 否 → 单机方案
│   │
│   ├─ 任务量 < 10 万?
│   │   └─ 是 → 数据库轮询(最简单)
│   │
│   └─ 任务量 ≥ 10 万?
│       │
│       ├─ 都是短延时(< 1 小时)?
│       │   └─ 是 → 时间轮(性能最优)
│       │
│       └─ 有长延时任务?
│           └─ 是 → 最小堆 + 持久化

└─ 是 → 分布式方案

    ├─ 已在使用消息队列?
    │   ├─ RabbitMQ → 使用延时插件
    │   ├─ RocketMQ → 使用内置延时
    │   └─ Kafka → 分区策略 + 延时队列

    └─ 没用 MQ 或 MQ 不支持延时?

        ├─ 任务量 < 1000 万/天?
        │   └─ 是 → Redis ZSet

        └─ 任务量 ≥ 1000 万/天?

            ├─ 延时时间多样化?
            │   └─ 是 → 分层方案(热温冷)

            └─ 延时时间集中?
                └─ 是 → 分布式时间轮

典型场景推荐

场景 1:初创公司电商系统

需求

  • 日订单量:10 万单
  • 主要场景:订单超时取消(30 分钟)
  • 团队规模:3-5 人

推荐方案Redis ZSet

# 简单的实现
import redis

delay_queue = redis.Redis()

# 添加任务
delay_queue.zadd(
    'delay_orders',
    {'order_12345': int(time.time()) + 30*60}
)

# 定时扫描
while True:
    now = int(time.time())
    tasks = delay_queue.zrangebyscore('delay_orders', 0, now, start=0, num=100)
    for task in tasks:
        cancel_order(task)
        delay_queue.zrem('delay_orders', task)
    time.sleep(1)

理由

  • ✅ 实现简单,30 行代码搞定
  • ✅ 性能足够,支持 10 万/天
  • ✅ 精度高,秒级
  • ✅ 易扩展,多消费者并行

场景 2:中型公司秒杀系统

需求

  • 秒杀高峰:1 万 QPS
  • 场景:倒计时(精确到秒)
  • 延时时间:固定 10 秒

推荐方案时间轮

from collections import defaultdict

class TimingWheel:
    def __init__(self, wheel_size=10):
        self.wheel = [[] for _ in range(wheel_size)]
        self.current_slot = 0
    
    def add(self, task, delay_seconds):
        slot = (self.current_slot + delay_seconds) % len(self.wheel)
        self.wheel[slot].append(task)
    
    def tick(self):
        tasks = self.wheel[self.current_slot]
        self.wheel[self.current_slot] = []
        self.current_slot = (self.current_slot + 1) % len(self.wheel)
        return tasks

理由

  • ✅ O(1) 插入和取出,性能最优
  • ✅ 精度可控,精确到秒
  • ✅ 内存占用固定,与任务数无关
  • ✅ 适合固定间隔的延时任务

场景 3:大型金融系统

需求

  • 日任务量:1000 万+
  • 场景:风控延时验证(5-30 分钟)
  • 要求:绝对可靠,任务不能丢

推荐方案分层架构

┌────────────────────────────────────┐
│       分层延时队列架构              │
├────────────────────────────────────┤
│                                    │
│  热数据层(Redis ZSet)             │
│  - 30 分钟内的高精度任务            │
│  - 精度:秒级                      │
│  - 持久化:AOF                     │
│                                    │
│  温数据层(数据库轮询)             │
│  - 30 分钟~7 天的中等延时任务       │
│  - 精度:分钟级                    │
│  - 持久化:事务 + 日志             │
│                                    │
│  冷数据层(定时调度)               │
│  - >7 天的长延时任务                │
│  - 精度:小时级                    │
│  - 持久化:数据库                  │
│                                    │
└────────────────────────────────────┘

理由

  • ✅ 灵活性高,支持任意延时
  • ✅ 成本可控,只有短延时用内存
  • ✅ 可靠性高,所有层都持久化
  • ✅ 可扩展,每层独立扩容

想一想

思考 1

如果你的系统只有 1 万个延时任务/天,精度要求不高(分钟级),你会选择什么方案?为什么?

参考答案

推荐方案:数据库轮询

理由:

  1. 简单可靠 - 不需要引入额外组件(Redis、MQ),减少运维复杂度
  2. 成本最低 - 数据库已经有了,不需要额外的内存服务器
  3. 精度足够 - 分钟级精度,每分钟轮询一次即可
  4. 数据一致性 - 任务和业务数据在同一数据库,事务保证一致性

实现示例:

# 每分钟执行一次的定时任务
def check_delayed_tasks():
    now = datetime.now()
    
    # 查询到期任务(使用索引)
    tasks = DelayTask.objects.filter(
        execute_at__lte=now,
        status='pending'
    )[:100]  # 一次处理 100 个
    
    for task in tasks:
        try:
            execute_task(task)
            task.status = 'completed'
            task.save()
        except Exception as e:
            task.retry_count += 1
            if task.retry_count >= 3:
                task.status = 'failed'
            task.save()

# 创建任务
def create_delayed_task(task_type, task_data, delay_minutes):
    DelayTask.objects.create(
        task_type=task_type,
        task_data=task_data,
        execute_at=datetime.now() + timedelta(minutes=delay_minutes),
        status='pending'
    )

# 定时调度
from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()
scheduler.add_job(check_delayed_tasks, 'interval', minutes=1)
scheduler.start()

关键优化点:

  1. 使用索引execute_atstatus 建立联合索引
  2. 批量处理:一次查询 100 个任务,减少数据库压力
  3. 分页处理:如果有更多任务,分批处理

何时需要升级:

  • 当任务量超过 10 万/天时,考虑迁移到 Redis ZSet
  • 当精度要求到秒级时,必须升级

思考 2

如果延时任务的时间分布是:80% 在 1 分钟以内,20% 在 1 天以上,你会如何设计?

参考答案

推荐方案:混合分层架构

问题分析:

这种极端的时间分布(短延时 vs 长延时)如果用单一方案,会有明显问题:

方案短延时(1 分钟)长延时(1 天)
全用数据库轮询❌ 精度太差(轮询间隔 1 分钟)✅ 精度足够
全用 Redis ZSet✅ 精度高❌ 内存浪费(1 天的任务占用内存)
全用时间轮✅ 性能最优❌ 无法支持(时间轮大小有限)

解决方案:双队列架构

class HybridDelayQueue:
    def __init__(self):
        # 短延时队列:Redis ZSet(1 小时内)
        self.short_queue = RedisDelayQueue('short_tasks', max_delay=3600)
        
        # 长延时队列:数据库轮询(1 小时以上)
        self.long_queue = DBDelayQueue('long_tasks', min_delay=3600)
        
        # 定时迁移:将即将到期的长延时任务迁移到短延时队列
        self.migration_interval = 60  # 每分钟迁移一次
    
    def add_task(self, task_id, task_data, delay_seconds):
        """根据延时时间自动路由"""
        if delay_seconds <= 3600:
            # 短延时:直接放入 Redis
            self.short_queue.add_task(task_id, task_data, delay_seconds)
        else:
            # 长延时:先放入数据库
            self.long_queue.add_task(task_id, task_data, delay_seconds)
    
    def migrate_tasks(self):
        """将快要到期的任务从数据库迁移到 Redis"""
        # 查询 1 小时内到期的任务
        threshold = datetime.now() + timedelta(hours=1)
        tasks = self.long_queue.get_ready_tasks(threshold)
        
        for task in tasks:
            # 迁移到 Redis
            remaining_seconds = (task.execute_at - datetime.now()).total_seconds()
            self.short_queue.add_task(task.id, task.data, remaining_seconds)
            
            # 从数据库删除
            self.long_queue.remove_task(task.id)
    
    def start(self):
        """启动消费和迁移"""
        # 消费短延时任务
        threading.Thread(target=self.short_queue.consume, daemon=True).start()
        
        # 消费长延时任务
        threading.Thread(target=self.long_queue.consume, daemon=True).start()
        
        # 定时迁移
        scheduler = BackgroundScheduler()
        scheduler.add_job(self.migrate_tasks, 'interval', minutes=1)
        scheduler.start()

优势:

  1. 性能最优 - 80% 的短延时任务享受 Redis 的高性能
  2. 成本可控 - 长延时任务不占用 Redis 内存
  3. 精度保证 - 所有任务在到期前都会迁移到高精度队列
  4. 灵活扩展 - 可以根据实际数据分布调整分界线

数据对比:

假设每天 10 万任务:

维度全用 Redis ZSet混合架构节省
内存占用峰值~1000 万任务 × 1KB = 10GB~8 万任务 × 1KB = 80MB99%
查询性能全部 O(log n)短延时 O(log n)
长延时 O(n)
短延时快
持久化风险Redis 故障可能丢数据长延时任务在数据库,更安全更可靠

何时调整:

  • 如果短延时任务占比 > 90%,可以进一步降低分界线到 30 分钟
  • 如果短延时任务占比 < 50%,可以不使用混合架构,全用数据库轮询

关键洞察:

数据驱动设计是关键。理解业务数据的分布特征(如时间分布),才能设计出最优的架构。

思考 3

如果在分布式环境下,多个节点同时消费同一个延时队列,如何保证一个任务只被执行一次?

参考答案

核心问题:分布式互斥

这是经典的分布式协调问题,需要保证:

  1. 原子性:查询和删除是原子操作
  2. 互斥性:同一时刻只有一个消费者能拿到任务
  3. 容错性:消费者挂掉不影响其他消费者

方案对比:

方案 1:Redis 分布式锁

import redis
import uuid

class LockedDelayQueue:
    def __init__(self):
        self.redis = redis.Redis()
    
    def pop_task(self):
        """带锁的弹出任务"""
        while True:
            # 1. 查询到期任务
            now = time.time()
            tasks = self.redis.zrangebyscore('delay_queue', 0, now, start=0, num=1)
            
            if not tasks:
                return None
            
            task = tasks[0]
            task_id = self._extract_id(task)
            
            # 2. 尝试获取锁
            lock_key = f"lock:task:{task_id}"
            lock_value = str(uuid.uuid4())
            
            acquired = self.redis.set(
                lock_key, lock_value,
                nx=True,  # 不存在时才设置
                ex=10     # 10 秒后自动过期
            )
            
            if acquired:
                try:
                    # 3. 再次确认任务还在队列中(防止已过期)
                    if self.redis.zscore('delay_queue', task) is None:
                        continue  # 任务已被其他消费者取走
                    
                    # 4. 删除任务
                    self.redis.zrem('delay_queue', task)
                    
                    return task
                    
                finally:
                    # 5. 释放锁(确保是自己的锁)
                    lua_script = """
                    if redis.call("get", KEYS[1]) == ARGV[1] then
                        return redis.call("del", KEYS[1])
                    else
                        return 0
                    end
                    """
                    self.redis.eval(lua_script, 1, lock_key, lock_value)
            else:
                # 锁被占用,跳过
                time.sleep(0.01)

缺点:每次都要加锁,性能开销大


方案 2:Lua 脚本原子操作(推荐)

class AtomicDelayQueue:
    def __init__(self):
        self.redis = redis.Redis()
        
        # Lua 脚本:原子查询并删除
        self.pop_script = """
        -- 查询最早到期的任务
        local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 1)
        
        if #tasks > 0 then
            -- 删除该任务(原子操作)
            local removed = redis.call('ZREM', KEYS[1], tasks[1])
            
            if removed > 0 then
                -- 删除成功,返回任务
                return tasks[1]
            else
                -- 任务已被其他消费者删除
                return nil
            end
        else
            return nil
        end
        """
    
    def pop_task(self):
        """原子弹出任务"""
        now = time.time()
        task = self.redis.eval(
            self.pop_script,
            1,  # key 的数量
            'delay_queue',  # key
            now  # argv
        )
        
        return task

优势:

  • ✅ 无锁竞争,性能最优
  • ✅ Redis 单线程保证原子性
  • ✅ 代码简洁

方案 3:数据库悲观锁

-- 创建任务表
CREATE TABLE delay_tasks (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    task_type VARCHAR(50),
    task_data TEXT,
    execute_at TIMESTAMP,
    status ENUM('pending', 'processing', 'completed', 'failed'),
    consumer_id VARCHAR(50),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    
    INDEX idx_execute_status (execute_at, status)
);
class DatabaseDelayQueue:
    def __init__(self, consumer_id):
        self.consumer_id = consumer_id
    
    def pop_tasks(self, limit=100):
        """批量弹出任务"""
        with transaction.atomic():
            # 查询并锁定(FOR UPDATE SKIP LOCKED)
            tasks = DelayTask.objects.raw("""
                SELECT id, task_type, task_data 
                FROM delay_tasks 
                WHERE execute_at <= NOW() 
                  AND status = 'pending'
                ORDER BY execute_at ASC
                LIMIT %s
                FOR UPDATE SKIP LOCKED
            """, [limit])
            
            # 标记为处理中
            task_ids = [t.id for t in tasks]
            if task_ids:
                DelayTask.objects.filter(id__in=task_ids).update(
                    status='processing',
                    consumer_id=self.consumer_id,
                    updated_at=timezone.now()
                )
            
            return tasks
    
    def complete_task(self, task_id):
        """标记任务完成"""
        DelayTask.objects.filter(id=task_id).update(
            status='completed',
            updated_at=timezone.now()
        )

SKIP LOCKED 的作用:

  • 多个消费者并发查询
  • 行已被锁定时,自动跳过
  • 每个消费者拿到不同的任务

适用场景:

  • 任务已经存在数据库
  • 不想引入额外组件
  • 需要强事务保证

方案 4:分片调度

class ShardedDelayQueue:
    """分片调度:每个消费者负责一部分任务"""
    
    def __init__(self, shard_count, shard_id):
        self.shard_count = shard_count  # 总分片数
        self.shard_id = shard_id        # 当前分片 ID (0, 1, 2, ...)
    
    def _belongs_to_shard(self, task_id):
        """判断任务是否属于当前分片"""
        return int(task_id) % self.shard_count == self.shard_id
    
    def pop_task(self):
        """只弹出属于当前分片的任务"""
        while True:
            now = time.time()
            tasks = self.redis.zrangebyscore('delay_queue', 0, now, start=0, num=100)
            
            for task in tasks:
                task_id = self._extract_id(task)
                
                if self._belongs_to_shard(task_id):
                    # 尝试删除(可能有竞争)
                    removed = self.redis.zrem('delay_queue', task)
                    if removed:
                        return task
            
            if not tasks:
                return None
            
            time.sleep(0.1)

优势:

  • ✅ 完全避免竞争,性能最优
  • ✅ 实现简单,无需锁

缺点:

  • ⚠️ 需要知道分片总数
  • ⚠️ 某个分片任务多时,负载不均

方案 5:消息队列分区

如果使用消息队列(如 Kafka、RabbitMQ),可以利用分区机制:

# Kafka 分区策略
class KafkaDelayQueue:
    def __init__(self, bootstrap_servers, topic, partitions):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode()
        )
        self.partitions = partitions
    
    def add_task(self, task_id, task_data, delay_seconds):
        """添加任务(根据 task_id 分区)"""
        partition = int(task_id) % self.partitions
        
        message = {
            'task_id': task_id,
            'task_data': task_data,
            'execute_at': time.time() + delay_seconds
        }
        
        self.producer.send(
            topic,
            value=message,
            partition=partition
        )
    
    def consume_tasks(self, partition, handler):
        """消费指定分区的任务"""
        consumer = KafkaConsumer(
            topic,
            bootstrap_servers='localhost:9092',
            auto_offset_reset='latest',
            group_id='delay_queue_consumers'
        )
        
        # 只消费特定分区
        consumer.assign([TopicPartition(topic, partition)])
        
        for message in consumer:
            task = json.loads(message.value)
            now = time.time()
            
            if now >= task['execute_at']:
                # 到期了,处理任务
                handler(task)
            else:
                # 没到期,放回队列(延时重试)
                self.add_task(
                    task['task_id'],
                    task['task_data'],
                    task['execute_at'] - now
                )

优势:

  • ✅ Kafka 原生支持分区和消费者组
  • ✅ 高吞吐量
  • ✅ 持久化保证

方案选择建议:

场景推荐方案理由
Redis 存储Lua 原子操作性能最优,无需额外组件
数据库存储悲观锁(SKIP LOCKED)利用数据库特性,事务保证
消息队列分区策略MQ 原生支持,简化实现
大规模系统分片调度避免竞争,性能最优

最佳实践:

  1. 优先选择无锁方案:Lua 原子操作、分区调度
  2. 监控积压:记录各分片的任务积压情况
  3. 动态调整:根据负载动态调整分片数
  4. 失败重试:任务执行失败要有重试机制