批量处理

从逐条处理到批量处理

还记得我最初的实现吗?

# 😱 逐条处理
for order in pending_orders:
    cancel_order(order)
    release_inventory(order)
    send_notification(order.user_id, 'order_cancelled')

每次取消订单都要:

  1. 更新订单状态(数据库写入)
  2. 释放库存(数据库写入)
  3. 发送通知(HTTP 请求)
  4. 记录日志

如果有 1000 个订单要取消,那就是 3000 次数据库写入和 1000 次 HTTP 请求!

逐条处理的性能瓶颈:

数据库 I/O:3000 次
网络 I/O:1000 次
总耗时:~15 秒

批量处理的优势

性能对比

维度逐条处理批量处理提升
数据库 I/O3000 次100 次30x
网络往返1000 次10 次100x
总耗时15 秒2 秒7.5x
连接占用-

原理

逐条处理:
┌─────────┐    ┌─────────┐    ┌─────────┐
│ 任务 1  │───►│ 处理任务  │───►│ 写数据库  │
└─────────┘    └─────────┘    └─────────┘
┌─────────┐    ┌─────────┐    ┌─────────┐
│ 任务 2  │───►│ 处理任务  │───►│ 写数据库  │
└─────────┘    └─────────┘    └─────────┘
...

批量处理:
┌─────────┐    ┌─────────┐    ┌─────────┐
│ 任务 1  │────│          │    │         │
├─────────┤    │ 批量处理  │───►│ 批量写入  │
│ 任务 2  │────│          │    │         │
├─────────┤    └─────────┘    └─────────┘
│ 任务 3  │────│
└─────────┘    │
...           │


         一次处理完成

批量处理实现

数据库批量更新

class BatchDelayQueue:
    """批量处理延时队列"""
    
    def __init__(self, batch_size=100):
        self.db = MySQLDatabase()
        self.batch_size = batch_size
    
    def _batch_update_status(self, task_ids, new_status):
        """批量更新任务状态"""
        # ❌ 逐条更新(慢)
        # for task_id in task_ids:
        #     self.db.execute("""
        #         UPDATE delay_tasks 
        #         SET status = %s, updated_at = NOW()
        #         WHERE task_id = %s
        #     """, [new_status, task_id])
        
        # ✅ 批量更新(快)
        placeholders = ','.join(['%s'] * len(task_ids))
        self.db.execute(f"""
            UPDATE delay_tasks 
            SET status = %s, updated_at = NOW()
            WHERE task_id IN ({placeholders})
        """, [new_status] + task_ids)
        
        logger.info(f"批量更新 {len(task_ids)} 个任务状态为 {new_status}")
    
    def _batch_process_tasks(self, tasks):
        """批量处理任务"""
        # 收集需要处理的订单 ID
        order_ids = [task['task_data']['order_id'] for task in tasks]
        
        # ✅ 批量查询订单
        orders = {}
        for order in Order.objects.filter(id__in=order_ids):
            orders[order.id] = order
        
        # ✅ 批量取消订单
        orders_to_cancel = [
            order.id for order in orders.values()
            if order.status == 'pending_payment'
        ]
        
        if orders_to_cancel:
            Order.objects.filter(id__in=orders_to_cancel).update(
                status='cancelled',
                cancel_reason='payment_timeout',
                cancelled_at=timezone.now()
            )
            
            # ✅ 批量释放库存
            product_quantities = defaultdict(int)
            for order_id in orders_to_cancel:
                order = orders[order_id]
                product_quantities[order.product_id] += order.quantity
            
            for product_id, quantity in product_quantities.items():
                Inventory.objects.filter(product_id=product_id).update(
                    quantity=F('quantity') + quantity
                )
            
            # ✅ 批量发送通知(异步)
            user_ids = [orders[oid].user_id for oid in orders_to_cancel]
            NotificationService.batch_send(
                user_ids,
                'order_cancelled',
                {'reason': 'payment_timeout'}
            )

批量查询

