批量操作

一个电商客户的请求

那天下午,我收到了一封加急邮件。

你好,我是某电商的技术负责人。我们马上要做双十一大促,有 5000 个产品链接需要缩短,用于短信营销。每条短信按字符计费,长链接太贵了。

能不能今天搞定?我们明天就要开始发短信了。

5000 个链接。我的 API 一次只能处理一个。


第一版:同步批量接口

最快的方案

客户催得紧,我决定先出一个最快的方案。

@app.route('/api/shorten/batch', methods=['POST'])
def batch_shorten():
    """批量创建短链接"""
    data = request.get_json()
    urls = data.get('urls', [])

    # 安全限制:单次最多 1000 个
    if len(urls) > 1000:
        return {'error': '单次最多 1000 个 URL'}, 400

    results = []
    success_count = 0
    error_count = 0

    for url in urls:
        try:
            short_code = generate_short_code(url)
            db.execute(
                "INSERT INTO urls (short_code, long_url, user_id) VALUES (?, ?, ?)",
                (short_code, url, request.user['id'])
            )
            results.append({
                'original_url': url,
                'short_url': f'https://short.url/{short_code}',
                'status': 'success'
            })
            success_count += 1
        except Exception as e:
            results.append({
                'original_url': url,
                'status': 'error',
                'error': str(e)
            })
            error_count += 1

    return jsonify({
        'total': len(urls),
        'success': success_count,
        'failed': error_count,
        'results': results
    })

问题很快暴露

客户分 5 批提交了 5000 个链接。每批 1000 个,平均耗时 15 秒

“15 秒?太慢了!我们点提交后页面一直在转圈,还以为挂了。”

我查了一下瓶颈:1000 次数据库 INSERT,每次约 15ms,总计 15 秒。


优化:事务批量插入

把 1000 次提交合并为 1 次

“原来每次 INSERT 都是一次磁盘写入。如果把 1000 条打包成一次事务呢?“

@app.route('/api/shorten/batch', methods=['POST'])
def batch_shorten_v2():
    """批量创建短链接(事务优化版)"""
    data = request.get_json()
    urls = data.get('urls', [])

    if len(urls) > 1000:
        return {'error': '单次最多 1000 个 URL'}, 400

    results = []

    # 使用事务:1000 条 INSERT 合并为 1 次提交
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("BEGIN TRANSACTION")

        try:
            for url in urls:
                short_code = generate_short_code(url)
                cursor.execute(
                    "INSERT INTO urls (short_code, long_url, user_id) VALUES (?, ?, ?)",
                    (short_code, url, request.user['id'])
                )
                results.append({
                    'original_url': url,
                    'short_url': f'https://short.url/{short_code}',
                    'status': 'success'
                })

            conn.commit()
        except Exception as e:
            conn.rollback()
            return {'error': f'批量创建失败:{str(e)}'}, 500

    return jsonify({
        'total': len(urls),
        'success': len(results),
        'failed': 0,
        'results': results
    })

效果

方案1000 条耗时原因
逐条 INSERT15 秒1000 次磁盘写入
事务批量1.5 秒1 次磁盘写入

“10 倍提升,客户满意了。“


CSV 导入

客户的第二个需求

“能不能支持 CSV 文件导入?我们运营团队只会用 Excel。“

import csv
from werkzeug.utils import secure_filename

@app.route('/api/import/csv', methods=['POST'])
def import_csv():
    """从 CSV 文件导入 URL"""
    # 检查文件
    if 'file' not in request.files:
        return {'error': '请上传 CSV 文件'}, 400

    file = request.files['file']
    if not file.filename.endswith('.csv'):
        return {'error': '只支持 CSV 文件'}, 400

    # 解析 CSV
    urls = []
    try:
        stream = file.read().decode('utf-8').splitlines()
        reader = csv.DictReader(stream)

        for row in reader:
            url = row.get('url') or row.get('URL') or row.get('链接')
            if url and url.startswith('http'):
                urls.append(url.strip())
    except Exception as e:
        return {'error': f'CSV 解析失败:{str(e)}'}, 400

    if not urls:
        return {'error': 'CSV 中没有找到有效的 URL'}, 400

    if len(urls) > 5000:
        return {'error': f'CSV 包含 {len(urls)} 条,超过单次上限 5000 条'}, 400

    # 批量创建(复用事务接口的逻辑)
    return _batch_create_urls(urls, request.user['id'])

