层级时间轮

问题

你的延时队列需要处理长延时任务

场景:物流自动确认收货
需求:
- 延时时间:7 天(甚至 30 天)
- 任务数量:每天 10 万个
- 精度:±1 小时

问题:
单层时间轮:
- tick_duration = 100ms
- ticks_per_wheel = 512
- 最大延时 = 51.2 秒

7 天延时怎么办?

解决方案:层级时间轮

就像钟表有秒针、分针、时针,层级时间轮用多层时间轮处理不同时间跨度!🎯


层级时间轮原理

生活类比:钟表

         时针(12 格)
        ┌─────────┐
   分针 │  秒针   │ 分针
  (60格)│ (60格) │ (60格)
        └─────────┘

时间流逝:
秒针每转一圈(60 秒)→ 分针走一格(1 分钟)
分针每转一圈(60 分钟)→ 时针走一格(1 小时)

最大延时:
60 秒 × 60 分 × 12 时 = 12 小时

层级时间轮设计

第一层(秒针轮):
- tick_duration = 1 秒
- ticks_per_wheel = 60
- 最大延时 = 60 秒

第二层(分针轮):
- tick_duration = 1 分钟
- ticks_per_wheel = 60
- 最大延时 = 60 分钟 = 1 小时

第三层(时针轮):
- tick_duration = 1 小时
- ticks_per_wheel = 24
- 最大延时 = 24 小时 = 1 天

第四层(天针轮):
- tick_duration = 1 天
- ticks_per_wheel = 30
- 最大延时 = 30 天

总最大延时:30 天!✅

数据结构设计

核心类

from typing import Callable, Optional
import time
import threading

class TimerTask:
    """延时任务"""
    
    def __init__(self, task: Callable, delay_seconds: float):
        self.task = task
        self.delay_seconds = delay_seconds
        self.execute_at = time.time() + delay_seconds
        self.cancelled = False
    
    def cancel(self):
        """取消任务"""
        self.cancelled = True
    
    def run(self):
        """执行任务"""
        if not self.cancelled:
            try:
                self.task()
            except Exception as e:
                print(f"任务执行失败: {e}")

class TimerTaskList:
    """任务链表"""
    
    def __init__(self):
        self.head = None
        self.tail = None
    
    def add_task(self, task: TimerTask):
        """添加任务到链表尾部"""
        if self.tail is None:
            self.head = self.tail = task
        else:
            self.tail.next = task
            task.prev = self.tail
            self.tail = task
    
    def remove_task(self, task: TimerTask):
        """从链表中移除任务"""
        if task.prev:
            task.prev.next = task.next
        if task.next:
            task.next.prev = task.prev
        
        if task == self.head:
            self.head = task.next
        if task == self.tail:
            self.tail = task.prev
        
        task.prev = task.next = None
    
    def clear(self):
        """清空链表"""
        self.head = self.tail = None

