轮询设计
最简单的方案
还记得我一开始是怎么实现订单超时取消的吗?
# 😱 第一次尝试:每分钟跑一次脚本
import time
def check_order_timeout():
"""检查订单超时"""
timeout_threshold = datetime.now() - timedelta(minutes=30)
# 查询超时订单
pending_orders = Order.objects.filter(
status='pending_payment',
created_at__lt=timeout_threshold
)
for order in pending_orders:
cancel_order(order)
release_inventory(order)然后用 Linux 的 cron 定时任务,每分钟执行一次:
# crontab -e
* * * * * python /app/scripts/check_order_timeout.py这个方案简单粗暴,但真的能用吗?
数据库轮询的工作原理
核心思想
轮询方案的核心:
1. 定期(如每分钟)查询数据库
2. 找出应该执行的任务(执行时间 <= 当前时间)
3. 执行任务
4. 标记任务为已完成基本实现
class DatabasePollingDelayQueue:
"""数据库轮询延时队列"""
def __init__(self, poll_interval=60):
"""
poll_interval: 轮询间隔(秒)
"""
self.poll_interval = poll_interval
self.db = MySQLDatabase()
def add_task(self, task_id, task_type, task_data, delay_seconds):
"""添加任务"""
execute_at = datetime.now() + timedelta(seconds=delay_seconds)
self.db.execute("""
INSERT INTO delay_tasks
(task_id, task_type, task_data, execute_at, status, created_at)
VALUES (%s, %s, %s, %s, 'pending', NOW())
""", (task_id, task_type, json.dumps(task_data), execute_at))
logger.info(
f"添加任务: {task_id}, "
f"类型: {task_type}, "
f"执行时间: {execute_at}"
)
def cancel_task(self, task_id):
"""取消任务"""
self.db.execute("""
UPDATE delay_tasks
SET status = 'cancelled', updated_at = NOW()
WHERE task_id = %s AND status = 'pending'
""", [task_id])
logger.info(f"取消任务: {task_id}")
def start_consumer(self, handler):
"""启动消费者"""
logger.info(f"启动消费者,轮询间隔: {self.poll_interval}秒")
while True:
try:
self._poll_and_execute(handler)
except Exception as e:
logger.error(f"消费失败: {e}")
time.sleep(self.poll_interval)
def _poll_and_execute(self, handler):
"""轮询并执行任务"""
now = datetime.now()
# ✅ 查询到期的任务
tasks = self.db.query("""
SELECT * FROM delay_tasks
WHERE execute_at <= %s
AND status = 'pending'
ORDER BY execute_at ASC
LIMIT 100
""", [now])
logger.info(f"查询到 {len(tasks)} 个到期任务")
for task in tasks:
try:
# ✅ 标记为处理中
self.db.execute("""
UPDATE delay_tasks
SET status = 'processing',
updated_at = NOW()
WHERE task_id = %s
""", [task['task_id']])
# ✅ 执行任务
handler(task)
# ✅ 标记为完成
self.db.execute("""
UPDATE delay_tasks
SET status = 'completed',
completed_at = NOW(),
updated_at = NOW()
WHERE task_id = %s
""", [task['task_id']])
logger.info(f"任务完成: {task['task_id']}")
except Exception as e:
logger.error(f"任务执行失败: {task['task_id']}, {e}")
# ✅ 标记为失败
self.db.execute("""
UPDATE delay_tasks
SET status = 'failed',
error_message = %s,
updated_at = NOW()
WHERE task_id = %s
""", [str(e), task['task_id']])
# 使用示例
def order_timeout_handler(task):
"""订单超时处理器"""
order_id = task['task_data']['order_id']
order = Order.get(order_id)
if order.status == 'pending_payment':
order.status = 'cancelled'
order.cancel_reason = 'payment_timeout'
order.save()
# 释放库存
Inventory.release(order.product_id, order.quantity)
logger.info(f"取消订单: {order_id}")
# 启动队列
queue = DatabasePollingDelayQueue(poll_interval=60)
# 添加任务
queue.add_task(
task_id='cancel_order_12345',
task_type='order_timeout',
task_data={'order_id': 12345},
delay_seconds=30 * 60 # 30 分钟
)
# 启动消费者
queue.start_consumer(order_timeout_handler)数据库表设计
基本表结构
CREATE TABLE delay_tasks (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
task_id VARCHAR(100) UNIQUE NOT NULL,
task_type VARCHAR(50) NOT NULL,
task_data TEXT NOT NULL,
execute_at TIMESTAMP NOT NULL,
status ENUM('pending', 'processing', 'completed', 'failed', 'cancelled') DEFAULT 'pending',
retry_count INT DEFAULT 0,
max_retries INT DEFAULT 3,
error_message TEXT,
consumer_id VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
completed_at TIMESTAMP NULL,
-- 关键索引
INDEX idx_execute_status (execute_at, status),
INDEX idx_status (status),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;字段说明
| 字段 | 类型 | 说明 |
|---|---|---|
task_id | VARCHAR(100) | 任务唯一标识,用于取消、查询 |
task_type | VARCHAR(50) | 任务类型,用于路由到不同的处理器 |
task_data | TEXT | 任务数据(JSON 格式) |
execute_at | TIMESTAMP | 任务执行时间 |
status | ENUM | 任务状态 |
retry_count | INT | 重试次数 |
max_retries | INT | 最大重试次数 |
error_message | TEXT | 错误信息 |
consumer_id | VARCHAR(100) | 消费者 ID(用于故障恢复) |
关键索引
-- 最关键的索引:按执行时间和状态查询
CREATE INDEX idx_execute_status ON delay_tasks (execute_at, status);
-- 用于清理已完成任务
CREATE INDEX idx_status ON delay_tasks (status);
-- 用于监控任务积压
CREATE INDEX idx_created_at ON delay_tasks (created_at);为什么这个索引很重要?
-- ✅ 使用索引(快速)
SELECT * FROM delay_tasks
WHERE execute_at <= NOW()
AND status = 'pending'
ORDER BY execute_at ASC;
-- ✅ 执行计划:
-- Using index idx_execute_status
-- Rows examined: 100
-- ❌ 不使用索引(慢)
SELECT * FROM delay_tasks
WHERE task_type = 'order_timeout'
AND execute_at <= NOW();
-- ❌ 执行计划:
-- Using where; Using filesort
-- Rows examined: 1000000轮询间隔的选择
间隔对精度的影响
| 间隔 | 精度 | CPU 消耗 | 数据库压力 | 适用场景 |
|---|---|---|---|---|
| 1 秒 | ±0.5 秒 | 🔴 极高 | 🔴 极高 | 秒杀倒计时 |
| 10 秒 | ±5 秒 | 🟡 高 | 🟡 高 | 高精度场景 |
| 1 分钟 | ±30 秒 | 🟢 中 | 🟢 中 | 订单超时 |
| 5 分钟 | ±2.5 分钟 | 🟢 低 | 🟢 低 | 定时任务 |
| 1 小时 | ±30 分钟 | 🟢 极低 | 🟢 低 | 长周期任务 |
自适应轮询
根据任务量动态调整轮询间隔:
class AdaptivePollingDelayQueue:
"""自适应轮询延时队列"""
def __init__(self):
self.db = MySQLDatabase()
self.current_interval = 60 # 当前间隔(秒)
self.min_interval = 10 # 最小间隔
self.max_interval = 300 # 最大间隔
def start_consumer(self, handler):
"""启动消费者"""
while True:
# ✅ 第 1 步:轮询并执行
task_count = self._poll_and_execute(handler)
# ✅ 第 2 步:根据任务量调整间隔
self._adjust_interval(task_count)
# ✅ 第 3 步:休眠
time.sleep(self.current_interval)
def _adjust_interval(self, task_count):
"""根据任务量调整轮询间隔"""
if task_count > 100:
# 任务多,加快轮询
self.current_interval = max(
self.min_interval,
self.current_interval / 2
)
logger.info(f"任务较多,加快轮询: {self.current_interval}秒")
elif task_count < 10:
# 任务少,降低轮询频率
self.current_interval = min(
self.max_interval,
self.current_interval * 1.5
)
logger.info(f"任务较少,降低轮询: {self.current_interval}秒")优缺点分析
优点
| 优点 | 说明 |
|---|---|
| ✅ 简单可靠 | 实现简单,容易理解和维护 |
| ✅ 持久化保证 | 数据库天然持久化,任务不会丢 |
| ✅ 事务支持 | 可以利用数据库事务保证一致性 |
| ✅ 易扩展 | 可以增加索引、分区、分表 |
| ✅ 查询灵活 | 支持复杂查询和统计 |
缺点
| 缺点 | 说明 | 影响 |
|---|---|---|
| ❌ 精度有限 | 轮询间隔决定精度 | 不适合高精度场景 |
| ❌ 数据库压力 | 频繁查询数据库 | 高并发时压力大 |
| ❌ 空轮询 | 没有任务时也会查询 | 浪费资源 |
| ❌ 性能瓶颈 | 数据库 I/O 限制 | 无法支撑超高并发 |
想一想
思考 1
如何减少数据库轮询的空查询(没有任务时的查询)?
参考答案
问题分析:
空查询是指轮询时没有找到任何到期任务,但仍然执行了数据库查询。
空查询的代价:
- 数据库连接占用
- 网络往返延迟
- 数据库 CPU 和 I/O 开销
- 日志记录
假设每天 100 万任务,每分钟轮询一次:
- 总轮询次数:1440 次/天
- 如果大部分时间没有任务,90% 是空查询
- 浪费:1296 次无效查询解决方案:
方案 1:预估下次查询时间
class SmartPollingDelayQueue:
"""智能轮询:预估下次查询时间"""
def __init__(self):
self.db = MySQLDatabase()
self.next_poll_time = None # 下次需要查询的时间
def start_consumer(self, handler):
"""启动消费者"""
while True:
now = datetime.now()
# ✅ 如果设置了下次查询时间,且还没到,就跳过
if self.next_poll_time and now < self.next_poll_time:
wait_seconds = (self.next_poll_time - now).total_seconds()
logger.info(f"等待下次查询: {wait_seconds}秒")
time.sleep(wait_seconds)
continue
# ✅ 查询并执行任务
task_count = self._poll_and_execute(handler)
# ✅ 查询下一个待执行任务的执行时间
self._update_next_poll_time()
def _update_next_poll_time(self):
"""查询下一个待执行任务的执行时间"""
# 查询最早的 pending 任务
result = self.db.query("""
SELECT execute_at FROM delay_tasks
WHERE status = 'pending'
ORDER BY execute_at ASC
LIMIT 1
""")
if result:
next_execute_at = result[0]['execute_at']
self.next_poll_time = next_execute_at
logger.info(f"下次查询时间: {next_execute_at}")
else:
# 没有待执行任务,等待一段时间再查
self.next_poll_time = datetime.now() + timedelta(minutes=5)
logger.info("没有待执行任务,5 分钟后再查")方案 2:使用缓存
class CachedPollingDelayQueue:
"""带缓存的轮询"""
def __init__(self):
self.db = MySQLDatabase()
self.redis = redis.Redis()
self.cache_ttl = 60 # 缓存 60 秒
def _has_pending_tasks(self):
"""检查是否有待执行任务(使用缓存)"""
cache_key = 'has_pending_tasks'
# ✅ 先查缓存
cached = self.redis.get(cache_key)
if cached is not None:
return cached == '1'
# ✅ 缓存未命中,查询数据库
result = self.db.query("""
SELECT COUNT(*) as count FROM delay_tasks
WHERE status = 'pending'
AND execute_at <= NOW()
""")
has_pending = result[0]['count'] > 0
# ✅ 更新缓存
self.redis.setex(
cache_key,
'1' if has_pending else '0',
self.cache_ttl
)
return has_pending
def start_consumer(self, handler):
"""启动消费者"""
while True:
# ✅ 先检查缓存,避免空查询
if not self._has_pending_tasks():
logger.info("没有待执行任务,等待 10 秒")
time.sleep(10)
continue
# ✅ 有任务,执行查询
self._poll_and_execute(handler)
# ✅ 清除缓存,下次重新检查
self.redis.delete('has_pending_tasks')方案 3:触发通知
class NotificationPollingDelayQueue:
"""带通知的轮询"""
def __init__(self):
self.db = MySQLDatabase()
self.has_new_tasks = False # 通知标志
def add_task(self, task_id, task_type, task_data, delay_seconds):
"""添加任务:设置通知标志"""
execute_at = datetime.now() + timedelta(seconds=delay_seconds)
self.db.execute("""
INSERT INTO delay_tasks
(task_id, task_type, task_data, execute_at, status, created_at)
VALUES (%s, %s, %s, %s, 'pending', NOW())
""", (task_id, task_type, json.dumps(task_data), execute_at))
# ✅ 设置通知标志
self.has_new_tasks = True
logger.info(f"添加任务并设置通知: {task_id}")
def start_consumer(self, handler):
"""启动消费者"""
while True:
if self.has_new_tasks:
# ✅ 有新任务,查询并执行
self._poll_and_execute(handler)
self.has_new_tasks = False
else:
# ✅ 没有新任务,等待
time.sleep(1)方案对比:
| 方案 | 减少空查询 | 复杂度 | 适用场景 |
|---|---|---|---|
| 预估时间 | ⭐⭐⭐⭐⭐ | ⭐⭐ | 任务稀疏 |
| 使用缓存 | ⭐⭐⭐⭐ | ⭐ | 频繁查询 |
| 触发通知 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 单消费者 |