技术挑战

那次惨痛的事故回顾

在开始深入技术方案之前,让我先回顾一下我们系统遇到的所有问题。这些问题后来都成了我们设计延时队列时的核心挑战。


挑战 1:任务丢失

问题的发现

2024 年 4 月的一天,客服收到了 200 多条投诉:

“我下单后支付了,为什么第二天订单被取消了?!”

我慌了,赶紧查日志。发现:

  • 用户 14:00 下单
  • 用户 14:05 支付成功
  • 服务器 14:30 重启
  • 重启后,Redis 中的延时任务数据丢失(我们没有开启 AOF)
  • 14:35,系统重新启动,数据库查询时没找到这个任务的记录
  • 任务永远丢失了

后果:用户支付成功,但订单还是被取消了,愤怒要求退款。

根本原因分析

任务丢失的原因

1. 内存存储,无持久化
   ├─ Redis RDB 默认策略:可能丢失最近 5-15 分钟的数据
   ├─ 服务器重启:内存数据全部清空
   └─ 恢复机制缺失:没有从数据库恢复任务的机制

2. 任务创建和执行分离
   ├─ 创建时:只写入内存(Redis)
   ├─ 支付时:取消任务(删除内存记录)
   ├─ 服务器重启:内存清空,任务记录消失
   └─ 执行时:无从知晓任务曾经存在

3. 缺少补偿机制
   ├─ 任务丢失后无法发现
   ├─ 没有定期校验任务状态
   └─ 没有死信队列记录失败任务

解决方案思路

class ReliableDelayQueue:
    """可靠的延时队列:持久化 + 补偿"""
    
    def __init__(self):
        self.redis = redis.Redis()  # 内存层:高性能
        self.db = get_database()    # 持久层:可靠
    
    def add_task(self, task_id, task_data, delay_seconds):
        """添加任务:双层写入"""
        execute_at = time.time() + delay_seconds
        
        # 1. 先写入数据库(保证持久化)
        self.db.execute("""
            INSERT INTO delay_tasks (task_id, task_data, execute_at, status)
            VALUES (%s, %s, %s, 'pending')
        """, (task_id, json.dumps(task_data), datetime.fromtimestamp(execute_at)))
        
        # 2. 再写入 Redis(保证高性能)
        self.redis.zadd(
            'delay_queue',
            {task_id: execute_at}
        )
    
    def pop_task(self):
        """弹出任务:先删后查"""
        # 1. 从 Redis 弹出
        task_id = self._atomic_pop_from_redis()
        if not task_id:
            return None
        
        # 2. 从数据库查询
        task = self.db.query("""
            SELECT * FROM delay_tasks WHERE task_id = %s AND status = 'pending'
        """, [task_id])
        
        if task:
            # 3. 标记为处理中
            self.db.execute("""
                UPDATE delay_tasks SET status = 'processing' WHERE task_id = %s
            """, [task_id])
            return task
        else:
            # 😱 Redis 有任务但数据库没有,说明数据不一致
            # 记录到死信队列,人工介入
            self.db.execute("""
                INSERT INTO dead_letter_queue (task_id, reason, created_at)
                VALUES (%s, 'redis_db_inconsistency', NOW())
            """, [task_id])
            return None
    
    def recover_tasks(self):
        """补偿机制:定期检查"""
        # 查询超过执行时间但状态还是 pending 的任务
        tasks = self.db.query("""
            SELECT * FROM delay_tasks 
            WHERE execute_at <= NOW() 
              AND status = 'pending'
              AND created_at > NOW() - INTERVAL '1 hour'
        """)
        
        for task in tasks:
            # 重新添加到 Redis
            self.redis.zadd(
                'delay_queue',
                {task['task_id']: task['execute_at'].timestamp()}
            )
    
    def start_recovery_worker(self):
        """启动补偿线程"""
        def recovery_loop():
            while True:
                self.recover_tasks()
                time.sleep(60)  # 每分钟检查一次
        
        threading.Thread(target=recovery_loop, daemon=True).start()

关键要点:

  1. 双层存储:Redis + 数据库,性能和可靠性兼顾
  2. 先持久化后内存:保证数据不丢
  3. 补偿机制:定期检查数据一致性
  4. 死信队列:记录异常任务,便于排查

