应用场景

2024年3月的一个深夜

凌晨2点,我盯着监控屏幕,心跳加速。

“订单编号 #882317 创建时间 23:30,已超时30分钟,为什么还没有取消?”

屏幕上的数据让我脊背发凉:我们的电商系统有 1,247 个订单应该被自动取消,但它们还挂着”待支付”状态。

这些订单占用的库存,正在被其他用户疯狂点击”已售罄”。

那一刻,我意识到:我们的系统中缺少一个关键组件——延时队列


回到问题之前

事情要从三个月前说起。

2024年1月,我们的电商平台刚上线。那时候功能很简单:用户下单、支付、发货。第一天交易额800元,我们三个创始人激动地聚在出租屋吃了顿火锅。

到了3月,日订单量突破10万单。

新的业务需求接踵而至:

  • “用户下单后30分钟未支付,自动取消订单,释放库存”
  • “用户领取优惠券,24小时未使用则自动失效”
  • “用户申请退款,商家7天未处理则自动通过”

产品经理把这些需求写在白板上,转头问我:“这些功能多久能上线?”

我当时心想:不就是定时任务吗?用 cron 每分钟跑一次脚本,查数据库,把超时的订单状态改成”已取消”。

我说:“一周吧。”

那是我对延时队列的第一次理解——太天真了


第一次尝试:每分钟跑一次脚本

我写了一个 Python 脚本,用 Linux 的 cron 定时任务每分钟执行:

class OrderTimeoutChecker:
    def check_pending_orders(self):
        """检查超时未支付的订单"""
        timeout_threshold = datetime.now() - timedelta(minutes=30)
        
        # 查询所有超时订单
        pending_orders = Order.objects.filter(
            status='pending_payment',
            created_at__lt=timeout_threshold
        )
        
        for order in pending_orders:
            try:
                # 取消订单
                order.status = 'cancelled'
                order.cancel_reason = 'payment_timeout'
                order.save()
                
                # 释放库存
                InventoryService.release(order.product_id, order.quantity)
                
                # 释放优惠券
                if order.coupon_id:
                    CouponService.restore(order.coupon_id, order.user_id)
                    
            except Exception as e:
                logger.error(f"取消订单失败: {order.id}, 错误: {e}")

在测试环境跑了一下,没问题。上线。

第一周,风平浪静。

第二周,问题开始浮现。


遇到的第一个坑:数据库压力

某天上午10点,运维群里突然炸锅:

“数据库CPU 100%!” “所有查询都卡住了!” “用户无法下单!”

我慌了,登录数据库一看,那个定时任务的查询正在扫描 80万条订单记录

问题是:我写的SQL只查询了”待支付”状态,但没有给 created_at 字段加索引。

-- 这个查询在80万条记录里全表扫描
SELECT * FROM orders 
WHERE status = 'pending_payment' 
  AND created_at < '2024-03-15 09:30:00';

更糟糕的是,这个查询每分钟都在执行。

快速修复:给 (status, created_at) 加了联合索引。

数据库压力下来了,但我知道这只是治标不治本。随着订单量增长,单表数据迟早会破千万。


遇到的第二个坑:执行精度问题

产品经理跑来找我:“用户投诉了,他们说下单后29分钟订单就被取消了,不是说30分钟吗?”

我查了一下日志,发现:

  • 用户 14:00:30 下单
  • 定时任务 14:30:05 执行(应该14:30:30执行,但cron是每分钟的第0秒执行)
  • 订单超时判定:14:30:05 - 30分钟 = 14:00:05
  • 因为 14:00:30 > 14:00:05,所以被判定为超时

误差接近1分钟!

用户觉得被骗了——说好的30分钟,怎么29分钟就取消了?

我调整了超时时间到35分钟,产品经理不满意:“我们的竞品都是精确到分钟的。”

我想:要精确到分钟,需要把cron的执行频率改成每秒一次。

但想象一下:每秒扫描一次数据库?

这不行。


