队列模型
那年夏天,我差点被线程睡死了
这是 2018 年的夏天,我们的小创业公司刚上线外卖系统,日订单量终于突破 5000 单。团队只有 3 个后端,大家在出租屋里敲代码,氛围热烈而紧张。
那天晚上 11 点,运营跑过来找到我:
“老板,后台发现好多订单超时没付款,但 30 分钟后没自动取消。库存一直被占用,有用户在投诉!”
我一看代码,心凉了半截。原来是我一周前写的”订单超时取消”功能:
import threading
def create_order(order_id):
# 创建订单
save_order(order_id)
# 启动一个线程,30 分钟后取消
def cancel_after_timeout():
import time
time.sleep(30 * 60) # 等待 30 分钟
cancel_order(order_id)
thread = threading.Thread(target=cancel_after_timeout)
thread.start()那时候我天真地以为:一个订单一个线程,简单直接。
但现实狠狠地打了我一巴掌。运维小王连夜查了监控,脸色惨白:
“老板,你看这个图……当前活跃线程数 8 万多,服务器内存已经用掉 90% 了。再这么下去,今晚就要 OOM。”
我盯着那个监控图表,突然意识到问题的严重性:
- 😱 5000 日单 峰值时段同时在线的订单可能上万,那上万个线程?
- 😱 一台服务器重启,所有等待中的任务全丢了,订单永远不取消?
- 😱 内存不够用,晚上高峰期直接崩溃?
那一夜我没睡,盯着服务器日志发呆。凌晨 4 点,我突然醒悟:我需要一个更靠谱的模型。
回到基础:什么是队列?
那天凌晨,我重新翻开了大学时的数据结构课本。原来,问题的根源是我对”队列”的理解太浅了。
队列(Queue)是一个”先进先出”(FIFO)的数据结构,就像排队买奶茶:
生产者 队列 消费者
│ │ │
│ ── 放入消息 A ──► [A, B, C] ──► 取出消息 A ──► │
│ ── 放入消息 B ──► [B, C] ──► 取出消息 B ──► │
│ ── 放入消息 C ──► [C] ──► 取出消息 C ──► │from collections import deque
class SimpleQueue:
def __init__(self):
self._queue = deque()
def enqueue(self, message):
"""入队"""
self._queue.append(message)
def dequeue(self):
"""出队"""
if self._queue:
return self._queue.popleft()
return None普通队列的特点:放入就立即可取。
但我的需求不同——订单需要等 30 分钟才能取消。这就是延时队列的核心:放入后,需要等一段时间才能取。
延时队列模型:时间是我的朋友
我画了一整晚的图,终于想清楚了:延时队列 = 普通队列 + 时间维度。
普通队列: 放入 ──► 立即可取
延时队列: 放入 ──► 等待 N 秒 ──► 可取核心概念:两个房间
我把延时队列想象成有两个房间的餐厅:
┌──────────────────────────────────────────────┐
│ 延时队列模型 │
│ │
│ 生产者 │
│ │ │
│ │ 放入消息 + 延时时间 │
│ ▼ │
│ ┌──────────────────────────┐ │
│ │ 等待区(Wait Room) │ │
│ │ │ │
│ │ 消息A - 还剩 28 分钟 │ │
│ │ 消息B - 还剩 15 分钟 │ │
│ │ 消息C - 还剩 3 分钟 │ │
│ │ 消息D - 到期!→ 出列 │ │
│ └──────────┬───────────────┘ │
│ │ │
│ │ 到期消息 │
│ ▼ │
│ ┌──────────────────────────┐ │
│ │ 就绪区(Ready Zone) │ │
│ │ │ │
│ │ 消息D ✅ 可以消费了 │ │
│ └──────────┬───────────────┘ │
│ │ │
│ ▼ │
│ 消费者 │
│ (处理到期任务) │
└──────────────────────────────────────────────┘这样一想,思路清晰了!所有订单先进入”等待区”,时间到了才能去”就绪区”被处理。
延时队列的核心操作
import time
from heapq import heappush, heappop
class DelayQueue:
def __init__(self):
self._heap = [] # 最小堆,按到期时间排序
def enqueue(self, message, delay_seconds):
"""入队:放入消息,指定延时"""
execute_at = time.time() + delay_seconds
heappush(self._heap, (execute_at, message))
def dequeue(self):
"""出队:取出一个到期消息(如果有的话)"""
if not self._heap:
return None
execute_at, message = self._heap[0]
if time.time() >= execute_at:
# 到期了,可以取出
heappop(self._heap)
return message
# 还没到期,返回 None
return None
def peek_delay(self):
"""查看最近一个任务的剩余时间"""
if not self._heap:
return None
execute_at, _ = self._heap[0]
return max(0, execute_at - time.time())使用示例:
queue = DelayQueue()
# 下单后 30 分钟超时
queue.enqueue("order:1001", delay_seconds=30 * 60)
queue.enqueue("order:1002", delay_seconds=30 * 60)
# 5 秒后发提醒
queue.enqueue("notify:user:888", delay_seconds=5)
# 不断检查有没有到期任务
while True:
task = queue.dequeue()
if task:
print(f"处理任务: {task}")
else:
time.sleep(1) # 没有到期任务,等 1 秒再看两种消费模型:谁来做主?
第一版上线后,我又遇到了新问题。消费者应该主动来拉任务,还是队列主动推任务?
拉取模型(Pull):自己找活干
# 拉取模型
def consumer_pull(queue):
while True:
task = queue.dequeue()
if task:
process(task)
else:
time.sleep(0.1) # 没有任务,等一会儿我一开始选这个,因为简单:消费者自己控制节奏,忙的时候少拉点,闲的时候多拉点。
优点: 消费者自己控制节奏
缺点: 没任务时也在不断轮询,浪费资源
推送模型(Push):任务送上门
后来订单量涨到 10 万单/天,我发现轮询太慢了。改成推送模式后,实时性提升明显:
# 推送模型
class PushDelayQueue:
def __init__(self):
self._heap = []
self._consumers = []
def register_consumer(self, callback):
self._consumers.append(callback)
def _notify(self, message):
for consumer in self._consumers:
consumer(message)
def run(self):
"""后台线程,到期后自动推送"""
while True:
if self._heap:
execute_at, message = self._heap[0]
now = time.time()
if now >= execute_at:
heappop(self._heap)
self._notify(message) # 推送给消费者
else:
time.sleep(min(0.1, execute_at - now))
else:
time.sleep(1)优点: 到期即推,延迟更低
缺点: 消费者处理不过来时会积压
实战经验对比
| 维度 | 拉取模型 | 推送模型 |
|---|---|---|
| 实时性 | 取决于轮询间隔 | 到期即推 |
| 消费者压力 | 自己控制 | 可能被压垮 |
| 实现复杂度 | 简单 | 中等 |
| 适用场景 | 简单场景 | 高实时要求 |
| 典型实现 | Redis ZSet | RabbitMQ |
我的建议:刚开始用拉取,够用就行。等业务量上来,再考虑推送。别过度设计。
队列的生命周期:一个任务的旅程
在排查线上问题时,我发现如果不知道任务处于什么状态,根本没法定位问题。于是设计了状态机。
一个延时任务从创建到完成,会经历这些状态:
┌──────────┐ 到达执行时间 ┌──────────┐ 消费者取走 ┌──────────┐
│ 等待中 │ ──────────────► │ 就绪 │ ──────────────► │ 处理中 │
│ (Waiting) │ │ (Ready) │ │(Processing)│
└──────────┘ └──────────┘ └─────┬────┘
│
┌───────────┼───────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 完成 │ │ 失败 │ │ 重试 │
│(Completed)│ │ (Failed) │ │ (Retry) │
└──────────┘ └──────────┘ └─────┬────┘
│
▼
重新进入等待class TaskState:
WAITING = 'waiting' # 等待到期
READY = 'ready' # 已到期,等待消费
PROCESSING = 'processing' # 正在处理
COMPLETED = 'completed' # 处理完成
FAILED = 'failed' # 处理失败
RETRY = 'retry' # 等待重试
class DelayTask:
def __init__(self, task_id, payload, delay_seconds):
self.task_id = task_id
self.payload = payload
self.execute_at = time.time() + delay_seconds
self.state = TaskState.WAITING
self.retry_count = 0
self.max_retries = 3
self.created_at = time.time()
self.updated_at = time.time()从单机到分布式:成长的代价
有了状态机,系统稳定多了。但 3 个月后,日订单量突破 10 万单,新的问题来了。
单机延时队列的问题暴露无遗:
单机延时队列的问题:
1. 内存有限 ── 任务太多放不下
2. 单点故障 ── 机器挂了全丢了
3. 性能瓶颈 ── 单机处理能力有限
解决方案:
1. 存储 ── 把任务放到外部存储(数据库 / Redis / MQ)
2. 高可用 ── 多副本 + 持久化
3. 分布式 ── 多消费者并行处理那段时间,我和小王几乎天天通宵。我们决定把存储层抽离出来,改用 Redis 集群。
分布式延时队列模型
┌───────────┐ ┌───────────┐ ┌───────────┐
│ 生产者 A │ │ 生产者 B │ │ 生产者 C │
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
└────────────┬────┴─────────────────┘
│
▼
┌─────────────────┐
│ 集中式存储层 │
│ (DB / Redis / │
│ MQ 集群) │
└────────┬────────┘
│
┌───────────┼───────────┐
│ │ │
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ 调度器 A │ │ 调度器 B │ │ 调度器 C │ ← 互斥调度
└─────┬─────┘ └─────┬─────┘ └─────┬─────┘
│ │ │
▼ ▼ ▼
┌───────────┐ ┌───────────┐ ┌───────────┐
│ 消费者 A │ │ 消费者 B │ │ 消费者 C │
└───────────┘ └───────────┘ └───────────┘这样改造后,单机故障不再致命。即使一台服务器挂了,其他服务器继续工作,任务数据也都在 Redis 里持久化保存。
这一步的代价:架构复杂度大幅上升,分布式一致性、锁竞争、网络分区……问题接踵而至。但为了支撑业务增长,这些代价是值得的。
数据流示意:从下单到超时
为了帮新人理解整个流程,我画了一张完整的数据流图。以”订单超时取消”为例:
1. 用户下单
POST /api/order
│
▼
2. 创建订单记录(数据库)
order.status = "pending_payment"
│
▼
3. 投递延时任务
delay_queue.push({
task_id: "cancel_order_12345",
payload: { order_id: 12345 },
delay: 30 * 60 // 30 分钟
})
│
▼
4. 任务在等待区等待 30 分钟
... 时间流逝 ...
│
▼
5. 任务到期,进入就绪区
│
▼
6. 调度器分配给消费者
│
▼
7. 消费者执行取消逻辑
- 查询订单状态(还是 pending_payment 吗?)
- 如果是:取消订单 + 释放库存
- 如果不是(已付款):跳过
│
▼
8. 标记任务完成
task.status = "completed"这里有个踩坑点!
注意第 7 步的幂等检查!
我们早期就踩过这个坑。有一次,用户在第 29 分钟付款了,但延时任务到期后还是把订单取消了,用户气炸了。
教训:在 30 分钟内,用户可能已经付款了。延时任务到期后,必须先检查业务状态,再决定是否执行。这就是”幂等性”——多次执行结果是一样的。
我在代码里加了个显眼的注释:
# ⚠️ 重要:必须先检查订单状态!
# 用户可能已经在第 29 分钟付款了
if order.status == 'pending_payment':
cancel_order(order_id)
else:
logger.info(f"订单 {order_id} 已付款,无需取消")这个小小的检查,避免了无数的投诉。
想一想
为什么延时队列通常使用最小堆而不是数组来存储? 时间复杂度有什么区别?
如果有 1000 万个任务同时在等待区,堆的操作会不会变慢? 应该怎么优化?
拉取模型和推送模型,哪个更适合”订单超时取消”场景? 为什么?
分布式环境下,多个调度器如何保证一个任务只被调度一次? 你能想到几种方案?
参考答案
思考 1:为什么延时队列通常使用最小堆而不是数组来存储?
问题分析:
延时队列的核心需求是:快速找到最早到期的任务。我们需要比较不同数据结构在”插入”和”取出最小元素”这两个操作上的效率。
时间复杂度对比:
| 操作 | 数组 | 最小堆(优先队列) | 优化说明 |
|---|---|---|---|
| 插入任务 | O(1) | O(log n) | 数组直接追加,堆需要上浮调整 |
| 取出最早任务 | O(n) | O(log n) | 数组需要遍历全部,堆只需取出堆顶 |
| 查看最早任务 | O(n) | O(1) | 数组需要遍历,堆直接看堆顶 |
代码对比:
# 方案 A:使用数组(低效)
class ArrayDelayQueue:
def __init__(self):
self._tasks = [] # [(execute_at, message), ...]
def enqueue(self, message, delay_seconds):
"""O(1) - 直接追加"""
execute_at = time.time() + delay_seconds
self._tasks.append((execute_at, message))
def dequeue(self):
"""O(n) - 必须遍历全部找到最小值"""
if not self._tasks:
return None
# 😱 这里的循环是性能瓶颈
min_idx = 0
for i in range(1, len(self._tasks)):
if self._tasks[i][0] < self._tasks[min_idx][0]:
min_idx = i
execute_at, message = self._tasks[min_idx]
if time.time() >= execute_at:
self._tasks.pop(min_idx) # O(n) 删除操作
return message
return None
# 方案 B:使用最小堆(高效)
class HeapDelayQueue:
def __init__(self):
self._heap = [] # heapq 维护最小堆
def enqueue(self, message, delay_seconds):
"""O(log n) - 堆上浮"""
execute_at = time.time() + delay_seconds
heappush(self._heap, (execute_at, message))
def dequeue(self):
"""O(log n) - 堆下沉"""
if not self._heap:
return None
execute_at, message = self._heap[0] # O(1) 查看堆顶
if time.time() >= execute_at:
heappop(self._heap) # O(log n) 堆调整
return message
return None性能测试对比:
import time
def test_performance(queue_class, n=100000):
"""测试 10 万任务的性能"""
queue = queue_class()
# 插入 n 个任务
start = time.time()
for i in range(n):
queue.enqueue(f"task_{i}", delay_seconds=i)
insert_time = time.time() - start
# 全部取出
start = time.time()
count = 0
while queue.dequeue():
count += 1
dequeue_time = time.time() - start
return insert_time, dequeue_time
# 结果对比(n=100000)
# 数组版本:插入 0.02s,取出 28.5s 😱
# 堆版本: 插入 0.15s,取出 0.18s ✅为什么堆更适合?
- 业务特点:延时队列总是需要”最早到期的任务先处理”
- 堆的特性:堆顶永远是最小值,O(1) 时间就能获取
- 操作平衡:虽然插入稍慢(O(log n)),但取出快太多,整体性能优
最佳实践:
- Python 用
heapq模块(基于列表的最小堆实现) - Java 用
PriorityQueue或DelayQueue - Go 用
container/heap包 - C++ 用
std::priority_queue
思考 2:如果有 1000 万个任务同时在等待区,堆的操作会不会变慢?应该怎么优化?
问题分析:
堆的时间复杂度是 O(log n),当 n=1000 万时,log₂(10,000,000) ≈ 23。这意味着每次插入或删除需要约 23 次比较和交换操作。
理论计算:
# 1000 万任务的堆操作次数
import math
n = 10_000_000
heap_depth = math.log2(n) # 堆的高度
print(f"堆高度: {heap_depth:.1f}") # 约 23 层
print(f"插入操作比较次数: {heap_depth:.1f}")
print(f"删除操作比较次数: {2 * heap_depth:.1f}") # 删除需要替换 + 下沉
# 内存占用估算
task_size = 100 # 每个任务约 100 字节(时间戳 + ID + 数据)
total_memory = n * task_size
print(f"内存占用: {total_memory / 1024 / 1024:.1f} MB") # 约 953 MB性能瓶颈分析:
| 瓶颈类型 | 具体表现 | 影响程度 |
|---|---|---|
| CPU 计算 | O(log n) 的堆调整 | ⚠️ 中等(23 次操作可接受) |
| 内存占用 | 1000 万任务 ≈ 1GB | ⚠️ 中等(现代服务器内存足够) |
| 缓存未命中 | 堆的随机访问导致 CPU 缓存失效 | 🔴 严重(真正瓶颈) |
| 持久化成本 | 重启恢复需要加载 1GB 数据 | 🔴 严重(启动慢) |
优化方案:
方案 1:分层堆(Tiered Heap)
将任务按到期时间分成多个桶:
from collections import defaultdict
import time
class TieredDelayQueue:
"""分层延时队列:按时间分桶"""
def __init__(self, bucket_interval=60): # 每 60 秒一个桶
self.buckets = defaultdict(list) # {timestamp: [tasks]}
self.bucket_interval = bucket_interval
self.current_bucket = int(time.time() // bucket_interval)
def _get_bucket_key(self, execute_at):
"""计算任务应该进入哪个桶"""
return int(execute_at // self.bucket_interval)
def enqueue(self, message, delay_seconds):
"""O(1) - 直接放入对应桶"""
execute_at = time.time() + delay_seconds
bucket_key = self._get_bucket_key(execute_at)
self.buckets[bucket_key].append((execute_at, message))
def dequeue(self):
"""O(1) 平均 - 只处理当前桶"""
now = time.time()
current_bucket_key = int(now // self.bucket_interval)
# 清理过期的桶
while self.current_bucket < current_bucket_key:
del self.buckets[self.current_bucket]
self.current_bucket += 1
# 从当前桶取任务
tasks = self.buckets.get(current_bucket_key, [])
if not tasks:
return None
# 桶内任务较少,线性扫描即可
for i, (execute_at, message) in enumerate(tasks):
if now >= execute_at:
tasks.pop(i)
return message
return None优势:
- ✅ 插入 O(1)(直接追加到桶)
- ✅ 取出 O(1) 平均(只扫描当前桶)
- ✅ 内存友好(可以清理过期桶)
方案 2:时间轮(Timing Wheel)
适合大量短延时的场景:
class TimingWheel:
"""时间轮:适合秒级、分钟级的短延时"""
def __init__(self, bucket_size=60, wheel_size=60):
# bucket_size: 每个槽位的时间跨度(秒)
# wheel_size: 轮上的槽位数
self.bucket_size = bucket_size
self.wheel_size = wheel_size
self.wheel = [[] for _ in range(wheel_size)]
self.current_slot = 0
self.start_time = time.time()
def enqueue(self, message, delay_seconds):
"""O(1)"""
slot = (self.current_slot + delay_seconds // self.bucket_size) % self.wheel_size
self.wheel[slot].append(message)
def tick(self):
"""每秒调用一次,推进时间轮"""
tasks = self.wheel[self.current_slot]
self.wheel[self.current_slot] = [] # 清空槽位
self.current_slot = (self.current_slot + 1) % self.wheel_size
return tasks时间轮的复杂度:
- 插入:O(1)
- 取出:O(1) 平均(每次 tick 处理一个槽位)
- 空间:O(wheel_size),与任务数无关!
方案 3:外部存储 + 缓存
将 1000 万任务存储到 Redis,内存只保留热点任务:
import redis
class RedisBackedQueue:
"""Redis ZSet + 本地缓存"""
def __init__(self):
self.redis = redis.Redis()
self.local_cache = [] # 本地缓存最近 1000 个任务
self.cache_size = 1000
def enqueue(self, message, delay_seconds):
"""O(log n) - 但在 Redis 端"""
score = time.time() + delay_seconds
self.redis.zadd('delay_queue', {message: score})
def dequeue(self):
"""优先从本地缓存取"""
# 1. 先查本地缓存(O(1))
if self.local_cache:
return self.local_cache.pop(0)
# 2. 缓存空了,批量从 Redis 加载
now = time.time()
tasks = self.redis.zrangebyscore(
'delay_queue', 0, now, start=0, num=self.cache_size
)
if tasks:
# 删除已取出的任务
self.redis.zremrangebyscore('delay_queue', 0, now)
self.local_cache = tasks
return self.local_cache.pop(0)
return None方案选择建议:
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 任务数 < 100 万 | 单机最小堆 | 简单高效,无需优化 |
| 100 万 - 1000 万 | 分层堆 | 平衡性能和复杂度 |
| > 1000 万,短延时 | 时间轮 | O(1) 操作,内存可控 |
| > 1000 万,长延时 | Redis ZSet + 缓存 | 持久化 + 分布式支持 |
| 极致性能要求 | 时间轮 + 多级轮 | Kafka、Netty 都用这个 |
关键优化点总结:
- 减少堆操作次数:用分桶/时间轮降低 log n 的 n 值
- 利用 CPU 缓存:数组比链表缓存友好
- 批量处理:一次取出多个任务,分摊开销
- 异步持久化:写操作放后台,不阻塞主流程
- 懒加载:只在需要时才加载任务数据
思考 3:拉取模型和推送模型,哪个更适合”订单超时取消”场景?为什么?
业务需求分析:
先明确”订单超时取消”的核心特点:
| 特性 | 说明 | 业务影响 |
|---|---|---|
| 延时时间长 | 30 分钟 | 不需要毫秒级实时性 |
| 容忍延迟 | 晚几秒取消可接受 | 允许轮询间隔 |
| 突发性 | 下单高峰期任务激增 | 消费者不能被压垮 |
| 可靠性要求高 | 不能漏取消 | 需要重试机制 |
| 幂等性要求 | 已付款订单不能取消 | 消费者要做状态检查 |
拉取模型(Pull)适合的理由:
1. 消费者可以自我保护
class PullConsumer:
"""消费者可以根据自身能力控制节奏"""
def __init__(self, max_concurrent=100):
self.max_concurrent = max_concurrent
self.processing = 0
def consume(self, queue):
while True:
# ⚠️ 关键:自我限流
if self.processing >= self.max_concurrent:
print("达到并发上限,暂停拉取")
time.sleep(1)
continue
task = queue.dequeue()
if task:
self.processing += 1
self._process_with_callback(task)
else:
time.sleep(1) # 没任务就休息
def _process_with_callback(self, task):
def callback():
self.processing -= 1
threading.Thread(
target=self._cancel_order,
args=(task, callback)
).start()对比推送模型的问题:
# ❌ 推送模型:消费者可能被压垮
class PushConsumer:
def on_task(self, task):
# 如果取消订单操作慢(比如要查数据库、调库存接口)
# 推送速度 > 处理速度 → 消息积压 → 内存溢出
self._cancel_order(task) # 阻塞操作2. 容错性更好
# 拉取模型:消费者挂了,任务还在队列里
def consumer_pull_with_retry(queue):
while True:
try:
task = queue.dequeue()
if task:
process(task)
except Exception as e:
# 😱 消费者崩溃,但任务不会丢
logger.error(f"处理失败: {e}")
time.sleep(5) # 等待恢复后继续拉取推送模式下,如果推送时消费者挂了,需要复杂的重试逻辑。
3. 实现简单
# 拉取模型:只需一个循环
def consumer():
while True:
task = queue.dequeue()
if task:
process(task)
else:
time.sleep(1)
# 推送模型:需要管理消费者列表、处理推送失败...
class PushQueue:
def __init__(self):
self.consumers = []
def register(self, consumer):
self.consumers.append(consumer)
def _notify(self, task):
for consumer in self.consumers:
try:
consumer(task)
except:
# 😱 推送失败怎么办?重试?记录失败?
pass什么时候用推送模型?
推送模型适合这些场景:
| 场景 | 示例 | 理由 |
|---|---|---|
| 超低延迟要求 | 秒级杀倒计时 | 不能等轮询 |
| 任务轻量 | 发送通知 | 处理快,不会积压 |
| 实时系统 | 即时通讯 | 需要即时响应 |
最佳实践:混合模式
class HybridDelayQueue:
"""结合拉取和推送的优点"""
def __init__(self):
self._heap = []
self._consumers = []
self._pull_thread = None
def start(self):
"""启动后台线程,定期批量推送"""
def batch_pull_and_push():
while True:
# 每秒批量拉取
tasks = []
for _ in range(100): # 每次最多拉 100 个
task = self.dequeue()
if task:
tasks.append(task)
else:
break
# 推送给消费者(但消费者可以拒绝)
if tasks:
for consumer in self._consumers:
consumer.on_batch(tasks)
time.sleep(1)
self._pull_thread = threading.Thread(target=batch_pull_and_push)
self._pull_thread.start()最终建议:
对于”订单超时取消”场景,拉取模型是更好的选择:
- ✅ 30 分钟延时,不需要秒级实时性
- ✅ 消费者可以自我保护,避免被高峰压垮
- ✅ 实现简单,故障容错性好
- ✅ 可以用多消费者并行拉取,提高吞吐量
如果确实需要更低的延迟,可以用批量拉取 + 异步处理的方式优化。
思考 4:分布式环境下,多个调度器如何保证一个任务只被调度一次?你能想到几种方案?
问题本质:
这是经典的分布式互斥问题。多个调度器同时看到同一个到期任务,如何保证只有一个能拿到它?
方案 1:Redis 分布式锁
最直接的方案:
import redis
import uuid
class RedisLockScheduler:
"""基于 Redis 锁的调度器"""
def __init__(self):
self.redis = redis.Redis()
self.lock_timeout = 10 # 锁超时时间(秒)
def schedule(self):
while True:
# 1. 查询到期任务
now = time.time()
tasks = self.redis.zrangebyscore('delay_queue', 0, now, start=0, num=10)
for task in tasks:
# 2. 尝试获取锁
lock_key = f"lock:task:{task['id']}"
lock_value = str(uuid.uuid4()) # 唯一标识
# SETNX + 过期时间(原子操作)
acquired = self.redis.set(
lock_key, lock_value,
nx=True, ex=self.lock_timeout
)
if acquired:
try:
# 3. 拿到锁,处理任务
self._process_task(task)
# 4. 从队列删除
self.redis.zrem('delay_queue', task['id'])
finally:
# 5. 释放锁(检查是自己的锁)
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis.eval(lua_script, 1, lock_key, lock_value)
else:
# 锁被其他调度器持有,跳过
continue
time.sleep(0.1)
def _process_task(self, task):
print(f"处理任务: {task['id']}")
# 实际业务逻辑...优缺点:
- ✅ 实现简单,Redis 原生支持
- ⚠️ 每个任务都要加锁,性能开销大
- ⚠️ 锁超时设置需要权衡(太短可能导致任务重复执行)
方案 2:Redis ZSET + Lua 脚本原子操作
将”查询 + 删除”合并为原子操作:
class AtomicPopScheduler:
"""原子弹出任务的调度器"""
def __init__(self):
self.redis = redis.Redis()
def schedule(self):
while True:
# 用 Lua 脚本保证原子性
lua_script = """
-- 查询最早到期的任务
local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 1)
if #tasks > 0 then
-- 删除该任务(原子操作)
redis.call('ZREM', KEYS[1], tasks[1])
return tasks[1]
else
return nil
end
"""
now = time.time()
task = self.redis.eval(lua_script, 1, 'delay_queue', now)
if task:
self._process_task(task)
else:
time.sleep(0.1)核心思想:
ZRANGEBYSCORE+ZREM在 Lua 中执行,Redis 保证原子性- 多个调度器同时执行,只有一个能 ZREM 成功
- 无需显式加锁,性能更好
优缺点:
- ✅ 原子操作,无锁竞争
- ✅ 性能高,Redis 单线程处理
- ⚠️ 任务取出后必须处理成功(否则丢失)
- 💡 需要配合死信队列处理失败任务
方案 3:数据库悲观锁
如果任务存储在数据库:
-- 创建任务表
CREATE TABLE delay_tasks (
id BIGINT PRIMARY KEY,
payload TEXT,
execute_at TIMESTAMP,
status ENUM('waiting', 'processing', 'completed'),
scheduler_id VARCHAR(50), -- 标记哪个调度器在处理
updated_at TIMESTAMP,
INDEX idx_execute_at (execute_at, status)
);class DatabaseLockScheduler:
"""基于数据库行锁的调度器"""
def __init__(self, db, scheduler_id):
self.db = db
self.scheduler_id = scheduler_id
def schedule(self):
while True:
# 1. 查询并锁定(FOR UPDATE 跳过已锁定的行)
tasks = self.db.query("""
SELECT id, payload
FROM delay_tasks
WHERE execute_at <= NOW()
AND status = 'waiting'
ORDER BY execute_at ASC
LIMIT 10
FOR UPDATE SKIP LOCKED -- 关键:跳过被锁定的行
""")
for task in tasks:
# 2. 标记为处理中
self.db.execute("""
UPDATE delay_tasks
SET status = 'processing',
scheduler_id = %s,
updated_at = NOW()
WHERE id = %s
""", (self.scheduler_id, task['id']))
# 3. 处理任务
try:
self._process_task(task)
# 4. 标记完成
self.db.execute("""
UPDATE delay_tasks
SET status = 'completed',
updated_at = NOW()
WHERE id = %s
""", (task['id'],))
except Exception as e:
# 5. 失败重试
self.db.execute("""
UPDATE delay_tasks
SET status = 'waiting',
updated_at = NOW()
WHERE id = %s
""", (task['id'],))
time.sleep(1)SKIP LOCKED 的作用:
- 多个调度器并发查询
- 行已被锁定时,自动跳过
- 每个调度器拿到不同的任务
优缺点:
- ✅ 数据库原生支持,可靠
- ✅ 任务有持久化记录
- ⚠️ 性能受数据库限制
- ⚠️ 需要数据库支持
SKIP LOCKED(PostgreSQL、MySQL 8.0+)
方案 4:分片调度
将任务按 ID 分片,每个调度器只负责一部分:
class ShardedScheduler:
"""分片调度:避免竞争"""
def __init__(self, total_shards, shard_id):
self.total_shards = total_shards # 总分片数
self.shard_id = shard_id # 当前分片 ID
def _belongs_to_shard(self, task_id):
"""判断任务是否属于当前分片"""
return int(task_id) % self.total_shards == self.shard_id
def schedule(self):
while True:
# 只查询属于当前分片的任务
tasks = self.redis.zrangebyscore('delay_queue', 0, time.time())
for task in tasks:
if self._belongs_to_shard(task['id']):
self._process_task(task)
time.sleep(0.1)优势:
- ✅ 完全避免竞争,每个调度器独立工作
- ✅ 性能最优,无需锁
- ⚠️ 调度器挂掉,分片任务无人处理
- 💡 需要配合主备切换或分片迁移
方案 5:Lease 机制
借鉴 Kubernetes 的 Lease 机制:
class LeaseScheduler:
"""基于租约的调度器"""
def __init__(self, scheduler_id, lease_ttl=10):
self.scheduler_id = scheduler_id
self.lease_ttl = lease_ttl
self.redis = redis.Redis()
def acquire_lease(self):
"""续约"""
self.redis.setex(
f"lease:scheduler:{self.scheduler_id}",
self.lease_ttl,
time.time()
)
def get_active_schedulers(self):
"""获取活跃的调度器列表"""
keys = self.redis.keys("lease:scheduler:*")
return [key.split(":")[-1] for key in keys]
def schedule(self):
while True:
# 1. 续约
self.acquire_lease()
# 2. 获取所有活跃调度器
active = self.get_active_schedulers()
my_index = active.index(self.scheduler_id)
# 3. 只处理属于我的任务(按哈希分片)
tasks = self.redis.zrangebyscore('delay_queue', 0, time.time())
for task in tasks:
if hash(task['id']) % len(active) == my_index:
self._process_task(task)
time.sleep(1)特点:
- 调度器动态加入/退出
- 自动重新分片
- 无需人工配置
方案对比总结:
| 方案 | 复杂度 | 性能 | 可靠性 | 适用场景 |
|---|---|---|---|---|
| Redis 锁 | ⭐⭐ | ⭐⭐ | ⭐⭐⭐ | 小规模任务 |
| Lua 原子操作 | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ | 推荐:高性能场景 |
| 数据库锁 | ⭐⭐ | ⭐⭐ | ⭐⭐⭐⭐⭐ | 任务已存在数据库 |
| 分片调度 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 大规模固定集群 |
| Lease 机制 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 动态扩缩容 |
最佳实践建议:
- 中小规模(< 10 万任务/天):Lua 原子操作 + 死信队列
- 大规模(> 10 万任务/天):分片调度 + 主备切换
- 已有数据库:直接用数据库悲观锁
- 云原生环境:考虑用 Kubernetes 的 CronJob 或 Celery + RabbitMQ
关键要点:
- ✅ 优先选择无锁方案(Lua 原子操作、分片)
- ✅ 失败任务必须有补偿机制(死信队列、重试)
- ✅ 监控每个调度器的负载,避免热点
- ✅ 记录任务处理日志,便于排查问题