故障转移
故障场景
常见故障
延时队列可能遇到的故障:
1. 消费者故障
├─ 进程崩溃
├─ 网络断开
└─ 内存溢出
2. 调度器故障
├─ 进程崩溃
├─ 领导者切换
└─ 数据不一致
3. 存储故障
├─ Redis 宕机
├─ MySQL 宕机
└─ Kafka 宕机
4. 网络分区
├─ 节点间通信失败
├─ 脑裂问题
└─ 数据不一致
故障检测
心跳机制
class HeartbeatMonitor:
"""心跳监控"""
def __init__(self):
self.redis = redis.Redis()
self.heartbeat_interval = 10
self.timeout_threshold = 30
def start_heartbeat(self, node_id):
"""启动心跳"""
def heartbeat_loop():
while True:
# ✅ 发送心跳
self.redis.setex(
f'node:heartbeat:{node_id}',
'alive',
self.timeout_threshold
)
time.sleep(self.heartbeat_interval)
threading.Thread(target=heartbeat_loop, daemon=True).start()
def check_dead_nodes(self):
"""检查死亡节点"""
keys = self.redis.keys('node:heartbeat:*')
dead_nodes = []
for key in keys:
if not self.redis.get(key):
node_id = key.split(':')[-1]
dead_nodes.append(node_id)
logger.warning(f"节点 {node_id} 可能已死亡")
return dead_nodes
故障恢复
任务迁移
class TaskMigrationService:
"""任务迁移服务"""
def __init__(self):
self.redis = redis.Redis()
self.db = MySQLDatabase()
def migrate_tasks(self, dead_node_id):
"""迁移死亡节点的任务"""
# ✅ 查找死亡节点正在处理的任务
tasks = self.db.query("""
SELECT * FROM delay_tasks
WHERE consumer_id = %s
AND status = 'processing'
""", [dead_node_id])
for task in tasks:
# ✅ 重置任务状态
self.db.execute("""
UPDATE delay_tasks
SET status = 'pending',
consumer_id = NULL,
retry_count = retry_count + 1,
updated_at = NOW()
WHERE id = %s
""", [task['id']])
logger.info(f"迁移任务: {task['id']}")
高可用架构
主备切换
class FailoverManager:
"""故障转移管理器"""
def __init__(self, node_id):
self.node_id = node_id
self.redis = redis.Redis()
self.is_leader = False
def elect_leader(self):
"""选举 leader"""
leader_key = 'scheduler:leader'
# ✅ 尝试成为 leader
acquired = self.redis.set(
leader_key,
self.node_id,
nx=True,
ex=30
)
if acquired:
self.is_leader = True
logger.info(f"成为 leader: {self.node_id}")
else:
self.is_leader = False
leader = self.redis.get(leader_key)
logger.info(f"是 follower,leader: {leader}")
def run(self):
"""运行"""
while True:
# ✅ 定期选举
self.elect_leader()
if self.is_leader:
# ✅ Leader 负责调度
self._leader_schedule()
else:
# ✅ Follower 等待
time.sleep(5)