多队列
为什么需要多队列?
单一 ZSet 队列的问题:
问题:任务量过大
假设每天 1000 万任务:
- 30 分钟内:450 万
- 30 分钟~7 天:350 万
- 7 天以上:200 万
所有任务都在同一个 ZSet:
- 内存占用:1000 万 × 1KB = 10GB
- 查询效率:扫描整个 ZSet 找到期任务
- 清理困难:已完成的任务需要单独清理多队列架构
按延时范围分层
┌─────────────────────────────────────────────────────────┐
│ 多队列延时架构 │
├─────────────────────────────────────────────────────────┤
│ │
│ 热数据层(短延时队列) │
│ • 延时范围:< 30 分钟 │
│ • 精度:秒级 │
│ • 内存:少量(任务快速完成) │
│ • Redis: zset:short │
│ │
│ 温数据层(中延时队列) │
│ • 延时范围:30 分钟 ~ 7 天 │
│ • 精度:分钟级 │
│ • 内存:中等 │
│ • Redis: zset:medium │
│ │
│ 冷数据层(长延时队列) │
│ • 延时范围:> 7 天 │
│ • 精度:小时级 │
│ • 内存:少(使用数据库轮询) │
│ • Database: delay_tasks_long │
│ │
└─────────────────────────────────────────────────────────┘实现代码
class TieredDelayQueue:
"""多队列延时队列"""
def __init__(self):
self.redis = redis.Redis()
self.db = MySQLDatabase()
# 配置
self.tiers = [
{
'name': 'short',
'delay_range': (0, 30 * 60), # 0 ~ 30 分钟
'queue_key': 'delay_queue:short',
'data_key': 'delay_queue:short:data',
'precision': 1, # 秒级
'storage': 'redis'
},
{
'name': 'medium',
'delay_range': (30 * 60, 7 * 24 * 60 * 60), # 30 分钟 ~ 7 天
'queue_key': 'delay_queue:medium',
'data_key': 'delay_queue:medium:data',
'precision': 60, # 分钟级
'storage': 'redis'
},
{
'name': 'long',
'delay_range': (7 * 24 * 60 * 60, float('inf')), # > 7 天
'precision': 3600, # 小时级
'storage': 'database'
}
]
def _get_tier(self, delay_seconds):
"""根据延时时间获取对应的层级"""
for tier in self.tiers:
min_delay, max_delay = tier['delay_range']
if min_delay <= delay_seconds < max_delay:
return tier
return self.tiers[-1] # 默认最后一层
def add_task(self, task_id, task_type, task_data, delay_seconds):
"""添加任务:自动路由到对应层级"""
tier = self._get_tier(delay_seconds)
logger.info(
f"任务 {task_id} 路由到 {tier['name']} 层, "
f"延时: {delay_seconds}s"
)
if tier['storage'] == 'redis':
self._add_to_redis(
tier,
task_id,
task_type,
task_data,
delay_seconds
)
else:
self._add_to_database(
tier,
task_id,
task_type,
task_data,
delay_seconds
)
def _add_to_redis(self, tier, task_id, task_type, task_data, delay_seconds):
"""添加到 Redis 层"""
execute_at = int(time.time()) + delay_seconds
# 存储任务数据
task_info = {
'task_id': task_id,
'task_type': task_type,
'task_data': task_data,
'execute_at': execute_at,
'created_at': int(time.time())
}
self.redis.hset(
tier['data_key'],
task_id,
json.dumps(task_info)
)
# 添加到队列
self.redis.zadd(
tier['queue_key'],
{task_id: execute_at}
)
def _add_to_database(self, tier, task_id, task_type, task_data, delay_seconds):
"""添加到数据库层"""
execute_at = datetime.now() + timedelta(seconds=delay_seconds)
self.db.execute("""
INSERT INTO delay_tasks_long
(task_id, task_type, task_data, execute_at, status)
VALUES (%s, %s, %s, %s, 'pending')
""", (task_id, task_type, json.dumps(task_data), execute_at))
def pop_ready_task(self):
"""弹出到期任务(从所有层级)"""
# 优先从短延时层弹出
for tier in reversed(self.tiers):
if tier['storage'] == 'redis':
task = self._pop_from_redis(tier)
if task:
return task
else:
task = self._pop_from_database(tier)
if task:
return task
return None
def _pop_from_redis(self, tier):
"""从 Redis 弹出任务"""
now = int(time.time())
lua_script = """
local queue_key = KEYS[1]
local data_key = KEYS[2]
local now = tonumber(ARGV[1])
local task_ids = redis.call('ZRANGEBYSCORE', queue_key, 0, now, 'LIMIT', 0, 1)
if #task_ids == 0 then
return nil
end
local task_id = task_ids[1]
local task_data = redis.call('HGET', data_key, task_id)
redis.call('ZREM', queue_key, task_id)
redis.call('HDEL', data_key, task_id)
return task_data
"""
result = self.redis.eval(
lua_script,
2,
tier['queue_key'],
tier['data_key'],
now
)
if result:
return json.loads(result)
return None
def _pop_from_database(self, tier):
"""从数据库弹出任务"""
tasks = self.db.query("""
SELECT * FROM delay_tasks_long
WHERE execute_at <= NOW()
AND status = 'pending'
ORDER BY execute_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
""")
if tasks:
task = tasks[0]
self.db.execute("""
UPDATE delay_tasks_long
SET status = 'processing'
WHERE task_id = %s
""", [task['task_id']])
return task
return None
def upgrade_tasks(self):
"""任务升级:将快要到期的任务从低精度层移到高精度层"""
# 将温数据层剩余时间 < 1 小时的任务移到热数据层
self._upgrade_medium_to_short()
def _upgrade_medium_to_short(self):
"""从中延时层升级到短延时层"""
now = int(time.time())
upgrade_threshold = 3600 # 1 小时
# 查询需要升级的任务
task_ids = self.redis.zrangebyscore(
'delay_queue:medium',
0,
now + upgrade_threshold,
start=0,
num=100
)
for task_id in task_ids:
# 获取任务数据
task_data = self.redis.hget('delay_queue:medium:data', task_id)
if task_data:
task = json.loads(task_data)
# 删除从温数据层
self.redis.zrem('delay_queue:medium', task_id)
self.redis.hdel('delay_queue:medium:data', task_id)
# 添加到热数据层
remaining_delay = task['execute_at'] - now
self._add_to_redis(
self.tiers[0], # short tier
task_id,
task['task_type'],
task['task_data'],
remaining_delay
)
logger.info(f"任务升级: {task_id} (medium → short)")多队列的优势
| 优势 | 说明 |
|---|---|
| ✅ 内存优化 | 长延时任务不占用 Redis 内存 |
| ✅ 性能优化 | 短延时任务查询更快 |
| ✅ 精度优化 | 不同层级使用不同精度 |
| ✅ 成本优化 | 减少内存使用,降低成本 |
| ✅ 可扩展 | 可以独立扩展不同层级 |