真正的危机:库存竞争

真正让我意识到问题严重性的是一次促销活动。

2024年3月15日,我们做了个”春日大促”活动,一款爆品限量1000件。

活动开始10分钟,1000件库存被抢光。

但问题来了:很多人下单后没支付,这些订单的库存被锁定30分钟。

30分钟内,又有2000个用户下单,但都显示”已售罄”。

30分钟后,第一批超时订单自动取消,释放出600件库存。

但此时,这600件库存的释放信息,没有及时通知到还在等待的用户。

用户眼睁睁看着库存从”已售罄”变成”有货”,但错过了抢购时机。

投诉邮件如雪片般飞来。

那一刻,我坐在显示器前,脑子里的齿轮开始转动:

我们需要的不是一个定时检查的脚本,而是一个真正能”在未来某个时刻精准执行任务”的队列系统。


我开始研究:什么是延时队列?

那天晚上,我失眠了。躺在床上翻来覆去,脑子里全是订单、库存、超时时间。

我爬起来,打开电脑,开始搜索。

我发现,这个需求在业界有个专门的名字:延时队列(Delayed Queue)

简单来说:

  • 现在:把一个任务放入队列,指定”30分钟后执行”
  • 30分钟后:系统自动把任务取出来执行

这听起来不就是 setTimeout 吗?

但在分布式系统中,事情没那么简单。我们需要考虑:

  1. 任务持久化:服务器重启,任务不能丢
  2. 分布式执行:多台服务器都能处理任务
  3. 任务去重:同一个任务不能执行两次
  4. 任务取消:用户在29分钟时支付了,超时取消任务要能取消
  5. 高并发:一秒钟创建1万个延时任务

我开始研究业界方案。


发现的第一个真实场景:电商订单超时

我在 GitHub 上搜到了一个开源的延时队列项目,看它的 README,第一个例子就是:

订单超时自动取消

class OrderService:
    def create_order(self, order_data):
        """创建订单"""
        order = Order(
            user_id=order_data['user_id'],
            amount=order_data['amount'],
            status='pending_payment'
        )
        order.save()
        
        # 30分钟后检查订单状态,如果未支付则取消
        delay_queue.add_task(
            task_type='order_cancel',
            task_data={'order_id': order.id},
            delay_seconds=30 * 60  # 30分钟
        )
        
        return order

    def cancel_order(self, order_id):
        """取消订单"""
        order = Order.get(order_id)
        if order.status == 'pending_payment':
            order.status = 'cancelled'
            order.save()
            
            # 释放优惠券和库存
            self._release_coupons(order)
            self._release_inventory(order)

    def on_payment_success(self, order_id):
        """支付成功回调"""
        # 用户支付了,取消超时任务
        delay_queue.cancel_task(
            task_type='order_cancel',
            task_data={'order_id': order_id}
        )

这段代码让我眼前一亮!

关键点

  • 创建订单时,同时创建一个”30分钟后执行的取消任务”
  • 如果用户支付了,把取消任务删掉
  • 如果30分钟没支付,任务自动执行,订单被取消

这不就是我们需要的吗?!

我激动地坐直了身体,继续往下看。


更多场景:原来延时队列无处不在

随着深入研究,我发现延时队列的应用场景远比我想象的多:

场景1:金融风控 - 延时验证

用户登录异常时,不要立即锁定账户,而是10分钟后再发送验证短信:

class RiskControlService:
    def login_abnormal(self, user_id, risk_score):
        """用户登录异常"""
        # 高风险用户5分钟后验证,低风险10分钟
        delay = 5 * 60 if risk_score > 80 else 10 * 60
        
        delay_queue.add_task(
            task_type='send_security_sms',
            task_data={'user_id': user_id, 'risk_score': risk_score},
            delay_seconds=delay,
            task_id=f'sms_{user_id}_{int(time.time())}'
        )

    def on_user_verified(self, user_id):
        """用户完成了验证"""
        # 取消待发送的短信任务
        delay_queue.cancel_task(
            task_type='send_security_sms',
            task_data={'user_id': user_id}
        )

