多队列

为什么需要多队列?

单一 ZSet 队列的问题:

问题:任务量过大

假设每天 1000 万任务:
- 30 分钟内:450 万
- 30 分钟~7 天:350 万
- 7 天以上:200 万

所有任务都在同一个 ZSet:
- 内存占用:1000 万 × 1KB = 10GB
- 查询效率:扫描整个 ZSet 找到期任务
- 清理困难:已完成的任务需要单独清理

多队列架构

按延时范围分层

┌─────────────────────────────────────────────────────────┐
│                    多队列延时架构                        │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  热数据层(短延时队列)                                   │
│  • 延时范围:< 30 分钟                                   │
│  • 精度:秒级                                           │
│  • 内存:少量(任务快速完成)                              │
│  • Redis: zset:short                                   │
│                                                         │
│  温数据层(中延时队列)                                   │
│  • 延时范围:30 分钟 ~ 7 天                              │
│  • 精度:分钟级                                         │
│  • 内存:中等                                           │
│  • Redis: zset:medium                                  │
│                                                         │
│  冷数据层(长延时队列)                                   │
│  • 延时范围:> 7 天                                     │
│  • 精度:小时级                                         │
│  • 内存:少(使用数据库轮询)                              │
│  • Database: delay_tasks_long                           │
│                                                         │
└─────────────────────────────────────────────────────────┘

实现代码

class TieredDelayQueue:
    """多队列延时队列"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.db = MySQLDatabase()
        
        # 配置
        self.tiers = [
            {
                'name': 'short',
                'delay_range': (0, 30 * 60),  # 0 ~ 30 分钟
                'queue_key': 'delay_queue:short',
                'data_key': 'delay_queue:short:data',
                'precision': 1,  # 秒级
                'storage': 'redis'
            },
            {
                'name': 'medium',
                'delay_range': (30 * 60, 7 * 24 * 60 * 60),  # 30 分钟 ~ 7 天
                'queue_key': 'delay_queue:medium',
                'data_key': 'delay_queue:medium:data',
                'precision': 60,  # 分钟级
                'storage': 'redis'
            },
            {
                'name': 'long',
                'delay_range': (7 * 24 * 60 * 60, float('inf')),  # > 7 天
                'precision': 3600,  # 小时级
                'storage': 'database'
            }
        ]
    
    def _get_tier(self, delay_seconds):
        """根据延时时间获取对应的层级"""
        for tier in self.tiers:
            min_delay, max_delay = tier['delay_range']
            if min_delay <= delay_seconds < max_delay:
                return tier
        return self.tiers[-1]  # 默认最后一层
    
    def add_task(self, task_id, task_type, task_data, delay_seconds):
        """添加任务:自动路由到对应层级"""
        tier = self._get_tier(delay_seconds)
        
        logger.info(
            f"任务 {task_id} 路由到 {tier['name']} 层, "
            f"延时: {delay_seconds}s"
        )
        
        if tier['storage'] == 'redis':
            self._add_to_redis(
                tier,
                task_id,
                task_type,
                task_data,
                delay_seconds
            )
        else:
            self._add_to_database(
                tier,
                task_id,
                task_type,
                task_data,
                delay_seconds
            )
    
    def _add_to_redis(self, tier, task_id, task_type, task_data, delay_seconds):
        """添加到 Redis 层"""
        execute_at = int(time.time()) + delay_seconds
        
        # 存储任务数据
        task_info = {
            'task_id': task_id,
            'task_type': task_type,
            'task_data': task_data,
            'execute_at': execute_at,
            'created_at': int(time.time())
        }
        
        self.redis.hset(
            tier['data_key'],
            task_id,
            json.dumps(task_info)
        )
        
        # 添加到队列
        self.redis.zadd(
            tier['queue_key'],
            {task_id: execute_at}
        )
    
    def _add_to_database(self, tier, task_id, task_type, task_data, delay_seconds):
        """添加到数据库层"""
        execute_at = datetime.now() + timedelta(seconds=delay_seconds)
        
        self.db.execute("""
            INSERT INTO delay_tasks_long 
            (task_id, task_type, task_data, execute_at, status)
            VALUES (%s, %s, %s, %s, 'pending')
        """, (task_id, task_type, json.dumps(task_data), execute_at))
    
    def pop_ready_task(self):
        """弹出到期任务(从所有层级)"""
        # 优先从短延时层弹出
        for tier in reversed(self.tiers):
            if tier['storage'] == 'redis':
                task = self._pop_from_redis(tier)
                if task:
                    return task
            else:
                task = self._pop_from_database(tier)
                if task:
                    return task
        
        return None
    
    def _pop_from_redis(self, tier):
        """从 Redis 弹出任务"""
        now = int(time.time())
        
        lua_script = """
        local queue_key = KEYS[1]
        local data_key = KEYS[2]
        local now = tonumber(ARGV[1])
        
        local task_ids = redis.call('ZRANGEBYSCORE', queue_key, 0, now, 'LIMIT', 0, 1)
        if #task_ids == 0 then
            return nil
        end
        
        local task_id = task_ids[1]
        local task_data = redis.call('HGET', data_key, task_id)
        
        redis.call('ZREM', queue_key, task_id)
        redis.call('HDEL', data_key, task_id)
        
        return task_data
        """
        
        result = self.redis.eval(
            lua_script,
            2,
            tier['queue_key'],
            tier['data_key'],
            now
        )
        
        if result:
            return json.loads(result)
        return None
    
    def _pop_from_database(self, tier):
        """从数据库弹出任务"""
        tasks = self.db.query("""
            SELECT * FROM delay_tasks_long 
            WHERE execute_at <= NOW() 
              AND status = 'pending'
            ORDER BY execute_at ASC
            LIMIT 1
            FOR UPDATE SKIP LOCKED
        """)
        
        if tasks:
            task = tasks[0]
            self.db.execute("""
                UPDATE delay_tasks_long 
                SET status = 'processing'
                WHERE task_id = %s
            """, [task['task_id']])
            return task
        
        return None
    
    def upgrade_tasks(self):
        """任务升级:将快要到期的任务从低精度层移到高精度层"""
        # 将温数据层剩余时间 < 1 小时的任务移到热数据层
        self._upgrade_medium_to_short()
    
    def _upgrade_medium_to_short(self):
        """从中延时层升级到短延时层"""
        now = int(time.time())
        upgrade_threshold = 3600  # 1 小时
        
        # 查询需要升级的任务
        task_ids = self.redis.zrangebyscore(
            'delay_queue:medium',
            0,
            now + upgrade_threshold,
            start=0,
            num=100
        )
        
        for task_id in task_ids:
            # 获取任务数据
            task_data = self.redis.hget('delay_queue:medium:data', task_id)
            if task_data:
                task = json.loads(task_data)
                
                # 删除从温数据层
                self.redis.zrem('delay_queue:medium', task_id)
                self.redis.hdel('delay_queue:medium:data', task_id)
                
                # 添加到热数据层
                remaining_delay = task['execute_at'] - now
                self._add_to_redis(
                    self.tiers[0],  # short tier
                    task_id,
                    task['task_type'],
                    task['task_data'],
                    remaining_delay
                )
                
                logger.info(f"任务升级: {task_id} (medium → short)")

多队列的优势

优势说明
内存优化长延时任务不占用 Redis 内存
性能优化短延时任务查询更快
精度优化不同层级使用不同精度
成本优化减少内存使用,降低成本
可扩展可以独立扩展不同层级