队列模型

那年夏天,我差点被线程睡死了

这是 2018 年的夏天,我们的小创业公司刚上线外卖系统,日订单量终于突破 5000 单。团队只有 3 个后端,大家在出租屋里敲代码,氛围热烈而紧张。

那天晚上 11 点,运营跑过来找到我:

“老板,后台发现好多订单超时没付款,但 30 分钟后没自动取消。库存一直被占用,有用户在投诉!”

我一看代码,心凉了半截。原来是我一周前写的”订单超时取消”功能:

import threading

def create_order(order_id):
    # 创建订单
    save_order(order_id)
    
    # 启动一个线程,30 分钟后取消
    def cancel_after_timeout():
        import time
        time.sleep(30 * 60)  # 等待 30 分钟
        cancel_order(order_id)
    
    thread = threading.Thread(target=cancel_after_timeout)
    thread.start()

那时候我天真地以为:一个订单一个线程,简单直接。

但现实狠狠地打了我一巴掌。运维小王连夜查了监控,脸色惨白:

“老板,你看这个图……当前活跃线程数 8 万多,服务器内存已经用掉 90% 了。再这么下去,今晚就要 OOM。”

我盯着那个监控图表,突然意识到问题的严重性:

  • 😱 5000 日单 峰值时段同时在线的订单可能上万,那上万个线程?
  • 😱 一台服务器重启,所有等待中的任务全丢了,订单永远不取消?
  • 😱 内存不够用,晚上高峰期直接崩溃?

那一夜我没睡,盯着服务器日志发呆。凌晨 4 点,我突然醒悟:我需要一个更靠谱的模型。


回到基础:什么是队列?

那天凌晨,我重新翻开了大学时的数据结构课本。原来,问题的根源是我对”队列”的理解太浅了。

队列(Queue)是一个”先进先出”(FIFO)的数据结构,就像排队买奶茶:

生产者                        队列                      消费者
  │                           │                          │
  │ ── 放入消息 A ──► [A, B, C] ──► 取出消息 A ──►       │
  │ ── 放入消息 B ──► [B, C]    ──► 取出消息 B ──►       │
  │ ── 放入消息 C ──► [C]       ──► 取出消息 C ──►       │
from collections import deque

class SimpleQueue:
    def __init__(self):
        self._queue = deque()
    
    def enqueue(self, message):
        """入队"""
        self._queue.append(message)
    
    def dequeue(self):
        """出队"""
        if self._queue:
            return self._queue.popleft()
        return None

普通队列的特点:放入就立即可取

但我的需求不同——订单需要等 30 分钟才能取消。这就是延时队列的核心:放入后,需要等一段时间才能取


延时队列模型:时间是我的朋友

我画了一整晚的图,终于想清楚了:延时队列 = 普通队列 + 时间维度

普通队列:  放入 ──► 立即可取
延时队列:  放入 ──► 等待 N 秒 ──► 可取

核心概念:两个房间

我把延时队列想象成有两个房间的餐厅:

┌──────────────────────────────────────────────┐
│                 延时队列模型                   │
│                                              │
│   生产者                                      │
│     │                                        │
│     │ 放入消息 + 延时时间                      │
│     ▼                                        │
│   ┌──────────────────────────┐               │
│   │      等待区(Wait Room)  │               │
│   │                          │               │
│   │  消息A - 还剩 28 分钟    │               │
│   │  消息B - 还剩 15 分钟    │               │
│   │  消息C - 还剩 3 分钟     │               │
│   │  消息D - 到期!→ 出列    │               │
│   └──────────┬───────────────┘               │
│              │                               │
│              │ 到期消息                       │
│              ▼                               │
│   ┌──────────────────────────┐               │
│   │      就绪区(Ready Zone) │               │
│   │                          │               │
│   │  消息D ✅ 可以消费了      │               │
│   └──────────┬───────────────┘               │
│              │                               │
│              ▼                               │
│           消费者                              │
│     (处理到期任务)                           │
└──────────────────────────────────────────────┘

这样一想,思路清晰了!所有订单先进入”等待区”,时间到了才能去”就绪区”被处理。

延时队列的核心操作

import time
from heapq import heappush, heappop

