时间轮原理

什么是时间轮?

时间轮(Timing Wheel)是一种高效管理定时任务的数据结构,类似于生活中的时钟。


基本原理

单层时间轮

时间轮结构:

    ┌───┬───┬───┬───┬───┬───┐
    │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │  ← 槽位(slots)
    ├───┼───┼───┼───┼───┼───┤
    │   │   │A  │   │B  │   │  ← 任务(A 在槽位 2,B 在槽位 4)
    └───┴───┴───┴───┴───┴───┘

    当前指针(current_slot)

时间轮转动:
- 每秒转动一格
- 指针指向的槽位,其中的任务到期执行

代码实现

import time

class TimingWheel:
    """单层时间轮"""
    
    def __init__(self, tick_interval=1, wheel_size=60):
        """
        tick_interval: 每格的时间间隔(秒)
        wheel_size: 时间轮的槽数
        """
        self.tick_interval = tick_interval
        self.wheel_size = wheel_size
        self.wheel = [[] for _ in range(wheel_size)]  # 槽位数组
        self.current_slot = 0  # 当前槽位
        self.start_time = time.time()
    
    def add_task(self, task, delay_seconds):
        """添加任务"""
        # 计算任务应该放在哪个槽位
        ticks = delay_seconds // self.tick_interval
        slot = (self.current_slot + ticks) % self.wheel_size
        
        # 添加任务到对应槽位
        self.wheel[slot].append({
            'task': task,
            'expires_at': time.time() + delay_seconds
        })
        
        logger.info(f"添加任务到槽位 {slot}")
    
    def tick(self):
        """时间轮转动一格"""
        # 获取当前槽位的任务
        tasks = self.wheel[self.current_slot]
        
        # 清空当前槽位
        self.wheel[self.current_slot] = []
        
        # 移动到下一个槽位
        self.current_slot = (self.current_slot + 1) % self.wheel_size
        
        return tasks
    
    def run(self, handler):
        """运行时间轮"""
        while True:
            # 执行一次 tick
            tasks = self.tick()
            
            # 处理到期的任务
            for item in tasks:
                task = item['task']
                try:
                    handler(task)
                except Exception as e:
                    logger.error(f"任务失败: {e}")
            
            # 休眠一个 tick_interval
            time.sleep(self.tick_interval)


# 使用示例
wheel = TimingWheel(tick_interval=1, wheel_size=60)

# 添加任务
wheel.add_task({'id': 1}, 5)   # 5 秒后执行
wheel.add_task({'id': 2}, 10)  # 10 秒后执行
wheel.add_task({'id': 3}, 30)  # 30 秒后执行

# 运行时间轮
def handler(task):
    print(f"执行任务: {task}")

wheel.run(handler)

时间复杂度

操作时间复杂度说明
添加任务O(1)直接计算槽位
获取到期任务O(1) 平均只处理当前槽位
空间复杂度O(n)n 为槽数

优缺点

优点

  • ✅ 性能极高(O(1))
  • ✅ 精度可控
  • ✅ 内存占用固定

缺点

  • ❌ 只支持固定间隔的延时
  • ❌ 长延时任务需要大容量时间轮