挑战 2:任务重复执行

问题的发现

某天运营发现:库存扣减有问题。

正常情况下:

  • 用户下单:库存 -1
  • 订单取消:库存 +1

但实际数据显示:

  • 某个商品的总订单量:1000
  • 实际库存扣减:1500
  • 多扣了 500 件!

我查日志,发现了问题:同一个”取消订单”任务被执行了多次。

根本原因分析

# 有问题的代码
def cancel_order_task(order_id):
    """取消订单任务"""
    # 😱 没有幂等性检查!
    order = Order.get(order_id)
    
    # 如果订单已经是 cancelled 状态,还会再执行一次
    order.status = 'cancelled'
    order.save()
    
    # 库存也会再释放一次
    Inventory.release(order.product_id, order.quantity)

任务重复执行的原因:

重复执行的场景

1. 消费者重启
   ├─ 任务刚从队列取出,还没执行完
   ├─ 消费者挂了,任务没有标记为完成
   └─ 消费者重启,又把任务取出来执行

2. 网络问题
   ├─ 任务执行成功,但确认 ACK 丢失
   ├─ 队列认为任务还没完成
   └─ 又把任务投递给其他消费者

3. 并发竞争
   ├─ 两个消费者同时弹出同一个任务
   ├─ 由于锁机制失效
   └─ 两个消费者同时执行

4. 重试机制
   ├─ 任务执行抛异常
   ├─ 进入重试队列
   └─ 但实际上任务已经执行了一部分

解决方案:幂等性设计

class IdempotentTaskExecutor:
    """幂等性任务执行器"""
    
    def __init__(self):
        self.redis = redis.Redis()
    
    def execute_with_idempotency(self, task_id, handler):
        """带幂等性的任务执行"""
        # 1. 检查任务是否已执行
        lock_key = f"executed:{task_id}"
        
        # SETNX:如果任务已执行,直接返回
        executed = self.redis.set(
            lock_key, '1',
            nx=True,  # 不存在时才设置
            ex=3600   # 1 小时后自动过期
        )
        
        if not executed:
            # 任务已执行,直接返回
            logger.info(f"任务 {task_id} 已执行过,跳过")
            return
        
        # 2. 执行任务
        try:
            handler()
            logger.info(f"任务 {task_id} 执行成功")
        except Exception as e:
            # 3. 执行失败,删除执行标记,允许重试
            self.redis.delete(lock_key)
            logger.error(f"任务 {task_id} 执行失败: {e}")
            raise

# 使用示例
def cancel_order_task(order_id):
    """取消订单任务(幂等版本)"""
    task_id = f"cancel_order_{order_id}"
    
    def handler():
        # 业务逻辑
        order = Order.get(order_id)
        
        # 二次检查:订单状态
        if order.status == 'cancelled':
            return  # 已取消,无需再处理
        
        order.status = 'cancelled'
        order.cancel_reason = 'payment_timeout'
        order.save()
        
        # 释放库存(也要幂等)
        Inventory.release(order.product_id, order.quantity)
    
    # 使用幂等性执行器
    executor = IdempotentTaskExecutor()
    executor.execute_with_idempotency(task_id, handler)

幂等性设计的三个层次:

# 层次 1:任务级别(最外层)
def execute_task(task):
    # 使用分布式锁确保任务只执行一次
    if not acquire_lock(task.id):
        return  # 被其他消费者执行了
    
    try:
        # 层次 2:状态检查
        if task.status == 'completed':
            return  # 任务已完成
        
        # 层次 3:业务状态检查
        order = Order.get(task.order_id)
        if order.status == 'cancelled':
            return  # 订单已取消,无需再处理
        
        # 执行业务逻辑
        cancel_order(order)
        
        task.status = 'completed'
    finally:
        release_lock(task.id)

挑战 3:执行精度

问题的发现

产品经理跑来找我:

“用户投诉了,他们下单后 29 分钟订单就被取消了,不是说 30 分钟吗?”

我查了一下日志:

  • 用户 14:00:30 下单
  • 定时任务 14:30:05 执行(应该 14:30:30 执行,但 cron 是每分钟的第 0 秒执行)
  • 订单超时判定:14:30:05 - 30 分钟 = 14:00:05
  • 因为 14:00:30 > 14:00:05,所以被判定为超时

