完整架构

生产级架构

整体架构图

┌─────────────────────────────────────────────────────────────────┐
│                        延时队列系统架构                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │                    客户端层(Client)                     │   │
│  │  ├─ Web 应用                                           │   │
│  │  ├─ 移动应用                                           │   │
│  │  └─ 定时任务                                           │   │
│  └──────────────────┬──────────────────────────────────────┘   │
│                     │ REST API / gRPC                         │
│  ┌──────────────────▼──────────────────────────────────────┐   │
│  │                   网关层(Gateway)                       │   │
│  │  ├─ 负载均衡                                           │   │
│  │  ├─ 限流熔断                                           │   │
│  │  └─ 认证授权                                           │   │
│  └──────────────────┬──────────────────────────────────────┘   │
│                     │                                          │
│  ┌──────────────────▼──────────────────────────────────────┐   │
│  │                   业务层(Business)                      │   │
│  │  ├─ 任务管理服务                                       │   │
│  │  ├─ 任务路由服务                                       │   │
│  │  ├─ 任务编排服务                                       │   │
│  │  └─ 任务监控服务                                       │   │
│  └──────────────────┬──────────────────────────────────────┘   │
│                     │                                          │
│  ┌──────────────────▼──────────────────────────────────────┐   │
│  │                   服务层(Service)                       │   │
│  │                                                           │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐       │   │
│  │  │  调度器集群  │  │  消费者集群  │  │  监控集群   │       │   │
│  │  │  Scheduler  │  │  Consumer   │  │  Monitor   │       │   │
│  │  │             │  │             │  │             │       │   │
│  │  │ ├─ Scheduler│  │ ├─ Worker-1 │  │ ├─ Prometheus│       │   │
│  │  │ ├─ Scheduler│  │ ├─ Worker-2 │  │ ├─ Grafana   │       │   │
│  │  │ └─ Scheduler│  │ ├─ Worker-3 │  │ └─ AlertMgr  │       │   │
│  │  │     ...     │  │     ...     │  │             │       │   │
│  │  └─────────────┘  └─────────────┘  └─────────────┘       │   │
│  └──────────────────┬──────────────────────────────────────┘   │
│                     │                                          │
│  ┌──────────────────▼──────────────────────────────────────┐   │
│  │                   存储层(Storage)                       │   │
│  │                                                           │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐       │   │
│  │  │ Redis 集群  │  │ MySQL 集群  │  │ Kafka 集群  │       │   │
│  │  │             │  │             │  │             │       │   │
│  │  │ ├─ Master-1 │  │ ├─ Master-1 │  │ ├─ Broker-1 │       │   │
│  │  │ ├─ Master-2 │  │ ├─ Master-2 │  │ ├─ Broker-2 │       │   │
│  │  │ └─ Master-3 │  │ └─ Slave-1  │  │ └─ Broker-3 │       │   │
│  │  │             │  │     ...     │  │     ...     │       │   │
│  │  └─────────────┘  └─────────────┘  └─────────────┘       │   │
│  │                                                           │   │
│  │  ┌─────────────────────────────────────────────────┐     │   │
│  │  │              分层存储策略                       │     │   │
│  │  │                                                 │     │   │
│  │  │  热数据层(Redis)                               │     │   │
│  │  │  ├─ 延时 < 30 分钟                             │     │   │
│  │  │  ├─ 精度:秒级                                  │     │   │
│  │  │  └─ 内存:~50MB(峰值)                          │     │   │
│  │  │                                                 │     │   │
│  │  │  温数据层(MySQL)                               │     │   │
│  │  │  ├─ 延时 30 分钟 ~ 7 天                         │     │   │
│  │  │  ├─ 精度:分钟级                                │     │   │
│  │  │  └─ 存储:~1GB                                  │     │   │
│  │  │                                                 │     │   │
│  │  │  冷数据层(MySQL 长表)                           │     │   │
│  │  │  ├─ 延时 > 7 天                                  │     │   │
│  │  │  ├─ 精度:小时级                                │     │   │
│  │  │  └─ 存储:~500MB                                │     │   │
│  │  └─────────────────────────────────────────────────┘     │   │
│  └───────────────────────────────────────────────────────────┘   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

核心组件

1. 任务管理服务

