批量处理
从逐条处理到批量处理
还记得我最初的实现吗?
# 😱 逐条处理
for order in pending_orders:
cancel_order(order)
release_inventory(order)
send_notification(order.user_id, 'order_cancelled')每次取消订单都要:
- 更新订单状态(数据库写入)
- 释放库存(数据库写入)
- 发送通知(HTTP 请求)
- 记录日志
如果有 1000 个订单要取消,那就是 3000 次数据库写入和 1000 次 HTTP 请求!
逐条处理的性能瓶颈:
数据库 I/O:3000 次
网络 I/O:1000 次
总耗时:~15 秒批量处理的优势
性能对比
| 维度 | 逐条处理 | 批量处理 | 提升 |
|---|---|---|---|
| 数据库 I/O | 3000 次 | 100 次 | 30x |
| 网络往返 | 1000 次 | 10 次 | 100x |
| 总耗时 | 15 秒 | 2 秒 | 7.5x |
| 连接占用 | 高 | 低 | - |
原理
逐条处理:
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 任务 1 │───►│ 处理任务 │───►│ 写数据库 │
└─────────┘ └─────────┘ └─────────┘
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 任务 2 │───►│ 处理任务 │───►│ 写数据库 │
└─────────┘ └─────────┘ └─────────┘
...
批量处理:
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 任务 1 │────│ │ │ │
├─────────┤ │ 批量处理 │───►│ 批量写入 │
│ 任务 2 │────│ │ │ │
├─────────┤ └─────────┘ └─────────┘
│ 任务 3 │────│
└─────────┘ │
... │
│
▼
一次处理完成批量处理实现
数据库批量更新
class BatchDelayQueue:
"""批量处理延时队列"""
def __init__(self, batch_size=100):
self.db = MySQLDatabase()
self.batch_size = batch_size
def _batch_update_status(self, task_ids, new_status):
"""批量更新任务状态"""
# ❌ 逐条更新(慢)
# for task_id in task_ids:
# self.db.execute("""
# UPDATE delay_tasks
# SET status = %s, updated_at = NOW()
# WHERE task_id = %s
# """, [new_status, task_id])
# ✅ 批量更新(快)
placeholders = ','.join(['%s'] * len(task_ids))
self.db.execute(f"""
UPDATE delay_tasks
SET status = %s, updated_at = NOW()
WHERE task_id IN ({placeholders})
""", [new_status] + task_ids)
logger.info(f"批量更新 {len(task_ids)} 个任务状态为 {new_status}")
def _batch_process_tasks(self, tasks):
"""批量处理任务"""
# 收集需要处理的订单 ID
order_ids = [task['task_data']['order_id'] for task in tasks]
# ✅ 批量查询订单
orders = {}
for order in Order.objects.filter(id__in=order_ids):
orders[order.id] = order
# ✅ 批量取消订单
orders_to_cancel = [
order.id for order in orders.values()
if order.status == 'pending_payment'
]
if orders_to_cancel:
Order.objects.filter(id__in=orders_to_cancel).update(
status='cancelled',
cancel_reason='payment_timeout',
cancelled_at=timezone.now()
)
# ✅ 批量释放库存
product_quantities = defaultdict(int)
for order_id in orders_to_cancel:
order = orders[order_id]
product_quantities[order.product_id] += order.quantity
for product_id, quantity in product_quantities.items():
Inventory.objects.filter(product_id=product_id).update(
quantity=F('quantity') + quantity
)
# ✅ 批量发送通知(异步)
user_ids = [orders[oid].user_id for oid in orders_to_cancel]
NotificationService.batch_send(
user_ids,
'order_cancelled',
{'reason': 'payment_timeout'}
)批量查询
def batch_query_orders(order_ids):
"""批量查询订单"""
# ❌ 逐条查询(N+1 问题)
# orders = []
# for order_id in order_ids:
# order = Order.get(order_id)
# orders.append(order)
# ✅ 批量查询(一次查询)
orders = list(Order.objects.filter(id__in=order_ids))
# 构建 ID -> Order 映射
order_map = {order.id: order for order in orders}
return order_map
# 使用示例
order_ids = [task['task_data']['order_id'] for task in tasks]
orders = batch_query_orders(order_ids)批量插入
def batch_add_tasks(task_list):
"""批量添加任务"""
# ❌ 逐条插入
# for task in task_list:
# DelayTask.objects.create(**task)
# ✅ 批量插入
DelayTask.objects.bulk_create([
DelayTask(**task) for task in task_list
], batch_size=100)
# 或者使用原生 SQL
values = ','.join([
f"('{t['task_id']}', '{t['task_type']}', '{json.dumps(t['task_data'])}', '{t['execute_at']}')"
for t in task_list
])
db.execute(f"""
INSERT INTO delay_tasks (task_id, task_type, task_data, execute_at, status, created_at)
VALUES {values}
""")批量处理的注意事项
1. 批量大小的选择
class AdaptiveBatchProcessor:
"""自适应批量大小"""
def __init__(self):
self.db = MySQLDatabase()
self.batch_size = 100 # 初始批量大小
self.min_batch_size = 10
self.max_batch_size = 1000
def _adjust_batch_size(self, execution_time):
"""根据执行时间调整批量大小"""
if execution_time < 1.0: # 执行很快,可以增加批量
self.batch_size = min(
self.max_batch_size,
int(self.batch_size * 1.5)
)
elif execution_time > 5.0: # 执行较慢,减少批量
self.batch_size = max(
self.min_batch_size,
int(self.batch_size * 0.7)
)
logger.info(f"调整批量大小: {self.batch_size}")批量大小建议:
| 场景 | 批量大小 | 理由 |
|---|---|---|
| 简单数据库更新 | 100~500 | 数据库连接池限制 |
| 复杂业务逻辑 | 10~50 | 避免单个请求超时 |
| 外部 API 调用 | 5~20 | 避免超时和限流 |
| 大量数据导入 | 1000~5000 | 最大化吞吐量 |
2. 事务管理
def batch_process_with_transaction(tasks):
"""带事务的批量处理"""
try:
with transaction.atomic():
# ✅ 在一个事务中执行所有操作
# 1. 批量查询
# 2. 批量更新
# 3. 批量插入
pass
except Exception as e:
# ✅ 事务回滚,保证数据一致性
logger.error(f"批量处理失败: {e}")
raise3. 错误处理
def batch_process_with_partial_failure(tasks):
"""允许部分失败的批量处理"""
success_count = 0
failed_tasks = []
for task in tasks:
try:
process_task(task)
success_count += 1
except Exception as e:
logger.error(f"任务失败: {task['task_id']}, {e}")
failed_tasks.append(task)
# ✅ 处理成功的继续,失败的重试
logger.info(
f"批量处理完成: 成功 {success_count}, 失败 {len(failed_tasks)}"
)
if failed_tasks:
# 失败的任务放入重试队列
retry_tasks(failed_tasks)批量处理的完整示例
class BatchPollingDelayQueue:
"""批量轮询延时队列"""
def __init__(self, batch_size=100):
self.db = MySQLDatabase()
self.batch_size = batch_size
def start_consumer(self, handler):
"""启动消费者"""
while True:
# ✅ 第 1 步:批量查询到期任务
tasks = self._query_ready_tasks()
if not tasks:
time.sleep(1)
continue
# ✅ 第 2 步:批量标记为处理中
task_ids = [task['task_id'] for task in tasks]
self._batch_update_status(task_ids, 'processing')
try:
# ✅ 第 3 步:批量处理
self._batch_process(tasks, handler)
# ✅ 第 4 步:批量标记为完成
self._batch_update_status(task_ids, 'completed')
except Exception as e:
logger.error(f"批量处理失败: {e}")
# ✅ 失败的任务标记为重试
self._batch_update_status(task_ids, 'retrying')
def _query_ready_tasks(self):
"""批量查询到期任务"""
tasks = self.db.query("""
SELECT * FROM delay_tasks
WHERE execute_at <= NOW()
AND status = 'pending'
ORDER BY execute_at ASC
LIMIT %s
""", [self.batch_size])
return tasks
def _batch_update_status(self, task_ids, status):
"""批量更新状态"""
if not task_ids:
return
placeholders = ','.join(['%s'] * len(task_ids))
self.db.execute(f"""
UPDATE delay_tasks
SET status = %s, updated_at = NOW()
WHERE task_id IN ({placeholders})
""", [status] + task_ids)
def _batch_process(self, tasks, handler):
"""批量处理任务"""
# 根据任务类型分组
tasks_by_type = defaultdict(list)
for task in tasks:
tasks_by_type[task['task_type']].append(task)
# 按类型批量处理
for task_type, type_tasks in tasks_by_type.items():
if task_type == 'order_timeout':
self._batch_process_order_timeout(type_tasks)
elif task_type == 'coupon_expire':
self._batch_process_coupon_expire(type_tasks)
else:
# 其他类型逐个处理
for task in type_tasks:
handler(task)
def _batch_process_order_timeout(self, tasks):
"""批量处理订单超时"""
order_ids = [task['task_data']['order_id'] for task in tasks]
# 批量查询订单
orders = Order.objects.filter(id__in=order_ids)
# 批量取消
orders_to_cancel = [
order.id for order in orders
if order.status == 'pending_payment'
]
if orders_to_cancel:
Order.objects.filter(id__in=orders_to_cancel).update(
status='cancelled',
cancel_reason='payment_timeout',
cancelled_at=timezone.now()
)想一想
思考 1
批量处理时,如果部分任务失败,应该如何处理?
参考答案
问题分析:
批量处理中,如果 100 个任务中有 5 个失败,如何处理?
三种策略:
策略 1:全部失败回滚
def batch_process_all_or_nothing(tasks):
"""全部成功或全部失败"""
try:
with transaction.atomic():
for task in tasks:
process_task(task)
except Exception as e:
# 任何一个任务失败,全部回滚
logger.error(f"批量处理失败,全部回滚: {e}")
raise优点: 数据一致性好 缺点: 一个失败影响全部
策略 2:部分失败跳过
def batch_process_partial(tasks):
"""部分失败跳过"""
success_count = 0
failed_tasks = []
for task in tasks:
try:
process_task(task)
success_count += 1
except Exception as e:
logger.error(f"任务失败: {task['task_id']}, {e}")
failed_tasks.append(task)
logger.info(
f"批量处理: 成功 {success_count}, 失败 {len(failed_tasks)}"
)
# 失败的任务重试
if failed_tasks:
retry_tasks(failed_tasks)优点: 不会影响成功的任务 缺点: 需要额外处理失败任务
策略 3:分组处理
def batch_process_by_group(tasks, group_size=10):
"""分组处理,每组独立"""
for i in range(0, len(tasks), group_size):
group = tasks[i:i + group_size]
try:
with transaction.atomic():
for task in group:
process_task(task)
except Exception as e:
logger.error(f"组 {i//group_size} 失败: {e}")
# 该组失败,但不影响其他组优点: 平衡了并发控制和故障隔离 缺点: 实现较复杂
最佳实践建议:
- 允许部分失败:对于非关键任务
- 分组处理:对于大批量任务
- 全部回滚:对于强一致性要求