class DelayQueue:
    def __init__(self):
        self._heap = []  # 最小堆,按到期时间排序
    
    def enqueue(self, message, delay_seconds):
        """入队:放入消息,指定延时"""
        execute_at = time.time() + delay_seconds
        heappush(self._heap, (execute_at, message))
    
    def dequeue(self):
        """出队:取出一个到期消息(如果有的话)"""
        if not self._heap:
            return None
        
        execute_at, message = self._heap[0]
        
        if time.time() >= execute_at:
            # 到期了,可以取出
            heappop(self._heap)
            return message
        
        # 还没到期,返回 None
        return None
    
    def peek_delay(self):
        """查看最近一个任务的剩余时间"""
        if not self._heap:
            return None
        execute_at, _ = self._heap[0]
        return max(0, execute_at - time.time())

使用示例:

queue = DelayQueue()

# 下单后 30 分钟超时
queue.enqueue("order:1001", delay_seconds=30 * 60)
queue.enqueue("order:1002", delay_seconds=30 * 60)

# 5 秒后发提醒
queue.enqueue("notify:user:888", delay_seconds=5)

# 不断检查有没有到期任务
while True:
    task = queue.dequeue()
    if task:
        print(f"处理任务: {task}")
    else:
        time.sleep(1)  # 没有到期任务,等 1 秒再看

两种消费模型:谁来做主?

第一版上线后,我又遇到了新问题。消费者应该主动来拉任务,还是队列主动推任务?

拉取模型(Pull):自己找活干

# 拉取模型
def consumer_pull(queue):
    while True:
        task = queue.dequeue()
        if task:
            process(task)
        else:
            time.sleep(0.1)  # 没有任务,等一会儿

我一开始选这个,因为简单:消费者自己控制节奏,忙的时候少拉点,闲的时候多拉点。

优点: 消费者自己控制节奏
缺点: 没任务时也在不断轮询,浪费资源

推送模型(Push):任务送上门

后来订单量涨到 10 万单/天,我发现轮询太慢了。改成推送模式后,实时性提升明显:

# 推送模型
class PushDelayQueue:
    def __init__(self):
        self._heap = []
        self._consumers = []
    
    def register_consumer(self, callback):
        self._consumers.append(callback)
    
    def _notify(self, message):
        for consumer in self._consumers:
            consumer(message)
    
    def run(self):
        """后台线程,到期后自动推送"""
        while True:
            if self._heap:
                execute_at, message = self._heap[0]
                now = time.time()
                if now >= execute_at:
                    heappop(self._heap)
                    self._notify(message)  # 推送给消费者
                else:
                    time.sleep(min(0.1, execute_at - now))
            else:
                time.sleep(1)

优点: 到期即推,延迟更低
缺点: 消费者处理不过来时会积压

实战经验对比

维度拉取模型推送模型
实时性取决于轮询间隔到期即推
消费者压力自己控制可能被压垮
实现复杂度简单中等
适用场景简单场景高实时要求
典型实现Redis ZSetRabbitMQ

我的建议:刚开始用拉取,够用就行。等业务量上来,再考虑推送。别过度设计。


队列的生命周期:一个任务的旅程

在排查线上问题时,我发现如果不知道任务处于什么状态,根本没法定位问题。于是设计了状态机。

一个延时任务从创建到完成,会经历这些状态:

┌──────────┐    到达执行时间    ┌──────────┐    消费者取走    ┌──────────┐
│  等待中   │ ──────────────► │  就绪     │ ──────────────► │  处理中   │
│ (Waiting) │                 │ (Ready)   │                 │(Processing)│
└──────────┘                  └──────────┘                  └─────┬────┘

                                                    ┌───────────┼───────────┐
                                                    │           │           │
                                                    ▼           ▼           ▼
                                              ┌──────────┐ ┌──────────┐ ┌──────────┐
                                              │  完成     │ │  失败     │ │  重试     │
                                              │(Completed)│ │ (Failed)  │ │ (Retry)  │
                                              └──────────┘ └──────────┘ └─────┬────┘


                                                                         重新进入等待
class TaskState:
    WAITING = 'waiting'        # 等待到期
    READY = 'ready'            # 已到期,等待消费
    PROCESSING = 'processing'  # 正在处理
    COMPLETED = 'completed'    # 处理完成
    FAILED = 'failed'          # 处理失败
    RETRY = 'retry'            # 等待重试

class DelayTask:
    def __init__(self, task_id, payload, delay_seconds):
        self.task_id = task_id
        self.payload = payload
        self.execute_at = time.time() + delay_seconds
        self.state = TaskState.WAITING
        self.retry_count = 0
        self.max_retries = 3
        self.created_at = time.time()
        self.updated_at = time.time()