class TaskManagementService:
    """任务管理服务"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.db = MySQLDatabase()
    
    def create_task(self, task_request):
        """创建任务"""
        # ✅ 参数校验
        self._validate_request(task_request)
        
        # ✅ 生成任务 ID
        task_id = self._generate_task_id(task_request)
        
        # ✅ 根据延时选择存储层
        delay = task_request['delay_seconds']
        tier = self._select_tier(delay)
        
        # ✅ 添加到对应存储层
        if tier['storage'] == 'redis':
            self._add_to_redis(task_id, task_request, delay)
        else:
            self._add_to_database(task_id, task_request, delay)
        
        # ✅ 记录审计日志
        self._audit_log('create', task_id, task_request)
        
        return {
            'task_id': task_id,
            'status': 'pending',
            'execute_at': datetime.now() + timedelta(seconds=delay)
        }
    
    def cancel_task(self, task_id):
        """取消任务"""
        # ✅ 检查任务是否存在
        task = self._get_task(task_id)
        if not task:
            raise TaskNotFoundError(task_id)
        
        # ✅ 检查任务状态
        if task['status'] not in ['pending', 'retrying']:
            raise TaskCannotCancelError(task_id, task['status'])
        
        # ✅ 从存储层删除
        if task['storage'] == 'redis':
            self._remove_from_redis(task_id)
        else:
            self._remove_from_database(task_id)
        
        # ✅ 记录审计日志
        self._audit_log('cancel', task_id)
        
        return {'task_id': task_id, 'status': 'cancelled'}
    
    def get_task(self, task_id):
        """查询任务"""
        task = self._get_task(task_id)
        if not task:
            raise TaskNotFoundError(task_id)
        return task

2. 调度器集群

class SchedulerCluster:
    """调度器集群"""
    
    def __init__(self, shard_count=10):
        self.shard_count = shard_count
        self.schedulers = []
        
        # 初始化调度器
        for shard_id in range(shard_count):
            scheduler = Scheduler(shard_id)
            self.schedulers.append(scheduler)
    
    def start(self):
        """启动所有调度器"""
        for scheduler in self.schedulers:
            threading.Thread(
                target=scheduler.run,
                daemon=True
            ).start()
    
    def stop(self):
        """停止所有调度器"""
        for scheduler in self.schedulers:
            scheduler.stop()


class Scheduler:
    """单个调度器(负责一个分片)"""
    
    def __init__(self, shard_id):
        self.shard_id = shard_id
        self.redis = redis.Redis()
        self.db = MySQLDatabase()
        self.running = True
    
    def run(self):
        """运行调度器"""
        while self.running:
            try:
                # ✅ 从 Redis 获取到期任务
                tasks = self._get_ready_tasks_from_redis()
                
                # ✅ 从数据库获取到期任务
                tasks.extend(self._get_ready_tasks_from_database())
                
                # ✅ 分配任务给消费者
                for task in tasks:
                    self._dispatch_task(task)
                
                # ✅ 休眠
                time.sleep(1)
                
            except Exception as e:
                logger.error(f"调度器异常: {e}")
    
    def _get_ready_tasks_from_redis(self):
        """从 Redis 获取到期任务"""
        # 只查询属于当前分片的任务
        task_ids = self.redis.zrangebyscore(
            f'delay_queue:short',
            0,
            int(time.time()),
            start=0,
            num=100
        )
        
        tasks = []
        for task_id in task_ids:
            if self._belongs_to_shard(task_id):
                tasks.append(self._get_task_from_redis(task_id))
        
        return tasks
    
    def _belongs_to_shard(self, task_id):
        """判断任务是否属于当前分片"""
        return hash(task_id) % self.schedulers_count == self.shard_id
    
    def _dispatch_task(self, task):
        """分配任务给消费者"""
        # 将任务发送到消息队列
        self._send_to_kafka('delay_queue_tasks', task)

3. 消费者集群

class ConsumerCluster:
    """消费者集群"""
    
    def __init__(self, consumer_count=20):
        self.consumer_count = consumer_count
        self.consumers = []
    
    def start(self, handlers):
        """启动所有消费者"""
        for i in range(self.consumer_count):
            consumer = Consumer(handlers)
            threading.Thread(
                target=consumer.run,
                daemon=True
            ).start()
            self.consumers.append(consumer)


class Consumer:
    """单个消费者"""
    
    def __init__(self, handlers):
        self.handlers = handlers  # 任务类型 -> 处理函数映射
        self.kafka_consumer = self._create_consumer()
    
    def run(self):
        """运行消费者"""
        for message in self.kafka_consumer:
            try:
                task = json.loads(message.value)
                handler = self.handlers.get(task['task_type'])
                
                if handler:
                    handler(task)
                    self.kafka_consumer.commit()
                else:
                    logger.error(f"未知的任务类型: {task['task_type']}")
                    
            except Exception as e:
                logger.error(f"任务执行失败: {e}")
                self._handle_failure(task, e)

高可用设计

主备切换

class HighAvailabilityScheduler:
    """高可用调度器"""
    
    def __init__(self, node_id):
        self.node_id = node_id
        self.redis = redis.Redis()
        self.is_leader = False
    
    def elect_leader(self):
        """选举 leader"""
        leader_key = 'delay_queue:leader'
        
        # ✅ 尝试获取 leader 锁
        acquired = self.redis.set(
            leader_key,
            self.node_id,
            nx=True,
            ex=30  # 30 秒超时
        )
        
        if acquired:
            self.is_leader = True
            logger.info(f"节点 {self.node_id} 成为 leader")
        else:
            self.is_leader = False
            leader = self.redis.get(leader_key)
            logger.info(f"节点 {self.node_id} 是 follower,leader: {leader}")
    
    def run(self):
        """运行"""
        while True:
            # ✅ 定期选举 leader
            self.elect_leader()
            
            if self.is_leader:
                # ✅ Leader 负责调度
                self._schedule()
            else:
                # ✅ Follower 等待
                time.sleep(5)
    
    def _schedule(self):
        """调度任务"""
        # Leader 负责的调度逻辑
        pass

监控告警

监控指标

class DelayQueueMonitor:
    """延时队列监控"""
    
    def __init__(self):
        self.prometheus = PrometheusClient()
    
    def collect_metrics(self):
        """收集监控指标"""
        # ✅ 任务积压数
        pending_count = self._get_pending_count()
        self.prometheus.gauge('delay_queue_pending', pending_count)
        
        # ✅ 任务处理速率
        process_rate = self._get_process_rate()
        self.prometheus.gauge('delay_queue_process_rate', process_rate)
        
        # ✅ 任务失败率
        failure_rate = self._get_failure_rate()
        self.prometheus.gauge('delay_queue_failure_rate', failure_rate)
        
        # ✅ 平均执行延迟
        avg_delay = self._get_avg_delay()
        self.prometheus.gauge('delay_queue_avg_delay', avg_delay)
        
        # ✅ P99 执行延迟
        p99_delay = self._get_p99_delay()
        self.prometheus.gauge('delay_queue_p99_delay', p99_delay)