def batch_query_orders(order_ids):
    """批量查询订单"""
    # ❌ 逐条查询(N+1 问题)
    # orders = []
    # for order_id in order_ids:
    #     order = Order.get(order_id)
    #     orders.append(order)
    
    # ✅ 批量查询(一次查询)
    orders = list(Order.objects.filter(id__in=order_ids))
    
    # 构建 ID -> Order 映射
    order_map = {order.id: order for order in orders}
    
    return order_map

# 使用示例
order_ids = [task['task_data']['order_id'] for task in tasks]
orders = batch_query_orders(order_ids)

批量插入

def batch_add_tasks(task_list):
    """批量添加任务"""
    # ❌ 逐条插入
    # for task in task_list:
    #     DelayTask.objects.create(**task)
    
    # ✅ 批量插入
    DelayTask.objects.bulk_create([
        DelayTask(**task) for task in task_list
    ], batch_size=100)
    
    # 或者使用原生 SQL
    values = ','.join([
        f"('{t['task_id']}', '{t['task_type']}', '{json.dumps(t['task_data'])}', '{t['execute_at']}')"
        for t in task_list
    ])
    
    db.execute(f"""
        INSERT INTO delay_tasks (task_id, task_type, task_data, execute_at, status, created_at)
        VALUES {values}
    """)

批量处理的注意事项

1. 批量大小的选择

class AdaptiveBatchProcessor:
    """自适应批量大小"""
    
    def __init__(self):
        self.db = MySQLDatabase()
        self.batch_size = 100  # 初始批量大小
        self.min_batch_size = 10
        self.max_batch_size = 1000
    
    def _adjust_batch_size(self, execution_time):
        """根据执行时间调整批量大小"""
        if execution_time < 1.0:  # 执行很快,可以增加批量
            self.batch_size = min(
                self.max_batch_size,
                int(self.batch_size * 1.5)
            )
        elif execution_time > 5.0:  # 执行较慢,减少批量
            self.batch_size = max(
                self.min_batch_size,
                int(self.batch_size * 0.7)
            )
        
        logger.info(f"调整批量大小: {self.batch_size}")

批量大小建议:

场景批量大小理由
简单数据库更新100~500数据库连接池限制
复杂业务逻辑10~50避免单个请求超时
外部 API 调用5~20避免超时和限流
大量数据导入1000~5000最大化吞吐量

2. 事务管理

def batch_process_with_transaction(tasks):
    """带事务的批量处理"""
    try:
        with transaction.atomic():
            # ✅ 在一个事务中执行所有操作
            # 1. 批量查询
            # 2. 批量更新
            # 3. 批量插入
            pass
    except Exception as e:
        # ✅ 事务回滚,保证数据一致性
        logger.error(f"批量处理失败: {e}")
        raise

3. 错误处理

def batch_process_with_partial_failure(tasks):
    """允许部分失败的批量处理"""
    success_count = 0
    failed_tasks = []
    
    for task in tasks:
        try:
            process_task(task)
            success_count += 1
        except Exception as e:
            logger.error(f"任务失败: {task['task_id']}, {e}")
            failed_tasks.append(task)
    
    # ✅ 处理成功的继续,失败的重试
    logger.info(
        f"批量处理完成: 成功 {success_count}, 失败 {len(failed_tasks)}"
    )
    
    if failed_tasks:
        # 失败的任务放入重试队列
        retry_tasks(failed_tasks)

批量处理的完整示例