CSV 格式说明

我写了个简单的文档给客户:

# CSV 文件格式

## 必需列
- url(或 URL、链接)

## 可选列
- title(链接标题)
- tag(标签,用于分组管理)

## 示例
url,title,tag
https://example.com/product/1,iPhone 15,手机
https://example.com/product/2,MacBook Pro,电脑
https://example.com/product/3,AirPods,耳机

更大的客户来了

新问题:10 万条链接

一个月后,一个更大的客户找上门。

“我们有 10 万条 链接需要缩短,用于全渠道营销推广。”

10 万条。即使事务优化后,也需要 150 秒。HTTP 请求早就超时了。

“我意识到,同步方案的天花板到了。“


异步处理:Celery 任务队列

架构改造

“我花了一个周末引入了 Celery 异步任务队列。“

┌──────────┐     ┌──────────┐     ┌──────────┐
│  客户端   │────▶│  API 服务 │────▶│  Redis   │  ← 任务队列
└──────────┘     └──────────┘     └──────────┘


                                 ┌──────────┐
                                 │  Worker  │  ← 后台处理
                                 └──────────┘


                                 ┌──────────┐
                                 │  MySQL   │
                                 └──────────┘

提交任务

from celery import Celery

celery_app = Celery('url_shortener', broker='redis://localhost:6379/0')

@app.route('/api/shorten/batch-async', methods=['POST'])
def batch_shorten_async():
    """异步批量创建短链接"""
    data = request.get_json()
    urls = data.get('urls', [])

    if len(urls) > 100000:
        return {'error': '单次最多 10 万条'}, 400

    # 创建任务记录
    task_id = generate_task_id()
    db.execute(
        "INSERT INTO batch_tasks (id, user_id, total_count, status) VALUES (?, ?, ?, ?)",
        (task_id, request.user['id'], len(urls), 'pending')
    )

    # 提交异步任务
    process_batch.delay(task_id, urls, request.user['id'])

    return jsonify({
        'task_id': task_id,
        'status': 'processing',
        'total': len(urls),
        'message': f'已提交 {len(urls)} 条 URL,正在后台处理'
    }), 202

Worker 处理

@celery_app.task(bind=True)
def process_batch(self, task_id, urls, user_id):
    """后台批量处理"""
    batch_size = 1000
    total = len(urls)
    processed = 0
    success_count = 0
    error_count = 0

    # 分批处理,每批 1000 条
    for i in range(0, total, batch_size):
        batch = urls[i:i + batch_size]

        with get_db_connection() as conn:
            cursor = conn.cursor()
            cursor.execute("BEGIN TRANSACTION")

            for url in batch:
                try:
                    short_code = generate_short_code(url)
                    cursor.execute(
                        "INSERT INTO urls (short_code, long_url, user_id) VALUES (?, ?, ?)",
                        (short_code, url, user_id)
                    )
                    success_count += 1
                except Exception:
                    error_count += 1

            conn.commit()

        processed += len(batch)

        # 更新进度
        progress = int(processed / total * 100)
        db.execute(
            "UPDATE batch_tasks SET progress = ?, success_count = ?, error_count = ? WHERE id = ?",
            (progress, success_count, error_count, task_id)
        )

        # 通知 Celery 进度
        self.update_state(
            state='PROGRESS',
            meta={'progress': progress, 'processed': processed, 'total': total}
        )

    # 标记完成
    db.execute(
        "UPDATE batch_tasks SET status = 'completed', progress = 100, completed_at = ? WHERE id = ?",
        (datetime.now(), task_id)
    )

    return {
        'task_id': task_id,
        'total': total,
        'success': success_count,
        'failed': error_count
    }

