完整架构
生产级架构
整体架构图
┌─────────────────────────────────────────────────────────────────┐
│ 延时队列系统架构 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 客户端层(Client) │ │
│ │ ├─ Web 应用 │ │
│ │ ├─ 移动应用 │ │
│ │ └─ 定时任务 │ │
│ └──────────────────┬──────────────────────────────────────┘ │
│ │ REST API / gRPC │
│ ┌──────────────────▼──────────────────────────────────────┐ │
│ │ 网关层(Gateway) │ │
│ │ ├─ 负载均衡 │ │
│ │ ├─ 限流熔断 │ │
│ │ └─ 认证授权 │ │
│ └──────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────▼──────────────────────────────────────┐ │
│ │ 业务层(Business) │ │
│ │ ├─ 任务管理服务 │ │
│ │ ├─ 任务路由服务 │ │
│ │ ├─ 任务编排服务 │ │
│ │ └─ 任务监控服务 │ │
│ └──────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────▼──────────────────────────────────────┐ │
│ │ 服务层(Service) │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ 调度器集群 │ │ 消费者集群 │ │ 监控集群 │ │ │
│ │ │ Scheduler │ │ Consumer │ │ Monitor │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ ├─ Scheduler│ │ ├─ Worker-1 │ │ ├─ Prometheus│ │ │
│ │ │ ├─ Scheduler│ │ ├─ Worker-2 │ │ ├─ Grafana │ │ │
│ │ │ └─ Scheduler│ │ ├─ Worker-3 │ │ └─ AlertMgr │ │ │
│ │ │ ... │ │ ... │ │ │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ └──────────────────┬──────────────────────────────────────┘ │
│ │ │
│ ┌──────────────────▼──────────────────────────────────────┐ │
│ │ 存储层(Storage) │ │
│ │ │ │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │
│ │ │ Redis 集群 │ │ MySQL 集群 │ │ Kafka 集群 │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ ├─ Master-1 │ │ ├─ Master-1 │ │ ├─ Broker-1 │ │ │
│ │ │ ├─ Master-2 │ │ ├─ Master-2 │ │ ├─ Broker-2 │ │ │
│ │ │ └─ Master-3 │ │ └─ Slave-1 │ │ └─ Broker-3 │ │ │
│ │ │ │ │ ... │ │ ... │ │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ 分层存储策略 │ │ │
│ │ │ │ │ │
│ │ │ 热数据层(Redis) │ │ │
│ │ │ ├─ 延时 < 30 分钟 │ │ │
│ │ │ ├─ 精度:秒级 │ │ │
│ │ │ └─ 内存:~50MB(峰值) │ │ │
│ │ │ │ │ │
│ │ │ 温数据层(MySQL) │ │ │
│ │ │ ├─ 延时 30 分钟 ~ 7 天 │ │ │
│ │ │ ├─ 精度:分钟级 │ │ │
│ │ │ └─ 存储:~1GB │ │ │
│ │ │ │ │ │
│ │ │ 冷数据层(MySQL 长表) │ │ │
│ │ │ ├─ 延时 > 7 天 │ │ │
│ │ │ ├─ 精度:小时级 │ │ │
│ │ │ └─ 存储:~500MB │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
核心组件
1. 任务管理服务
class TaskManagementService:
"""任务管理服务"""
def __init__(self):
self.redis = redis.Redis()
self.db = MySQLDatabase()
def create_task(self, task_request):
"""创建任务"""
# ✅ 参数校验
self._validate_request(task_request)
# ✅ 生成任务 ID
task_id = self._generate_task_id(task_request)
# ✅ 根据延时选择存储层
delay = task_request['delay_seconds']
tier = self._select_tier(delay)
# ✅ 添加到对应存储层
if tier['storage'] == 'redis':
self._add_to_redis(task_id, task_request, delay)
else:
self._add_to_database(task_id, task_request, delay)
# ✅ 记录审计日志
self._audit_log('create', task_id, task_request)
return {
'task_id': task_id,
'status': 'pending',
'execute_at': datetime.now() + timedelta(seconds=delay)
}
def cancel_task(self, task_id):
"""取消任务"""
# ✅ 检查任务是否存在
task = self._get_task(task_id)
if not task:
raise TaskNotFoundError(task_id)
# ✅ 检查任务状态
if task['status'] not in ['pending', 'retrying']:
raise TaskCannotCancelError(task_id, task['status'])
# ✅ 从存储层删除
if task['storage'] == 'redis':
self._remove_from_redis(task_id)
else:
self._remove_from_database(task_id)
# ✅ 记录审计日志
self._audit_log('cancel', task_id)
return {'task_id': task_id, 'status': 'cancelled'}
def get_task(self, task_id):
"""查询任务"""
task = self._get_task(task_id)
if not task:
raise TaskNotFoundError(task_id)
return task
2. 调度器集群
class SchedulerCluster:
"""调度器集群"""
def __init__(self, shard_count=10):
self.shard_count = shard_count
self.schedulers = []
# 初始化调度器
for shard_id in range(shard_count):
scheduler = Scheduler(shard_id)
self.schedulers.append(scheduler)
def start(self):
"""启动所有调度器"""
for scheduler in self.schedulers:
threading.Thread(
target=scheduler.run,
daemon=True
).start()
def stop(self):
"""停止所有调度器"""
for scheduler in self.schedulers:
scheduler.stop()
class Scheduler:
"""单个调度器(负责一个分片)"""
def __init__(self, shard_id):
self.shard_id = shard_id
self.redis = redis.Redis()
self.db = MySQLDatabase()
self.running = True
def run(self):
"""运行调度器"""
while self.running:
try:
# ✅ 从 Redis 获取到期任务
tasks = self._get_ready_tasks_from_redis()
# ✅ 从数据库获取到期任务
tasks.extend(self._get_ready_tasks_from_database())
# ✅ 分配任务给消费者
for task in tasks:
self._dispatch_task(task)
# ✅ 休眠
time.sleep(1)
except Exception as e:
logger.error(f"调度器异常: {e}")
def _get_ready_tasks_from_redis(self):
"""从 Redis 获取到期任务"""
# 只查询属于当前分片的任务
task_ids = self.redis.zrangebyscore(
f'delay_queue:short',
0,
int(time.time()),
start=0,
num=100
)
tasks = []
for task_id in task_ids:
if self._belongs_to_shard(task_id):
tasks.append(self._get_task_from_redis(task_id))
return tasks
def _belongs_to_shard(self, task_id):
"""判断任务是否属于当前分片"""
return hash(task_id) % self.schedulers_count == self.shard_id
def _dispatch_task(self, task):
"""分配任务给消费者"""
# 将任务发送到消息队列
self._send_to_kafka('delay_queue_tasks', task)
3. 消费者集群
class ConsumerCluster:
"""消费者集群"""
def __init__(self, consumer_count=20):
self.consumer_count = consumer_count
self.consumers = []
def start(self, handlers):
"""启动所有消费者"""
for i in range(self.consumer_count):
consumer = Consumer(handlers)
threading.Thread(
target=consumer.run,
daemon=True
).start()
self.consumers.append(consumer)
class Consumer:
"""单个消费者"""
def __init__(self, handlers):
self.handlers = handlers # 任务类型 -> 处理函数映射
self.kafka_consumer = self._create_consumer()
def run(self):
"""运行消费者"""
for message in self.kafka_consumer:
try:
task = json.loads(message.value)
handler = self.handlers.get(task['task_type'])
if handler:
handler(task)
self.kafka_consumer.commit()
else:
logger.error(f"未知的任务类型: {task['task_type']}")
except Exception as e:
logger.error(f"任务执行失败: {e}")
self._handle_failure(task, e)
高可用设计
主备切换
class HighAvailabilityScheduler:
"""高可用调度器"""
def __init__(self, node_id):
self.node_id = node_id
self.redis = redis.Redis()
self.is_leader = False
def elect_leader(self):
"""选举 leader"""
leader_key = 'delay_queue:leader'
# ✅ 尝试获取 leader 锁
acquired = self.redis.set(
leader_key,
self.node_id,
nx=True,
ex=30 # 30 秒超时
)
if acquired:
self.is_leader = True
logger.info(f"节点 {self.node_id} 成为 leader")
else:
self.is_leader = False
leader = self.redis.get(leader_key)
logger.info(f"节点 {self.node_id} 是 follower,leader: {leader}")
def run(self):
"""运行"""
while True:
# ✅ 定期选举 leader
self.elect_leader()
if self.is_leader:
# ✅ Leader 负责调度
self._schedule()
else:
# ✅ Follower 等待
time.sleep(5)
def _schedule(self):
"""调度任务"""
# Leader 负责的调度逻辑
pass
监控告警
监控指标
class DelayQueueMonitor:
"""延时队列监控"""
def __init__(self):
self.prometheus = PrometheusClient()
def collect_metrics(self):
"""收集监控指标"""
# ✅ 任务积压数
pending_count = self._get_pending_count()
self.prometheus.gauge('delay_queue_pending', pending_count)
# ✅ 任务处理速率
process_rate = self._get_process_rate()
self.prometheus.gauge('delay_queue_process_rate', process_rate)
# ✅ 任务失败率
failure_rate = self._get_failure_rate()
self.prometheus.gauge('delay_queue_failure_rate', failure_rate)
# ✅ 平均执行延迟
avg_delay = self._get_avg_delay()
self.prometheus.gauge('delay_queue_avg_delay', avg_delay)
# ✅ P99 执行延迟
p99_delay = self._get_p99_delay()
self.prometheus.gauge('delay_queue_p99_delay', p99_delay)