误差接近 1 分钟!

用户觉得被骗了——说好的 30 分钟,怎么 29 分钟就取消了?

精度问题分析

不同方案的精度对比:

方案精度误差来源适用场景
数据库轮询分钟级轮询间隔(如 1 分钟)低精度要求
Redis ZSet毫秒级消费者轮询间隔高精度要求
时间轮毫秒级时间轮粒度固定间隔延时
消息队列毫秒级网络延迟、消费延迟高精度要求

误差来源分解:

延时执行的实际延时 = 业务指定的延时 + 系统误差

系统误差 = Δ1 + Δ2 + Δ3 + Δ4

Δ1: 调度延迟
   └─ 调度器检查队列的间隔(如 1 秒)

Δ2: 消费延迟
   └─ 消费器处理任务的耗时

Δ3: 网络延迟
   └─ 任务从队列传输到消费者的时间

Δ4: 时钟偏差
   └─ 不同服务器的时钟不一致

提高精度的方法

方法 1:时间补偿

class CompensatedDelayQueue:
    """带时间补偿的延时队列"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.ntp_sync = NTPSync()  # NTP 时间同步
    
    def add_task(self, task_id, task_data, delay_seconds):
        """添加任务:计算补偿后的执行时间"""
        # 1. 获取系统时间偏差
        time_offset = self.ntp_sync.get_offset()
        
        # 2. 计算实际执行时间(补偿系统时间偏差)
        now = time.time()
        compensated_now = now + time_offset
        execute_at = compensated_now + delay_seconds
        
        # 3. 对于短延时任务,增加额外缓冲
        if delay_seconds < 3600:  # 1 小时内
            execute_at += 30  # 增加 30 秒缓冲
        
        # 4. 存储任务
        self.redis.zadd(
            'delay_queue',
            {task_id: execute_at}
        )
        
        return execute_at

方法 2:优先级队列

class PriorityDelayQueue:
    """优先级队列:紧急任务优先处理"""
    
    def __init__(self):
        self.redis = redis.Redis()
    
    def pop_ready_tasks(self, batch_size=100):
        """批量弹出到期任务,按紧急程度排序"""
        now = time.time()
        
        # 查询所有到期任务
        tasks = self.redis.zrangebyscore(
            'delay_queue',
            0,
            now,
            start=0,
            num=batch_size
        )
        
        # 按剩余时间排序(过期越久的越紧急)
        tasks_with_urgency = []
        for task in tasks:
            task_data = json.loads(task)
            execute_at = task_data['execute_at']
            overdue = now - execute_at  # 过期时间
            tasks_with_urgency.append((overdue, task))
        
        # 排序:过期时间长的优先
        tasks_with_urgency.sort(key=lambda x: x[0], reverse=True)
        
        # 返回按紧急程度排序的任务列表
        return [task for _, task in tasks_with_urgency]

方法 3:动态调整轮询间隔

class AdaptivePollingQueue:
    """自适应轮询队列:根据任务量动态调整"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.current_interval = 1.0  # 当前轮询间隔(秒)
        self.last_task_count = 0
    
    def consume(self, handler):
        """消费任务,动态调整轮询间隔"""
        while True:
            now = time.time()
            
            # 1. 查询到期任务数量
            ready_count = self.redis.zcount('delay_queue', 0, now)
            
            # 2. 根据任务量调整轮询间隔
            if ready_count > 100:
                # 任务多,加快轮询
                self.current_interval = max(0.1, self.current_interval / 2)
            elif ready_count < 10:
                # 任务少,降低轮询频率
                self.current_interval = min(5.0, self.current_interval * 1.5)
            
            # 3. 取出并执行任务
            tasks = self.pop_tasks(min(ready_count, 100))
            for task in tasks:
                handler(task)
            
            # 4. 休眠(动态间隔)
            time.sleep(self.current_interval)

精度优化效果:

优化方法精度提升适用场景成本
时间补偿±100ms → ±10ms时钟偏差大的环境需要时间同步服务
优先级队列平均延迟降低 30%高并发场景需要排序开销
动态轮询响应速度提高 50%波动明显的场景复杂度增加

挑战 4:性能瓶颈

问题的发现