效果

方案10 万条耗时客户体验
同步接口150 秒(超时)❌ 页面卡死
异步任务~30 秒(后台)✅ 秒级响应,后台处理

进度追踪

查询接口

“客户提交后总想知道进度。我加了一个查询接口。“

@app.route('/api/batch/<task_id>/status')
def get_batch_status(task_id):
    """查询批量任务状态"""
    task = db.query(
        "SELECT * FROM batch_tasks WHERE id = ? AND user_id = ?",
        (task_id, request.user['id'])
    )

    if not task:
        return {'error': '任务不存在'}, 404

    task = task[0]

    response = {
        'task_id': task['id'],
        'status': task['status'],       # pending / processing / completed / failed
        'progress': task['progress'],    # 0-100
        'total': task['total_count'],
        'success': task['success_count'],
        'failed': task['error_count']
    }

    if task['status'] == 'completed':
        response['completed_at'] = task['completed_at'].isoformat()
        response['duration_seconds'] = (
            task['completed_at'] - task['created_at']
        ).total_seconds()

    return jsonify(response)

客户端轮询

// 客户端示例:轮询任务进度
async function submitBatchAndTrack(urls) {
    // 1. 提交任务
    const submitResp = await fetch('/api/shorten/batch-async', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ urls })
    });

    const { task_id } = await submitResp.json();

    // 2. 轮询进度
    while (true) {
        const statusResp = await fetch(`/api/batch/${task_id}/status`);
        const status = await statusResp.json();

        updateProgressBar(status.progress);
        updateStatusText(`已处理 ${status.success} 条,失败 ${status.failed} 条`);

        if (status.status === 'completed') {
            showSuccessMessage(`全部完成!成功 ${status.success} 条`);
            break;
        }

        if (status.status === 'failed') {
            showErrorMessage('任务处理失败');
            break;
        }

        // 每秒查询一次
        await new Promise(r => setTimeout(r, 1000));
    }
}

批量删除和更新

批量删除

“后来客户又提出:大促结束了,那些活动链接能不能批量删除?“

@app.route('/api/links/batch-delete', methods=['POST'])
def batch_delete():
    """批量删除短链接"""
    data = request.get_json()
    short_codes = data.get('short_codes', [])

    if len(short_codes) > 10000:
        return {'error': '单次最多删除 1 万条'}, 400

    # 批量删除(使用 IN 语句)
    placeholders = ','.join(['?'] * len(short_codes))
    cursor = db.execute(
        f"DELETE FROM urls WHERE short_code IN ({placeholders}) AND user_id = ?",
        (*short_codes, request.user['id'])
    )

    deleted_count = cursor.rowcount

    # 清理缓存
    for code in short_codes:
        redis.delete(f"url:{code}")

    return jsonify({
        'requested': len(short_codes),
        'deleted': deleted_count
    })

批量更新

@app.route('/api/links/batch-update', methods=['POST'])
def batch_update():
    """批量更新短链接"""
    updates = request.get_json().get('updates', [])

    if len(updates) > 5000:
        return {'error': '单次最多更新 5000 条'}, 400

    updated_count = 0

    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("BEGIN TRANSACTION")

        for item in updates:
            result = cursor.execute(
                "UPDATE urls SET long_url = ? WHERE short_code = ? AND user_id = ?",
                (item['new_url'], item['short_code'], request.user['id'])
            )
            updated_count += result.rowcount

            # 清除旧缓存
            redis.delete(f"url:{item['short_code']}")

        conn.commit()

    return jsonify({
        'requested': len(updates),
        'updated': updated_count
    })

成本分析

“引入 Celery 后,我算了一笔账。“

组件月成本说明
额外 Redis 实例¥40/月Celery broker
Worker 服务器¥60/月2 核 4G
合计¥100/月

“多花了 ¥100/月,但换来了企业客户的满意度。这个投入值不值?看收入就知道了——批量功能上线后,企业客户付费转化提升了 30%。”