场景2:社交媒体 - 定时发布

用户写好文章,选择”明天上午10点发布”:

class SocialMediaService:
    def schedule_post(self, post_data, publish_at):
        """定时发布文章"""
        delay_seconds = (publish_at - datetime.now()).total_seconds()
        
        delay_queue.add_task(
            task_type='publish_post',
            task_data={
                'post_id': post_data['id'],
                'content': post_data['content']
            },
            delay_seconds=int(delay_seconds)
        )

场景3:物流 - 自动确认收货

用户签收后7天,如果没申请退款,自动确认收货并打款给商家:

class LogisticsService:
    def delivery_signed(self, order_id, sign_time):
        """物流签收"""
        # 7天后自动确认收货
        delay_queue.add_task(
            task_type='auto_confirm_receipt',
            task_data={'order_id': order_id},
            delay_seconds=7 * 24 * 60 * 60  # 7天
        )

    def on_return_requested(self, order_id):
        """用户申请退货"""
        # 取消自动确认收货任务
        delay_queue.cancel_task(
            task_type='auto_confirm_receipt',
            task_data={'order_id': order_id}
        )

我一边看,一边在心里默默记笔记。

原来我们遇到的不是特殊问题,而是通用问题


开始做需求分析

那天晚上,我整理了一份详细的需求分析文档。

延时时间分布

我统计了我们系统的实际数据:

延时时间应用场景占比每日任务量
5-30分钟订单支付超时、风控检查45%45万
1-24小时优惠券过期、任务调度35%35万
1-7天自动确认收货、退款处理15%15万
>7天定时提醒、周期性任务5%5万

结论:45%的任务集中在30分钟内,这是优化的重点。

QPS 分析

我拉了一周的监控数据:

时间段平均 QPS峰值 QPS特点
00:00-06:0050100低谷期
06:00-09:00200500早晨活跃
09:00-18:005002000工作高峰
18:00-22:008003000晚上高峰
22:00-24:00300800逐步下降

结论:峰值3000 QPS,系统需要支持10倍峰值,即30000 QPS。

容量规划

基于以上分析,我整理了技术指标:

  • 基础容量:支持 1000 QPS 的延时任务创建
  • 峰值容量:支持 5000 QPS 的突发流量
  • 存储容量:单天支持 1000 万任务存储
  • 执行精度:延时误差 ≤ 1 分钟(不是5分钟!)
  • 可靠性:任务丢失率 ≤ 0.001%

写下这些数字的时候,我的手在微微发抖。

这不再是简单的脚本了,而是一个完整的分布式系统。


技术挑战

我闭上眼睛,思考着可能遇到的挑战。

挑战1:高并发

促销活动期间,一秒钟创建10000个延时任务,怎么保证系统不崩?

我的想法是:用消息队列削峰

# 使用消息队列削峰填谷
class DelayQueueProducer:
    def __init__(self):
        self.kafka_producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            batch_size=16384,  # 16KB批量发送
            linger_ms=10  # 等待10ms收集更多消息
        )
    
    def add_task(self, task_data, delay_seconds):
        """添加延时任务"""
        # 使用 topic 分片分散压力
        topic = 'delay_queue_topic'
        partition = hash(task_data['user_id']) % 10
        
        message = {
            'task_data': task_data,
            'execute_at': int(time.time() + delay_seconds),
            'created_at': int(time.time())
        }
        
        self.kafka_producer.send(
            topic,
            key=str(task_data['user_id']).encode(),
            value=message,
            partition=partition
        )

挑战2:执行精度

30分钟的订单超时,实际执行可能是25分钟或35分钟,用户会投诉。

我的想法是:时间补偿 + 优先级队列

