任务调度
分布式调度挑战
在分布式环境下,多个调度器如何协调工作?
核心问题
分布式调度的核心问题:
1. 任务如何分配?
├─ 按任务 ID 分片
├─ 按队列分片
└─ 按消费者分片
2. 如何避免重复执行?
├─ 分布式锁
├─ Lua 脚本原子操作
└─ 数据库悲观锁
3. 如何处理故障?
├─ 心跳检测
├─ 主备切换
└─ 任务迁移调度策略
策略 1:分片调度
class ShardedScheduler:
"""分片调度器"""
def __init__(self, shard_id, total_shards):
self.shard_id = shard_id
self.total_shards = total_shards
def belongs_to_shard(self, task_id):
"""判断任务是否属于当前分片"""
return hash(task_id) % self.total_shards == self.shard_id
def schedule(self):
"""调度任务"""
while True:
# ✅ 只查询属于当前分片的任务
tasks = self._get_shard_tasks()
for task in tasks:
self._dispatch_task(task)
time.sleep(1)
def _get_shard_tasks(self):
"""获取当前分片的任务"""
all_tasks = self._get_all_tasks()
return [
task for task in all_tasks
if self.belongs_to_shard(task['task_id'])
]策略 2:动态分片
class DynamicShardedScheduler:
"""动态分片调度器"""
def __init__(self, node_id):
self.node_id = node_id
self.redis = redis.Redis()
def get_active_nodes(self):
"""获取活跃节点列表"""
keys = self.redis.keys('scheduler:heartbeat:*')
nodes = []
for key in keys:
node_id = key.split(':')[-1]
if self.redis.get(key):
nodes.append(node_id)
return nodes
def calculate_shard(self, task_id):
"""计算任务所属分片"""
nodes = self.get_active_nodes()
if not nodes:
raise NoActiveNodeError()
node_index = hash(task_id) % len(nodes)
return nodes[node_index]
def schedule(self):
"""调度任务"""
while True:
nodes = self.get_active_nodes()
my_index = nodes.index(self.node_id) if self.node_id in nodes else -1
if my_index == -1:
# 当前节点不在列表中,等待
time.sleep(5)
continue
# ✅ 只处理属于当前节点的任务
tasks = self._get_tasks_for_node(my_index, len(nodes))
for task in tasks:
self._dispatch_task(task)
time.sleep(1)