从单机到分布式:成长的代价

有了状态机,系统稳定多了。但 3 个月后,日订单量突破 10 万单,新的问题来了。

单机延时队列的问题暴露无遗:

单机延时队列的问题:
1. 内存有限 ── 任务太多放不下
2. 单点故障 ── 机器挂了全丢了
3. 性能瓶颈 ── 单机处理能力有限

解决方案:
1. 存储 ── 把任务放到外部存储(数据库 / Redis / MQ)
2. 高可用 ── 多副本 + 持久化
3. 分布式 ── 多消费者并行处理

那段时间,我和小王几乎天天通宵。我们决定把存储层抽离出来,改用 Redis 集群。

分布式延时队列模型

┌───────────┐     ┌───────────┐     ┌───────────┐
│ 生产者 A  │     │ 生产者 B  │     │ 生产者 C  │
└─────┬─────┘     └─────┬─────┘     └─────┬─────┘
      │                 │                 │
      └────────────┬────┴─────────────────┘


         ┌─────────────────┐
         │  集中式存储层    │
         │  (DB / Redis /  │
         │     MQ 集群)    │
         └────────┬────────┘

      ┌───────────┼───────────┐
      │           │           │
      ▼           ▼           ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ 调度器 A  │ │ 调度器 B  │ │ 调度器 C  │  ← 互斥调度
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
      │             │             │
      ▼             ▼             ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ 消费者 A  │ │ 消费者 B  │ │ 消费者 C  │
└───────────┘ └───────────┘ └───────────┘

这样改造后,单机故障不再致命。即使一台服务器挂了,其他服务器继续工作,任务数据也都在 Redis 里持久化保存。

这一步的代价:架构复杂度大幅上升,分布式一致性、锁竞争、网络分区……问题接踵而至。但为了支撑业务增长,这些代价是值得的。


数据流示意:从下单到超时

为了帮新人理解整个流程,我画了一张完整的数据流图。以”订单超时取消”为例:

1. 用户下单
   POST /api/order


2. 创建订单记录(数据库)
   order.status = "pending_payment"


3. 投递延时任务
   delay_queue.push({
     task_id: "cancel_order_12345",
     payload: { order_id: 12345 },
     delay: 30 * 60  // 30 分钟
   })


4. 任务在等待区等待 30 分钟
   ... 时间流逝 ...


5. 任务到期,进入就绪区


6. 调度器分配给消费者


7. 消费者执行取消逻辑
   - 查询订单状态(还是 pending_payment 吗?)
   - 如果是:取消订单 + 释放库存
   - 如果不是(已付款):跳过


8. 标记任务完成
   task.status = "completed"

这里有个踩坑点!

注意第 7 步的幂等检查!

我们早期就踩过这个坑。有一次,用户在第 29 分钟付款了,但延时任务到期后还是把订单取消了,用户气炸了。

教训:在 30 分钟内,用户可能已经付款了。延时任务到期后,必须先检查业务状态,再决定是否执行。这就是”幂等性”——多次执行结果是一样的。

我在代码里加了个显眼的注释:

# ⚠️ 重要:必须先检查订单状态!
# 用户可能已经在第 29 分钟付款了
if order.status == 'pending_payment':
    cancel_order(order_id)
else:
    logger.info(f"订单 {order_id} 已付款,无需取消")

这个小小的检查,避免了无数的投诉。


想一想

  1. 为什么延时队列通常使用最小堆而不是数组来存储? 时间复杂度有什么区别?

  2. 如果有 1000 万个任务同时在等待区,堆的操作会不会变慢? 应该怎么优化?

  3. 拉取模型和推送模型,哪个更适合”订单超时取消”场景? 为什么?

  4. 分布式环境下,多个调度器如何保证一个任务只被调度一次? 你能想到几种方案?


参考答案

思考 1:为什么延时队列通常使用最小堆而不是数组来存储?

参考答案

问题分析:

延时队列的核心需求是:快速找到最早到期的任务。我们需要比较不同数据结构在”插入”和”取出最小元素”这两个操作上的效率。

时间复杂度对比:

操作数组最小堆(优先队列)优化说明
插入任务O(1)O(log n)数组直接追加,堆需要上浮调整
取出最早任务O(n)O(log n)数组需要遍历全部,堆只需取出堆顶
查看最早任务O(n)O(1)数组需要遍历,堆直接看堆顶

