Score 设计

最简单的 Score 设计

最直观的方案是直接使用时间戳作为 score:

# 😱 简单但有问题
execute_at = int(time.time()) + delay_seconds
redis.zadd('delay_queue', {task_id: execute_at})

这个方案看起来没问题,但实际使用时会遇到一些坑。


Score 设计的挑战

挑战 1:时间精度

# 问题 1:时间精度不够
# 如果使用秒级时间戳:
execute_at = int(time.time()) + delay_seconds
# 1710505800

# 同一秒内的任务会冲突!
task1 = add_task('task_1', ..., 30)  # 14:00:30
task2 = add_task('task_2', ..., 30)  # 14:00:30(同一秒)
task3 = add_task('task_3', ..., 30)  # 14:00:30(同一秒)

# 所有任务的 score 都是 1710505800
# ZSet 会覆盖后面的任务!

挑战 2:任务去重

# 问题 2:相同 score 的任务会被覆盖
redis.zadd('delay_queue', {
    'task_1': 1710505800,
    'task_2': 1710505800  # 相同 score,可能覆盖 task_1
})

# 结果:只有 task_2 在队列中,task_1 丢失了!

挑战 3:并发竞争

# 问题 3:并发添加相同 score 的任务
# 线程 1:
redis.zadd('delay_queue', {'task_1': 1710505800})

# 线程 2(同时执行):
redis.zadd('delay_queue', {'task_2': 1710505800})

# 结果:不确定,可能是 task_1,也可能是 task_2

Score 设计方案

方案 1:毫秒级时间戳

class MillisecondScoreDelayQueue:
    """使用毫秒级时间戳"""
    
    def add_task(self, task_id, delay_seconds):
        """添加任务"""
        # ✅ 使用毫秒级时间戳
        execute_at = int(time.time() * 1000) + delay_seconds * 1000
        
        self.redis.zadd('delay_queue', {task_id: execute_at})
        
        return execute_at
    
    def get_ready_tasks(self, batch_size=100):
        """获取到期任务"""
        now = int(time.time() * 1000)
        
        task_ids = self.redis.zrangebyscore(
            'delay_queue',
            0,
            now,
            start=0,
            num=batch_size
        )
        
        return task_ids

优点:

  • ✅ 精度提高到毫秒级
  • ✅ 同一秒内的任务不会冲突

缺点:

  • ❌ 同一毫秒内的任务仍可能冲突
  • ❌ 如果并发很高,仍可能有问题

方案 2:时间戳 + 随机数

class TimestampWithRandomDelayQueue:
    """时间戳 + 随机数"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.random_bits = 6  # 随机数位数
    
    def add_task(self, task_id, delay_seconds):
        """添加任务"""
        # ✅ 时间戳部分(毫秒级)
        timestamp = int(time.time() * 1000) + delay_seconds * 1000
        
        # ✅ 随机数部分(最低 6 位)
        random_part = random.randint(0, 2**self.random_bits - 1)
        
        # ✅ 组合:时间戳 << 6 + 随机数
        score = (timestamp << self.random_bits) + random_part
        
        self.redis.zadd('delay_queue', {task_id: score})
        
        return score
    
    def get_ready_tasks(self, batch_size=100):
        """获取到期任务"""
        now = int(time.time() * 1000) << self.random_bits
        
        task_ids = self.redis.zrangebyscore(
            'delay_queue',
            0,
            now,
            start=0,
            num=batch_size
        )
        
        return task_ids

示例:

原始时间戳:1710505800000 (毫秒)

score 设计:
├─ 时间戳部分(高 46 位):1710505800000 << 6
└─ 随机数部分(低 6 位):0 ~ 63

示例 score:
1710505800000 << 6 + 0 = 109543571200000
1710505800000 << 6 + 1 = 109543571200001
...
1710505800000 << 6 + 63 = 109543571200063

优点:

  • ✅ 同一毫秒内的任务不会冲突(最多 64 个)
  • ✅ 保持时间顺序

缺点:

  • ❌ 如果并发 > 64,仍可能冲突
  • ❌ 需要合理的随机位数选择

方案 3:时间戳 + 序列号(推荐)

class TimestampWithSequenceDelayQueue:
    """时间戳 + 序列号"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.sequence_key = 'delay_queue:sequence'
        self.sequence_bits = 10  # 序列号位数
    
    def _next_sequence(self):
        """获取下一个序列号(原子操作)"""
        return self.redis.incr(self.sequence_key)
    
    def add_task(self, task_id, delay_seconds):
        """添加任务"""
        # ✅ 时间戳部分(毫秒级)
        timestamp = int(time.time() * 1000) + delay_seconds * 1000
        
        # ✅ 序列号部分(原子递增)
        sequence = self._next_sequence()
        
        # ✅ 组合:时间戳 << 10 + 序列号
        score = (timestamp << self.sequence_bits) + (sequence % (2**self.sequence_bits))
        
        self.redis.zadd('delay_queue', {task_id: score})
        
        return score
    
    def get_ready_tasks(self, batch_size=100):
        """获取到期任务"""
        now = int(time.time() * 1000) << self.sequence_bits
        
        task_ids = self.redis.zrangebyscore(
            'delay_queue',
            0,
            now,
            start=0,
            num=batch_size
        )
        
        return task_ids

