故障转移

故障场景

常见故障

延时队列可能遇到的故障:

1. 消费者故障
   ├─ 进程崩溃
   ├─ 网络断开
   └─ 内存溢出

2. 调度器故障
   ├─ 进程崩溃
   ├─ 领导者切换
   └─ 数据不一致

3. 存储故障
   ├─ Redis 宕机
   ├─ MySQL 宕机
   └─ Kafka 宕机

4. 网络分区
   ├─ 节点间通信失败
   ├─ 脑裂问题
   └─ 数据不一致

故障检测

心跳机制

class HeartbeatMonitor:
    """心跳监控"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.heartbeat_interval = 10
        self.timeout_threshold = 30
    
    def start_heartbeat(self, node_id):
        """启动心跳"""
        def heartbeat_loop():
            while True:
                # ✅ 发送心跳
                self.redis.setex(
                    f'node:heartbeat:{node_id}',
                    'alive',
                    self.timeout_threshold
                )
                time.sleep(self.heartbeat_interval)
        
        threading.Thread(target=heartbeat_loop, daemon=True).start()
    
    def check_dead_nodes(self):
        """检查死亡节点"""
        keys = self.redis.keys('node:heartbeat:*')
        dead_nodes = []
        
        for key in keys:
            if not self.redis.get(key):
                node_id = key.split(':')[-1]
                dead_nodes.append(node_id)
                logger.warning(f"节点 {node_id} 可能已死亡")
        
        return dead_nodes

故障恢复

任务迁移

class TaskMigrationService:
    """任务迁移服务"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.db = MySQLDatabase()
    
    def migrate_tasks(self, dead_node_id):
        """迁移死亡节点的任务"""
        # ✅ 查找死亡节点正在处理的任务
        tasks = self.db.query("""
            SELECT * FROM delay_tasks
            WHERE consumer_id = %s
              AND status = 'processing'
        """, [dead_node_id])
        
        for task in tasks:
            # ✅ 重置任务状态
            self.db.execute("""
                UPDATE delay_tasks
                SET status = 'pending',
                    consumer_id = NULL,
                    retry_count = retry_count + 1,
                    updated_at = NOW()
                WHERE id = %s
            """, [task['id']])
            
            logger.info(f"迁移任务: {task['id']}")

高可用架构

主备切换

class FailoverManager:
    """故障转移管理器"""
    
    def __init__(self, node_id):
        self.node_id = node_id
        self.redis = redis.Redis()
        self.is_leader = False
    
    def elect_leader(self):
        """选举 leader"""
        leader_key = 'scheduler:leader'
        
        # ✅ 尝试成为 leader
        acquired = self.redis.set(
            leader_key,
            self.node_id,
            nx=True,
            ex=30
        )
        
        if acquired:
            self.is_leader = True
            logger.info(f"成为 leader: {self.node_id}")
        else:
            self.is_leader = False
            leader = self.redis.get(leader_key)
            logger.info(f"是 follower,leader: {leader}")
    
    def run(self):
        """运行"""
        while True:
            # ✅ 定期选举
            self.elect_leader()
            
            if self.is_leader:
                # ✅ Leader 负责调度
                self._leader_schedule()
            else:
                # ✅ Follower 等待
                time.sleep(5)