批量操作
一个电商客户的请求
那天下午,我收到了一封加急邮件。
你好,我是某电商的技术负责人。我们马上要做双十一大促,有 5000 个产品链接需要缩短,用于短信营销。每条短信按字符计费,长链接太贵了。
能不能今天搞定?我们明天就要开始发短信了。
5000 个链接。我的 API 一次只能处理一个。
第一版:同步批量接口
最快的方案
客户催得紧,我决定先出一个最快的方案。
@app.route('/api/shorten/batch', methods=['POST'])
def batch_shorten():
"""批量创建短链接"""
data = request.get_json()
urls = data.get('urls', [])
# 安全限制:单次最多 1000 个
if len(urls) > 1000:
return {'error': '单次最多 1000 个 URL'}, 400
results = []
success_count = 0
error_count = 0
for url in urls:
try:
short_code = generate_short_code(url)
db.execute(
"INSERT INTO urls (short_code, long_url, user_id) VALUES (?, ?, ?)",
(short_code, url, request.user['id'])
)
results.append({
'original_url': url,
'short_url': f'https://short.url/{short_code}',
'status': 'success'
})
success_count += 1
except Exception as e:
results.append({
'original_url': url,
'status': 'error',
'error': str(e)
})
error_count += 1
return jsonify({
'total': len(urls),
'success': success_count,
'failed': error_count,
'results': results
})问题很快暴露
客户分 5 批提交了 5000 个链接。每批 1000 个,平均耗时 15 秒。
“15 秒?太慢了!我们点提交后页面一直在转圈,还以为挂了。”
我查了一下瓶颈:1000 次数据库 INSERT,每次约 15ms,总计 15 秒。
优化:事务批量插入
把 1000 次提交合并为 1 次
“原来每次 INSERT 都是一次磁盘写入。如果把 1000 条打包成一次事务呢?“
@app.route('/api/shorten/batch', methods=['POST'])
def batch_shorten_v2():
"""批量创建短链接(事务优化版)"""
data = request.get_json()
urls = data.get('urls', [])
if len(urls) > 1000:
return {'error': '单次最多 1000 个 URL'}, 400
results = []
# 使用事务:1000 条 INSERT 合并为 1 次提交
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("BEGIN TRANSACTION")
try:
for url in urls:
short_code = generate_short_code(url)
cursor.execute(
"INSERT INTO urls (short_code, long_url, user_id) VALUES (?, ?, ?)",
(short_code, url, request.user['id'])
)
results.append({
'original_url': url,
'short_url': f'https://short.url/{short_code}',
'status': 'success'
})
conn.commit()
except Exception as e:
conn.rollback()
return {'error': f'批量创建失败:{str(e)}'}, 500
return jsonify({
'total': len(urls),
'success': len(results),
'failed': 0,
'results': results
})效果
| 方案 | 1000 条耗时 | 原因 |
|---|---|---|
| 逐条 INSERT | 15 秒 | 1000 次磁盘写入 |
| 事务批量 | 1.5 秒 | 1 次磁盘写入 |
“10 倍提升,客户满意了。“
CSV 导入
客户的第二个需求
“能不能支持 CSV 文件导入?我们运营团队只会用 Excel。“
import csv
from werkzeug.utils import secure_filename
@app.route('/api/import/csv', methods=['POST'])
def import_csv():
"""从 CSV 文件导入 URL"""
# 检查文件
if 'file' not in request.files:
return {'error': '请上传 CSV 文件'}, 400
file = request.files['file']
if not file.filename.endswith('.csv'):
return {'error': '只支持 CSV 文件'}, 400
# 解析 CSV
urls = []
try:
stream = file.read().decode('utf-8').splitlines()
reader = csv.DictReader(stream)
for row in reader:
url = row.get('url') or row.get('URL') or row.get('链接')
if url and url.startswith('http'):
urls.append(url.strip())
except Exception as e:
return {'error': f'CSV 解析失败:{str(e)}'}, 400
if not urls:
return {'error': 'CSV 中没有找到有效的 URL'}, 400
if len(urls) > 5000:
return {'error': f'CSV 包含 {len(urls)} 条,超过单次上限 5000 条'}, 400
# 批量创建(复用事务接口的逻辑)
return _batch_create_urls(urls, request.user['id'])CSV 格式说明
我写了个简单的文档给客户:
# CSV 文件格式
## 必需列
- url(或 URL、链接)
## 可选列
- title(链接标题)
- tag(标签,用于分组管理)
## 示例
url,title,tag
https://example.com/product/1,iPhone 15,手机
https://example.com/product/2,MacBook Pro,电脑
https://example.com/product/3,AirPods,耳机更大的客户来了
新问题:10 万条链接
一个月后,一个更大的客户找上门。
“我们有 10 万条 链接需要缩短,用于全渠道营销推广。”
10 万条。即使事务优化后,也需要 150 秒。HTTP 请求早就超时了。
“我意识到,同步方案的天花板到了。“
异步处理:Celery 任务队列
架构改造
“我花了一个周末引入了 Celery 异步任务队列。“
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 客户端 │────▶│ API 服务 │────▶│ Redis │ ← 任务队列
└──────────┘ └──────────┘ └──────────┘
│
▼
┌──────────┐
│ Worker │ ← 后台处理
└──────────┘
│
▼
┌──────────┐
│ MySQL │
└──────────┘提交任务
from celery import Celery
celery_app = Celery('url_shortener', broker='redis://localhost:6379/0')
@app.route('/api/shorten/batch-async', methods=['POST'])
def batch_shorten_async():
"""异步批量创建短链接"""
data = request.get_json()
urls = data.get('urls', [])
if len(urls) > 100000:
return {'error': '单次最多 10 万条'}, 400
# 创建任务记录
task_id = generate_task_id()
db.execute(
"INSERT INTO batch_tasks (id, user_id, total_count, status) VALUES (?, ?, ?, ?)",
(task_id, request.user['id'], len(urls), 'pending')
)
# 提交异步任务
process_batch.delay(task_id, urls, request.user['id'])
return jsonify({
'task_id': task_id,
'status': 'processing',
'total': len(urls),
'message': f'已提交 {len(urls)} 条 URL,正在后台处理'
}), 202Worker 处理
@celery_app.task(bind=True)
def process_batch(self, task_id, urls, user_id):
"""后台批量处理"""
batch_size = 1000
total = len(urls)
processed = 0
success_count = 0
error_count = 0
# 分批处理,每批 1000 条
for i in range(0, total, batch_size):
batch = urls[i:i + batch_size]
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("BEGIN TRANSACTION")
for url in batch:
try:
short_code = generate_short_code(url)
cursor.execute(
"INSERT INTO urls (short_code, long_url, user_id) VALUES (?, ?, ?)",
(short_code, url, user_id)
)
success_count += 1
except Exception:
error_count += 1
conn.commit()
processed += len(batch)
# 更新进度
progress = int(processed / total * 100)
db.execute(
"UPDATE batch_tasks SET progress = ?, success_count = ?, error_count = ? WHERE id = ?",
(progress, success_count, error_count, task_id)
)
# 通知 Celery 进度
self.update_state(
state='PROGRESS',
meta={'progress': progress, 'processed': processed, 'total': total}
)
# 标记完成
db.execute(
"UPDATE batch_tasks SET status = 'completed', progress = 100, completed_at = ? WHERE id = ?",
(datetime.now(), task_id)
)
return {
'task_id': task_id,
'total': total,
'success': success_count,
'failed': error_count
}效果
| 方案 | 10 万条耗时 | 客户体验 |
|---|---|---|
| 同步接口 | 150 秒(超时) | ❌ 页面卡死 |
| 异步任务 | ~30 秒(后台) | ✅ 秒级响应,后台处理 |
进度追踪
查询接口
“客户提交后总想知道进度。我加了一个查询接口。“
@app.route('/api/batch/<task_id>/status')
def get_batch_status(task_id):
"""查询批量任务状态"""
task = db.query(
"SELECT * FROM batch_tasks WHERE id = ? AND user_id = ?",
(task_id, request.user['id'])
)
if not task:
return {'error': '任务不存在'}, 404
task = task[0]
response = {
'task_id': task['id'],
'status': task['status'], # pending / processing / completed / failed
'progress': task['progress'], # 0-100
'total': task['total_count'],
'success': task['success_count'],
'failed': task['error_count']
}
if task['status'] == 'completed':
response['completed_at'] = task['completed_at'].isoformat()
response['duration_seconds'] = (
task['completed_at'] - task['created_at']
).total_seconds()
return jsonify(response)客户端轮询
// 客户端示例:轮询任务进度
async function submitBatchAndTrack(urls) {
// 1. 提交任务
const submitResp = await fetch('/api/shorten/batch-async', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ urls })
});
const { task_id } = await submitResp.json();
// 2. 轮询进度
while (true) {
const statusResp = await fetch(`/api/batch/${task_id}/status`);
const status = await statusResp.json();
updateProgressBar(status.progress);
updateStatusText(`已处理 ${status.success} 条,失败 ${status.failed} 条`);
if (status.status === 'completed') {
showSuccessMessage(`全部完成!成功 ${status.success} 条`);
break;
}
if (status.status === 'failed') {
showErrorMessage('任务处理失败');
break;
}
// 每秒查询一次
await new Promise(r => setTimeout(r, 1000));
}
}批量删除和更新
批量删除
“后来客户又提出:大促结束了,那些活动链接能不能批量删除?“
@app.route('/api/links/batch-delete', methods=['POST'])
def batch_delete():
"""批量删除短链接"""
data = request.get_json()
short_codes = data.get('short_codes', [])
if len(short_codes) > 10000:
return {'error': '单次最多删除 1 万条'}, 400
# 批量删除(使用 IN 语句)
placeholders = ','.join(['?'] * len(short_codes))
cursor = db.execute(
f"DELETE FROM urls WHERE short_code IN ({placeholders}) AND user_id = ?",
(*short_codes, request.user['id'])
)
deleted_count = cursor.rowcount
# 清理缓存
for code in short_codes:
redis.delete(f"url:{code}")
return jsonify({
'requested': len(short_codes),
'deleted': deleted_count
})批量更新
@app.route('/api/links/batch-update', methods=['POST'])
def batch_update():
"""批量更新短链接"""
updates = request.get_json().get('updates', [])
if len(updates) > 5000:
return {'error': '单次最多更新 5000 条'}, 400
updated_count = 0
with get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("BEGIN TRANSACTION")
for item in updates:
result = cursor.execute(
"UPDATE urls SET long_url = ? WHERE short_code = ? AND user_id = ?",
(item['new_url'], item['short_code'], request.user['id'])
)
updated_count += result.rowcount
# 清除旧缓存
redis.delete(f"url:{item['short_code']}")
conn.commit()
return jsonify({
'requested': len(updates),
'updated': updated_count
})成本分析
“引入 Celery 后,我算了一笔账。“
| 组件 | 月成本 | 说明 |
|---|---|---|
| 额外 Redis 实例 | ¥40/月 | Celery broker |
| Worker 服务器 | ¥60/月 | 2 核 4G |
| 合计 | ¥100/月 | — |
“多花了 ¥100/月,但换来了企业客户的满意度。这个投入值不值?看收入就知道了——批量功能上线后,企业客户付费转化提升了 30%。”