时间轮原理
什么是时间轮?
时间轮(Timing Wheel)是一种高效管理定时任务的数据结构,类似于生活中的时钟。
基本原理
单层时间轮
时间轮结构:
┌───┬───┬───┬───┬───┬───┐
│ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ ← 槽位(slots)
├───┼───┼───┼───┼───┼───┤
│ │ │A │ │B │ │ ← 任务(A 在槽位 2,B 在槽位 4)
└───┴───┴───┴───┴───┴───┘
↑
当前指针(current_slot)
时间轮转动:
- 每秒转动一格
- 指针指向的槽位,其中的任务到期执行代码实现
import time
class TimingWheel:
"""单层时间轮"""
def __init__(self, tick_interval=1, wheel_size=60):
"""
tick_interval: 每格的时间间隔(秒)
wheel_size: 时间轮的槽数
"""
self.tick_interval = tick_interval
self.wheel_size = wheel_size
self.wheel = [[] for _ in range(wheel_size)] # 槽位数组
self.current_slot = 0 # 当前槽位
self.start_time = time.time()
def add_task(self, task, delay_seconds):
"""添加任务"""
# 计算任务应该放在哪个槽位
ticks = delay_seconds // self.tick_interval
slot = (self.current_slot + ticks) % self.wheel_size
# 添加任务到对应槽位
self.wheel[slot].append({
'task': task,
'expires_at': time.time() + delay_seconds
})
logger.info(f"添加任务到槽位 {slot}")
def tick(self):
"""时间轮转动一格"""
# 获取当前槽位的任务
tasks = self.wheel[self.current_slot]
# 清空当前槽位
self.wheel[self.current_slot] = []
# 移动到下一个槽位
self.current_slot = (self.current_slot + 1) % self.wheel_size
return tasks
def run(self, handler):
"""运行时间轮"""
while True:
# 执行一次 tick
tasks = self.tick()
# 处理到期的任务
for item in tasks:
task = item['task']
try:
handler(task)
except Exception as e:
logger.error(f"任务失败: {e}")
# 休眠一个 tick_interval
time.sleep(self.tick_interval)
# 使用示例
wheel = TimingWheel(tick_interval=1, wheel_size=60)
# 添加任务
wheel.add_task({'id': 1}, 5) # 5 秒后执行
wheel.add_task({'id': 2}, 10) # 10 秒后执行
wheel.add_task({'id': 3}, 30) # 30 秒后执行
# 运行时间轮
def handler(task):
print(f"执行任务: {task}")
wheel.run(handler)时间复杂度
| 操作 | 时间复杂度 | 说明 |
|---|---|---|
| 添加任务 | O(1) | 直接计算槽位 |
| 获取到期任务 | O(1) 平均 | 只处理当前槽位 |
| 空间复杂度 | O(n) | n 为槽数 |
优缺点
优点
- ✅ 性能极高(O(1))
- ✅ 精度可控
- ✅ 内存占用固定
缺点
- ❌ 只支持固定间隔的延时
- ❌ 长延时任务需要大容量时间轮