可靠性

那次惨痛的教训

2024 年 4 月 1 日(愚人节,但对我们来说不是玩笑),我们的延时队列服务发生了一起严重事故。

事故经过

时间线:

14:00 - 用户 A 下单,创建 30 分钟后取消订单的任务
14:05 - 用户 A 支付成功,取消超时任务
14:30 - 延时队列服务重启(部署新版本)
14:31 - 用户 A 收到订单被取消的通知 🤯

根因分析

经过排查,问题出在这里:

# 😱 有问题的代码
class UnreliableDelayQueue:
    def __init__(self):
        self.tasks = []  # 内存存储
    
    def add_task(self, task_id, task_data, delay_seconds):
        """添加任务到内存"""
        execute_at = time.time() + delay_seconds
        self.tasks.append({
            'id': task_id,
            'data': task_data,
            'execute_at': execute_at
        })
    
    def cancel_task(self, task_id):
        """取消任务"""
        # 😱 从内存删除
        self.tasks = [t for t in self.tasks if t['id'] != task_id]
    
    def service_restart(self):
        """服务重启"""
        # 😱 内存数据全部丢失!
        self.tasks = []

问题根源:

  1. ❌ 任务只存储在内存,无持久化
  2. ❌ 服务重启后,任务全部丢失
  3. ❌ 取消任务只是从内存删除,没有记录
  4. ❌ 重启后,任务不存在,但用户的期望还在

可靠性的三个维度

1. 任务不丢失(No Loss)

任务创建后,无论发生什么情况(服务器崩溃、网络故障、停电),都不能丢失。

2. 任务不重复(No Duplicate)

同一个任务不能被多次执行。如果执行失败需要重试,要有明确的控制。

3. 任务不遗漏(No Miss)

到期的任务必须被执行,不能因为任何原因被遗漏。

可靠性三角
    不丢失
      / \
     /   \
    /_____\
   不遗漏 不重复

任务不丢失的保障

方案 1:双层存储(数据库 + 内存)

class ReliableDelayQueue:
    """可靠的延时队列:双层存储"""
    
    def __init__(self):
        # L1: 数据库(持久化层)
        self.db = MySQLDatabase()
        
        # L2: Redis(性能层)
        self.redis = redis.Redis()
    
    def add_task(self, task_id, task_data, delay_seconds):
        """添加任务:先持久化,再内存"""
        execute_at = datetime.now() + timedelta(seconds=delay_seconds)
        
        # ✅ 第 1 步:写入数据库(强事务保证)
        try:
            with self.db.transaction():
                self.db.execute("""
                    INSERT INTO delay_tasks 
                    (task_id, task_data, execute_at, status, created_at)
                    VALUES (%s, %s, %s, 'pending', NOW())
                """, (task_id, json.dumps(task_data), execute_at))
        except Exception as e:
            # 数据库写入失败,任务创建失败
            logger.error(f"数据库写入失败: {task_id}, {e}")
            raise
        
        # ✅ 第 2 步:写入 Redis(异步,失败不影响主流程)
        try:
            self.redis.zadd(
                'delay_queue',
                {task_id: execute_at.timestamp()}
            )
        except Exception as e:
            # Redis 写入失败,记录日志但不阻断
            logger.warning(f"Redis 写入失败: {task_id}, {e}")
            
            # 依赖补偿机制定期将数据库任务同步到 Redis
            self._mark_for_sync(task_id)
    
    def pop_task(self):
        """弹出任务:先从 Redis,再到数据库"""
        # ✅ 第 1 步:从 Redis 弹出
        task_id = self._atomic_pop_from_redis()
        
        if task_id:
            # ✅ 第 2 步:从数据库查询确认
            task = self.db.query("""
                SELECT * FROM delay_tasks 
                WHERE task_id = %s AND status = 'pending'
            """, [task_id])
            
            if task:
                # ✅ 第 3 步:标记为处理中
                self.db.execute("""
                    UPDATE delay_tasks 
                    SET status = 'processing',
                        consumer_id = %s,
                        updated_at = NOW()
                    WHERE task_id = %s
                """, [get_consumer_id(), task_id])
                return task
            else:
                # 😱 数据不一致:Redis 有但数据库没有
                logger.warning(f"数据不一致: task_id={task_id}")
                self._record_dead_letter(task_id, "redis_db_mismatch")
        
        # ✅ 第 4 步:Redis 没有任务,从数据库查询(兜底)
        task = self.db.query("""
            SELECT * FROM delay_tasks 
            WHERE execute_at <= NOW() 
              AND status = 'pending'
            ORDER BY execute_at ASC
            LIMIT 1
            FOR UPDATE SKIP LOCKED
        """)
        
        if task:
            self.db.execute("""
                UPDATE delay_tasks 
                SET status = 'processing',
                    consumer_id = %s,
                    updated_at = NOW()
                WHERE task_id = %s
            """, [get_consumer_id(), task['task_id']])
            return task
        
        return None
    
    def _atomic_pop_from_redis(self):
        """从 Redis 原子弹出任务"""
        lua_script = """
        local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 1)
        if #tasks > 0 then
            local removed = redis.call('ZREM', KEYS[1], tasks[1])
            if removed > 0 then
                return tasks[1]
            end
        end
        return nil
        """
        
        return self.redis.eval(
            lua_script,
            1,
            'delay_queue',
            time.time()
        )