示例:

score 设计:
├─ 时间戳部分(高 42 位)
└─ 序列号部分(低 10 位)

同一秒内的任务:
1710505800000 << 10 + 0 = 1752904332800000
1710505800000 << 10 + 1 = 1752904332800001
...
1710505800000 << 10 + 1023 = 1752904332801023

可以支持同一毫秒内 1024 个任务

优点:

  • ✅ 完全避免冲突(序列号唯一)
  • ✅ 保持时间顺序
  • ✅ 支持高并发

缺点:

  • ❌ 序列号会一直增长,需要定期重置

方案 4:时间戳 + UUID(适用于分布式)

class TimestampWithUUIDDelayQueue:
    """时间戳 + UUID(分布式)"""
    
    def add_task(self, task_id, delay_seconds):
        """添加任务"""
        # ✅ 时间戳部分(毫秒级)
        timestamp = int(time.time() * 1000) + delay_seconds * 1000
        
        # ✅ UUID 部分(取后 16 位)
        uuid_str = str(uuid.uuid4()).replace('-', '')
        uuid_part = int(uuid_str[-16:], 16)
        
        # ✅ 组合
        score = (timestamp << 16) + (uuid_part % (2**16))
        
        self.redis.zadd('delay_queue', {task_id: score})
        
        return score

优点:

  • ✅ 分布式环境下也能保证唯一性
  • ✅ 不需要序列号管理

缺点:

  • ❌ 随机性可能导致顺序不完全按时间

Score 设计对比

方案冲突概率顺序保证分布式友好复杂度
秒级时间戳🔴 高✅ 完全✅ 友好⭐ 简单
毫秒级时间戳🟡 中✅ 完全✅ 友好⭐ 简单
时间戳 + 随机数🟢 低✅ 完全✅ 友好⭐⭐ 中等
时间戳 + 序列号🟢 无✅ 完全⚠️ 需要协调⭐⭐⭐ 复杂
时间戳 + UUID🟢 无⚠️ 近似✅ 友好⭐⭐ 中等

最佳实践

推荐方案:时间戳 + 序列号