class DelayCalculator:
    @staticmethod
    def calculate_delay(execute_at, current_time=None):
        """计算延时时间,考虑系统时间偏差"""
        if current_time is None:
            current_time = time.time()
        
        # 获取系统时间偏差
        time_offset = SystemTime.get_offset()
        adjusted_current_time = current_time + time_offset
        
        delay_seconds = max(0, execute_at - adjusted_current_time)
        
        # 对于短延时任务,增加额外缓冲
        if delay_seconds < 3600:  # 1小时内
            delay_seconds += 30  # 增加30秒缓冲
            
        return int(delay_seconds)

挑战3:可靠性

任务创建后,服务器崩溃怎么办?

我的想法是:持久化 + 事务

class ReliableDelayQueue:
    def add_task(self, task_data, delay_seconds, max_retries=3):
        """添加可靠的延时任务"""
        task_id = self._generate_task_id(task_data)
        
        # 事务性保存任务
        with transaction.atomic():
            # 1. 先保存任务到数据库
            task = DelayTask.objects.create(
                task_id=task_id,
                task_data=task_data,
                execute_at=datetime.now() + timedelta(seconds=delay_seconds),
                status='pending',
                retry_count=0,
                max_retries=max_retries
            )
            
            # 2. 添加到延时队列
            self._add_to_delay_queue(task)
            
            return task_id
    
    def execute_task(self, task_id):
        """执行任务"""
        task = DelayTask.objects.get(task_id=task_id)
        
        try:
            # 执行业务逻辑
            self._execute_business_logic(task.task_data)
            
            # 标记完成
            task.status = 'completed'
            task.save()
            
        except Exception as e:
            # 处理失败
            if task.retry_count < task.max_retries:
                # 重试
                task.retry_count += 1
                task.status = 'retrying'
                task.next_retry_at = datetime.now() + timedelta(minutes=5)
                task.save()
                
                # 重新加入队列
                self._add_to_delay_queue(task)
            else:
                # 超过最大重试次数,标记失败
                task.status = 'failed'
                task.error_message = str(e)
                task.save()

凌晨3点的顿悟

写完这些代码,已经是凌晨3点。

我站起来,走到窗前。城市的灯火阑珊,偶尔有车驶过。

我忽然明白了一个道理:

所有的”定时任务”,本质上都是”延时执行的需求”。

  • 订单30分钟后超时 → 延时30分钟执行取消
  • 优惠券24小时后过期 → 延时24小时执行失效
  • 生日当天发送祝福 → 延时N天执行发送

而实现”延时执行”的关键,在于:如何高效地在海量任务中,找到”时间到了”的那些任务?

这就像在数百万个闹钟中,找到每一分钟需要响铃的那几个。

回到座位上,我打开一个新的文档,标题写着:

“延时队列技术方案选型”

我知道,这又将是一个不眠之夜。

但这次,我有信心。


我的思考

思考 1

如果让你从零设计一个延时队列,你会选择什么作为底层数据结构?

参考答案

问题分析

延时队列的核心需求可以抽象为三点:

  1. 存储能力:能够持久化大量任务(百万级甚至亿级)
  2. 查询效率:快速找到”时间到了”的任务(时间维度查询)
  3. 删除能力:支持取消任务(如用户支付后取消超时任务)

这本质上是在时间维度上做高效的检索和调度。

技术方案对比

方案数据结构时间复杂度优点缺点适用场景
数据库 + 轮询B+树索引O(log n) 查询
O(n) 全表扫描
• 简单可靠
• 持久化保证
• 易于维护
• 轮询空扫浪费资源
• 精度受限于轮询间隔
• 高并发下数据库压力大
• 小规模系统
• 低并发场景
• MVP 验证阶段
Redis ZSet跳表(Skip List)O(log n) 插入
O(log n) 查询最小值
• 高性能(内存操作)
• ZRANGEBYSCORE 直接获取到期任务
• 支持精确删除
• 内存成本高
• 需要考虑持久化(RDB/AOF)
• 数据量受内存限制
• 中等规模(百万级任务)
• 对精度要求高
• 高并发写入
时间轮算法环形数组 + 链表O(1) 插入
O(1) 获取到期任务
• 插入和获取都是 O(1)
• 精度可控(通过时间轮粒度)
• 内存占用可预测
• 实现复杂
• 不支持任意延时时间
• 需要配合持久化方案
• 大规模、高并发
• 固定延时间隔
• 定时任务调度
消息队列(延时队列)分区队列 + 优先级O(1) 生产
O(log n) 消费
• 天然分布式
• 高吞吐量
• 削峰填谷
• 延时功能有限(如 RocketMQ 18天)
• 运维复杂度高
• 成本较高
• 大规模系统
• 需要分布式
• 已使用 MQ 的场景