双层存储的优势:

层级存储优点缺点
L1: 数据库磁盘✅ 强持久化
✅ 事务保证
✅ 可靠性高
⚠️ 性能较低
L2: Redis内存✅ 高性能
✅ 快速查询
⚠️ 需要持久化配置

方案 2:WAL(Write-Ahead Log)

使用写前日志确保数据不丢失:

class WALDelayQueue:
    """带 WAL 的延时队列"""
    
    def __init__(self):
        self.wal_file = open('/data/delay_queue.wal', 'a')
        self.redis = redis.Redis()
        
        # 启动时恢复 WAL
        self._recover_from_wal()
    
    def add_task(self, task_id, task_data, delay_seconds):
        """添加任务:先写 WAL"""
        execute_at = time.time() + delay_seconds
        
        # ✅ 第 1 步:写 WAL(顺序写,性能高)
        log_entry = {
            'timestamp': time.time(),
            'operation': 'ADD',
            'task_id': task_id,
            'task_data': task_data,
            'execute_at': execute_at
        }
        self.wal_file.write(json.dumps(log_entry) + '\n')
        self.wal_file.flush()  # 强制刷盘
        
        # ✅ 第 2 步:写入 Redis
        self.redis.zadd('delay_queue', {task_id: execute_at})
        
        # ✅ 第 3 步:WAL 标记为完成
        log_entry['status'] = 'COMMITTED'
        self.wal_file.write(json.dumps(log_entry) + '\n')
        self.wal_file.flush()
    
    def _recover_from_wal(self):
        """从 WAL 恢复数据"""
        with open('/data/delay_queue.wal', 'r') as f:
            for line in f:
                entry = json.loads(line)
                
                if entry['operation'] == 'ADD':
                    # 检查任务是否已经完成
                    if entry.get('status') != 'COMMITTED':
                        # 任务未完成,需要重新添加
                        self.redis.zadd(
                            'delay_queue',
                            {entry['task_id']: entry['execute_at']}
                        )
                        logger.info(f"恢复任务: {entry['task_id']}")

方案 3:消息队列持久化

使用 Kafka 等持久化消息队列:

class KafkaDelayQueue:
    """基于 Kafka 的延时队列"""
    
    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            # ✅ 启用持久化
            acks='all',  # 等待所有副本确认
            retries=3,
            # ✅ 启用幂等性
            enable_idempotence=True
        )
        
        self.consumer = KafkaConsumer(
            'delay_queue',
            bootstrap_servers=['localhost:9092'],
            # ✅ 从最早的消息开始消费(容错)
            auto_offset_reset='earliest',
            # ✅ 手动提交 offset(可靠性)
            enable_auto_commit=False
        )
    
    def add_task(self, task_id, task_data, delay_seconds):
        """添加任务到 Kafka"""
        message = {
            'task_id': task_id,
            'task_data': task_data,
            'execute_at': time.time() + delay_seconds
        }
        
        # ✅ 同步发送,确认成功
        future = self.producer.send(
            'delay_queue',
            value=json.dumps(message).encode(),
            key=task_id.encode()
        )
        
        # 等待确认
        try:
            record_metadata = future.get(timeout=10)
            logger.info(
                f"任务 {task_id} 已发送: "
                f"partition={record_metadata.partition}, "
                f"offset={record_metadata.offset}"
            )
        except Exception as e:
            logger.error(f"发送失败: {task_id}, {e}")
            raise

任务不重复的保障

幂等性设计

确保同一个任务即使被多次调用,结果也是一样的。