class BatchPollingDelayQueue:
    """批量轮询延时队列"""
    
    def __init__(self, batch_size=100):
        self.db = MySQLDatabase()
        self.batch_size = batch_size
    
    def start_consumer(self, handler):
        """启动消费者"""
        while True:
            # ✅ 第 1 步:批量查询到期任务
            tasks = self._query_ready_tasks()
            
            if not tasks:
                time.sleep(1)
                continue
            
            # ✅ 第 2 步:批量标记为处理中
            task_ids = [task['task_id'] for task in tasks]
            self._batch_update_status(task_ids, 'processing')
            
            try:
                # ✅ 第 3 步:批量处理
                self._batch_process(tasks, handler)
                
                # ✅ 第 4 步:批量标记为完成
                self._batch_update_status(task_ids, 'completed')
                
            except Exception as e:
                logger.error(f"批量处理失败: {e}")
                
                # ✅ 失败的任务标记为重试
                self._batch_update_status(task_ids, 'retrying')
    
    def _query_ready_tasks(self):
        """批量查询到期任务"""
        tasks = self.db.query("""
            SELECT * FROM delay_tasks 
            WHERE execute_at <= NOW() 
              AND status = 'pending'
            ORDER BY execute_at ASC
            LIMIT %s
        """, [self.batch_size])
        
        return tasks
    
    def _batch_update_status(self, task_ids, status):
        """批量更新状态"""
        if not task_ids:
            return
        
        placeholders = ','.join(['%s'] * len(task_ids))
        self.db.execute(f"""
            UPDATE delay_tasks 
            SET status = %s, updated_at = NOW()
            WHERE task_id IN ({placeholders})
        """, [status] + task_ids)
    
    def _batch_process(self, tasks, handler):
        """批量处理任务"""
        # 根据任务类型分组
        tasks_by_type = defaultdict(list)
        for task in tasks:
            tasks_by_type[task['task_type']].append(task)
        
        # 按类型批量处理
        for task_type, type_tasks in tasks_by_type.items():
            if task_type == 'order_timeout':
                self._batch_process_order_timeout(type_tasks)
            elif task_type == 'coupon_expire':
                self._batch_process_coupon_expire(type_tasks)
            else:
                # 其他类型逐个处理
                for task in type_tasks:
                    handler(task)
    
    def _batch_process_order_timeout(self, tasks):
        """批量处理订单超时"""
        order_ids = [task['task_data']['order_id'] for task in tasks]
        
        # 批量查询订单
        orders = Order.objects.filter(id__in=order_ids)
        
        # 批量取消
        orders_to_cancel = [
            order.id for order in orders
            if order.status == 'pending_payment'
        ]
        
        if orders_to_cancel:
            Order.objects.filter(id__in=orders_to_cancel).update(
                status='cancelled',
                cancel_reason='payment_timeout',
                cancelled_at=timezone.now()
            )

想一想

思考 1

批量处理时,如果部分任务失败,应该如何处理?

参考答案

问题分析:

批量处理中,如果 100 个任务中有 5 个失败,如何处理?

三种策略:

策略 1:全部失败回滚

def batch_process_all_or_nothing(tasks):
    """全部成功或全部失败"""
    try:
        with transaction.atomic():
            for task in tasks:
                process_task(task)
    except Exception as e:
        # 任何一个任务失败,全部回滚
        logger.error(f"批量处理失败,全部回滚: {e}")
        raise

优点: 数据一致性好 缺点: 一个失败影响全部


策略 2:部分失败跳过

def batch_process_partial(tasks):
    """部分失败跳过"""
    success_count = 0
    failed_tasks = []
    
    for task in tasks:
        try:
            process_task(task)
            success_count += 1
        except Exception as e:
            logger.error(f"任务失败: {task['task_id']}, {e}")
            failed_tasks.append(task)
    
    logger.info(
        f"批量处理: 成功 {success_count}, 失败 {len(failed_tasks)}"
    )
    
    # 失败的任务重试
    if failed_tasks:
        retry_tasks(failed_tasks)

优点: 不会影响成功的任务 缺点: 需要额外处理失败任务


策略 3:分组处理

def batch_process_by_group(tasks, group_size=10):
    """分组处理,每组独立"""
    for i in range(0, len(tasks), group_size):
        group = tasks[i:i + group_size]
        
        try:
            with transaction.atomic():
                for task in group:
                    process_task(task)
        except Exception as e:
            logger.error(f"组 {i//group_size} 失败: {e}")
            # 该组失败,但不影响其他组

优点: 平衡了并发控制和故障隔离 缺点: 实现较复杂

最佳实践建议:

  1. 允许部分失败:对于非关键任务
  2. 分组处理:对于大批量任务
  3. 全部回滚:对于强一致性要求