Score 设计
最简单的 Score 设计
最直观的方案是直接使用时间戳作为 score:
# 😱 简单但有问题
execute_at = int(time.time()) + delay_seconds
redis.zadd('delay_queue', {task_id: execute_at})这个方案看起来没问题,但实际使用时会遇到一些坑。
Score 设计的挑战
挑战 1:时间精度
# 问题 1:时间精度不够
# 如果使用秒级时间戳:
execute_at = int(time.time()) + delay_seconds
# 1710505800
# 同一秒内的任务会冲突!
task1 = add_task('task_1', ..., 30) # 14:00:30
task2 = add_task('task_2', ..., 30) # 14:00:30(同一秒)
task3 = add_task('task_3', ..., 30) # 14:00:30(同一秒)
# 所有任务的 score 都是 1710505800
# ZSet 会覆盖后面的任务!挑战 2:任务去重
# 问题 2:相同 score 的任务会被覆盖
redis.zadd('delay_queue', {
'task_1': 1710505800,
'task_2': 1710505800 # 相同 score,可能覆盖 task_1
})
# 结果:只有 task_2 在队列中,task_1 丢失了!挑战 3:并发竞争
# 问题 3:并发添加相同 score 的任务
# 线程 1:
redis.zadd('delay_queue', {'task_1': 1710505800})
# 线程 2(同时执行):
redis.zadd('delay_queue', {'task_2': 1710505800})
# 结果:不确定,可能是 task_1,也可能是 task_2Score 设计方案
方案 1:毫秒级时间戳
class MillisecondScoreDelayQueue:
"""使用毫秒级时间戳"""
def add_task(self, task_id, delay_seconds):
"""添加任务"""
# ✅ 使用毫秒级时间戳
execute_at = int(time.time() * 1000) + delay_seconds * 1000
self.redis.zadd('delay_queue', {task_id: execute_at})
return execute_at
def get_ready_tasks(self, batch_size=100):
"""获取到期任务"""
now = int(time.time() * 1000)
task_ids = self.redis.zrangebyscore(
'delay_queue',
0,
now,
start=0,
num=batch_size
)
return task_ids优点:
- ✅ 精度提高到毫秒级
- ✅ 同一秒内的任务不会冲突
缺点:
- ❌ 同一毫秒内的任务仍可能冲突
- ❌ 如果并发很高,仍可能有问题
方案 2:时间戳 + 随机数
class TimestampWithRandomDelayQueue:
"""时间戳 + 随机数"""
def __init__(self):
self.redis = redis.Redis()
self.random_bits = 6 # 随机数位数
def add_task(self, task_id, delay_seconds):
"""添加任务"""
# ✅ 时间戳部分(毫秒级)
timestamp = int(time.time() * 1000) + delay_seconds * 1000
# ✅ 随机数部分(最低 6 位)
random_part = random.randint(0, 2**self.random_bits - 1)
# ✅ 组合:时间戳 << 6 + 随机数
score = (timestamp << self.random_bits) + random_part
self.redis.zadd('delay_queue', {task_id: score})
return score
def get_ready_tasks(self, batch_size=100):
"""获取到期任务"""
now = int(time.time() * 1000) << self.random_bits
task_ids = self.redis.zrangebyscore(
'delay_queue',
0,
now,
start=0,
num=batch_size
)
return task_ids示例:
原始时间戳:1710505800000 (毫秒)
score 设计:
├─ 时间戳部分(高 46 位):1710505800000 << 6
└─ 随机数部分(低 6 位):0 ~ 63
示例 score:
1710505800000 << 6 + 0 = 109543571200000
1710505800000 << 6 + 1 = 109543571200001
...
1710505800000 << 6 + 63 = 109543571200063优点:
- ✅ 同一毫秒内的任务不会冲突(最多 64 个)
- ✅ 保持时间顺序
缺点:
- ❌ 如果并发 > 64,仍可能冲突
- ❌ 需要合理的随机位数选择
方案 3:时间戳 + 序列号(推荐)
class TimestampWithSequenceDelayQueue:
"""时间戳 + 序列号"""
def __init__(self):
self.redis = redis.Redis()
self.sequence_key = 'delay_queue:sequence'
self.sequence_bits = 10 # 序列号位数
def _next_sequence(self):
"""获取下一个序列号(原子操作)"""
return self.redis.incr(self.sequence_key)
def add_task(self, task_id, delay_seconds):
"""添加任务"""
# ✅ 时间戳部分(毫秒级)
timestamp = int(time.time() * 1000) + delay_seconds * 1000
# ✅ 序列号部分(原子递增)
sequence = self._next_sequence()
# ✅ 组合:时间戳 << 10 + 序列号
score = (timestamp << self.sequence_bits) + (sequence % (2**self.sequence_bits))
self.redis.zadd('delay_queue', {task_id: score})
return score
def get_ready_tasks(self, batch_size=100):
"""获取到期任务"""
now = int(time.time() * 1000) << self.sequence_bits
task_ids = self.redis.zrangebyscore(
'delay_queue',
0,
now,
start=0,
num=batch_size
)
return task_ids示例:
score 设计:
├─ 时间戳部分(高 42 位)
└─ 序列号部分(低 10 位)
同一秒内的任务:
1710505800000 << 10 + 0 = 1752904332800000
1710505800000 << 10 + 1 = 1752904332800001
...
1710505800000 << 10 + 1023 = 1752904332801023
可以支持同一毫秒内 1024 个任务优点:
- ✅ 完全避免冲突(序列号唯一)
- ✅ 保持时间顺序
- ✅ 支持高并发
缺点:
- ❌ 序列号会一直增长,需要定期重置
方案 4:时间戳 + UUID(适用于分布式)
class TimestampWithUUIDDelayQueue:
"""时间戳 + UUID(分布式)"""
def add_task(self, task_id, delay_seconds):
"""添加任务"""
# ✅ 时间戳部分(毫秒级)
timestamp = int(time.time() * 1000) + delay_seconds * 1000
# ✅ UUID 部分(取后 16 位)
uuid_str = str(uuid.uuid4()).replace('-', '')
uuid_part = int(uuid_str[-16:], 16)
# ✅ 组合
score = (timestamp << 16) + (uuid_part % (2**16))
self.redis.zadd('delay_queue', {task_id: score})
return score优点:
- ✅ 分布式环境下也能保证唯一性
- ✅ 不需要序列号管理
缺点:
- ❌ 随机性可能导致顺序不完全按时间
Score 设计对比
| 方案 | 冲突概率 | 顺序保证 | 分布式友好 | 复杂度 |
|---|---|---|---|---|
| 秒级时间戳 | 🔴 高 | ✅ 完全 | ✅ 友好 | ⭐ 简单 |
| 毫秒级时间戳 | 🟡 中 | ✅ 完全 | ✅ 友好 | ⭐ 简单 |
| 时间戳 + 随机数 | 🟢 低 | ✅ 完全 | ✅ 友好 | ⭐⭐ 中等 |
| 时间戳 + 序列号 | 🟢 无 | ✅ 完全 | ⚠️ 需要协调 | ⭐⭐⭐ 复杂 |
| 时间戳 + UUID | 🟢 无 | ⚠️ 近似 | ✅ 友好 | ⭐⭐ 中等 |
最佳实践
推荐方案:时间戳 + 序列号
class BestPracticeDelayQueue:
"""最佳实践:时间戳 + 序列号"""
def __init__(self, queue_name='delay_queue'):
self.redis = redis.Redis()
self.queue_key = f"delay_queue:{queue_name}"
self.sequence_key = f"delay_queue:sequence:{queue_name}"
# 配置
self.timestamp_bits = 42 # 时间戳位数(约 140 年)
self.sequence_bits = 22 # 序列号位数(支持 4M 并发)
def _next_sequence(self):
"""获取下一个序列号(带周期重置)"""
sequence = self.redis.incr(self.sequence_key)
# ✅ 周期性重置序列号(防止溢出)
max_sequence = 2**self.sequence_bits - 1
if sequence >= max_sequence:
self.redis.set(self.sequence_key, 0)
sequence = self.redis.incr(self.sequence_key)
return sequence
def add_task(self, task_id, delay_seconds):
"""添加任务"""
# ✅ 计算执行时间(毫秒)
execute_at_ms = int(time.time() * 1000) + delay_seconds * 1000
# ✅ 获取序列号
sequence = self._next_sequence()
# ✅ 组合 score
score = (execute_at_ms << self.sequence_bits) + sequence
# ✅ 添加到队列
self.redis.zadd(self.queue_key, {task_id: score})
# ✅ 存储元数据
metadata = {
'execute_at': execute_at_ms,
'sequence': sequence,
'created_at': int(time.time() * 1000)
}
self.redis.hset(
f"{self.queue_key}:metadata",
task_id,
json.dumps(metadata)
)
logger.info(
f"添加任务: {task_id}, "
f"score={score}, "
f"execute_at={execute_at_ms}, "
f"sequence={sequence}"
)
return task_id
def pop_ready_task(self):
"""弹出到期任务(原子操作)"""
now_ms = int(time.time() * 1000)
now_score = now_ms << self.sequence_bits
# ✅ Lua 脚本:查询并删除
lua_script = """
local queue_key = KEYS[1]
local metadata_key = KEYS[2]
local now_score = tonumber(ARGV[1])
-- 查询最早到期的任务
local task_ids = redis.call('ZRANGEBYSCORE', queue_key, 0, now_score, 'LIMIT', 0, 1)
if #task_ids == 0 then
return nil
end
local task_id = task_ids[1]
-- 获取元数据
local metadata = redis.call('HGET', metadata_key, task_id)
-- 删除任务
redis.call('ZREM', queue_key, task_id)
redis.call('HDEL', metadata_key, task_id)
return metadata
"""
result = self.redis.eval(
lua_script,
2,
self.queue_key,
f"{self.queue_key}:metadata",
now_score
)
if result:
metadata = json.loads(result)
return {
'task_id': task_id,
'execute_at': metadata['execute_at'],
'sequence': metadata['sequence']
}
return None
def get_queue_info(self):
"""获取队列信息"""
now_ms = int(time.time() * 1000)
# 总任务数
total_count = self.redis.zcard(self.queue_key)
# 到期任务数
now_score = now_ms << self.sequence_bits
ready_count = self.redis.zcount(self.queue_key, 0, now_score)
# 最早任务时间
earliest = self.redis.zrange(self.queue_key, 0, 0, withscores=True)
earliest_time = earliest[0][1] >> self.sequence_bits if earliest else None
return {
'total_count': total_count,
'ready_count': ready_count,
'earliest_time': earliest_time,
'earliest_seconds_away': (earliest_time - now_ms) / 1000 if earliest_time else None
}
# 使用示例
queue = BestPracticeDelayQueue('orders')
# 添加任务
queue.add_task('task_1', 'order_timeout', {'order_id': 12345}, 30 * 60)
queue.add_task('task_2', 'order_timeout', {'order_id': 12346}, 30 * 60)
queue.add_task('task_3', 'order_timeout', {'order_id': 12347}, 30 * 60)
# 查看队列信息
info = queue.get_queue_info()
print(f"总任务数: {info['total_count']}")
print(f"到期任务数: {info['ready_count']}")
print(f"最早任务时间: {datetime.fromtimestamp(info['earliest_time']/1000)}")想一想
思考 1
如何处理 score 溢出的问题?
参考答案
问题分析:
# 如果使用 64 位整数:
# 时间戳 42 位 + 序列号 22 位
# 时间戳部分的最大值(毫秒)
max_timestamp = 2**42 - 1 # 4398046511103 毫秒
max_timestamp_years = max_timestamp / (1000 * 60 * 60 * 24 * 365.25)
# 约 139 年
# 如果从 1970 年开始:
# 1970 + 139 = 2109 年
# 在 2109 年会溢出!解决方案:
方案 1:使用相对时间
class RelativeScoreDelayQueue:
"""使用相对时间"""
def __init__(self):
self.redis = redis.Redis()
self.base_time = int(time.time() * 1000) # 系统启动时间
def add_task(self, task_id, delay_seconds):
"""添加任务:使用相对时间"""
# ✅ 计算相对于基准时间的偏移
delay_ms = delay_seconds * 1000
# ✅ score = 偏移时间(而不是绝对时间)
score = delay_ms
self.redis.zadd('delay_queue', {task_id: score})
def get_ready_tasks(self, batch_size=100):
"""获取到期任务"""
# ✅ 计算当前的相对时间
now_ms = int(time.time() * 1000)
relative_now = now_ms - self.base_time
task_ids = self.redis.zrangebyscore(
'delay_queue',
0,
relative_now,
start=0,
num=batch_size
)
return task_ids优点:
- ✅ 避免绝对时间戳溢出
- ✅ 可以支持任意长的延时
缺点:
- ❌ 系统重启后需要重新计算 base_time
- ❌ 需要持久化 base_time
方案 2:周期性重置序列号
class CyclicSequenceDelayQueue:
"""周期性重置序列号"""
def __init__(self):
self.redis = redis.Redis()
self.sequence_key = 'delay_queue:sequence'
self.cycle_key = 'delay_queue:cycle'
def _next_sequence(self):
"""获取下一个序列号(周期性重置)"""
sequence = self.redis.incr(self.sequence_key)
# ✅ 每 2^16 次重置一次序列号
if sequence % 2**16 == 0:
self.redis.set(self.sequence_key, 0)
self.redis.incr(self.cycle_key) # 增加周期号
return sequence
def add_task(self, task_id, delay_seconds):
"""添加任务"""
execute_at_ms = int(time.time() * 1000) + delay_seconds * 1000
# ✅ 包含周期号
cycle = int(self.redis.get(self.cycle_key) or 0)
sequence = self._next_sequence()
# ✅ score = 时间戳(48位) + 周期号(8位) + 序列号(8位)
score = (execute_at_ms << 16) + (cycle << 8) + sequence
self.redis.zadd('delay_queue', {task_id: score})优点:
- ✅ 序列号不会无限增长
- ✅ 延长了可使用时间
缺点:
- ⚠️ 需要管理周期号
- ⚠️ 实现复杂度增加
方案 3:使用浮点数
class FloatScoreDelayQueue:
"""使用浮点数 score"""
def add_task(self, task_id, delay_seconds):
"""添加任务"""
# ✅ 使用浮点数:时间戳 + 微秒偏移
timestamp = int(time.time())
microseconds = int(time.time() * 1000000) % 1000000
# ✅ score = timestamp + microseconds / 1000000
score = timestamp + microseconds / 1000000
self.redis.zadd('delay_queue', {task_id: score})优点:
- ✅ 简单直接
- ✅ 不需要管理序列号
缺点:
- ❌ Redis 使用 double 存储,精度有限
- ❌ 可能有浮点数精度问题
最佳实践:
- ✅ 短期使用:直接使用毫秒级时间戳
- ✅ 长期使用:使用相对时间或周期性重置
- ✅ 避免溢出:定期检查 score 是否接近上限
- ✅ 监控告警:score 接近上限时发出告警