轮询设计

最简单的方案

还记得我一开始是怎么实现订单超时取消的吗?

# 😱 第一次尝试:每分钟跑一次脚本
import time

def check_order_timeout():
    """检查订单超时"""
    timeout_threshold = datetime.now() - timedelta(minutes=30)
    
    # 查询超时订单
    pending_orders = Order.objects.filter(
        status='pending_payment',
        created_at__lt=timeout_threshold
    )
    
    for order in pending_orders:
        cancel_order(order)
        release_inventory(order)

然后用 Linux 的 cron 定时任务,每分钟执行一次:

# crontab -e
* * * * * python /app/scripts/check_order_timeout.py

这个方案简单粗暴,但真的能用吗?


数据库轮询的工作原理

核心思想

轮询方案的核心:

1. 定期(如每分钟)查询数据库
2. 找出应该执行的任务(执行时间 <= 当前时间)
3. 执行任务
4. 标记任务为已完成

基本实现

class DatabasePollingDelayQueue:
    """数据库轮询延时队列"""
    
    def __init__(self, poll_interval=60):
        """
        poll_interval: 轮询间隔(秒)
        """
        self.poll_interval = poll_interval
        self.db = MySQLDatabase()
    
    def add_task(self, task_id, task_type, task_data, delay_seconds):
        """添加任务"""
        execute_at = datetime.now() + timedelta(seconds=delay_seconds)
        
        self.db.execute("""
            INSERT INTO delay_tasks 
            (task_id, task_type, task_data, execute_at, status, created_at)
            VALUES (%s, %s, %s, %s, 'pending', NOW())
        """, (task_id, task_type, json.dumps(task_data), execute_at))
        
        logger.info(
            f"添加任务: {task_id}, "
            f"类型: {task_type}, "
            f"执行时间: {execute_at}"
        )
    
    def cancel_task(self, task_id):
        """取消任务"""
        self.db.execute("""
            UPDATE delay_tasks 
            SET status = 'cancelled', updated_at = NOW()
            WHERE task_id = %s AND status = 'pending'
        """, [task_id])
        
        logger.info(f"取消任务: {task_id}")
    
    def start_consumer(self, handler):
        """启动消费者"""
        logger.info(f"启动消费者,轮询间隔: {self.poll_interval}秒")
        
        while True:
            try:
                self._poll_and_execute(handler)
            except Exception as e:
                logger.error(f"消费失败: {e}")
            
            time.sleep(self.poll_interval)
    
    def _poll_and_execute(self, handler):
        """轮询并执行任务"""
        now = datetime.now()
        
        # ✅ 查询到期的任务
        tasks = self.db.query("""
            SELECT * FROM delay_tasks 
            WHERE execute_at <= %s 
              AND status = 'pending'
            ORDER BY execute_at ASC
            LIMIT 100
        """, [now])
        
        logger.info(f"查询到 {len(tasks)} 个到期任务")
        
        for task in tasks:
            try:
                # ✅ 标记为处理中
                self.db.execute("""
                    UPDATE delay_tasks 
                    SET status = 'processing', 
                        updated_at = NOW()
                    WHERE task_id = %s
                """, [task['task_id']])
                
                # ✅ 执行任务
                handler(task)
                
                # ✅ 标记为完成
                self.db.execute("""
                    UPDATE delay_tasks 
                    SET status = 'completed', 
                        completed_at = NOW(),
                        updated_at = NOW()
                    WHERE task_id = %s
                """, [task['task_id']])
                
                logger.info(f"任务完成: {task['task_id']}")
                
            except Exception as e:
                logger.error(f"任务执行失败: {task['task_id']}, {e}")
                
                # ✅ 标记为失败
                self.db.execute("""
                    UPDATE delay_tasks 
                    SET status = 'failed', 
                        error_message = %s,
                        updated_at = NOW()
                    WHERE task_id = %s
                """, [str(e), task['task_id']])

# 使用示例
def order_timeout_handler(task):
    """订单超时处理器"""
    order_id = task['task_data']['order_id']
    
    order = Order.get(order_id)
    if order.status == 'pending_payment':
        order.status = 'cancelled'
        order.cancel_reason = 'payment_timeout'
        order.save()
        
        # 释放库存
        Inventory.release(order.product_id, order.quantity)
        logger.info(f"取消订单: {order_id}")

# 启动队列
queue = DatabasePollingDelayQueue(poll_interval=60)

# 添加任务
queue.add_task(
    task_id='cancel_order_12345',
    task_type='order_timeout',
    task_data={'order_id': 12345},
    delay_seconds=30 * 60  # 30 分钟
)