代码对比:

# 方案 A:使用数组(低效)
class ArrayDelayQueue:
    def __init__(self):
        self._tasks = []  # [(execute_at, message), ...]
    
    def enqueue(self, message, delay_seconds):
        """O(1) - 直接追加"""
        execute_at = time.time() + delay_seconds
        self._tasks.append((execute_at, message))
    
    def dequeue(self):
        """O(n) - 必须遍历全部找到最小值"""
        if not self._tasks:
            return None
        
        # 😱 这里的循环是性能瓶颈
        min_idx = 0
        for i in range(1, len(self._tasks)):
            if self._tasks[i][0] < self._tasks[min_idx][0]:
                min_idx = i
        
        execute_at, message = self._tasks[min_idx]
        
        if time.time() >= execute_at:
            self._tasks.pop(min_idx)  # O(n) 删除操作
            return message
        return None

# 方案 B:使用最小堆(高效)
class HeapDelayQueue:
    def __init__(self):
        self._heap = []  # heapq 维护最小堆
    
    def enqueue(self, message, delay_seconds):
        """O(log n) - 堆上浮"""
        execute_at = time.time() + delay_seconds
        heappush(self._heap, (execute_at, message))
    
    def dequeue(self):
        """O(log n) - 堆下沉"""
        if not self._heap:
            return None
        
        execute_at, message = self._heap[0]  # O(1) 查看堆顶
        
        if time.time() >= execute_at:
            heappop(self._heap)  # O(log n) 堆调整
            return message
        return None

性能测试对比:

import time

def test_performance(queue_class, n=100000):
    """测试 10 万任务的性能"""
    queue = queue_class()
    
    # 插入 n 个任务
    start = time.time()
    for i in range(n):
        queue.enqueue(f"task_{i}", delay_seconds=i)
    insert_time = time.time() - start
    
    # 全部取出
    start = time.time()
    count = 0
    while queue.dequeue():
        count += 1
    dequeue_time = time.time() - start
    
    return insert_time, dequeue_time

# 结果对比(n=100000)
# 数组版本:插入 0.02s,取出 28.5s  😱
# 堆版本:  插入 0.15s,取出 0.18s  ✅

为什么堆更适合?

  1. 业务特点:延时队列总是需要”最早到期的任务先处理”
  2. 堆的特性:堆顶永远是最小值,O(1) 时间就能获取
  3. 操作平衡:虽然插入稍慢(O(log n)),但取出快太多,整体性能优

最佳实践:

  • Python 用 heapq 模块(基于列表的最小堆实现)
  • Java 用 PriorityQueueDelayQueue
  • Go 用 container/heap
  • C++ 用 std::priority_queue

思考 2:如果有 1000 万个任务同时在等待区,堆的操作会不会变慢?应该怎么优化?

参考答案

问题分析:

堆的时间复杂度是 O(log n),当 n=1000 万时,log₂(10,000,000) ≈ 23。这意味着每次插入或删除需要约 23 次比较和交换操作。

理论计算:

# 1000 万任务的堆操作次数
import math

n = 10_000_000
heap_depth = math.log2(n)  # 堆的高度

print(f"堆高度: {heap_depth:.1f}")  # 约 23 层
print(f"插入操作比较次数: {heap_depth:.1f}")
print(f"删除操作比较次数: {2 * heap_depth:.1f}")  # 删除需要替换 + 下沉

# 内存占用估算
task_size = 100  # 每个任务约 100 字节(时间戳 + ID + 数据)
total_memory = n * task_size
print(f"内存占用: {total_memory / 1024 / 1024:.1f} MB")  # 约 953 MB

性能瓶颈分析:

瓶颈类型具体表现影响程度
CPU 计算O(log n) 的堆调整⚠️ 中等(23 次操作可接受)
内存占用1000 万任务 ≈ 1GB⚠️ 中等(现代服务器内存足够)
缓存未命中堆的随机访问导致 CPU 缓存失效🔴 严重(真正瓶颈)
持久化成本重启恢复需要加载 1GB 数据🔴 严重(启动慢)

优化方案:

方案 1:分层堆(Tiered Heap)

将任务按到期时间分成多个桶:

from collections import defaultdict
import time