推荐决策树

是否需要支持分布式?
├─ 否 → 单机方案
│   ├─ 任务量 < 10万,精度要求不高 → 数据库 + 轮询
│   └─ 任务量 > 10万,或要求秒级精度 → Redis ZSet

└─ 是 → 分布式方案
    ├─ 已在使用 RocketMQ/RabbitMQ → 使用内置延时队列
    ├─ 需要 > 18天延时 → 自研(时间轮 + 持久化)
    └─ 高并发 + 精度要求 → 时间轮 + 消息队列

最佳实践建议

  1. 小团队起步:先用数据库方案快速验证业务价值
  2. 中等规模:迁移到 Redis ZSet,性价比最高
  3. 大规模系统:考虑分层设计
    • 1小时内短延时:Redis ZSet(高精度)
    • 1小时以上长延时:数据库轮询(低成本)

代码示例 - Redis ZSet 方案

import redis
import time
import json

class RedisDelayQueue:
    def __init__(self, queue_name, redis_host='localhost'):
        self.redis = redis.Redis(host=redis_host, decode_responses=True)
        self.queue_key = f"delay_queue:{queue_name}"
    
    def add_task(self, task_id, task_data, delay_seconds):
        """添加延时任务"""
        execute_at = int(time.time()) + delay_seconds
        # score 为执行时间戳,member 为 task_id
        self.redis.zadd(
            self.queue_key,
            {json.dumps({'task_id': task_id, 'data': task_data}): execute_at}
        )
        return task_id
    
    def get_ready_tasks(self, batch_size=100):
        """获取到期的任务(批量)"""
        now = int(time.time())
        # 查询 score <= now 的任务
        ready_tasks = self.redis.zrangebyscore(
            self.queue_key,
            0,
            now,
            start=0,
            num=batch_size
        )
        return [json.loads(task) for task in ready_tasks]
    
    def remove_task(self, task_id):
        """删除任务(如用户支付后取消超时任务)"""
        # 需要遍历找到对应的 task(实际应用中可以建立反向索引)
        tasks = self.redis.zrange(self.queue_key, 0, -1)
        for task in tasks:
            task_data = json.loads(task)
            if task_data['task_id'] == task_id:
                self.redis.zrem(self.queue_key, task)
                return True
        return False
    
    def consume_tasks(self, handler, batch_size=100):
        """消费到期任务"""
        while True:
            ready_tasks = self.get_ready_tasks(batch_size)
            if not ready_tasks:
                time.sleep(1)  # 没有到期任务,休眠1秒
                continue
            
            for task in ready_tasks:
                try:
                    # 执行任务
                    handler(task['data'])
                    # 完成后删除
                    self.redis.zrem(self.queue_key, json.dumps(task))
                except Exception as e:
                    # 异常处理:记录日志,根据业务决定是否重试
                    print(f"任务执行失败: {task['task_id']}, 错误: {e}")

关键洞察

  • 没有银弹:不同阶段适合不同方案
  • Redis ZSet 是最通用的中间方案:覆盖 90% 的场景
  • 分层设计是大规模系统的终极方案:短延时用内存,长延时用磁盘

思考 2

我们的系统中有45%的任务延时在30分钟以内。这个数据对技术选型有什么影响?

参考答案

问题分析