2024 年 6 月 18 日,我们做了一次”618 大促”活动。

活动开始前,我们做了充分的准备:

  • 数据库扩容到 32 核 128GB
  • Redis 集群扩容到 3 主 3 从
  • 消费者增加到 20 个节点

但活动开始 10 分钟后,系统还是挂了。

监控显示:

  • 延时任务创建 QPS:8000
  • 消费者处理能力:5000
  • 任务积压:每秒增加 3000
  • Redis 内存占用:90%
  • 消费者 CPU:100%

结论:生产速度 > 消费速度,系统雪崩。

性能瓶颈分析

性能瓶颈的三个层面

1. 存储层瓶颈
   ├─ 数据库:连接池耗尽
   ├─ Redis:带宽和内存不足
   └─ 消息队列:分区不足

2. 网络层瓶颈
   ├─ 消费者和存储之间的高频交互
   ├─ 序列化/反序列化开销
   └─ 网络带宽限制

3. 计算层瓶颈
   ├─ 单个消费者处理能力有限
   ├─ 锁竞争导致性能下降
   └─ GC 或内存问题

性能优化方案

优化 1:批量处理

class BatchDelayQueue:
    """批量处理:减少 I/O 次数"""
    
    def __init__(self):
        self.redis = redis.Redis()
    
    def add_tasks_batch(self, tasks):
        """批量添加任务"""
        pipe = self.redis.pipeline()
        for task in tasks:
            execute_at = time.time() + task['delay']
            pipe.zadd(
                'delay_queue',
                {task['id']: execute_at}
            )
        pipe.execute()
    
    def consume_batch(self, handler, batch_size=100):
        """批量消费任务"""
        while True:
            now = time.time()
            
            # 1. 批量查询到期任务
            tasks = self.redis.zrangebyscore(
                'delay_queue',
                0,
                now,
                start=0,
                num=batch_size
            )
            
            if not tasks:
                time.sleep(0.1)
                continue
            
            # 2. 批量删除
            pipe = self.redis.pipeline()
            for task in tasks:
                pipe.zrem('delay_queue', task)
            removed_tasks = pipe.execute()
            
            # 3. 批量处理
            for i, task in enumerate(tasks):
                if removed_tasks[i]:
                    try:
                        handler(json.loads(task))
                    except Exception as e:
                        # 失败的任务单独处理
                        self.handle_failed_task(task, e)

性能提升:

  • 查询次数:1 次 / 100 个任务(减少 100 倍)
  • 删除次数:1 次 / 100 个任务(减少 100 倍)
  • 网络往返:1 次批量 vs 100 次单次(减少 100 倍)

优化 2:多队列分片

class ShardedDelayQueue:
    """多队列分片:并行处理"""
    
    def __init__(self, shard_count):
        self.shard_count = shard_count
    
    def _get_shard(self, task_id):
        """计算任务所属的分片"""
        return int(task_id) % self.shard_count
    
    def add_task(self, task_id, task_data, delay_seconds):
        """添加任务到对应分片"""
        shard = self._get_shard(task_id)
        queue_name = f'delay_queue:shard:{shard}'
        
        execute_at = time.time() + delay_seconds
        self.redis.zadd(queue_name, {task_id: execute_at})
    
    def consume(self, shard_id, handler):
        """消费指定分片的任务"""
        queue_name = f'delay_queue:shard:{shard_id}'
        
        while True:
            now = time.time()
            tasks = self.redis.zrangebyscore(queue_name, 0, now)
            
            for task in tasks:
                # 尝试删除(原子操作)
                removed = self.redis.zrem(queue_name, task)
                if removed:
                    handler(json.loads(task))
            
            time.sleep(0.1)

# 启动多个消费者,每个消费者负责一个分片
shard_count = 10
for shard_id in range(shard_count):
    consumer = ShardedDelayQueue(shard_count)
    threading.Thread(
        target=consumer.consume,
        args=(shard_id, task_handler),
        daemon=True
    ).start()

性能提升:

  • 10 个分片,理论吞吐量提升 10 倍
  • 分片之间无竞争,充分利用多核 CPU
  • 每个分片独立扩容,灵活调整

优化 3:缓存热点数据

