任务调度

分布式调度挑战

在分布式环境下,多个调度器如何协调工作?

核心问题

分布式调度的核心问题:

1. 任务如何分配?
   ├─ 按任务 ID 分片
   ├─ 按队列分片
   └─ 按消费者分片

2. 如何避免重复执行?
   ├─ 分布式锁
   ├─ Lua 脚本原子操作
   └─ 数据库悲观锁

3. 如何处理故障?
   ├─ 心跳检测
   ├─ 主备切换
   └─ 任务迁移

调度策略

策略 1:分片调度

class ShardedScheduler:
    """分片调度器"""
    
    def __init__(self, shard_id, total_shards):
        self.shard_id = shard_id
        self.total_shards = total_shards
    
    def belongs_to_shard(self, task_id):
        """判断任务是否属于当前分片"""
        return hash(task_id) % self.total_shards == self.shard_id
    
    def schedule(self):
        """调度任务"""
        while True:
            # ✅ 只查询属于当前分片的任务
            tasks = self._get_shard_tasks()
            
            for task in tasks:
                self._dispatch_task(task)
            
            time.sleep(1)
    
    def _get_shard_tasks(self):
        """获取当前分片的任务"""
        all_tasks = self._get_all_tasks()
        return [
            task for task in all_tasks
            if self.belongs_to_shard(task['task_id'])
        ]

策略 2:动态分片

class DynamicShardedScheduler:
    """动态分片调度器"""
    
    def __init__(self, node_id):
        self.node_id = node_id
        self.redis = redis.Redis()
    
    def get_active_nodes(self):
        """获取活跃节点列表"""
        keys = self.redis.keys('scheduler:heartbeat:*')
        nodes = []
        for key in keys:
            node_id = key.split(':')[-1]
            if self.redis.get(key):
                nodes.append(node_id)
        return nodes
    
    def calculate_shard(self, task_id):
        """计算任务所属分片"""
        nodes = self.get_active_nodes()
        if not nodes:
            raise NoActiveNodeError()
        
        node_index = hash(task_id) % len(nodes)
        return nodes[node_index]
    
    def schedule(self):
        """调度任务"""
        while True:
            nodes = self.get_active_nodes()
            my_index = nodes.index(self.node_id) if self.node_id in nodes else -1
            
            if my_index == -1:
                # 当前节点不在列表中,等待
                time.sleep(5)
                continue
            
            # ✅ 只处理属于当前节点的任务
            tasks = self._get_tasks_for_node(my_index, len(nodes))
            
            for task in tasks:
                self._dispatch_task(task)
            
            time.sleep(1)