“45% 的任务在 30 分钟以内”这个数据点透露了几个关键信息:

  1. 短延时任务占主导:近一半的任务对时间精度敏感
  2. 轮询方案的缺陷被放大:1 分钟轮询间隔对 30 分钟任务来说,误差达 3.3%
  3. 优化方向明确:优化短延时任务的收益最大

精度影响分析

假设采用”每分钟轮询数据库”的方案:

延时时长平均等待时间最大误差误差占比用户感知
5 分钟30 秒60 秒20%⚠️ 明显
15 分钟30 秒60 秒6.7%⚠️ 可感知
30 分钟30 秒60 秒3.3%⚠️ 勉强可接受
1 小时30 秒60 秒1.7%✅ 不明显
24 小时30 秒60 秒0.07%✅ 完全可忽略

结论:对于 30 分钟以内的任务,1 分钟的轮询间隔带来的误差是不可接受的。

分层设计策略

基于延时分布,我们可以采用分层架构来平衡精度和成本:

┌─────────────────────────────────────────────────────────────┐
│                     延时任务分层架构                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌──────────────────┐                                      │
│  │  热数据层(内存)  │  ← 30分钟内的任务(45%)            │
│  │  • Redis ZSet    │     • 高精度(秒级)                  │
│  │  • 时间轮        │     • 快速响应                        │
│  │  • 精度:1秒     │     • 内存成本可控(短延时任务很快消费)│
│  └──────────────────┘                                      │
│           ↓                                                │
│  ┌──────────────────┐                                      │
│  │  温数据层(数据库)│  ← 30分钟~7天的任务(35%)          │
│  │  • MySQL索引     │     • 中等精度(分钟级)              │
│  │  • 每分钟轮询    │     • 成本适中                        │
│  │  • 精度:1分钟   │                                      │
│  └──────────────────┘                                      │
│           ↓                                                │
│  ┌──────────────────┐                                      │
│  │  冷数据层(归档)  │  ← >7天的任务(20%)                 │
│  │  • 定时归档      │     • 低精度(小时级)                │
│  │  • 每小时轮询    │     • 最小成本                        │
│  └──────────────────┘                                      │
│                                                             │
└─────────────────────────────────────────────────────────────┘

成本优化

使用分层设计后的成本对比(假设每天 100 万任务):

方案全量使用 Redis ZSet分层设计节省
内存占用(峰值)~500 万任务 × 1KB = 5GB~50 万任务 × 1KB = 50MB99%
数据库查询次数/天01440 次轮询-
精度全部秒级短延时秒级,长延时分钟级分层保证

技术选型建议

基于”45% 任务在 30 分钟内”这个数据:

  1. 优先级:短延时任务的精度优化是 P0 优先级

  2. 技术栈选择

    • 不要选择纯轮询方案(精度不足)
    • 不要全部用内存方案(成本太高)
    • 选择分层方案(精度和成本的平衡)
  3. 实现路径

    • 第一阶段:全部用 Redis ZSet(快速上线)
    • 第二阶段:引入分层设计(降低成本)
    • 第三阶段:自动化任务迁移(热→温→冷)

代码示例 - 分层延时队列

class TieredDelayQueue:
    def __init__(self):
        # 热数据层:Redis ZSet(30分钟内)
        self.hot_queue = RedisDelayQueue('hot_tasks')
        # 温数据层:数据库轮询(30分钟~7天)
        self.warm_queue = DatabaseDelayQueue('warm_tasks', poll_interval=60)
        # 冷数据层:数据库低频轮询(>7天)
        self.cold_queue = DatabaseDelayQueue('cold_tasks', poll_interval=3600)
    
    SHORT_DELAY_THRESHOLD = 30 * 60  # 30分钟
    MEDIUM_DELAY_THRESHOLD = 7 * 24 * 60 * 60  # 7天
    
    def add_task(self, task_id, task_data, delay_seconds):
        """根据延时时间自动路由到对应的层级"""
        if delay_seconds <= self.SHORT_DELAY_THRESHOLD:
            # 热数据:Redis ZSet,高精度
            return self.hot_queue.add_task(task_id, task_data, delay_seconds)
        elif delay_seconds <= self.MEDIUM_DELAY_THRESHOLD:
            # 温数据:数据库,1分钟精度
            return self.warm_queue.add_task(task_id, task_data, delay_seconds)
        else:
            # 冷数据:数据库,1小时精度
            return self.cold_queue.add_task(task_id, task_data, delay_seconds)
    
    def upgrade_tasks(self):
        """将快要到期的温/冷任务升级到热层"""
        # 例如:将剩余时间 < 1小时的任务从温层移到热层
        # 这样可以保证所有任务在到期前都在高精度层
        pass