class CachedDelayQueue:
    """缓存优化:减少热点任务的查询压力"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.local_cache = {}  # 本地缓存
        
    def get_ready_tasks(self):
        """获取到期任务(优先从缓存)"""
        now = time.time()
        
        # 1. 先查本地缓存
        if 'cached_tasks' in self.local_cache:
            cached_tasks = self.local_cache['cached_tasks']
            ready_tasks = [
                task for task in cached_tasks
                if task['execute_at'] <= now
            ]
            
            if ready_tasks:
                return ready_tasks
        
        # 2. 缓存为空,批量加载
        tasks = self.redis.zrangebyscore(
            'delay_queue',
            0,
            now + 300,  # 提前 5 分钟加载
            start=0,
            num=1000
        )
        
        self.local_cache['cached_tasks'] = [json.loads(t) for t in tasks]
        
        # 3. 返回当前到期的任务
        return self.local_cache['cached_tasks']

性能提升:

  • 热点任务查询减少 99% 的网络 I/O
  • 本地缓存命中时,延迟从毫秒级降到微秒级
  • 适用于延时时间集中的场景

挑战 5:监控与运维

问题的发现

2024 年 8 月,我们的系统突然出现大量订单超时未取消。

但监控大盘显示:

  • 延时队列服务状态:✅ 正常
  • 任务创建成功率:✅ 99.9%
  • 任务执行成功率:✅ 99.9%

一切都看起来正常!

但实际情况:

  • 消费者进程虽然活着,但死锁了
  • 任务虽然创建成功,但被错误地路由到错误的队列
  • 任务虽然执行成功,但数据库更新失败了

结论:监控指标太粗粒度,无法发现问题。

监控指标设计

我们需要更细粒度的监控:

class DelayQueueMonitor:
    """延时队列监控"""
    
    def __init__(self):
        self.metrics = PrometheusMetrics()
        self.redis = redis.Redis()
    
    def collect_metrics(self):
        """收集关键指标"""
        now = time.time()
        
        # 指标 1:任务积压数
        pending_count = self.redis.zcount('delay_queue', now, float('inf'))
        self.metrics.gauge('delay_queue_pending', pending_count)
        
        # 指标 2:到期任务数
        ready_count = self.redis.zcount('delay_queue', 0, now)
        self.metrics.gauge('delay_queue_ready', ready_count)
        
        # 指标 3:任务积压率
        backlog_ratio = ready_count / (pending_count + 1)
        self.metrics.gauge('delay_queue_backlog_ratio', backlog_ratio)
        
        # 指标 4:平均延时
        avg_delay = self._calculate_avg_delay()
        self.metrics.gauge('delay_queue_avg_delay', avg_delay)
        
        # 指标 5:P99 延时
        p99_delay = self._calculate_p99_delay()
        self.metrics.gauge('delay_queue_p99_delay', p99_delay)
        
        # 指标 6:任务处理速率
        process_rate = self._calculate_process_rate()
        self.metrics.gauge('delay_queue_process_rate', process_rate)
        
        # 指标 7:任务失败率
        failure_rate = self._calculate_failure_rate()
        self.metrics.gauge('delay_queue_failure_rate', failure_rate)
        
        # 指标 8:消费者健康度
        consumer_health = self._check_consumer_health()
        self.metrics.gauge('delay_queue_consumer_health', consumer_health)
    
    def _calculate_avg_delay(self):
        """计算平均任务延时"""
        now = time.time()
        tasks = self.redis.zrangebyscore('delay_queue', 0, now, withscores=True)
        
        if not tasks:
            return 0
        
        total_delay = sum(now - score for _, score in tasks)
        return total_delay / len(tasks)
    
    def _check_consumer_health(self):
        """检查消费者健康度"""
        # 检查消费者最近心跳
        last_heartbeat = self.redis.get('consumer:last_heartbeat')
        if last_heartbeat:
            last_heartbeat_time = float(last_heartbeat)
            if time.time() - last_heartbeat_time > 60:
                return 0  # 消费者不健康
        
        return 1  # 消费者健康

# 告警规则
alert_rules = [
    # 规则 1:任务积压过多
    Alert(
        name='delay_queue_backlog',
        condition='delay_queue_backlog_ratio > 0.5',
        severity='warning',
        message='任务积压率超过 50%,请检查消费者'
    ),
    
    # 规则 2:消费者不健康
    Alert(
        name='delay_queue_consumer_down',
        condition='delay_queue_consumer_health == 0',
        severity='critical',
        message='消费者不健康或已停止,请立即检查'
    ),
    
    # 规则 3:任务失败率过高
    Alert(
        name='delay_queue_high_failure_rate',
        condition='delay_queue_failure_rate > 0.01',
        severity='warning',
        message='任务失败率超过 1%,请检查任务执行逻辑'
    ),
]

核心监控指标:

指标含义告警阈值严重程度
pending_count待处理任务数> 10000Warning
ready_count到期待执行任务数> 1000Critical
backlog_ratio积压率> 50%Warning
avg_delay平均执行延迟> 60sWarning
p99_delayP99 执行延迟> 300sCritical
process_rate任务处理速率< 创建速率的 80%Warning
failure_rate任务失败率> 1%Warning
consumer_health消费者健康度0(不健康)Critical

想一想

思考 1

如果你的系统要求任务绝对不能丢,你会采用什么方案?请从持久化、补偿、重试三个维度设计。

参考答案

需求分析:

“绝对不能丢”意味着:

  • 服务器重启时,任务不能丢
  • 数据库故障时,任务不能丢
  • 网络故障时,任务不能丢
  • 消费者崩溃时,任务不能丢

方案设计:三层可靠性保障

class UltimateReliableDelayQueue:
    """终极可靠的延时队列"""
    
    def __init__(self):
        # L1: 主存储(数据库)
        self.primary_db = MySQLDatabase()
        
        # L2: 高性能存储(Redis)
        self.cache_redis = redis.Redis()
        
        # L3: 备份存储(消息队列)
        self.backup_mq = KafkaProducer()
        
        # 监控和告警
        self.monitor = DelayQueueMonitor()
    
    def add_task(self, task_id, task_data, delay_seconds):
        """添加任务:三层写入"""
        execute_at = datetime.now() + timedelta(seconds=delay_seconds)
        
        # L1: 写入数据库(强事务保证)
        try:
            with self.primary_db.transaction():
                self.primary_db.execute("""
                    INSERT INTO delay_tasks 
                    (task_id, task_data, execute_at, status, created_at)
                    VALUES (%s, %s, %s, 'pending', NOW())
                """, (task_id, json.dumps(task_data), execute_at))
        except Exception as e:
            # 数据库写入失败,立即告警
            self.monitor.alert_critical(f"数据库写入失败: {task_id}, {e}")
            raise
        
        # L2: 写入 Redis(异步,失败不影响主流程)
        try:
            self.cache_redis.zadd(
                'delay_queue',
                {task_id: execute_at.timestamp()}
            )
        except Exception as e:
            # Redis 失败,记录日志但不阻断
            logger.warning(f"Redis 写入失败: {task_id}, {e}")
        
        # L3: 发送到备份 MQ(异步,最终一致性)
        try:
            self.backup_mq.send(
                topic='delay_queue_backup',
                value={
                    'task_id': task_id,
                    'task_data': task_data,
                    'execute_at': execute_at.isoformat()
                }
            )
        except Exception as e:
            logger.warning(f"MQ 发送失败: {task_id}, {e}")
    
    def pop_task(self):
        """弹出任务:L1 + L2 协同"""
        # 1. 优先从 Redis 弹出(高性能)
        task_id = self._atomic_pop_from_redis()
        
        if task_id:
            # 2. 从数据库查询确认
            task = self.primary_db.query("""
                SELECT * FROM delay_tasks 
                WHERE task_id = %s AND status = 'pending'
            """, [task_id])
            
            if task:
                # 3. 标记为处理中
                self.primary_db.execute("""
                    UPDATE delay_tasks 
                    SET status = 'processing', 
                        consumer_id = %s,
                        updated_at = NOW()
                    WHERE task_id = %s
                """, [get_consumer_id(), task_id])
                return task
            else:
                # 😱 数据不一致:Redis 有但数据库没有
                self.monitor.alert_warning(
                    f"数据不一致: task_id={task_id}, Redis 有但数据库无"
                )
        
        # 4. Redis 没有任务,从数据库查询(兜底)
        task = self.primary_db.query("""
            SELECT * FROM delay_tasks 
            WHERE execute_at <= NOW() 
              AND status = 'pending'
            ORDER BY execute_at ASC
            LIMIT 1
            FOR UPDATE SKIP LOCKED
        """)
        
        if task:
            # 标记为处理中
            self.primary_db.execute("""
                UPDATE delay_tasks 
                SET status = 'processing',
                    consumer_id = %s,
                    updated_at = NOW()
                WHERE task_id = %s
            """, [get_consumer_id(), task['task_id']])
            
            return task
        
        return None
    
    def complete_task(self, task_id):
        """完成任务"""
        self.primary_db.execute("""
            UPDATE delay_tasks 
            SET status = 'completed',
                completed_at = NOW()
            WHERE task_id = %s
        """, [task_id])
    
    def fail_task(self, task_id, error_message):
        """任务失败"""
        task = self.primary_db.query("""
            SELECT * FROM delay_tasks WHERE task_id = %s
        """, [task_id])
        
        if task['retry_count'] >= task['max_retries']:
            # 超过重试次数,标记为失败
            self.primary_db.execute("""
                UPDATE delay_tasks 
                SET status = 'failed',
                    error_message = %s,
                    updated_at = NOW()
                WHERE task_id = %s
            """, [error_message, task_id])
            
            # 发送到死信队列
            self._send_to_dead_letter_queue(task)
        else:
            # 增加重试次数,重新排队
            self.primary_db.execute("""
                UPDATE delay_tasks 
                SET retry_count = retry_count + 1,
                    status = 'pending',
                    execute_at = NOW() + INTERVAL '5' MINUTE,
                    updated_at = NOW()
                WHERE task_id = %s
            """, [task_id])
            
            # 重新添加到 Redis
            self.cache_redis.zadd(
                'delay_queue',
                {task_id: task['execute_at'].timestamp()}
            )
    
    def reconcile(self):
        """定期校验:发现和修复数据不一致"""
        # 1. 检查数据库中有但 Redis 中没有的任务
        orphaned_tasks = self.primary_db.query("""
            SELECT * FROM delay_tasks 
            WHERE status IN ('pending', 'processing')
              AND created_at > NOW() - INTERVAL '1 hour'
        """)
        
        for task in orphaned_tasks:
            # 检查 Redis 中是否存在
            if not self.cache_redis.zscore('delay_queue', task['task_id']):
                # Redis 中没有,说明丢失了,重新添加
                logger.warning(f"发现孤立任务: {task['task_id']}")
                self.cache_redis.zadd(
                    'delay_queue',
                    {task['task_id']: task['execute_at'].timestamp()}
                )
        
        # 2. 检查超时未处理的任务
        timeout_tasks = self.primary_db.query("""
            SELECT * FROM delay_tasks 
            WHERE status = 'processing'
              AND updated_at < NOW() - INTERVAL '10' MINUTE
        """)
        
        for task in timeout_tasks:
            # 处理超时,可能消费者挂了
            logger.error(f"处理超时: {task['task_id']}")
            self.fail_task(
                task['task_id'],
                f"处理超时: last_update={task['updated_at']}"
            )
    
    def start_reconciliation_worker(self):
        """启动校验线程"""
        def reconcile_loop():
            while True:
                try:
                    self.reconcile()
                except Exception as e:
                    logger.error(f"校验失败: {e}")
                time.sleep(60)  # 每分钟校验一次
        
        threading.Thread(target=reconcile_loop, daemon=True).start()

可靠性保障总结:

保障层次技术可靠性性能
L1: 数据库事务 + 日志⭐⭐⭐⭐⭐⭐⭐
L2: RedisZSet + 持久化⭐⭐⭐⭐⭐⭐⭐⭐⭐
L3: 备份 MQ持久化队列⭐⭐⭐⭐⭐⭐⭐⭐
补偿机制定期校验⭐⭐⭐⭐⭐⭐
重试机制指数退避⭐⭐⭐⭐⭐⭐⭐⭐

最佳实践:

  1. 数据永远先写数据库,再写缓存
  2. 所有操作都要有日志,便于故障排查
  3. 定期进行数据校验,发现不一致及时修复
  4. 设置合理的重试策略,避免无限重试
  5. 监控所有关键指标,及时发现问题