class IdempotentExecutor:
    """幂等性执行器"""
    
    def __init__(self):
        self.redis = redis.Redis()
    
    def execute(self, task_id, handler):
        """执行任务(带幂等性)"""
        # ✅ 第 1 步:检查是否已执行
        executed_key = f"executed:{task_id}"
        if self.redis.exists(executed_key):
            logger.info(f"任务 {task_id} 已执行,跳过")
            return
        
        # ✅ 第 2 步:获取分布式锁
        lock_key = f"lock:task:{task_id}"
        lock_acquired = self.redis.set(
            lock_key, '1',
            nx=True,
            ex=30  # 30 秒超时
        )
        
        if not lock_acquired:
            logger.info(f"任务 {task_id} 正在被其他消费者处理")
            return
        
        try:
            # ✅ 第 3 步:再次检查(双重检查)
            if self.redis.exists(executed_key):
                return
            
            # ✅ 第 4 步:执行业务逻辑
            handler()
            
            # ✅ 第 5 步:标记为已执行
            self.redis.setex(executed_key, '1', 86400)  # 1 天
            logger.info(f"任务 {task_id} 执行成功")
            
        finally:
            # ✅ 第 6 步:释放锁
            self.redis.delete(lock_key)

# 使用示例
def cancel_order_task(order_id):
    """取消订单任务(幂等版本)"""
    task_id = f"cancel_order_{order_id}"
    
    def handler():
        # 业务逻辑
        order = Order.get(order_id)
        
        # ✅ 业务层幂等检查
        if order.status == 'cancelled':
            return
        
        order.status = 'cancelled'
        order.save()
        
        # 释放库存(也要幂等)
        Inventory.release(order.product_id, order.quantity)
    
    executor = IdempotentExecutor()
    executor.execute(task_id, handler)

三层幂等检查

def execute_with_three_layer_idempotency(task_id, handler):
    """三层幂等检查"""
    
    # ✅ 第 1 层:分布式锁(防止并发执行)
    lock_key = f"lock:task:{task_id}"
    if not acquire_distributed_lock(lock_key):
        return
    
    try:
        # ✅ 第 2 层:执行标记(防止重复执行)
        executed_key = f"executed:{task_id}"
        if redis.exists(executed_key):
            return
        
        # ✅ 第 3 层:业务状态检查(防止状态不一致)
        if not check_business_state(task_id):
            return
        
        # 执行业务逻辑
        handler()
        
        # 标记为已执行
        redis.setex(executed_key, '1', 86400)
        
    finally:
        release_distributed_lock(lock_key)

任务不遗漏的保障

补偿机制

定期检查是否有遗漏的任务:

class CompensationMechanism:
    """补偿机制"""
    
    def __init__(self):
        self.db = MySQLDatabase()
        self.redis = redis.Redis()
    
    def compensate(self):
        """补偿:发现并修复遗漏的任务"""
        # ✅ 第 1 步:查询数据库中到期但未执行的任务
        orphaned_tasks = self.db.query("""
            SELECT * FROM delay_tasks 
            WHERE execute_at <= NOW() 
              AND status = 'pending'
              AND created_at > NOW() - INTERVAL '1 hour'
        """)
        
        for task in orphaned_tasks:
            logger.warning(f"发现遗漏任务: {task['task_id']}")
            
            # ✅ 第 2 步:重新添加到 Redis
            self.redis.zadd(
                'delay_queue',
                {task['task_id']: task['execute_at'].timestamp()}
            )
            
            # ✅ 第 3 步:记录到监控
            self.monitor.alert_warning(
                f"任务遗漏: task_id={task['task_id']}, "
                f"execute_at={task['execute_at']}"
            )
    
    def check_stuck_tasks(self):
        """检查卡住的任务"""
        # 查询长时间处于 processing 状态的任务
        stuck_tasks = self.db.query("""
            SELECT * FROM delay_tasks 
            WHERE status = 'processing'
              AND updated_at < NOW() - INTERVAL '10' MINUTE
        """)
        
        for task in stuck_tasks:
            logger.error(f"任务卡住: {task['task_id']}")
            
            # 重置为 pending,重新执行
            self.db.execute("""
                UPDATE delay_tasks 
                SET status = 'pending',
                    consumer_id = NULL,
                    retry_count = retry_count + 1,
                    updated_at = NOW()
                WHERE task_id = %s
            """, [task['task_id']])

心跳机制

消费者定期发送心跳,证明自己还活着:

class ConsumerWithHeartbeat:
    """带心跳的消费者"""
    
    def __init__(self, consumer_id):
        self.consumer_id = consumer_id
        self.redis = redis.Redis()
        self.heartbeat_interval = 10  # 10 秒
    
    def send_heartbeat(self):
        """发送心跳"""
        heartbeat_key = f"consumer:heartbeat:{self.consumer_id}"
        self.redis.setex(
            heartbeat_key,
            '1',
            self.heartbeat_interval * 3  # 3 倍间隔超时
        )
    
    def consume(self, queue, handler):
        """消费任务"""
        def heartbeat_loop():
            while True:
                self.send_heartbeat()
                time.sleep(self.heartbeat_interval)
        
        # 启动心跳线程
        threading.Thread(target=heartbeat_loop, daemon=True).start()
        
        # 消费任务
        while True:
            task = queue.pop_task()
            if task:
                # 检查消费者是否还活着
                if not self._is_alive():
                    logger.error("消费者已超时,停止消费")
                    break
                
                try:
                    handler(task)
                except Exception as e:
                    logger.error(f"任务执行失败: {e}")
            else:
                time.sleep(0.1)
    
    def _is_alive(self):
        """检查消费者是否还活着"""
        heartbeat_key = f"consumer:heartbeat:{self.consumer_id}"
        return self.redis.exists(heartbeat_key)

想一想

思考 1

如果消费者在执行任务时挂了,如何保证任务能被重新执行?

参考答案

问题分析:

消费者在执行任务时挂了,会导致:

  1. 任务状态停留在 processing
  2. 任务没有完成
  3. 其他消费者不知道该任务需要重试

解决方案:心跳 + 超时检测

class FailoverAwareDelayQueue:
    """支持故障转移的延时队列"""
    
    def __init__(self):
        self.db = MySQLDatabase()
        self.redis = redis.Redis()
    
    def pop_task(self):
        """弹出任务"""
        task = self.db.query("""
            SELECT * FROM delay_tasks 
            WHERE execute_at <= NOW() 
              AND status = 'pending'
            ORDER BY execute_at ASC
            LIMIT 1
            FOR UPDATE SKIP LOCKED
        """)
        
        if task:
            # ✅ 标记为处理中,记录消费者 ID 和心跳时间
            self.db.execute("""
                UPDATE delay_tasks 
                SET status = 'processing',
                    consumer_id = %s,
                    heartbeat_at = NOW(),
                    updated_at = NOW()
                WHERE task_id = %s
            """, [get_consumer_id(), task['task_id']])
            
            return task
        return None
    
    def complete_task(self, task_id):
        """完成任务"""
        self.db.execute("""
            UPDATE delay_tasks 
            SET status = 'completed',
                completed_at = NOW()
            WHERE task_id = %s
        """, [task_id])
    
    def send_heartbeat(self, task_id):
        """发送心跳"""
        self.db.execute("""
            UPDATE delay_tasks 
            SET heartbeat_at = NOW()
            WHERE task_id = %s AND status = 'processing'
        """, [task_id])
    
    def detect_dead_tasks(self):
        """检测死掉的任务"""
        # 查询心跳超时的任务
        dead_tasks = self.db.query("""
            SELECT * FROM delay_tasks 
            WHERE status = 'processing'
              AND heartbeat_at < NOW() - INTERVAL '30' SECOND
        """)
        
        for task in dead_tasks:
            logger.warning(f"检测到死任务: {task['task_id']}")
            
            # 重置为 pending,允许重新执行
            self.db.execute("""
                UPDATE delay_tasks 
                SET status = 'pending',
                    consumer_id = NULL,
                    heartbeat_at = NULL,
                    retry_count = retry_count + 1,
                    updated_at = NOW()
                WHERE task_id = %s
            """, [task['task_id']])

# 消费者使用
class RobustConsumer:
    def __init__(self):
        self.queue = FailoverAwareDelayQueue()
    
    def consume(self):
        while True:
            task = self.queue.pop_task()
            if not task:
                time.sleep(0.1)
                continue
            
            # ✅ 启动心跳线程
            stop_event = threading.Event()
            
            def heartbeat_loop():
                while not stop_event.is_set():
                    self.queue.send_heartbeat['task_id']
                    time.sleep(10)
            
            heartbeat_thread = threading.Thread(
                target=heartbeat_loop,
                daemon=True
            )
            heartbeat_thread.start()
            
            try:
                # 执行任务
                handler(task)
                self.queue.complete_task(task['task_id'])
            except Exception as e:
                logger.error(f"任务执行失败: {e}")
                # 不手动重置,依赖心跳超时检测
            finally:
                stop_event.set()

关键要点:

  1. ✅ 记录心跳时间:heartbeat_at 字段
  2. ✅ 定期检测死任务:心跳超时 > 30 秒
  3. ✅ 重置任务状态:从 processing 改回 pending
  4. ✅ 增加重试次数:记录失败次数