class TieredDelayQueue:
    """分层延时队列:按时间分桶"""
    
    def __init__(self, bucket_interval=60):  # 每 60 秒一个桶
        self.buckets = defaultdict(list)  # {timestamp: [tasks]}
        self.bucket_interval = bucket_interval
        self.current_bucket = int(time.time() // bucket_interval)
    
    def _get_bucket_key(self, execute_at):
        """计算任务应该进入哪个桶"""
        return int(execute_at // self.bucket_interval)
    
    def enqueue(self, message, delay_seconds):
        """O(1) - 直接放入对应桶"""
        execute_at = time.time() + delay_seconds
        bucket_key = self._get_bucket_key(execute_at)
        self.buckets[bucket_key].append((execute_at, message))
    
    def dequeue(self):
        """O(1) 平均 - 只处理当前桶"""
        now = time.time()
        current_bucket_key = int(now // self.bucket_interval)
        
        # 清理过期的桶
        while self.current_bucket < current_bucket_key:
            del self.buckets[self.current_bucket]
            self.current_bucket += 1
        
        # 从当前桶取任务
        tasks = self.buckets.get(current_bucket_key, [])
        if not tasks:
            return None
        
        # 桶内任务较少,线性扫描即可
        for i, (execute_at, message) in enumerate(tasks):
            if now >= execute_at:
                tasks.pop(i)
                return message
        
        return None

优势:

  • ✅ 插入 O(1)(直接追加到桶)
  • ✅ 取出 O(1) 平均(只扫描当前桶)
  • ✅ 内存友好(可以清理过期桶)

方案 2:时间轮(Timing Wheel)

适合大量短延时的场景:

class TimingWheel:
    """时间轮:适合秒级、分钟级的短延时"""
    
    def __init__(self, bucket_size=60, wheel_size=60):
        # bucket_size: 每个槽位的时间跨度(秒)
        # wheel_size: 轮上的槽位数
        self.bucket_size = bucket_size
        self.wheel_size = wheel_size
        self.wheel = [[] for _ in range(wheel_size)]
        self.current_slot = 0
        self.start_time = time.time()
    
    def enqueue(self, message, delay_seconds):
        """O(1)"""
        slot = (self.current_slot + delay_seconds // self.bucket_size) % self.wheel_size
        self.wheel[slot].append(message)
    
    def tick(self):
        """每秒调用一次,推进时间轮"""
        tasks = self.wheel[self.current_slot]
        self.wheel[self.current_slot] = []  # 清空槽位
        self.current_slot = (self.current_slot + 1) % self.wheel_size
        return tasks

时间轮的复杂度:

  • 插入:O(1)
  • 取出:O(1) 平均(每次 tick 处理一个槽位)
  • 空间:O(wheel_size),与任务数无关!

方案 3:外部存储 + 缓存

将 1000 万任务存储到 Redis,内存只保留热点任务:

import redis

class RedisBackedQueue:
    """Redis ZSet + 本地缓存"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.local_cache = []  # 本地缓存最近 1000 个任务
        self.cache_size = 1000
    
    def enqueue(self, message, delay_seconds):
        """O(log n) - 但在 Redis 端"""
        score = time.time() + delay_seconds
        self.redis.zadd('delay_queue', {message: score})
    
    def dequeue(self):
        """优先从本地缓存取"""
        # 1. 先查本地缓存(O(1))
        if self.local_cache:
            return self.local_cache.pop(0)
        
        # 2. 缓存空了,批量从 Redis 加载
        now = time.time()
        tasks = self.redis.zrangebyscore(
            'delay_queue', 0, now, start=0, num=self.cache_size
        )
        
        if tasks:
            # 删除已取出的任务
            self.redis.zremrangebyscore('delay_queue', 0, now)
            self.local_cache = tasks
            return self.local_cache.pop(0)
        
        return None

方案选择建议:

场景推荐方案理由
任务数 < 100 万单机最小堆简单高效,无需优化
100 万 - 1000 万分层堆平衡性能和复杂度
> 1000 万,短延时时间轮O(1) 操作,内存可控
> 1000 万,长延时Redis ZSet + 缓存持久化 + 分布式支持
极致性能要求时间轮 + 多级轮Kafka、Netty 都用这个

关键优化点总结:

  1. 减少堆操作次数:用分桶/时间轮降低 log n 的 n 值
  2. 利用 CPU 缓存:数组比链表缓存友好
  3. 批量处理:一次取出多个任务,分摊开销
  4. 异步持久化:写操作放后台,不阻塞主流程
  5. 懒加载:只在需要时才加载任务数据

思考 3:拉取模型和推送模型,哪个更适合”订单超时取消”场景?为什么?

参考答案

业务需求分析:

先明确”订单超时取消”的核心特点:

特性说明业务影响
延时时间长30 分钟不需要毫秒级实时性
容忍延迟晚几秒取消可接受允许轮询间隔
突发性下单高峰期任务激增消费者不能被压垮
可靠性要求高不能漏取消需要重试机制
幂等性要求已付款订单不能取消消费者要做状态检查

拉取模型(Pull)适合的理由:

1. 消费者可以自我保护

class PullConsumer:
    """消费者可以根据自身能力控制节奏"""
    
    def __init__(self, max_concurrent=100):
        self.max_concurrent = max_concurrent
        self.processing = 0
    
    def consume(self, queue):
        while True:
            # ⚠️ 关键:自我限流
            if self.processing >= self.max_concurrent:
                print("达到并发上限,暂停拉取")
                time.sleep(1)
                continue
            
            task = queue.dequeue()
            if task:
                self.processing += 1
                self._process_with_callback(task)
            else:
                time.sleep(1)  # 没任务就休息
    
    def _process_with_callback(self, task):
        def callback():
            self.processing -= 1
        threading.Thread(
            target=self._cancel_order,
            args=(task, callback)
        ).start()

对比推送模型的问题:

# ❌ 推送模型:消费者可能被压垮
class PushConsumer:
    def on_task(self, task):
        # 如果取消订单操作慢(比如要查数据库、调库存接口)
        # 推送速度 > 处理速度 → 消息积压 → 内存溢出
        self._cancel_order(task)  # 阻塞操作

2. 容错性更好

# 拉取模型:消费者挂了,任务还在队列里
def consumer_pull_with_retry(queue):
    while True:
        try:
            task = queue.dequeue()
            if task:
                process(task)
        except Exception as e:
            # 😱 消费者崩溃,但任务不会丢
            logger.error(f"处理失败: {e}")
            time.sleep(5)  # 等待恢复后继续拉取

推送模式下,如果推送时消费者挂了,需要复杂的重试逻辑。

3. 实现简单

# 拉取模型:只需一个循环
def consumer():
    while True:
        task = queue.dequeue()
        if task:
            process(task)
        else:
            time.sleep(1)

# 推送模型:需要管理消费者列表、处理推送失败...
class PushQueue:
    def __init__(self):
        self.consumers = []
    
    def register(self, consumer):
        self.consumers.append(consumer)
    
    def _notify(self, task):
        for consumer in self.consumers:
            try:
                consumer(task)
            except:
                # 😱 推送失败怎么办?重试?记录失败?
                pass

什么时候用推送模型?

推送模型适合这些场景:

场景示例理由
超低延迟要求秒级杀倒计时不能等轮询
任务轻量发送通知处理快,不会积压
实时系统即时通讯需要即时响应

最佳实践:混合模式

class HybridDelayQueue:
    """结合拉取和推送的优点"""
    
    def __init__(self):
        self._heap = []
        self._consumers = []
        self._pull_thread = None
    
    def start(self):
        """启动后台线程,定期批量推送"""
        def batch_pull_and_push():
            while True:
                # 每秒批量拉取
                tasks = []
                for _ in range(100):  # 每次最多拉 100 个
                    task = self.dequeue()
                    if task:
                        tasks.append(task)
                    else:
                        break
                
                # 推送给消费者(但消费者可以拒绝)
                if tasks:
                    for consumer in self._consumers:
                        consumer.on_batch(tasks)
                
                time.sleep(1)
        
        self._pull_thread = threading.Thread(target=batch_pull_and_push)
        self._pull_thread.start()

最终建议:

对于”订单超时取消”场景,拉取模型是更好的选择

  1. ✅ 30 分钟延时,不需要秒级实时性
  2. ✅ 消费者可以自我保护,避免被高峰压垮
  3. ✅ 实现简单,故障容错性好
  4. ✅ 可以用多消费者并行拉取,提高吞吐量

如果确实需要更低的延迟,可以用批量拉取 + 异步处理的方式优化。

思考 4:分布式环境下,多个调度器如何保证一个任务只被调度一次?你能想到几种方案?

参考答案

问题本质:

这是经典的分布式互斥问题。多个调度器同时看到同一个到期任务,如何保证只有一个能拿到它?

方案 1:Redis 分布式锁

最直接的方案:

import redis
import uuid

class RedisLockScheduler:
    """基于 Redis 锁的调度器"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.lock_timeout = 10  # 锁超时时间(秒)
    
    def schedule(self):
        while True:
            # 1. 查询到期任务
            now = time.time()
            tasks = self.redis.zrangebyscore('delay_queue', 0, now, start=0, num=10)
            
            for task in tasks:
                # 2. 尝试获取锁
                lock_key = f"lock:task:{task['id']}"
                lock_value = str(uuid.uuid4())  # 唯一标识
                
                # SETNX + 过期时间(原子操作)
                acquired = self.redis.set(
                    lock_key, lock_value, 
                    nx=True, ex=self.lock_timeout
                )
                
                if acquired:
                    try:
                        # 3. 拿到锁,处理任务
                        self._process_task(task)
                        
                        # 4. 从队列删除
                        self.redis.zrem('delay_queue', task['id'])
                    finally:
                        # 5. 释放锁(检查是自己的锁)
                        lua_script = """
                        if redis.call("get", KEYS[1]) == ARGV[1] then
                            return redis.call("del", KEYS[1])
                        else
                            return 0
                        end
                        """
                        self.redis.eval(lua_script, 1, lock_key, lock_value)
                else:
                    # 锁被其他调度器持有,跳过
                    continue
            
            time.sleep(0.1)
    
    def _process_task(self, task):
        print(f"处理任务: {task['id']}")
        # 实际业务逻辑...

优缺点:

  • ✅ 实现简单,Redis 原生支持
  • ⚠️ 每个任务都要加锁,性能开销大
  • ⚠️ 锁超时设置需要权衡(太短可能导致任务重复执行)

方案 2:Redis ZSET + Lua 脚本原子操作

将”查询 + 删除”合并为原子操作:

class AtomicPopScheduler:
    """原子弹出任务的调度器"""
    
    def __init__(self):
        self.redis = redis.Redis()
    
    def schedule(self):
        while True:
            # 用 Lua 脚本保证原子性
            lua_script = """
            -- 查询最早到期的任务
            local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 1)
            
            if #tasks > 0 then
                -- 删除该任务(原子操作)
                redis.call('ZREM', KEYS[1], tasks[1])
                return tasks[1]
            else
                return nil
            end
            """
            
            now = time.time()
            task = self.redis.eval(lua_script, 1, 'delay_queue', now)
            
            if task:
                self._process_task(task)
            else:
                time.sleep(0.1)

核心思想:

  • ZRANGEBYSCORE + ZREM 在 Lua 中执行,Redis 保证原子性
  • 多个调度器同时执行,只有一个能 ZREM 成功
  • 无需显式加锁,性能更好

优缺点:

  • ✅ 原子操作,无锁竞争
  • ✅ 性能高,Redis 单线程处理
  • ⚠️ 任务取出后必须处理成功(否则丢失)
  • 💡 需要配合死信队列处理失败任务

方案 3:数据库悲观锁

如果任务存储在数据库:

-- 创建任务表
CREATE TABLE delay_tasks (
    id BIGINT PRIMARY KEY,
    payload TEXT,
    execute_at TIMESTAMP,
    status ENUM('waiting', 'processing', 'completed'),
    scheduler_id VARCHAR(50),  -- 标记哪个调度器在处理
    updated_at TIMESTAMP,
    INDEX idx_execute_at (execute_at, status)
);
class DatabaseLockScheduler:
    """基于数据库行锁的调度器"""
    
    def __init__(self, db, scheduler_id):
        self.db = db
        self.scheduler_id = scheduler_id
    
    def schedule(self):
        while True:
            # 1. 查询并锁定(FOR UPDATE 跳过已锁定的行)
            tasks = self.db.query("""
                SELECT id, payload 
                FROM delay_tasks 
                WHERE execute_at <= NOW() 
                  AND status = 'waiting'
                ORDER BY execute_at ASC
                LIMIT 10
                FOR UPDATE SKIP LOCKED  -- 关键:跳过被锁定的行
            """)
            
            for task in tasks:
                # 2. 标记为处理中
                self.db.execute("""
                    UPDATE delay_tasks 
                    SET status = 'processing',
                        scheduler_id = %s,
                        updated_at = NOW()
                    WHERE id = %s
                """, (self.scheduler_id, task['id']))
                
                # 3. 处理任务
                try:
                    self._process_task(task)
                    
                    # 4. 标记完成
                    self.db.execute("""
                        UPDATE delay_tasks 
                        SET status = 'completed',
                            updated_at = NOW()
                        WHERE id = %s
                    """, (task['id'],))
                except Exception as e:
                    # 5. 失败重试
                    self.db.execute("""
                        UPDATE delay_tasks 
                        SET status = 'waiting',
                            updated_at = NOW()
                        WHERE id = %s
                    """, (task['id'],))
            
            time.sleep(1)

SKIP LOCKED 的作用:

  • 多个调度器并发查询
  • 行已被锁定时,自动跳过
  • 每个调度器拿到不同的任务

优缺点:

  • ✅ 数据库原生支持,可靠
  • ✅ 任务有持久化记录
  • ⚠️ 性能受数据库限制
  • ⚠️ 需要数据库支持 SKIP LOCKED(PostgreSQL、MySQL 8.0+)

方案 4:分片调度

将任务按 ID 分片,每个调度器只负责一部分:

class ShardedScheduler:
    """分片调度:避免竞争"""
    
    def __init__(self, total_shards, shard_id):
        self.total_shards = total_shards  # 总分片数
        self.shard_id = shard_id            # 当前分片 ID
    
    def _belongs_to_shard(self, task_id):
        """判断任务是否属于当前分片"""
        return int(task_id) % self.total_shards == self.shard_id
    
    def schedule(self):
        while True:
            # 只查询属于当前分片的任务
            tasks = self.redis.zrangebyscore('delay_queue', 0, time.time())
            
            for task in tasks:
                if self._belongs_to_shard(task['id']):
                    self._process_task(task)
            
            time.sleep(0.1)

优势:

  • ✅ 完全避免竞争,每个调度器独立工作
  • ✅ 性能最优,无需锁
  • ⚠️ 调度器挂掉,分片任务无人处理
  • 💡 需要配合主备切换或分片迁移

方案 5:Lease 机制

借鉴 Kubernetes 的 Lease 机制:

class LeaseScheduler:
    """基于租约的调度器"""
    
    def __init__(self, scheduler_id, lease_ttl=10):
        self.scheduler_id = scheduler_id
        self.lease_ttl = lease_ttl
        self.redis = redis.Redis()
    
    def acquire_lease(self):
        """续约"""
        self.redis.setex(
            f"lease:scheduler:{self.scheduler_id}",
            self.lease_ttl,
            time.time()
        )
    
    def get_active_schedulers(self):
        """获取活跃的调度器列表"""
        keys = self.redis.keys("lease:scheduler:*")
        return [key.split(":")[-1] for key in keys]
    
    def schedule(self):
        while True:
            # 1. 续约
            self.acquire_lease()
            
            # 2. 获取所有活跃调度器
            active = self.get_active_schedulers()
            my_index = active.index(self.scheduler_id)
            
            # 3. 只处理属于我的任务(按哈希分片)
            tasks = self.redis.zrangebyscore('delay_queue', 0, time.time())
            for task in tasks:
                if hash(task['id']) % len(active) == my_index:
                    self._process_task(task)
            
            time.sleep(1)

特点:

  • 调度器动态加入/退出
  • 自动重新分片
  • 无需人工配置

方案对比总结:

方案复杂度性能可靠性适用场景
Redis 锁⭐⭐⭐⭐⭐⭐⭐小规模任务
Lua 原子操作⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐推荐:高性能场景
数据库锁⭐⭐⭐⭐⭐⭐⭐⭐⭐任务已存在数据库
分片调度⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐大规模固定集群
Lease 机制⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐动态扩缩容

最佳实践建议:

  1. 中小规模(< 10 万任务/天):Lua 原子操作 + 死信队列
  2. 大规模(> 10 万任务/天):分片调度 + 主备切换
  3. 已有数据库:直接用数据库悲观锁
  4. 云原生环境:考虑用 Kubernetes 的 CronJob 或 Celery + RabbitMQ

关键要点:

  • ✅ 优先选择无锁方案(Lua 原子操作、分片)
  • ✅ 失败任务必须有补偿机制(死信队列、重试)
  • ✅ 监控每个调度器的负载,避免热点
  • ✅ 记录任务处理日志,便于排查问题