class BestPracticeDelayQueue:
    """最佳实践:时间戳 + 序列号"""
    
    def __init__(self, queue_name='delay_queue'):
        self.redis = redis.Redis()
        self.queue_key = f"delay_queue:{queue_name}"
        self.sequence_key = f"delay_queue:sequence:{queue_name}"
        
        # 配置
        self.timestamp_bits = 42  # 时间戳位数(约 140 年)
        self.sequence_bits = 22   # 序列号位数(支持 4M 并发)
    
    def _next_sequence(self):
        """获取下一个序列号(带周期重置)"""
        sequence = self.redis.incr(self.sequence_key)
        
        # ✅ 周期性重置序列号(防止溢出)
        max_sequence = 2**self.sequence_bits - 1
        if sequence >= max_sequence:
            self.redis.set(self.sequence_key, 0)
            sequence = self.redis.incr(self.sequence_key)
        
        return sequence
    
    def add_task(self, task_id, delay_seconds):
        """添加任务"""
        # ✅ 计算执行时间(毫秒)
        execute_at_ms = int(time.time() * 1000) + delay_seconds * 1000
        
        # ✅ 获取序列号
        sequence = self._next_sequence()
        
        # ✅ 组合 score
        score = (execute_at_ms << self.sequence_bits) + sequence
        
        # ✅ 添加到队列
        self.redis.zadd(self.queue_key, {task_id: score})
        
        # ✅ 存储元数据
        metadata = {
            'execute_at': execute_at_ms,
            'sequence': sequence,
            'created_at': int(time.time() * 1000)
        }
        self.redis.hset(
            f"{self.queue_key}:metadata",
            task_id,
            json.dumps(metadata)
        )
        
        logger.info(
            f"添加任务: {task_id}, "
            f"score={score}, "
            f"execute_at={execute_at_ms}, "
            f"sequence={sequence}"
        )
        
        return task_id
    
    def pop_ready_task(self):
        """弹出到期任务(原子操作)"""
        now_ms = int(time.time() * 1000)
        now_score = now_ms << self.sequence_bits
        
        # ✅ Lua 脚本:查询并删除
        lua_script = """
        local queue_key = KEYS[1]
        local metadata_key = KEYS[2]
        local now_score = tonumber(ARGV[1])
        
        -- 查询最早到期的任务
        local task_ids = redis.call('ZRANGEBYSCORE', queue_key, 0, now_score, 'LIMIT', 0, 1)
        
        if #task_ids == 0 then
            return nil
        end
        
        local task_id = task_ids[1]
        
        -- 获取元数据
        local metadata = redis.call('HGET', metadata_key, task_id)
        
        -- 删除任务
        redis.call('ZREM', queue_key, task_id)
        redis.call('HDEL', metadata_key, task_id)
        
        return metadata
        """
        
        result = self.redis.eval(
            lua_script,
            2,
            self.queue_key,
            f"{self.queue_key}:metadata",
            now_score
        )
        
        if result:
            metadata = json.loads(result)
            return {
                'task_id': task_id,
                'execute_at': metadata['execute_at'],
                'sequence': metadata['sequence']
            }
        
        return None
    
    def get_queue_info(self):
        """获取队列信息"""
        now_ms = int(time.time() * 1000)
        
        # 总任务数
        total_count = self.redis.zcard(self.queue_key)
        
        # 到期任务数
        now_score = now_ms << self.sequence_bits
        ready_count = self.redis.zcount(self.queue_key, 0, now_score)
        
        # 最早任务时间
        earliest = self.redis.zrange(self.queue_key, 0, 0, withscores=True)
        earliest_time = earliest[0][1] >> self.sequence_bits if earliest else None
        
        return {
            'total_count': total_count,
            'ready_count': ready_count,
            'earliest_time': earliest_time,
            'earliest_seconds_away': (earliest_time - now_ms) / 1000 if earliest_time else None
        }

# 使用示例
queue = BestPracticeDelayQueue('orders')

# 添加任务
queue.add_task('task_1', 'order_timeout', {'order_id': 12345}, 30 * 60)
queue.add_task('task_2', 'order_timeout', {'order_id': 12346}, 30 * 60)
queue.add_task('task_3', 'order_timeout', {'order_id': 12347}, 30 * 60)

# 查看队列信息
info = queue.get_queue_info()
print(f"总任务数: {info['total_count']}")
print(f"到期任务数: {info['ready_count']}")
print(f"最早任务时间: {datetime.fromtimestamp(info['earliest_time']/1000)}")

