应用场景
2024年3月的一个深夜
凌晨2点,我盯着监控屏幕,心跳加速。
“订单编号 #882317 创建时间 23:30,已超时30分钟,为什么还没有取消?”
屏幕上的数据让我脊背发凉:我们的电商系统有 1,247 个订单应该被自动取消,但它们还挂着”待支付”状态。
这些订单占用的库存,正在被其他用户疯狂点击”已售罄”。
那一刻,我意识到:我们的系统中缺少一个关键组件——延时队列。
回到问题之前
事情要从三个月前说起。
2024年1月,我们的电商平台刚上线。那时候功能很简单:用户下单、支付、发货。第一天交易额800元,我们三个创始人激动地聚在出租屋吃了顿火锅。
到了3月,日订单量突破10万单。
新的业务需求接踵而至:
- “用户下单后30分钟未支付,自动取消订单,释放库存”
- “用户领取优惠券,24小时未使用则自动失效”
- “用户申请退款,商家7天未处理则自动通过”
产品经理把这些需求写在白板上,转头问我:“这些功能多久能上线?”
我当时心想:不就是定时任务吗?用 cron 每分钟跑一次脚本,查数据库,把超时的订单状态改成”已取消”。
我说:“一周吧。”
那是我对延时队列的第一次理解——太天真了。
第一次尝试:每分钟跑一次脚本
我写了一个 Python 脚本,用 Linux 的 cron 定时任务每分钟执行:
class OrderTimeoutChecker:
def check_pending_orders(self):
"""检查超时未支付的订单"""
timeout_threshold = datetime.now() - timedelta(minutes=30)
# 查询所有超时订单
pending_orders = Order.objects.filter(
status='pending_payment',
created_at__lt=timeout_threshold
)
for order in pending_orders:
try:
# 取消订单
order.status = 'cancelled'
order.cancel_reason = 'payment_timeout'
order.save()
# 释放库存
InventoryService.release(order.product_id, order.quantity)
# 释放优惠券
if order.coupon_id:
CouponService.restore(order.coupon_id, order.user_id)
except Exception as e:
logger.error(f"取消订单失败: {order.id}, 错误: {e}")在测试环境跑了一下,没问题。上线。
第一周,风平浪静。
第二周,问题开始浮现。
遇到的第一个坑:数据库压力
某天上午10点,运维群里突然炸锅:
“数据库CPU 100%!” “所有查询都卡住了!” “用户无法下单!”
我慌了,登录数据库一看,那个定时任务的查询正在扫描 80万条订单记录。
问题是:我写的SQL只查询了”待支付”状态,但没有给 created_at 字段加索引。
-- 这个查询在80万条记录里全表扫描
SELECT * FROM orders
WHERE status = 'pending_payment'
AND created_at < '2024-03-15 09:30:00';更糟糕的是,这个查询每分钟都在执行。
快速修复:给 (status, created_at) 加了联合索引。
数据库压力下来了,但我知道这只是治标不治本。随着订单量增长,单表数据迟早会破千万。
遇到的第二个坑:执行精度问题
产品经理跑来找我:“用户投诉了,他们说下单后29分钟订单就被取消了,不是说30分钟吗?”
我查了一下日志,发现:
- 用户 14:00:30 下单
- 定时任务 14:30:05 执行(应该14:30:30执行,但cron是每分钟的第0秒执行)
- 订单超时判定:14:30:05 - 30分钟 = 14:00:05
- 因为 14:00:30 > 14:00:05,所以被判定为超时
误差接近1分钟!
用户觉得被骗了——说好的30分钟,怎么29分钟就取消了?
我调整了超时时间到35分钟,产品经理不满意:“我们的竞品都是精确到分钟的。”
我想:要精确到分钟,需要把cron的执行频率改成每秒一次。
但想象一下:每秒扫描一次数据库?
这不行。
真正的危机:库存竞争
真正让我意识到问题严重性的是一次促销活动。
2024年3月15日,我们做了个”春日大促”活动,一款爆品限量1000件。
活动开始10分钟,1000件库存被抢光。
但问题来了:很多人下单后没支付,这些订单的库存被锁定30分钟。
30分钟内,又有2000个用户下单,但都显示”已售罄”。
30分钟后,第一批超时订单自动取消,释放出600件库存。
但此时,这600件库存的释放信息,没有及时通知到还在等待的用户。
用户眼睁睁看着库存从”已售罄”变成”有货”,但错过了抢购时机。
投诉邮件如雪片般飞来。
那一刻,我坐在显示器前,脑子里的齿轮开始转动:
我们需要的不是一个定时检查的脚本,而是一个真正能”在未来某个时刻精准执行任务”的队列系统。
我开始研究:什么是延时队列?
那天晚上,我失眠了。躺在床上翻来覆去,脑子里全是订单、库存、超时时间。
我爬起来,打开电脑,开始搜索。
我发现,这个需求在业界有个专门的名字:延时队列(Delayed Queue)。
简单来说:
- 现在:把一个任务放入队列,指定”30分钟后执行”
- 30分钟后:系统自动把任务取出来执行
这听起来不就是 setTimeout 吗?
但在分布式系统中,事情没那么简单。我们需要考虑:
- 任务持久化:服务器重启,任务不能丢
- 分布式执行:多台服务器都能处理任务
- 任务去重:同一个任务不能执行两次
- 任务取消:用户在29分钟时支付了,超时取消任务要能取消
- 高并发:一秒钟创建1万个延时任务
我开始研究业界方案。
发现的第一个真实场景:电商订单超时
我在 GitHub 上搜到了一个开源的延时队列项目,看它的 README,第一个例子就是:
订单超时自动取消
class OrderService:
def create_order(self, order_data):
"""创建订单"""
order = Order(
user_id=order_data['user_id'],
amount=order_data['amount'],
status='pending_payment'
)
order.save()
# 30分钟后检查订单状态,如果未支付则取消
delay_queue.add_task(
task_type='order_cancel',
task_data={'order_id': order.id},
delay_seconds=30 * 60 # 30分钟
)
return order
def cancel_order(self, order_id):
"""取消订单"""
order = Order.get(order_id)
if order.status == 'pending_payment':
order.status = 'cancelled'
order.save()
# 释放优惠券和库存
self._release_coupons(order)
self._release_inventory(order)
def on_payment_success(self, order_id):
"""支付成功回调"""
# 用户支付了,取消超时任务
delay_queue.cancel_task(
task_type='order_cancel',
task_data={'order_id': order_id}
)这段代码让我眼前一亮!
关键点:
- 创建订单时,同时创建一个”30分钟后执行的取消任务”
- 如果用户支付了,把取消任务删掉
- 如果30分钟没支付,任务自动执行,订单被取消
这不就是我们需要的吗?!
我激动地坐直了身体,继续往下看。
更多场景:原来延时队列无处不在
随着深入研究,我发现延时队列的应用场景远比我想象的多:
场景1:金融风控 - 延时验证
用户登录异常时,不要立即锁定账户,而是10分钟后再发送验证短信:
class RiskControlService:
def login_abnormal(self, user_id, risk_score):
"""用户登录异常"""
# 高风险用户5分钟后验证,低风险10分钟
delay = 5 * 60 if risk_score > 80 else 10 * 60
delay_queue.add_task(
task_type='send_security_sms',
task_data={'user_id': user_id, 'risk_score': risk_score},
delay_seconds=delay,
task_id=f'sms_{user_id}_{int(time.time())}'
)
def on_user_verified(self, user_id):
"""用户完成了验证"""
# 取消待发送的短信任务
delay_queue.cancel_task(
task_type='send_security_sms',
task_data={'user_id': user_id}
)场景2:社交媒体 - 定时发布
用户写好文章,选择”明天上午10点发布”:
class SocialMediaService:
def schedule_post(self, post_data, publish_at):
"""定时发布文章"""
delay_seconds = (publish_at - datetime.now()).total_seconds()
delay_queue.add_task(
task_type='publish_post',
task_data={
'post_id': post_data['id'],
'content': post_data['content']
},
delay_seconds=int(delay_seconds)
)场景3:物流 - 自动确认收货
用户签收后7天,如果没申请退款,自动确认收货并打款给商家:
class LogisticsService:
def delivery_signed(self, order_id, sign_time):
"""物流签收"""
# 7天后自动确认收货
delay_queue.add_task(
task_type='auto_confirm_receipt',
task_data={'order_id': order_id},
delay_seconds=7 * 24 * 60 * 60 # 7天
)
def on_return_requested(self, order_id):
"""用户申请退货"""
# 取消自动确认收货任务
delay_queue.cancel_task(
task_type='auto_confirm_receipt',
task_data={'order_id': order_id}
)我一边看,一边在心里默默记笔记。
原来我们遇到的不是特殊问题,而是通用问题。
开始做需求分析
那天晚上,我整理了一份详细的需求分析文档。
延时时间分布
我统计了我们系统的实际数据:
| 延时时间 | 应用场景 | 占比 | 每日任务量 |
|---|---|---|---|
| 5-30分钟 | 订单支付超时、风控检查 | 45% | 45万 |
| 1-24小时 | 优惠券过期、任务调度 | 35% | 35万 |
| 1-7天 | 自动确认收货、退款处理 | 15% | 15万 |
| >7天 | 定时提醒、周期性任务 | 5% | 5万 |
结论:45%的任务集中在30分钟内,这是优化的重点。
QPS 分析
我拉了一周的监控数据:
| 时间段 | 平均 QPS | 峰值 QPS | 特点 |
|---|---|---|---|
| 00:00-06:00 | 50 | 100 | 低谷期 |
| 06:00-09:00 | 200 | 500 | 早晨活跃 |
| 09:00-18:00 | 500 | 2000 | 工作高峰 |
| 18:00-22:00 | 800 | 3000 | 晚上高峰 |
| 22:00-24:00 | 300 | 800 | 逐步下降 |
结论:峰值3000 QPS,系统需要支持10倍峰值,即30000 QPS。
容量规划
基于以上分析,我整理了技术指标:
- 基础容量:支持 1000 QPS 的延时任务创建
- 峰值容量:支持 5000 QPS 的突发流量
- 存储容量:单天支持 1000 万任务存储
- 执行精度:延时误差 ≤ 1 分钟(不是5分钟!)
- 可靠性:任务丢失率 ≤ 0.001%
写下这些数字的时候,我的手在微微发抖。
这不再是简单的脚本了,而是一个完整的分布式系统。
技术挑战
我闭上眼睛,思考着可能遇到的挑战。
挑战1:高并发
促销活动期间,一秒钟创建10000个延时任务,怎么保证系统不崩?
我的想法是:用消息队列削峰。
# 使用消息队列削峰填谷
class DelayQueueProducer:
def __init__(self):
self.kafka_producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
batch_size=16384, # 16KB批量发送
linger_ms=10 # 等待10ms收集更多消息
)
def add_task(self, task_data, delay_seconds):
"""添加延时任务"""
# 使用 topic 分片分散压力
topic = 'delay_queue_topic'
partition = hash(task_data['user_id']) % 10
message = {
'task_data': task_data,
'execute_at': int(time.time() + delay_seconds),
'created_at': int(time.time())
}
self.kafka_producer.send(
topic,
key=str(task_data['user_id']).encode(),
value=message,
partition=partition
)挑战2:执行精度
30分钟的订单超时,实际执行可能是25分钟或35分钟,用户会投诉。
我的想法是:时间补偿 + 优先级队列。
class DelayCalculator:
@staticmethod
def calculate_delay(execute_at, current_time=None):
"""计算延时时间,考虑系统时间偏差"""
if current_time is None:
current_time = time.time()
# 获取系统时间偏差
time_offset = SystemTime.get_offset()
adjusted_current_time = current_time + time_offset
delay_seconds = max(0, execute_at - adjusted_current_time)
# 对于短延时任务,增加额外缓冲
if delay_seconds < 3600: # 1小时内
delay_seconds += 30 # 增加30秒缓冲
return int(delay_seconds)挑战3:可靠性
任务创建后,服务器崩溃怎么办?
我的想法是:持久化 + 事务。
class ReliableDelayQueue:
def add_task(self, task_data, delay_seconds, max_retries=3):
"""添加可靠的延时任务"""
task_id = self._generate_task_id(task_data)
# 事务性保存任务
with transaction.atomic():
# 1. 先保存任务到数据库
task = DelayTask.objects.create(
task_id=task_id,
task_data=task_data,
execute_at=datetime.now() + timedelta(seconds=delay_seconds),
status='pending',
retry_count=0,
max_retries=max_retries
)
# 2. 添加到延时队列
self._add_to_delay_queue(task)
return task_id
def execute_task(self, task_id):
"""执行任务"""
task = DelayTask.objects.get(task_id=task_id)
try:
# 执行业务逻辑
self._execute_business_logic(task.task_data)
# 标记完成
task.status = 'completed'
task.save()
except Exception as e:
# 处理失败
if task.retry_count < task.max_retries:
# 重试
task.retry_count += 1
task.status = 'retrying'
task.next_retry_at = datetime.now() + timedelta(minutes=5)
task.save()
# 重新加入队列
self._add_to_delay_queue(task)
else:
# 超过最大重试次数,标记失败
task.status = 'failed'
task.error_message = str(e)
task.save()凌晨3点的顿悟
写完这些代码,已经是凌晨3点。
我站起来,走到窗前。城市的灯火阑珊,偶尔有车驶过。
我忽然明白了一个道理:
所有的”定时任务”,本质上都是”延时执行的需求”。
- 订单30分钟后超时 → 延时30分钟执行取消
- 优惠券24小时后过期 → 延时24小时执行失效
- 生日当天发送祝福 → 延时N天执行发送
而实现”延时执行”的关键,在于:如何高效地在海量任务中,找到”时间到了”的那些任务?
这就像在数百万个闹钟中,找到每一分钟需要响铃的那几个。
回到座位上,我打开一个新的文档,标题写着:
“延时队列技术方案选型”
我知道,这又将是一个不眠之夜。
但这次,我有信心。
我的思考
思考 1
如果让你从零设计一个延时队列,你会选择什么作为底层数据结构?
问题分析
延时队列的核心需求可以抽象为三点:
- 存储能力:能够持久化大量任务(百万级甚至亿级)
- 查询效率:快速找到”时间到了”的任务(时间维度查询)
- 删除能力:支持取消任务(如用户支付后取消超时任务)
这本质上是在时间维度上做高效的检索和调度。
技术方案对比
| 方案 | 数据结构 | 时间复杂度 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|---|
| 数据库 + 轮询 | B+树索引 | O(log n) 查询 O(n) 全表扫描 | • 简单可靠 • 持久化保证 • 易于维护 | • 轮询空扫浪费资源 • 精度受限于轮询间隔 • 高并发下数据库压力大 | • 小规模系统 • 低并发场景 • MVP 验证阶段 |
| Redis ZSet | 跳表(Skip List) | O(log n) 插入 O(log n) 查询最小值 | • 高性能(内存操作) • ZRANGEBYSCORE 直接获取到期任务 • 支持精确删除 | • 内存成本高 • 需要考虑持久化(RDB/AOF) • 数据量受内存限制 | • 中等规模(百万级任务) • 对精度要求高 • 高并发写入 |
| 时间轮算法 | 环形数组 + 链表 | O(1) 插入 O(1) 获取到期任务 | • 插入和获取都是 O(1) • 精度可控(通过时间轮粒度) • 内存占用可预测 | • 实现复杂 • 不支持任意延时时间 • 需要配合持久化方案 | • 大规模、高并发 • 固定延时间隔 • 定时任务调度 |
| 消息队列(延时队列) | 分区队列 + 优先级 | O(1) 生产 O(log n) 消费 | • 天然分布式 • 高吞吐量 • 削峰填谷 | • 延时功能有限(如 RocketMQ 18天) • 运维复杂度高 • 成本较高 | • 大规模系统 • 需要分布式 • 已使用 MQ 的场景 |
推荐决策树
是否需要支持分布式?
├─ 否 → 单机方案
│ ├─ 任务量 < 10万,精度要求不高 → 数据库 + 轮询
│ └─ 任务量 > 10万,或要求秒级精度 → Redis ZSet
│
└─ 是 → 分布式方案
├─ 已在使用 RocketMQ/RabbitMQ → 使用内置延时队列
├─ 需要 > 18天延时 → 自研(时间轮 + 持久化)
└─ 高并发 + 精度要求 → 时间轮 + 消息队列最佳实践建议
- 小团队起步:先用数据库方案快速验证业务价值
- 中等规模:迁移到 Redis ZSet,性价比最高
- 大规模系统:考虑分层设计
- 1小时内短延时:Redis ZSet(高精度)
- 1小时以上长延时:数据库轮询(低成本)
代码示例 - Redis ZSet 方案
import redis
import time
import json
class RedisDelayQueue:
def __init__(self, queue_name, redis_host='localhost'):
self.redis = redis.Redis(host=redis_host, decode_responses=True)
self.queue_key = f"delay_queue:{queue_name}"
def add_task(self, task_id, task_data, delay_seconds):
"""添加延时任务"""
execute_at = int(time.time()) + delay_seconds
# score 为执行时间戳,member 为 task_id
self.redis.zadd(
self.queue_key,
{json.dumps({'task_id': task_id, 'data': task_data}): execute_at}
)
return task_id
def get_ready_tasks(self, batch_size=100):
"""获取到期的任务(批量)"""
now = int(time.time())
# 查询 score <= now 的任务
ready_tasks = self.redis.zrangebyscore(
self.queue_key,
0,
now,
start=0,
num=batch_size
)
return [json.loads(task) for task in ready_tasks]
def remove_task(self, task_id):
"""删除任务(如用户支付后取消超时任务)"""
# 需要遍历找到对应的 task(实际应用中可以建立反向索引)
tasks = self.redis.zrange(self.queue_key, 0, -1)
for task in tasks:
task_data = json.loads(task)
if task_data['task_id'] == task_id:
self.redis.zrem(self.queue_key, task)
return True
return False
def consume_tasks(self, handler, batch_size=100):
"""消费到期任务"""
while True:
ready_tasks = self.get_ready_tasks(batch_size)
if not ready_tasks:
time.sleep(1) # 没有到期任务,休眠1秒
continue
for task in ready_tasks:
try:
# 执行任务
handler(task['data'])
# 完成后删除
self.redis.zrem(self.queue_key, json.dumps(task))
except Exception as e:
# 异常处理:记录日志,根据业务决定是否重试
print(f"任务执行失败: {task['task_id']}, 错误: {e}")关键洞察
- 没有银弹:不同阶段适合不同方案
- Redis ZSet 是最通用的中间方案:覆盖 90% 的场景
- 分层设计是大规模系统的终极方案:短延时用内存,长延时用磁盘
思考 2
我们的系统中有45%的任务延时在30分钟以内。这个数据对技术选型有什么影响?
问题分析
“45% 的任务在 30 分钟以内”这个数据点透露了几个关键信息:
- 短延时任务占主导:近一半的任务对时间精度敏感
- 轮询方案的缺陷被放大:1 分钟轮询间隔对 30 分钟任务来说,误差达 3.3%
- 优化方向明确:优化短延时任务的收益最大
精度影响分析
假设采用”每分钟轮询数据库”的方案:
| 延时时长 | 平均等待时间 | 最大误差 | 误差占比 | 用户感知 |
|---|---|---|---|---|
| 5 分钟 | 30 秒 | 60 秒 | 20% | ⚠️ 明显 |
| 15 分钟 | 30 秒 | 60 秒 | 6.7% | ⚠️ 可感知 |
| 30 分钟 | 30 秒 | 60 秒 | 3.3% | ⚠️ 勉强可接受 |
| 1 小时 | 30 秒 | 60 秒 | 1.7% | ✅ 不明显 |
| 24 小时 | 30 秒 | 60 秒 | 0.07% | ✅ 完全可忽略 |
结论:对于 30 分钟以内的任务,1 分钟的轮询间隔带来的误差是不可接受的。
分层设计策略
基于延时分布,我们可以采用分层架构来平衡精度和成本:
┌─────────────────────────────────────────────────────────────┐
│ 延时任务分层架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ │
│ │ 热数据层(内存) │ ← 30分钟内的任务(45%) │
│ │ • Redis ZSet │ • 高精度(秒级) │
│ │ • 时间轮 │ • 快速响应 │
│ │ • 精度:1秒 │ • 内存成本可控(短延时任务很快消费)│
│ └──────────────────┘ │
│ ↓ │
│ ┌──────────────────┐ │
│ │ 温数据层(数据库)│ ← 30分钟~7天的任务(35%) │
│ │ • MySQL索引 │ • 中等精度(分钟级) │
│ │ • 每分钟轮询 │ • 成本适中 │
│ │ • 精度:1分钟 │ │
│ └──────────────────┘ │
│ ↓ │
│ ┌──────────────────┐ │
│ │ 冷数据层(归档) │ ← >7天的任务(20%) │
│ │ • 定时归档 │ • 低精度(小时级) │
│ │ • 每小时轮询 │ • 最小成本 │
│ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘成本优化
使用分层设计后的成本对比(假设每天 100 万任务):
| 方案 | 全量使用 Redis ZSet | 分层设计 | 节省 |
|---|---|---|---|
| 内存占用(峰值) | ~500 万任务 × 1KB = 5GB | ~50 万任务 × 1KB = 50MB | 99% |
| 数据库查询次数/天 | 0 | 1440 次轮询 | - |
| 精度 | 全部秒级 | 短延时秒级,长延时分钟级 | 分层保证 |
技术选型建议
基于”45% 任务在 30 分钟内”这个数据:
优先级:短延时任务的精度优化是 P0 优先级
技术栈选择:
- 不要选择纯轮询方案(精度不足)
- 不要全部用内存方案(成本太高)
- 选择分层方案(精度和成本的平衡)
实现路径:
- 第一阶段:全部用 Redis ZSet(快速上线)
- 第二阶段:引入分层设计(降低成本)
- 第三阶段:自动化任务迁移(热→温→冷)
代码示例 - 分层延时队列
class TieredDelayQueue:
def __init__(self):
# 热数据层:Redis ZSet(30分钟内)
self.hot_queue = RedisDelayQueue('hot_tasks')
# 温数据层:数据库轮询(30分钟~7天)
self.warm_queue = DatabaseDelayQueue('warm_tasks', poll_interval=60)
# 冷数据层:数据库低频轮询(>7天)
self.cold_queue = DatabaseDelayQueue('cold_tasks', poll_interval=3600)
SHORT_DELAY_THRESHOLD = 30 * 60 # 30分钟
MEDIUM_DELAY_THRESHOLD = 7 * 24 * 60 * 60 # 7天
def add_task(self, task_id, task_data, delay_seconds):
"""根据延时时间自动路由到对应的层级"""
if delay_seconds <= self.SHORT_DELAY_THRESHOLD:
# 热数据:Redis ZSet,高精度
return self.hot_queue.add_task(task_id, task_data, delay_seconds)
elif delay_seconds <= self.MEDIUM_DELAY_THRESHOLD:
# 温数据:数据库,1分钟精度
return self.warm_queue.add_task(task_id, task_data, delay_seconds)
else:
# 冷数据:数据库,1小时精度
return self.cold_queue.add_task(task_id, task_data, delay_seconds)
def upgrade_tasks(self):
"""将快要到期的温/冷任务升级到热层"""
# 例如:将剩余时间 < 1小时的任务从温层移到热层
# 这样可以保证所有任务在到期前都在高精度层
pass关键洞察
- 数据驱动设计:理解业务数据的分布特征(如延时分布)是技术选型的前提
- 分层是通用优化思路:不仅延时队列,缓存系统、存储系统都用分层来平衡成本和性能
- 精度不是越细越好:根据业务需求选择合适的精度,避免过度工程
思考 3
在电商场景中,如果用户在第29分钟支付了订单,需要立即取消”超时取消任务”。这个需求对延时队列设计有什么要求?
问题分析
订单支付场景是一个典型的”主动触发取消被动任务”的模式:
用户下单 → 创建"30分钟后取消订单"任务
↓
[29分钟后]
↓
用户支付 → 需要立即取消"取消订单"任务
↓
如果取消失败 → 1分钟后订单被错误取消 → 用户投诉!这揭示了延时队列设计的三个核心挑战:
- 任务可寻址性:如何快速找到要取消的任务?
- 删除原子性:如何确保任务不会被执行两次?
- 并发安全:支付回调执行的同时,任务刚好到期怎么办?
技术要求详解
要求 1:支持任务取消(API 设计)
延时队列必须提供主动删除接口,不能依赖”执行时检查状态”:
# ❌ 错误做法:依赖执行时检查
class BadDelayQueue:
def execute_ready_tasks(self):
tasks = self.get_ready_tasks()
for task in tasks:
order = Order.get(task['order_id'])
if order.status == 'paid': # 执行时才检查
continue # 跳过已支付订单
# 但是库存已经被锁定了29分59秒!
self.cancel_order(task['order_id'])
# ✅ 正确做法:支付时主动删除
class GoodDelayQueue:
def cancel_task(self, task_id):
"""主动删除任务"""
# 使用 task_id 快速定位并删除
return self.storage.delete(task_id)
# 支付回调中调用
def on_payment_success(order_id):
# 1. 更新订单状态
order = Order.get(order_id)
order.status = 'paid'
order.save()
# 2. 主动取消超时任务(关键!)
delay_queue.cancel_task(
task_id=f'order_timeout_{order_id}'
)要求 2:高效的删除操作
不同存储方案下的删除效率对比:
| 存储方案 | 删除方式 | 时间复杂度 | 适用场景 |
|---|---|---|---|
| 数据库(B+树) | DELETE WHERE task_id = ? | O(log n) | ✅ 通用方案 |
| Redis ZSet | ZREM key member | O(log n) | ✅ 高性能方案 |
| Redis List | LREM | O(n) | ❌ 不推荐 |
| 时间轮 | 数组索引 + 链表删除 | O(1) | ✅ 最优方案 |
最佳实践:为每个任务分配全局唯一的 task_id,建立 task_id → 任务 的索引。
import redis
import hashlib
class IndexedDelayQueue:
def __init__(self):
self.redis = redis.Redis(decode_responses=True)
# 任务存储:ZSet,score 为执行时间
self.queue_key = "delay_queue:tasks"
# 反向索引:Hash,task_id → 完整任务数据
self.index_key = "delay_queue:index"
def add_task(self, order_id, delay_seconds):
"""添加延时任务"""
task_id = f"order_timeout_{order_id}"
execute_at = int(time.time()) + delay_seconds
task_data = json.dumps({
'task_id': task_id,
'order_id': order_id,
'execute_at': execute_at
})
pipe = self.redis.pipeline()
# 1. 添加到队列
pipe.zadd(self.queue_key, {task_data: execute_at})
# 2. 建立反向索引(关键!)
pipe.hset(self.index_key, task_id, task_data)
pipe.execute()
return task_id
def cancel_task(self, task_id):
"""取消任务(O(1) + O(log n))"""
pipe = self.redis.pipeline()
# 1. 从索引中获取任务数据(O(1))
task_data = self.redis.hget(self.index_key, task_id)
if not task_data:
return False # 任务不存在或已被执行
# 2. 从队列中删除(O(log n))
pipe.zrem(self.queue_key, task_data)
# 3. 删除索引
pipe.hdel(self.index_key, task_id)
pipe.execute()
return True要求 3:并发安全处理
考虑竞态条件:支付回调执行的同时,任务刚好到期
# 并发场景下的处理
def on_payment_success(order_id):
"""支付成功回调"""
task_id = f"order_timeout_{order_id}"
# 使用 Redis 事务(或分布式锁)
with self.redis.pipeline() as pipe:
while True:
try:
# WATCH 监控订单状态
pipe.watch(f"order:{order_id}")
order = Order.get(order_id)
if order.status == 'paid':
pipe.unwatch()
return # 已处理,直接返回
# 开启事务
pipe.multi()
# 更新订单状态
order.update(status='paid')
# 取消超时任务
pipe.zrem(self.queue_key, task_id)
# 执行事务
pipe.execute()
break # 成功
except redis.WatchError:
# 订单被其他进程修改,重试
continue生产级最佳实践
幂等性设计
def on_payment_success(order_id): # 1. 先更新订单状态(带幂等检查) order = Order.get(order_id) if order.status == 'paid': return # 已处理,幂等返回 order.status = 'paid' order.save() # 2. 再取消任务(允许取消不存在的情况) delay_queue.cancel_task(f'order_timeout_{order_id}')任务设计模式
# 任务内部做二次检查(Defense in Depth) def execute_order_cancel_task(order_id): order = Order.get(order_id) # 任务执行时再次确认订单状态 if order.status == 'paid': return # 用户已支付,跳过取消 # 确认取消 order.status = 'cancelled' order.save()监控和告警
def cancel_task(task_id): result = delay_queue.cancel_task(task_id) if not result: # 任务可能已被执行,需要检查并补偿 alert_service.send( f"任务取消失败: {task_id},可能已被执行,请检查订单状态" ) return result
架构层面的建议
支付流程中的任务取消:
┌─────────────┐
│ 用户支付 │
└──────┬──────┘
│
▼
┌─────────────────────────────────┐
│ 1. 更新订单状态为"已支付" │ ← 主流程
│ (事务保护,幂等设计) │
└──────────┬──────────────────────┘
│
├──────────────────┐
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ 取消超时任务 │ │ 发送支付成功 │
│ (异步尝试) │ │ 通知 │
└─────────────┘ └─────────────┘
│
▼
┌──────────────────────┐
│ 失败?记录监控日志 │
│ + 订单状态兜底检查 │
└──────────────────────┘关键洞察
- 任务取消是延时队列的核心功能:不是可选项,而是必须项
- 删除效率很重要:O(n) 的删除在百万级任务下不可接受
- 并发安全需要端到端考虑:从 API 设计到任务执行,每一步都要考虑竞态条件
- Defense in Depth:多层级防护(幂等 + 事务 + 任务内检查)确保系统健壮性