关键洞察

  1. 数据驱动设计:理解业务数据的分布特征(如延时分布)是技术选型的前提
  2. 分层是通用优化思路:不仅延时队列,缓存系统、存储系统都用分层来平衡成本和性能
  3. 精度不是越细越好:根据业务需求选择合适的精度,避免过度工程

思考 3

在电商场景中,如果用户在第29分钟支付了订单,需要立即取消”超时取消任务”。这个需求对延时队列设计有什么要求?

参考答案

问题分析

订单支付场景是一个典型的”主动触发取消被动任务”的模式:

用户下单 → 创建"30分钟后取消订单"任务

    [29分钟后]

    用户支付 → 需要立即取消"取消订单"任务

    如果取消失败 → 1分钟后订单被错误取消 → 用户投诉!

这揭示了延时队列设计的三个核心挑战:

  1. 任务可寻址性:如何快速找到要取消的任务?
  2. 删除原子性:如何确保任务不会被执行两次?
  3. 并发安全:支付回调执行的同时,任务刚好到期怎么办?

技术要求详解

要求 1:支持任务取消(API 设计)

延时队列必须提供主动删除接口,不能依赖”执行时检查状态”:

# ❌ 错误做法:依赖执行时检查
class BadDelayQueue:
    def execute_ready_tasks(self):
        tasks = self.get_ready_tasks()
        for task in tasks:
            order = Order.get(task['order_id'])
            if order.status == 'paid':  # 执行时才检查
                continue  # 跳过已支付订单
            # 但是库存已经被锁定了29分59秒!
            self.cancel_order(task['order_id'])

# ✅ 正确做法:支付时主动删除
class GoodDelayQueue:
    def cancel_task(self, task_id):
        """主动删除任务"""
        # 使用 task_id 快速定位并删除
        return self.storage.delete(task_id)

# 支付回调中调用
def on_payment_success(order_id):
    # 1. 更新订单状态
    order = Order.get(order_id)
    order.status = 'paid'
    order.save()
    
    # 2. 主动取消超时任务(关键!)
    delay_queue.cancel_task(
        task_id=f'order_timeout_{order_id}'
    )

要求 2:高效的删除操作

不同存储方案下的删除效率对比:

存储方案删除方式时间复杂度适用场景
数据库(B+树)DELETE WHERE task_id = ?O(log n)✅ 通用方案
Redis ZSetZREM key memberO(log n)✅ 高性能方案
Redis ListLREMO(n)❌ 不推荐
时间轮数组索引 + 链表删除O(1)✅ 最优方案

最佳实践:为每个任务分配全局唯一的 task_id,建立 task_id → 任务 的索引。

import redis
import hashlib