# 启动消费者
queue.start_consumer(order_timeout_handler)

数据库表设计

基本表结构

CREATE TABLE delay_tasks (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    task_id VARCHAR(100) UNIQUE NOT NULL,
    task_type VARCHAR(50) NOT NULL,
    task_data TEXT NOT NULL,
    execute_at TIMESTAMP NOT NULL,
    status ENUM('pending', 'processing', 'completed', 'failed', 'cancelled') DEFAULT 'pending',
    retry_count INT DEFAULT 0,
    max_retries INT DEFAULT 3,
    error_message TEXT,
    consumer_id VARCHAR(100),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    completed_at TIMESTAMP NULL,
    
    -- 关键索引
    INDEX idx_execute_status (execute_at, status),
    INDEX idx_status (status),
    INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

字段说明

字段类型说明
task_idVARCHAR(100)任务唯一标识,用于取消、查询
task_typeVARCHAR(50)任务类型,用于路由到不同的处理器
task_dataTEXT任务数据(JSON 格式)
execute_atTIMESTAMP任务执行时间
statusENUM任务状态
retry_countINT重试次数
max_retriesINT最大重试次数
error_messageTEXT错误信息
consumer_idVARCHAR(100)消费者 ID(用于故障恢复)

关键索引

-- 最关键的索引:按执行时间和状态查询
CREATE INDEX idx_execute_status ON delay_tasks (execute_at, status);

-- 用于清理已完成任务
CREATE INDEX idx_status ON delay_tasks (status);

-- 用于监控任务积压
CREATE INDEX idx_created_at ON delay_tasks (created_at);

为什么这个索引很重要?

-- ✅ 使用索引(快速)
SELECT * FROM delay_tasks 
WHERE execute_at <= NOW() 
  AND status = 'pending'
ORDER BY execute_at ASC;

-- ✅ 执行计划:
-- Using index idx_execute_status
-- Rows examined: 100

-- ❌ 不使用索引(慢)
SELECT * FROM delay_tasks 
WHERE task_type = 'order_timeout'
  AND execute_at <= NOW();

-- ❌ 执行计划:
-- Using where; Using filesort
-- Rows examined: 1000000

轮询间隔的选择

间隔对精度的影响

间隔精度CPU 消耗数据库压力适用场景
1 秒±0.5 秒🔴 极高🔴 极高秒杀倒计时
10 秒±5 秒🟡 高🟡 高高精度场景
1 分钟±30 秒🟢 中🟢 中订单超时
5 分钟±2.5 分钟🟢 低🟢 低定时任务
1 小时±30 分钟🟢 极低🟢 低长周期任务

自适应轮询

根据任务量动态调整轮询间隔:

class AdaptivePollingDelayQueue:
    """自适应轮询延时队列"""
    
    def __init__(self):
        self.db = MySQLDatabase()
        self.current_interval = 60  # 当前间隔(秒)
        self.min_interval = 10      # 最小间隔
        self.max_interval = 300     # 最大间隔
    
    def start_consumer(self, handler):
        """启动消费者"""
        while True:
            # ✅ 第 1 步:轮询并执行
            task_count = self._poll_and_execute(handler)
            
            # ✅ 第 2 步:根据任务量调整间隔
            self._adjust_interval(task_count)
            
            # ✅ 第 3 步:休眠
            time.sleep(self.current_interval)
    
    def _adjust_interval(self, task_count):
        """根据任务量调整轮询间隔"""
        if task_count > 100:
            # 任务多,加快轮询
            self.current_interval = max(
                self.min_interval,
                self.current_interval / 2
            )
            logger.info(f"任务较多,加快轮询: {self.current_interval}秒")
            
        elif task_count < 10:
            # 任务少,降低轮询频率
            self.current_interval = min(
                self.max_interval,
                self.current_interval * 1.5
            )
            logger.info(f"任务较少,降低轮询: {self.current_interval}秒")

优缺点分析

优点

优点说明
简单可靠实现简单,容易理解和维护
持久化保证数据库天然持久化,任务不会丢
事务支持可以利用数据库事务保证一致性
易扩展可以增加索引、分区、分表
查询灵活支持复杂查询和统计

缺点

缺点说明影响
精度有限轮询间隔决定精度不适合高精度场景
数据库压力频繁查询数据库高并发时压力大
空轮询没有任务时也会查询浪费资源
性能瓶颈数据库 I/O 限制无法支撑超高并发

想一想

思考 1

如何减少数据库轮询的空查询(没有任务时的查询)?

参考答案

问题分析:

空查询是指轮询时没有找到任何到期任务,但仍然执行了数据库查询。

空查询的代价:

- 数据库连接占用
- 网络往返延迟
- 数据库 CPU 和 I/O 开销
- 日志记录

假设每天 100 万任务,每分钟轮询一次:
- 总轮询次数:1440 次/天
- 如果大部分时间没有任务,90% 是空查询
- 浪费:1296 次无效查询

解决方案:

方案 1:预估下次查询时间

class SmartPollingDelayQueue:
    """智能轮询:预估下次查询时间"""
    
    def __init__(self):
        self.db = MySQLDatabase()
        self.next_poll_time = None  # 下次需要查询的时间
    
    def start_consumer(self, handler):
        """启动消费者"""
        while True:
            now = datetime.now()
            
            # ✅ 如果设置了下次查询时间,且还没到,就跳过
            if self.next_poll_time and now < self.next_poll_time:
                wait_seconds = (self.next_poll_time - now).total_seconds()
                logger.info(f"等待下次查询: {wait_seconds}秒")
                time.sleep(wait_seconds)
                continue
            
            # ✅ 查询并执行任务
            task_count = self._poll_and_execute(handler)
            
            # ✅ 查询下一个待执行任务的执行时间
            self._update_next_poll_time()
    
    def _update_next_poll_time(self):
        """查询下一个待执行任务的执行时间"""
        # 查询最早的 pending 任务
        result = self.db.query("""
            SELECT execute_at FROM delay_tasks 
            WHERE status = 'pending'
            ORDER BY execute_at ASC
            LIMIT 1
        """)
        
        if result:
            next_execute_at = result[0]['execute_at']
            self.next_poll_time = next_execute_at
            logger.info(f"下次查询时间: {next_execute_at}")
        else:
            # 没有待执行任务,等待一段时间再查
            self.next_poll_time = datetime.now() + timedelta(minutes=5)
            logger.info("没有待执行任务,5 分钟后再查")

方案 2:使用缓存

class CachedPollingDelayQueue:
    """带缓存的轮询"""
    
    def __init__(self):
        self.db = MySQLDatabase()
        self.redis = redis.Redis()
        self.cache_ttl = 60  # 缓存 60 秒
    
    def _has_pending_tasks(self):
        """检查是否有待执行任务(使用缓存)"""
        cache_key = 'has_pending_tasks'
        
        # ✅ 先查缓存
        cached = self.redis.get(cache_key)
        if cached is not None:
            return cached == '1'
        
        # ✅ 缓存未命中,查询数据库
        result = self.db.query("""
            SELECT COUNT(*) as count FROM delay_tasks 
            WHERE status = 'pending' 
              AND execute_at <= NOW()
        """)
        
        has_pending = result[0]['count'] > 0
        
        # ✅ 更新缓存
        self.redis.setex(
            cache_key,
            '1' if has_pending else '0',
            self.cache_ttl
        )
        
        return has_pending
    
    def start_consumer(self, handler):
        """启动消费者"""
        while True:
            # ✅ 先检查缓存,避免空查询
            if not self._has_pending_tasks():
                logger.info("没有待执行任务,等待 10 秒")
                time.sleep(10)
                continue
            
            # ✅ 有任务,执行查询
            self._poll_and_execute(handler)
            
            # ✅ 清除缓存,下次重新检查
            self.redis.delete('has_pending_tasks')

方案 3:触发通知

class NotificationPollingDelayQueue:
    """带通知的轮询"""
    
    def __init__(self):
        self.db = MySQLDatabase()
        self.has_new_tasks = False  # 通知标志
    
    def add_task(self, task_id, task_type, task_data, delay_seconds):
        """添加任务:设置通知标志"""
        execute_at = datetime.now() + timedelta(seconds=delay_seconds)
        
        self.db.execute("""
            INSERT INTO delay_tasks 
            (task_id, task_type, task_data, execute_at, status, created_at)
            VALUES (%s, %s, %s, %s, 'pending', NOW())
        """, (task_id, task_type, json.dumps(task_data), execute_at))
        
        # ✅ 设置通知标志
        self.has_new_tasks = True
        
        logger.info(f"添加任务并设置通知: {task_id}")
    
    def start_consumer(self, handler):
        """启动消费者"""
        while True:
            if self.has_new_tasks:
                # ✅ 有新任务,查询并执行
                self._poll_and_execute(handler)
                self.has_new_tasks = False
            else:
                # ✅ 没有新任务,等待
                time.sleep(1)

方案对比:

方案减少空查询复杂度适用场景
预估时间⭐⭐⭐⭐⭐⭐⭐任务稀疏
使用缓存⭐⭐⭐⭐频繁查询
触发通知⭐⭐⭐⭐⭐⭐⭐⭐单消费者