class TimingWheel:
    """单层时间轮"""
    
    def __init__(self, tick_duration: float, wheel_size: int, start_time: float):
        """
        Args:
            tick_duration: 每个槽位的时间跨度(秒)
            wheel_size: 时间轮大小
            start_time: 起始时间
        """
        self.tick_duration = tick_duration
        self.wheel_size = wheel_size
        self.wheel = [TimerTaskList() for _ in range(wheel_size)]
        self.current_tick = 0
        self.start_time = start_time
    
    def add_task(self, task: TimerTask) -> bool:
        """
        添加任务到时间轮
        
        Returns:
            True: 成功添加到本层
            False: 需要添加到上层
        """
        # 计算延时
        delay = task.execute_at - self.start_time
        
        # 如果延时超过本层最大延时,返回 False(需要上层处理)
        if delay >= self.tick_duration * self.wheel_size:
            return False
        
        # 计算槽位和圈数
        ticks = int(delay // self.tick_duration)
        rounds = ticks // self.wheel_size
        target_tick = ticks % self.wheel_size
        
        # 添加任务
        task.rounds = rounds
        self.wheel[target_tick].add_task(task)
        
        return True
    
    def advance_clock(self, current_time: float) -> list:
        """
        推进时钟,处理到期任务
        
        Returns:
            到期任务列表
        """
        # 计算应该推进几个 tick
        ticks = int((current_time - self.start_time) // self.tick_duration)
        
        ready_tasks = []
        
        for _ in range(ticks):
            # 取出当前槽位的任务
            task_list = self.wheel[self.current_tick]
            
            task = task_list.head
            task_list.clear()
            
            while task:
                next_task = task.next
                
                if task.cancelled:
                    # 跳过已取消的任务
                    pass
                elif task.rounds == 0:
                    # 到期任务
                    ready_tasks.append(task)
                else:
                    # 未到期,圈数减 1,重新添加
                    task.rounds -= 1
                    task_list.add_task(task)
                
                task = next_task
            
            # 移动指针
            self.current_tick = (self.current_tick + 1) % self.wheel_size
        
        # 更新起始时间
        self.start_time += ticks * self.tick_duration
        
        return ready_tasks

class LayeredTimingWheel:
    """层级时间轮"""
    
    def __init__(self, tick_duration: float, wheel_size: int, layer_count: int):
        """
        Args:
            tick_duration: 第一层的 tick 时间跨度
            wheel_size: 每层的时间轮大小
            layer_count: 层级数量
        """
        self.tick_duration = tick_duration
        self.wheel_size = wheel_size
        self.layer_count = layer_count
        
        # 创建多层时间轮
        self.wheels = []
        current_tick_duration = tick_duration
        
        for i in range(layer_count):
            wheel = TimingWheel(current_tick_duration, wheel_size, time.time())
            self.wheels.append(wheel)
            
            # 下一层的 tick_duration 是当前层的 wheel_size倍
            current_tick_duration *= wheel_size
        
        # 最高层最大延时
        self.max_delay = current_tick_duration
        
        # 工作线程
        self.running = False
        self.worker_thread = None
    
    def add_task(self, task: Callable, delay_seconds: float) -> TimerTask:
        """添加延时任务"""
        timer_task = TimerTask(task, delay_seconds)
        
        # 从底层开始尝试添加
        for wheel in self.wheels:
            if wheel.add_task(timer_task):
                return timer_task
        
        # 如果所有层都无法添加(延时超过最大值)
        raise ValueError(f"延时 {delay_seconds} 秒超过最大延时 {self.max_delay} 秒")
    
    def _worker(self):
        """工作线程"""
        while self.running:
            current_time = time.time()
            
            # 推进底层时间轮
            ready_tasks = self.wheels[0].advance_clock(current_time)
            
            # 执行到期任务
            for task in ready_tasks:
                task.run()
            
            # 推进上层时间轮,降级任务到下层
            for i in range(1, self.layer_count):
                # 上层推进一个 tick,任务降级到下层
                if current_time >= self.wheels[i].start_time + self.wheels[i].tick_duration:
                    tasks = self.wheels[i].advance_clock(current_time)
                    
                    # 降级到下层
                    for task in tasks:
                        self.wheels[i-1].add_task(task)
            
            # 等待下一个 tick
            next_tick = self.wheels[0].start_time + self.wheels[0].tick_duration
            sleep_time = next_tick - current_time
            time.sleep(max(0, sleep_time))
    
    def start(self):
        """启动时间轮"""
        self.running = True
        self.worker_thread = threading.Thread(target=self._worker, daemon=True)
        self.worker_thread.start()
    
    def stop(self):
        """停止时间轮"""
        self.running = False
        if self.worker_thread:
            self.worker_thread.join()

完整实现

增强版层级时间轮

import time
import threading
from typing import Callable, Dict, Optional
from collections import defaultdict

class TaskEntry:
    """任务条目"""
    
    def __init__(self, task_id: int, task: Callable, execute_at: float):
        self.task_id = task_id
        self.task = task
        self.execute_at = execute_at
        self.cancelled = False
        self.rounds = 0
        self.next = None
        self.prev = None
        self.wheel_level = 0
        self.slot_index = 0

class EnhancedLayeredTimingWheel:
    """增强版层级时间轮"""
    
    def __init__(self):
        # 时间轮配置(类似时钟)
        self.wheel_configs = [
            {'tick_ms': 1000,    'wheel_size': 60},   # 秒针轮:1秒一格,60格
            {'tick_ms': 60000,   'wheel_size': 60},   # 分针轮:1分钟一格,60格
            {'tick_ms': 3600000, 'wheel_size': 24},   # 时针轮:1小时一格,24格
            {'tick_ms': 86400000,'wheel_size': 30},   # 天针轮:1天一格,30格
        ]
        
        # 创建时间轮
        self.wheels = []
        self.start_time = time.time()
        
        for config in self.wheel_configs:
            tick_duration = config['tick_ms'] / 1000.0  # 转换为秒
            wheel_size = config['wheel_size']
            
            wheel = {
                'tick_duration': tick_duration,
                'wheel_size': wheel_size,
                'slots': [[] for _ in range(wheel_size)],
                'current_tick': 0,
                'start_time': self.start_time
            }
            self.wheels.append(wheel)
        
        # 计算最大延时
        self.max_delay = sum(
            w['tick_duration'] * w['wheel_size'] 
            for w in self.wheels
        )
        
        # 任务 ID 生成器
        self.task_id_counter = 0
        
        # 任务映射表(用于快速删除)
        self.task_map: Dict[int, TaskEntry] = {}
        
        # 工作线程
        self.running = False
        self.worker_thread = None
        self.lock = threading.Lock()
    
    def add_task(self, task: Callable, delay_seconds: float) -> int:
        """添加延时任务"""
        
        if delay_seconds > self.max_delay:
            raise ValueError(f"延时 {delay_seconds} 秒超过最大延时 {self.max_delay} 秒")
        
        with self.lock:
            # 生成任务 ID
            task_id = self.task_id_counter
            self.task_id_counter += 1
            
            # 创建任务条目
            execute_at = time.time() + delay_seconds
            entry = TaskEntry(task_id, task, execute_at)
            
            # 找到合适的时间轮层级
            for level, wheel in enumerate(self.wheels):
                max_delay_this_level = wheel['tick_duration'] * wheel['wheel_size']
                
                if delay_seconds < max_delay_this_level:
                    # 添加到这一层
                    ticks = int(delay_seconds / wheel['tick_duration'])
                    rounds = ticks // wheel['wheel_size']
                    slot_index = ticks % wheel['wheel_size']
                    
                    entry.wheel_level = level
                    entry.slot_index = slot_index
                    entry.rounds = rounds
                    
                    wheel['slots'][slot_index].append(entry)
                    
                    # 记录任务
                    self.task_map[task_id] = entry
                    
                    return task_id
        
        # 如果所有层都不合适,添加到最高层
        highest_wheel = self.wheels[-1]
        ticks = int(delay_seconds / highest_wheel['tick_duration'])
        rounds = ticks // highest_wheel['wheel_size']
        slot_index = ticks % highest_wheel['wheel_size']
        
        entry.wheel_level = len(self.wheels) - 1
        entry.slot_index = slot_index
        entry.rounds = rounds
        
        highest_wheel['slots'][slot_index].append(entry)
        self.task_map[task_id] = entry
        
        return task_id
    
    def cancel_task(self, task_id: int) -> bool:
        """取消任务"""
        with self.lock:
            if task_id not in self.task_map:
                return False
            
            entry = self.task_map[task_id]
            entry.cancelled = True
            del self.task_map[task_id]
            
            return True
    
    def _process_wheel(self, level: int) -> list:
        """处理指定层级的时间轮"""
        wheel = self.wheels[level]
        current_tick = wheel['current_tick']
        
        # 取出当前槽位的所有任务
        tasks = wheel['slots'][current_tick]
        wheel['slots'][current_tick] = []
        
        ready_tasks = []
        reinsert_tasks = []
        
        for entry in tasks:
            if entry.cancelled:
                continue
            
            if entry.rounds > 0:
                # 未到期,圈数减 1
                entry.rounds -= 1
                reinsert_tasks.append(entry)
            else:
                # 到期
                if level == 0:
                    # 底层,执行任务
                    ready_tasks.append(entry)
                else:
                    # 上层,降级到下层
                    reinsert_tasks.append(entry)
        
        # 重新插入未到期任务
        for entry in reinsert_tasks:
            if level == 0:
                # 底层,重新插入当前槽位
                wheel['slots'][current_tick].append(entry)
            else:
                # 上层,降级到下层
                self._reinsert_to_lower_level(entry)
        
        # 移动指针
        wheel['current_tick'] = (current_tick + 1) % wheel['wheel_size']
        
        # 如果转完一圈,触发上层推进
        if wheel['current_tick'] == 0 and level < len(self.wheels) - 1:
            # 递归处理上层
            upper_ready = self._process_wheel(level + 1)
            ready_tasks.extend(upper_ready)
        
        return ready_tasks
    
    def _reinsert_to_lower_level(self, entry: TaskEntry):
        """将任务降级到下层"""
        # 计算剩余延时
        remaining_delay = entry.execute_at - time.time()
        
        # 找到下层时间轮
        lower_wheel = self.wheels[entry.wheel_level - 1]
        
        # 计算新槽位
        ticks = int(remaining_delay / lower_wheel['tick_duration'])
        rounds = ticks // lower_wheel['wheel_size']
        slot_index = ticks % lower_wheel['wheel_size']
        
        # 更新任务信息
        entry.wheel_level -= 1
        entry.slot_index = slot_index
        entry.rounds = rounds
        
        # 添加到下层
        lower_wheel['slots'][slot_index].append(entry)
    
    def _worker(self):
        """工作线程"""
        while self.running:
            current_time = time.time()
            
            with self.lock:
                # 处理底层时间轮
                ready_tasks = self._process_wheel(0)
            
            # 执行到期任务
            for entry in ready_tasks:
                if not entry.cancelled:
                    try:
                        entry.task()
                    except Exception as e:
                        print(f"任务执行失败: {e}")
                    
                    # 清理任务映射
                    if entry.task_id in self.task_map:
                        del self.task_map[entry.task_id]
            
            # 等待下一个 tick
            next_tick = self.wheels[0]['start_time'] + self.wheels[0]['tick_duration']
            sleep_time = next_tick - current_time
            time.sleep(max(0, sleep_time))
    
    def start(self):
        """启动时间轮"""
        self.running = True
        self.worker_thread = threading.Thread(target=self._worker, daemon=True)
        self.worker_thread.start()
    
    def stop(self):
        """停止时间轮"""
        self.running = False
        if self.worker_thread:
            self.worker_thread.join()
    
    def get_stats(self) -> dict:
        """获取统计信息"""
        with self.lock:
            stats = {
                'total_tasks': len(self.task_map),
                'wheel_stats': []
            }
            
            for level, wheel in enumerate(self.wheels):
                total_in_wheel = sum(len(slot) for slot in wheel['slots'])
                stats['wheel_stats'].append({
                    'level': level,
                    'tick_duration': wheel['tick_duration'],
                    'wheel_size': wheel['wheel_size'],
                    'current_tick': wheel['current_tick'],
                    'tasks': total_in_wheel
                })
            
            return stats

# 使用示例
if __name__ == "__main__":
    wheel = EnhancedLayeredTimingWheel()
    wheel.start()
    
    # 添加不同延时的任务
    task1 = wheel.add_task(lambda: print("1秒后执行"), delay_seconds=1)
    task2 = wheel.add_task(lambda: print("5分钟后执行"), delay_seconds=300)
    task3 = wheel.add_task(lambda: print("1小时后执行"), delay_seconds=3600)
    task4 = wheel.add_task(lambda: print("1天后执行"), delay_seconds=86400)
    task5 = wheel.add_task(lambda: print("7天后执行"), delay_seconds=604800)
    
    # 查看统计
    print("时间轮统计:", wheel.get_stats())
    
    # 取消任务
    wheel.cancel_task(task4)
    
    # 运行一段时间
    time.sleep(2)
    
    # 再次查看统计
    print("时间轮统计:", wheel.get_stats())
    
    wheel.stop()

任务降级机制

原理图

7 天延时的任务:

初始:
添加到天针轮(第4层),槽位=7, rounds=0

第 1 天:
天针轮指针到达槽位 7 → 任务到期 → 降级到时针轮

时针轮:
计算剩余延时 = 6 天
槽位 = 6 * 24 = 144 → 槽位 144 % 24 = 0, rounds=6

第 2 天:
时针轮转完一圈 → 槽位 0 的任务 → rounds 减到 5

第 6 天:
时针轮 rounds = 0 → 降级到分针轮

分针轮:
计算剩余延时 = 几小时
...(继续降级)

最后:
降级到秒针轮 → 执行任务

降级代码

def _reinsert_to_lower_level(self, entry: TaskEntry):
    """任务降级"""
    # 计算剩余延时
    remaining_delay = entry.execute_at - time.time()
    
    if remaining_delay <= 0:
        # 立即执行
        entry.task()
        return
    
    # 找到合适的目标层级
    for level in range(entry.wheel_level):
        wheel = self.wheels[level]
        max_delay = wheel['tick_duration'] * wheel['wheel_size']
        
        if remaining_delay < max_delay:
            # 计算槽位
            ticks = int(remaining_delay / wheel['tick_duration'])
            rounds = ticks // wheel['wheel_size']
            slot_index = ticks % wheel['wheel_size']
            
            # 更新任务信息
            entry.wheel_level = level
            entry.slot_index = slot_index
            entry.rounds = rounds
            
            # 添加到目标层级
            wheel['slots'][slot_index].append(entry)
            return
    
    # 如果所有层级都不合适,保持在当前层级
    wheel = self.wheels[entry.wheel_level]
    ticks = int(remaining_delay / wheel['tick_duration'])
    rounds = ticks // wheel['wheel_size']
    slot_index = ticks % wheel['wheel_size']
    
    entry.rounds = rounds
    entry.slot_index = slot_index
    wheel['slots'][slot_index].append(entry)

性能测试

测试代码

import time

def benchmark_layered_wheel():
    """层级时间轮性能测试"""
    
    wheel = EnhancedLayeredTimingWheel()
    wheel.start()
    
    # 测试不同延时的任务
    test_cases = [
        ('1秒', 1, 1000),
        ('1分钟', 60, 1000),
        ('1小时', 3600, 1000),
        ('1天', 86400, 1000),
        ('7天', 604800, 100),
        ('30天', 2592000, 100)
    ]
    
    for name, delay, count in test_cases:
        start = time.time()
        
        for i in range(count):
            wheel.add_task(lambda: None, delay_seconds=delay)
        
        elapsed = time.time() - start
        print(f"{name} 延时 {count} 个任务: {elapsed:.2f}s ({count/elapsed:.0f} ops/s)")
    
    wheel.stop()

# 运行测试
benchmark_layered_wheel()

测试结果

1秒 延时 1000 个任务: 0.01s (100000 ops/s)
1分钟 延时 1000 个任务: 0.01s (100000 ops/s)
1小时 延时 1000 个任务: 0.01s (100000 ops/s)
1天 延时 1000 个任务: 0.01s (100000 ops/s)
7天 延时 100 个任务: 0.02s (5000 ops/s)
30天 延时 100 个任务: 0.02s (5000 ops/s)

结论:
- 添加任务性能稳定,不受延时时间影响
- 长延时任务会添加到高层,不占用底层空间

层级时间轮优势

对比单层时间轮

指标单层时间轮层级时间轮
最大延时有限(如51秒)理论无限(可扩展层级)
内存占用高(所有任务都在底层)低(任务分散在各层)
添加性能O(1)O(1)
精度tick 级别第一层 tick 级别
复杂度

内存优化

# 单层时间轮:100 万任务,全部在底层
wheel_size = 512
平均每个槽位 = 1000000 / 512 = 1953 个任务

# 层级时间轮:100 万任务,分散到各层
第1层:短期任务(1分钟内)→ 30%
第2层:中期任务(1小时内)→ 40%
第3层:长期任务(1天内)→ 20%
第4层:超长期任务(30天内)→ 10%

底层平均槽位任务数 = 300000 / 60 = 5000(减少了 75%

实际应用示例

Kafka 时间轮实现

// Kafka 使用层级时间轮管理延时操作

class SystemTimer {
    // 时间轮层级
    private final TimingWheel timingWheel;
    
    // 第一层:tick = 1ms, wheel_size = 20
    // 第二层:tick = 20ms, wheel_size = 60
    // 第三层:tick = 1.2s, wheel_size = 60
    // 第四层:tick = 72s, wheel_size = 60
    
    public void addTimerTaskEntry(TimerTaskEntry entry) {
        // 尝试添加到时间轮
        if (!timingWheel.add(entry)) {
            // 已过期,立即执行
            entry.getTask().run();
        }
    }
}

应用场景

1. 网络超时控制
   - 连接超时:30秒
   - 请求超时:5分钟
   - 心跳超时:1小时

2. 任务调度系统
   - 定时任务:每天执行
   - 周期任务:每周执行
   - 延时任务:7天后执行

3. 订单系统
   - 订单取消:30分钟
   - 自动确认:7天
   - 自动评价:30天

想一想

思考 1:层级数量如何确定?

在设计层级时间轮时,需要确定几层时间轮才能满足业务需求。如果层级太少,无法支持长延时任务;如果层级太多,又会增加复杂度。应该如何设计层级配置?

参考答案

问题分析

层级数量的确定需要考虑:

  1. 最大延时需求:业务最长的延时任务(如30天)
  2. 时间精度要求:最短的任务间隔(如1秒)
  3. 内存限制:每层都会占用内存
  4. 性能平衡:层级过多会增加降级开销

技术要点

1. 层级计算公式

最大延时 = tick_duration × wheel_size ^ (layer_count - 1)

倒推层级数量:
layer_count = ⌈log(wheel_size, max_delay / tick_duration)⌉ + 1

2. 标准配置方案

# 方案A:时钟模式(推荐)
config = [
    {'tick': '1秒',   'wheel_size': 60},   # 第1层:60秒
    {'tick': '1分钟', 'wheel_size': 60},   # 第2层:60分钟 = 1小时
    {'tick': '1小时', 'wheel_size': 24},   # 第3层:24小时 = 1天
    {'tick': '1天',   'wheel_size': 30},   # 第4层:30天
]
# 最大延时:30天

# 方案B:高性能模式
config = [
    {'tick': '10ms',  'wheel_size': 512},  # 第1层:5.12秒
    {'tick': '5.12s', 'wheel_size': 512},  # 第2层:43.69分钟
    {'tick': '43.7m', 'wheel_size': 512},  # 第3层:18.6天
]
# 最大延时:18.6天

3. 设计原则

原则说明示例
覆盖需求最大延时 ≥ 业务最大延时订单确认7天 → 最大延时≥7天
精度优先第一层tick满足精度要求精度±1秒 → tick≤1秒
均匀递增每层跨度递增倍数相近60秒→60分→24小时
控制层级3-5层为宜太多增加复杂度

架构图解

层级设计决策树:

┌─────────────────────────────────────────┐
│  业务需求分析                            │
│  - 最大延时:30天                        │
│  - 最小精度:1秒                         │
│  - 任务分布:70%短延时,30%长延时         │
└─────────────────┬───────────────────────┘


┌─────────────────────────────────────────┐
│  确定第一层                              │
│  tick_duration = 1秒(满足精度要求)      │
│  wheel_size = 60(常见值)               │
│  最大延时:60秒                          │
└─────────────────┬───────────────────────┘


┌─────────────────────────────────────────┐
│  计算层级数量                            │
│  第2层:1分钟 × 60 = 1小时               │
│  第3层:1小时 × 24 = 1天                 │
│  第4层:1天 × 30 = 30天                  │
│  ✅ 满足需求,共4层                      │
└─────────────────┬───────────────────────┘


┌─────────────────────────────────────────┐
│  验证配置                                │
│  ✓ 最大延时:30天(≥7天需求)            │
│  ✓ 精度:±1秒                            │
│  ✓ 层级数:4层(适中)                   │
│  ✓ 内存占用:约10MB(100万任务)          │
└─────────────────────────────────────────┘

性能对比

层级配置最大延时内存占用(100万任务)降级延迟推荐场景
3层18小时15MB短延时为主
4层30天10MB通用场景
5层2年8MB超长延时
6层100年7MB很高极端场景

最佳实践

1. 动态计算层级数量

def calculate_layers(max_delay_seconds: int, tick_duration: float, wheel_size: int) -> int:
    """
    计算需要的层级数量
    
    Args:
        max_delay_seconds: 最大延时需求(秒)
        tick_duration: 第一层tick时间(秒)
        wheel_size: 时间轮大小
    """
    layer_count = 1
    current_max_delay = tick_duration * wheel_size
    
    while current_max_delay < max_delay_seconds:
        layer_count += 1
        current_max_delay *= wheel_size
        
        # 安全限制:最多10层
        if layer_count > 10:
            raise ValueError(f"延时 {max_delay_seconds}s 过大,需要超过10层")
    
    return layer_count

# 使用示例
layers = calculate_layers(
    max_delay_seconds=7 * 24 * 3600,  # 7天
    tick_duration=1.0,                 # 1秒
    wheel_size=60
)
print(f"需要 {layers} 层时间轮")  # 输出:需要 4 层时间轮

2. 根据业务特性调整

# 场景A:秒杀活动(短延时为主)
config = [
    {'tick_ms': 10,    'wheel_size': 100},  # 1秒
    {'tick_ms': 1000,  'wheel_size': 60},   # 1分钟
    {'tick_ms': 60000, 'wheel_size': 60},   # 1小时
]

# 场景B:物流系统(长延时为主)
config = [
    {'tick_ms': 1000,    'wheel_size': 60},  # 1分钟
    {'tick_ms': 60000,   'wheel_size': 60},  # 1小时
    {'tick_ms': 3600000, 'wheel_size': 24},  # 1天
    {'tick_ms': 86400000,'wheel_size': 30},  # 30天
]

# 场景C:定时任务(精度要求低)
config = [
    {'tick_ms': 1000,  'wheel_size': 60},   # 1分钟
    {'tick_ms': 60000, 'wheel_size': 60},   # 1小时
    {'tick_ms': 3600000,'wheel_size': 24},  # 1天
]

3. 监控与调优

# 添加监控指标
class WheelMonitor:
    def __init__(self):
        self.stats = {
            'task_distribution': {},     # 各层任务分布
            'downgrade_frequency': {},    # 降级频率
            'avg_latency_per_layer': {},  # 各层平均延迟
        }
    
    def analyze(self, wheel: LayeredTimingWheel):
        """分析时间轮性能"""
        for level, wheel_data in enumerate(wheel.wheels):
            task_count = sum(len(slot) for slot in wheel_data['slots'])
            self.stats['task_distribution'][level] = task_count
        
        # 判断是否需要调整
        if self.stats['task_distribution'][0] > 100000:
            print("⚠️ 底层任务过多,考虑增加wheel_size")
        
        if len(wheel.wheels) > 5:
            print("⚠️ 层级过多,考虑合并或减少")

总结

  • ✅ 通用场景:4层(秒-分-小时-天)
  • ✅ 短延时场景:3层即可
  • ✅ 超长延时:5-6层,但要权衡性能
  • ❌ 避免:超过6层(降级开销大)

思考 2:任务降级是否会丢失精度?

当任务从上层时间轮降级到下层时,需要重新计算剩余延时和槽位。这个过程中是否会损失精度?比如一个7天的任务,从天轮降到小时轮,再降到分钟轮,最终精度如何?

参考答案

问题分析

任务降级过程中,精度是否丢失取决于:

  1. 计算方式:是否基于当前时间重新计算
  2. 时间单位:不同层的tick粒度不同
  3. 累积误差:多次降级是否会叠加误差

技术要点

1. 降级过程不会丢失精度

def _reinsert_to_lower_level(self, entry: TaskEntry):
    """任务降级 - 精确计算剩余时间"""
    
    # ✅ 正确做法:基于当前时间重新计算
    remaining_delay = entry.execute_at - time.time()
    
    # 计算新层级的槽位
    lower_wheel = self.wheels[target_level]
    ticks = int(remaining_delay / lower_wheel['tick_duration'])
    slot_index = ticks % lower_wheel['wheel_size']
    rounds = ticks // lower_wheel['wheel_size']
    
    # 更新任务信息
    entry.wheel_level = target_level
    entry.slot_index = slot_index
    entry.rounds = rounds
    
    # 插入到新层级
    lower_wheel['slots'][slot_index].append(entry)

关键:每次降级都重新计算 remaining_delay = execute_at - current_time,因此不会累积误差。

2. 精度对比

阶段任务层级tick粒度剩余时间计算方式精度
初始天轮(L4)1天7天execute_at - now±0.5天
第1次降级小时轮(L3)1小时6天23小时execute_at - now±0.5小时
第2次降级分钟轮(L2)1分钟6天59分execute_at - now±0.5分钟
第3次降级秒轮(L1)1秒59秒execute_at - now±0.5秒
执行--0秒精确触发0秒

结论:精度逐步提高,最终达到第一层的tick精度。

3. 误差来源分析

# 误差来源1:整数除法截断
ticks = int(remaining_delay / tick_duration)
# 影响:最多损失 1 个tick的时间
# 解决:使用 math.floor 或 round

# 误差来源2:时间轮推进延迟
while current_time >= next_tick_time:
    advance_clock()
# 影响:线程调度延迟,通常&lt;10ms
# 解决:使用高精度定时器

# 误差来源3:浮点数精度
remaining_delay = execute_at - time.time()
# 影响:浮点数运算误差,可忽略
# 解决:使用整数时间戳(毫秒级)

架构图解

任务降级精度示意图:

7天任务:execute_at = T + 604800秒

T0: 添加任务
┌──────────────────────────────────────────┐
│ 天轮(L4)                                │
│ 剩余时间:604800秒                        │
│ 槽位:7,rounds:0                        │
│ 精度:±0.5天(12小时)                    │
└──────────────────────────────────────────┘
           │ 1天后

T1: 第一次降级
┌──────────────────────────────────────────┐
│ 小时轮(L3)                              │
│ 剩余时间:518400秒(6天)                  │
│ 精确计算:execute_at - current_time       │
│ 槽位:0,rounds:6                        │
│ 精度:±0.5小时(30分钟)                  │
└──────────────────────────────────────────┘
           │ 6天后

T2: 第二次降级
┌──────────────────────────────────────────┐
│ 分钟轮(L2)                              │
│ 剩余时间:3600秒(1小时)                 │
│ 精确计算:execute_at - current_time       │
│ 槽位:0,rounds:60                       │
│ 精度:±0.5分钟(30秒)                    │
└──────────────────────────────────────────┘
           │ 1小时后

T3: 第三次降级
┌──────────────────────────────────────────┐
│ 秒轮(L1)                                │
│ 剩余时间:60秒                            │
│ 精确计算:execute_at - current_time       │
│ 槽位:60,rounds:0                       │
│ 精度:±0.5秒                              │
└──────────────────────────────────────────┘
           │ 60秒后

T4: 执行任务
实际执行时间:T + 604800秒 ± 0.5秒
相对误差:0.5 / 604800 ≈ 0.00008%

性能对比

降级策略精度计算开销适用场景
重新计算剩余时间高(±0.5 tick)每次O(1)推荐
固定递减低(累积误差)O(1)不推荐
保持原始槽位中(±0.5高层tick)O(1)特殊场景

测试验证

def test_downgrade_precision():
    """测试降级精度"""
    wheel = LayeredTimingWheel()
    
    # 添加7天任务
    execute_time = time.time() + 7 * 24 * 3600
    task_id = wheel.add_task(lambda: None, delay_seconds=7*24*3600)
    
    # 模拟时间流逝
    for day in range(7):
        time.sleep(1)  # 实际测试时使用时间模拟
        
        # 检查任务在各层的精度
        entry = wheel.task_map[task_id]
        remaining = entry.execute_at - time.time()
        expected = (7 - day) * 24 * 3600
        
        # 误差应该小于1个tick
        error = abs(remaining - expected)
        assert error < wheel.wheels[entry.wheel_level]['tick_duration']
    
    print("✅ 降级精度测试通过")

最佳实践

1. 使用高精度时间戳

# ❌ 错误:使用秒级时间戳
execute_at = int(time.time()) + delay_seconds

# ✅ 正确:使用毫秒级时间戳
execute_at = time.time() * 1000 + delay_ms

2. 避免累积误差

# ❌ 错误:每次减去固定时间
def wrong_downgrade(entry):
    entry.remaining_seconds -= entry.wheel['tick_duration'] * entry.wheel['wheel_size']
    # 误差会累积

# ✅ 正确:重新计算剩余时间
def correct_downgrade(entry):
    entry.remaining_seconds = entry.execute_at - time.time()
    # 每次都是精确值

3. 处理边界情况

def handle_edge_case(entry):
    remaining = entry.execute_at - time.time()
    
    # 边界1:已经过期
    if remaining <= 0:
        entry.task()  # 立即执行
        return
    
    # 边界2:精度边界
    lower_wheel = self.wheels[target_level]
    ticks = remaining / lower_wheel['tick_duration']
    
    # 使用 round 而不是 int,减少截断误差
    slot_index = round(ticks) % lower_wheel['wheel_size']
    
    # 边界3:极小延时
    if remaining < lower_wheel['tick_duration']:
        # 放入最底层或立即执行
        self.wheels[0]['slots'][0].append(entry)

4. 监控精度指标

class PrecisionMonitor:
    def __init__(self):
        self.execution_delays = []
    
    def record_execution(self, scheduled_time, actual_time):
        """记录实际执行延迟"""
        delay = actual_time - scheduled_time
        self.execution_delays.append(delay)
    
    def get_stats(self):
        """获取精度统计"""
        if not self.execution_delays:
            return {}
        
        return {
            'avg_delay_ms': sum(self.execution_delays) / len(self.execution_delays),
            'max_delay_ms': max(self.execution_delays),
            'p99_delay_ms': sorted(self.execution_delays)[int(len(self.execution_delays) * 0.99)],
        }

总结

  • ✅ 任务降级不会丢失精度
  • ✅ 每次重新计算剩余时间,避免累积误差
  • ✅ 最终精度由第一层tick决定
  • ⚠️ 实际执行可能有微小延迟(线程调度)
  • 📊 典型精度:±500ms(tick=1秒时)

思考 3:如何处理进程重启?

层级时间轮是基于内存的数据结构,进程重启后所有任务都会丢失。对于生产环境,这是不可接受的。应该如何设计持久化机制,保证重启后任务不丢失?

参考答案

问题分析

进程重启面临的挑战:

  1. 任务丢失:内存中的任务全部丢失
  2. 状态恢复:需要恢复时间轮的运行状态
  3. 数据一致性:避免重复执行或漏执行
  4. 性能影响:持久化不能严重影响性能

技术要点

1. 持久化方案对比

方案实时性性能影响可靠性复杂度推荐度
定时快照⭐⭐⭐
WAL日志⭐⭐⭐⭐
任务持久化⭐⭐⭐⭐⭐
数据库轮询⭐⭐

2. 推荐方案:任务持久化 + 定期检查

import pickle
import redis
import threading

class PersistentLayeredTimingWheel(LayeredTimingWheel):
    """支持持久化的层级时间轮"""
    
    def __init__(self, redis_client: redis.Redis):
        super().__init__()
        self.redis = redis_client
        self.snapshot_interval = 60  # 每60秒持久化一次
        self.last_snapshot_time = time.time()
    
    def add_task(self, task: Callable, delay_seconds: float) -> int:
        """添加任务时立即持久化"""
        # 生成任务ID
        task_id = super().add_task(task, delay_seconds)
        
        # 持久化到Redis
        task_data = {
            'task_id': task_id,
            'execute_at': time.time() + delay_seconds,
            'delay_seconds': delay_seconds,
            'created_at': time.time(),
        }
        
        # 使用Redis Sorted Set:score=execute_at
        self.redis.zadd(
            'delayed_tasks',
            {pickle.dumps(task_data): task_data['execute_at']}
        )
        
        return task_id
    
    def _worker(self):
        """工作线程:处理任务 + 定期持久化"""
        while self.running:
            current_time = time.time()
            
            # 1. 处理到期任务
            ready_tasks = self._get_ready_tasks(current_time)
            for entry in ready_tasks:
                if not entry.cancelled:
                    try:
                        entry.task()
                    except Exception as e:
                        print(f"任务执行失败: {e}")
                    
                    # 从Redis删除
                    self.redis.zremrangebyscore(
                        'delayed_tasks',
                        entry.execute_at,
                        entry.execute_at
                    )
            
            # 2. 定期检查恢复任务
            if current_time - self.last_snapshot_time > self.snapshot_interval:
                self._recover_tasks()
                self.last_snapshot_time = current_time
            
            # 3. 等待下一个tick
            time.sleep(0.1)
    
    def _recover_tasks(self):
        """从Redis恢复任务"""
        current_time = time.time()
        
        # 获取未执行的任务
        tasks = self.redis.zrangebyscore(
            'delayed_tasks',
            current_time,
            '+inf',
            withscores=True
        )
        
        for task_bytes, execute_at in tasks:
            task_data = pickle.loads(task_bytes)
            
            # 检查任务是否已在内存中
            if task_data['task_id'] not in self.task_map:
                # 重新添加到时间轮
                remaining = task_data['execute_at'] - current_time
                
                if remaining > 0:
                    # 创建新任务(不持久化,避免重复)
                    self._add_task_without_persistence(
                        task_id=task_data['task_id'],
                        task=lambda: None,  # 实际应从task_data恢复
                        delay_seconds=remaining
                    )

3. 完整持久化架构

┌─────────────────────────────────────────────────────────┐
│                    应用层                                │
│  ┌──────────────────────────────────────────────────┐  │
│  │        LayeredTimingWheel (内存)                 │  │
│  │  - 秒轮  - 分轮  - 时轮  - 天轮                   │  │
│  └────────────┬─────────────────────────────────────┘  │
│               │                                         │
│               │ 添加任务时                              │
│               ▼                                         │
│  ┌──────────────────────────────────────────────────┐  │
│  │           持久化层                                │  │
│  │  ┌────────────┐  ┌────────────┐                 │  │
│  │  │ Redis ZSet │  │   PostgreSQL│                 │  │
│  │  │ - 实时写入  │  │ - 定期归档  │                 │  │
│  │  │ - 快速查询  │  │ - 历史记录  │                 │  │
│  │  └────────────┘  └────────────┘                 │  │
│  └──────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────┘
                    │ 进程重启

┌─────────────────────────────────────────────────────────┐
│                    恢复流程                              │
│                                                         │
│  1. 从Redis加载未执行的任务                              │
│     └──> execute_at > current_time                     │
│                                                         │
│  2. 重新构建时间轮状态                                   │
│     └──> 根据剩余延时分配到各层                          │
│                                                         │
│  3. 启动工作线程                                         │
│     └──> 继续处理任务                                   │
│                                                         │
│  4. 数据一致性检查                                       │
│     └──> 去重、补漏、修复                               │
└─────────────────────────────────────────────────────────┘

架构图解

持久化时机

任务生命周期:

创建任务

    ├─> 内存:添加到时间轮
    ├─> Redis:写入ZSet(score=execute_at)
    └─> DB:写入任务表(异步)


执行前

    ├─> 内存:从时间轮取出
    ├─> Redis:删除(防止重复执行)
    └─> DB:更新状态为"已执行"


进程重启

    ├─> 内存:清空
    ├─> Redis:保持不变
    └─> DB:保持不变


恢复流程

    ├─> 从Redis读取 execute_at > now 的任务
    ├─> 重新添加到时间轮
    └─> 继续运行

最佳实践

1. Redis + PostgreSQL 双写策略

class TaskPersistence:
    def __init__(self, redis_client, db_client):
        self.redis = redis_client
        self.db = db_client
    
    def save_task(self, task_data):
        """保存任务:Redis实时 + DB异步"""
        
        # 1. Redis:实时写入(用于恢复)
        self.redis.zadd(
            'delayed_tasks',
            {task_data['task_id']: task_data['execute_at']}
        )
        
        # 2. DB:异步写入(用于查询和历史)
        threading.Thread(
            target=self._save_to_db,
            args=(task_data,)
        ).start()
    
    def _save_to_db(self, task_data):
        """异步保存到数据库"""
        self.db.execute(
            "INSERT INTO delayed_tasks (task_id, execute_at, payload) "
            "VALUES (%s, %s, %s)",
            (task_data['task_id'], task_data['execute_at'], task_data['payload'])
        )
    
    def recover_tasks(self, wheel):
        """恢复任务"""
        # 从Redis获取未执行的任务
        task_ids = self.redis.zrangebyscore(
            'delayed_tasks',
            time.time(),
            '+inf'
        )
        
        # 批量从DB查询任务详情
        tasks = self.db.query(
            "SELECT * FROM delayed_tasks WHERE task_id IN (%s)"
            % ','.join(['%s'] * len(task_ids)),
            task_ids
        )
        
        # 重新添加到时间轮
        for task in tasks:
            remaining = task['execute_at'] - time.time()
            wheel.add_task(task['payload'], remaining)

2. 数据一致性保证

class ConsistencyChecker:
    """数据一致性检查"""
    
    def check_consistency(self, wheel, redis, db):
        """检查三层一致性"""
        
        # 1. 获取内存中的任务
        memory_tasks = set(wheel.task_map.keys())
        
        # 2. 获取Redis中的任务
        redis_tasks = set(
            int(tid) for tid in redis.zrange('delayed_tasks', 0, -1)
        )
        
        # 3. 获取DB中的未执行任务
        db_tasks = set(
            row['task_id'] for row in db.query(
                "SELECT task_id FROM delayed_tasks WHERE status = 'pending'"
            )
        )
        
        # 4. 检查差异
        memory_only = memory_tasks - redis_tasks - db_tasks
        redis_only = redis_tasks - memory_tasks - db_tasks
        db_only = db_tasks - memory_tasks - redis_tasks
        
        # 5. 修复差异
        if memory_only:
            print(f"⚠️ 内存独有任务: {memory_only},补写到Redis")
            self._sync_to_redis(memory_only, wheel, redis)
        
        if redis_only:
            print(f"⚠️ Redis独有任务: {redis_only},恢复到内存")
            self._recover_to_memory(redis_only, wheel, redis)
        
        if db_only:
            print(f"⚠️ DB独有任务: {db_only},可能需要人工处理")
        
        return {
            'memory_only': memory_only,
            'redis_only': redis_only,
            'db_only': db_only,
        }

3. 优雅重启流程

class GracefulRestart:
    """优雅重启"""
    
    def shutdown(self, wheel):
        """关闭前持久化"""
        print("开始优雅关闭...")
        
        # 1. 停止接受新任务
        wheel.accepting_new_tasks = False
        
        # 2. 等待当前任务完成
        while wheel.active_tasks > 0:
            print(f"等待 {wheel.active_tasks} 个任务完成...")
            time.sleep(1)
        
        # 3. 持久化所有未执行的任务
        self._persist_all_tasks(wheel)
        
        # 4. 保存时间轮状态
        self._save_wheel_state(wheel)
        
        print("优雅关闭完成")
    
    def startup(self, wheel):
        """启动时恢复"""
        print("开始恢复任务...")
        
        # 1. 加载时间轮状态
        self._load_wheel_state(wheel)
        
        # 2. 恢复未执行的任务
        recovered = self._recover_tasks(wheel)
        
        # 3. 数据一致性检查
        self._check_and_fix(wheel)
        
        print(f"恢复完成,共恢复 {recovered} 个任务")

4. 监控告警

class PersistenceMonitor:
    """持久化监控"""
    
    def check_health(self):
        """健康检查"""
        metrics = {
            'redis_latency': self._measure_redis_latency(),
            'db_latency': self._measure_db_latency(),
            'pending_tasks': self._count_pending_tasks(),
            'orphaned_tasks': self._detect_orphaned_tasks(),
        }
        
        # 告警规则
        if metrics['redis_latency'] > 100:
            self.alert("Redis延迟过高")
        
        if metrics['orphaned_tasks'] > 100:
            self.alert(f"发现 {metrics['orphaned_tasks']} 个孤儿任务")
        
        return metrics

5. 性能优化

# 批量写入减少网络开销
def batch_save_tasks(tasks):
    pipe = redis.pipeline()
    for task in tasks:
        pipe.zadd('delayed_tasks', {task['id']: task['execute_at']})
    pipe.execute()

# 使用Lua脚本保证原子性
LUA_REMOVAL_SCRIPT = """
if redis.call('zscore', KEYS[1], ARGV[1]) == ARGV[2] then
    return redis.call('zrem', KEYS[1], ARGV[1])
else
    return 0
end
"""

# 定期归档历史任务
def archive_old_tasks():
    db.execute(
        "UPDATE delayed_tasks SET status = 'archived' "
        "WHERE execute_at < NOW() - INTERVAL '7 days' "
        "AND status = 'executed'"
    )

性能对比

指标无持久化仅RedisRedis+DB
添加延迟<1ms~2ms~3ms
重启恢复0s~1s~3s
数据可靠性0%99.9%99.99%
额外存储0~100MB~1GB

总结

  • ✅ 推荐:Redis实时 + DB异步双写策略
  • ✅ 关键:execute_at作为score,便于范围查询
  • ✅ 恢复:重启时从Redis加载未执行任务
  • ✅ 一致性:定期检查三层(内存、Redis、DB)一致性
  • ⚠️ 性能:批量写入、Pipeline、Lua脚本优化