想一想

思考 1

如何处理 score 溢出的问题?

参考答案

问题分析:

# 如果使用 64 位整数:
# 时间戳 42 位 + 序列号 22 位

# 时间戳部分的最大值(毫秒)
max_timestamp = 2**42 - 1  # 4398046511103 毫秒
max_timestamp_years = max_timestamp / (1000 * 60 * 60 * 24 * 365.25)
# 约 139 年

# 如果从 1970 年开始:
# 1970 + 139 = 2109 年
# 在 2109 年会溢出!

解决方案:

方案 1:使用相对时间

class RelativeScoreDelayQueue:
    """使用相对时间"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.base_time = int(time.time() * 1000)  # 系统启动时间
    
    def add_task(self, task_id, delay_seconds):
        """添加任务:使用相对时间"""
        # ✅ 计算相对于基准时间的偏移
        delay_ms = delay_seconds * 1000
        
        # ✅ score = 偏移时间(而不是绝对时间)
        score = delay_ms
        
        self.redis.zadd('delay_queue', {task_id: score})
    
    def get_ready_tasks(self, batch_size=100):
        """获取到期任务"""
        # ✅ 计算当前的相对时间
        now_ms = int(time.time() * 1000)
        relative_now = now_ms - self.base_time
        
        task_ids = self.redis.zrangebyscore(
            'delay_queue',
            0,
            relative_now,
            start=0,
            num=batch_size
        )
        
        return task_ids

优点:

  • ✅ 避免绝对时间戳溢出
  • ✅ 可以支持任意长的延时

缺点:

  • ❌ 系统重启后需要重新计算 base_time
  • ❌ 需要持久化 base_time

方案 2:周期性重置序列号

class CyclicSequenceDelayQueue:
    """周期性重置序列号"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.sequence_key = 'delay_queue:sequence'
        self.cycle_key = 'delay_queue:cycle'
    
    def _next_sequence(self):
        """获取下一个序列号(周期性重置)"""
        sequence = self.redis.incr(self.sequence_key)
        
        # ✅ 每 2^16 次重置一次序列号
        if sequence % 2**16 == 0:
            self.redis.set(self.sequence_key, 0)
            self.redis.incr(self.cycle_key)  # 增加周期号
        
        return sequence
    
    def add_task(self, task_id, delay_seconds):
        """添加任务"""
        execute_at_ms = int(time.time() * 1000) + delay_seconds * 1000
        
        # ✅ 包含周期号
        cycle = int(self.redis.get(self.cycle_key) or 0)
        sequence = self._next_sequence()
        
        # ✅ score = 时间戳(48位) + 周期号(8位) + 序列号(8位)
        score = (execute_at_ms << 16) + (cycle << 8) + sequence
        
        self.redis.zadd('delay_queue', {task_id: score})

优点:

  • ✅ 序列号不会无限增长
  • ✅ 延长了可使用时间

缺点:

  • ⚠️ 需要管理周期号
  • ⚠️ 实现复杂度增加

方案 3:使用浮点数

class FloatScoreDelayQueue:
    """使用浮点数 score"""
    
    def add_task(self, task_id, delay_seconds):
        """添加任务"""
        # ✅ 使用浮点数:时间戳 + 微秒偏移
        timestamp = int(time.time())
        microseconds = int(time.time() * 1000000) % 1000000
        
        # ✅ score = timestamp + microseconds / 1000000
        score = timestamp + microseconds / 1000000
        
        self.redis.zadd('delay_queue', {task_id: score})

优点:

  • ✅ 简单直接
  • ✅ 不需要管理序列号

缺点:

  • ❌ Redis 使用 double 存储,精度有限
  • ❌ 可能有浮点数精度问题

最佳实践:

  1. 短期使用:直接使用毫秒级时间戳
  2. 长期使用:使用相对时间或周期性重置
  3. 避免溢出:定期检查 score 是否接近上限
  4. 监控告警:score 接近上限时发出告警