class IndexedDelayQueue:
    def __init__(self):
        self.redis = redis.Redis(decode_responses=True)
        # 任务存储:ZSet,score 为执行时间
        self.queue_key = "delay_queue:tasks"
        # 反向索引:Hash,task_id → 完整任务数据
        self.index_key = "delay_queue:index"
    
    def add_task(self, order_id, delay_seconds):
        """添加延时任务"""
        task_id = f"order_timeout_{order_id}"
        execute_at = int(time.time()) + delay_seconds
        
        task_data = json.dumps({
            'task_id': task_id,
            'order_id': order_id,
            'execute_at': execute_at
        })
        
        pipe = self.redis.pipeline()
        # 1. 添加到队列
        pipe.zadd(self.queue_key, {task_data: execute_at})
        # 2. 建立反向索引(关键!)
        pipe.hset(self.index_key, task_id, task_data)
        pipe.execute()
        
        return task_id
    
    def cancel_task(self, task_id):
        """取消任务(O(1) + O(log n))"""
        pipe = self.redis.pipeline()
        
        # 1. 从索引中获取任务数据(O(1))
        task_data = self.redis.hget(self.index_key, task_id)
        if not task_data:
            return False  # 任务不存在或已被执行
        
        # 2. 从队列中删除(O(log n))
        pipe.zrem(self.queue_key, task_data)
        # 3. 删除索引
        pipe.hdel(self.index_key, task_id)
        pipe.execute()
        
        return True

要求 3:并发安全处理

考虑竞态条件:支付回调执行的同时,任务刚好到期

# 并发场景下的处理
def on_payment_success(order_id):
    """支付成功回调"""
    task_id = f"order_timeout_{order_id}"
    
    # 使用 Redis 事务(或分布式锁)
    with self.redis.pipeline() as pipe:
        while True:
            try:
                # WATCH 监控订单状态
                pipe.watch(f"order:{order_id}")
                order = Order.get(order_id)
                
                if order.status == 'paid':
                    pipe.unwatch()
                    return  # 已处理,直接返回
                
                # 开启事务
                pipe.multi()
                # 更新订单状态
                order.update(status='paid')
                # 取消超时任务
                pipe.zrem(self.queue_key, task_id)
                # 执行事务
                pipe.execute()
                break  # 成功
            except redis.WatchError:
                # 订单被其他进程修改,重试
                continue

生产级最佳实践

  1. 幂等性设计

    def on_payment_success(order_id):
        # 1. 先更新订单状态(带幂等检查)
        order = Order.get(order_id)
        if order.status == 'paid':
            return  # 已处理,幂等返回
        
        order.status = 'paid'
        order.save()
        
        # 2. 再取消任务(允许取消不存在的情况)
        delay_queue.cancel_task(f'order_timeout_{order_id}')
  2. 任务设计模式

    # 任务内部做二次检查(Defense in Depth)
    def execute_order_cancel_task(order_id):
        order = Order.get(order_id)
        # 任务执行时再次确认订单状态
        if order.status == 'paid':
            return  # 用户已支付,跳过取消
        
        # 确认取消
        order.status = 'cancelled'
        order.save()
  3. 监控和告警

    def cancel_task(task_id):
        result = delay_queue.cancel_task(task_id)
        if not result:
            # 任务可能已被执行,需要检查并补偿
            alert_service.send(
                f"任务取消失败: {task_id},可能已被执行,请检查订单状态"
            )
        return result

架构层面的建议

支付流程中的任务取消:

┌─────────────┐
│  用户支付    │
└──────┬──────┘


┌─────────────────────────────────┐
│  1. 更新订单状态为"已支付"       │ ← 主流程
│     (事务保护,幂等设计)          │
└──────────┬──────────────────────┘

           ├──────────────────┐
           │                  │
           ▼                  ▼
    ┌─────────────┐    ┌─────────────┐
    │  取消超时任务 │    │  发送支付成功 │
    │  (异步尝试)   │    │  通知        │
    └─────────────┘    └─────────────┘


    ┌──────────────────────┐
    │  失败?记录监控日志   │
    │  + 订单状态兜底检查   │
    └──────────────────────┘

关键洞察

  1. 任务取消是延时队列的核心功能:不是可选项,而是必须项
  2. 删除效率很重要:O(n) 的删除在百万级任务下不可接受
  3. 并发安全需要端到端考虑:从 API 设计到任务执行,每一步都要考虑竞态条件
  4. Defense in Depth:多层级防护(幂等 + 事务 + 任务内检查)确保系统健壮性