导航菜单

图片上传流程

第一版上传:直接存磁盘

“光影”上线第一天,上传接口只有十几行代码:

@app.route('/api/upload', methods=['POST'])
def upload_image():
    file = request.files['image']
    filename = f"{uuid.uuid4().hex}.{file.filename.split('.')[-1]}"
    filepath = os.path.join(UPLOAD_FOLDER, filename)
    file.save(filepath)
    return jsonify({'url': f'/photos/{filename}', 'status': 'ok'})

简单粗暴,图片直接存到 /var/www/photos/。上线第一天我上传了 3 张照片,一切正常。

然后小李来了。

他带着 5 张索尼 A7R5 拍的夜景,每张 8.7 MB。我的 Flask 服务器接收这 5 张图花了 40 秒——因为 Nginx 默认的 client_max_body_size 只有 1MB,我改成了 50MB,但单线程处理大文件依然很慢。

更要命的是,10 个用户同时上传时,服务器的带宽和内存开始告急:

# 问题的数学
avg_upload_size_mb = 8.7
concurrent_uploads = 10
memory_needed = concurrent_uploads * avg_upload_size_mb  # = 87 MB

# Flask 默认把整个文件读入内存
# 10 个并发 = 87 MB 内存被占用
# 加上图片处理(PIL 解码需要 3 倍原始大小)
processing_memory = 87 * 3  # = 261 MB

# 我的服务器总共才 2 GB 内存
# 261 MB / 2048 MB = 12.7% 内存被上传占了
# 再加上应用本身、数据库、Redis...

这还没考虑带宽。我的 5Mbps 服务器上行带宽,同时服务上传和下载,直接打满。

第二版上传:流式写入 + 分片

我先优化了上传本身,不把整个文件读入内存:

from werkzeug.formparser import parse_form_data
import shutil

@app.route('/api/upload', methods=['POST'])
def upload_image_v2():
    """流式上传:文件直接写入磁盘,不经过内存"""
    file = request.files['image']
    
    # 校验文件类型
    allowed_types = {'image/jpeg', 'image/png', 'image/webp', 'image/avif'}
    if file.content_type not in allowed_types:
        return jsonify({'error': '不支持的文件类型'}), 400
    
    # 校验文件大小(最大 50MB)
    file.seek(0, 2)  # 移到文件末尾
    file_size = file.tell()
    file.seek(0)     # 回到开头
    max_size = 50 * 1024 * 1024
    if file_size > max_size:
        return jsonify({'error': '文件大小不能超过 50MB'}), 400
    
    # 生成文件名
    filename = f"{uuid.uuid4().hex}.{file.filename.split('.')[-1]}"
    filepath = os.path.join(UPLOAD_FOLDER, filename)
    
    # 流式写入磁盘(不经过内存)
    with open(filepath, 'wb') as f:
        shutil.copyfileobj(file.stream, f, 1024 * 1024)  # 1MB 缓冲
    
    return jsonify({
        'url': f'/photos/{filename}',
        'size': file_size,
        'status': 'ok'
    })

内存问题缓解了,但带宽问题还在。于是我又加了分片上传支持:

// 前端:大文件分片上传
class ChunkedUploader {
  private chunkSize = 5 * 1024 * 1024; // 5MB 一片

  async upload(file: File): Promise<UploadResult> {
    // 第一步:获取上传凭证
    const { upload_id, object_key } = await fetch('/api/upload/init', {
      method: 'POST',
      body: JSON.stringify({
        filename: file.name,
        size: file.size,
        content_type: file.type,
      }),
    }).then(r => r.json());

    // 第二步:分片上传
    const chunks = Math.ceil(file.size / this.chunkSize);
    const results = [];

    for (let i = 0; i < chunks; i++) {
      const start = i * this.chunkSize;
      const end = Math.min(start + this.chunkSize, file.size);
      const chunk = file.slice(start, end);

      const result = await fetch(`/api/upload/chunk`, {
        method: 'POST',
        body: JSON.stringify({
          upload_id,
          part_number: i + 1,
          chunk: await chunk.arrayBuffer(),
        }),
      });
      results.push(await result.json());

      // 更新进度
      this.onProgress?.((i + 1) / chunks * 100);
    }

    // 第三步:合并分片
    return fetch('/api/upload/complete', {
      method: 'POST',
      body: JSON.stringify({ upload_id, parts: results }),
    }).then(r => r.json());
  }

  onProgress?: (percent: number) => void;
}

分片解决了大文件上传的问题,但还有一个根本性的问题——所有流量都经过我的服务器。无论怎么优化,上传带宽和存储都是单机的天花板。

第三版上传:客户端直传 OSS

我研究了一圈,发现业界几乎都在用客户端直传对象存储的方案。

核心思路:服务器不碰文件,只负责签发上传凭证。

第一版:客户端 → 我的服务器 → 磁盘
第二版:客户端 → 我的服务器(流式)→ 磁盘
第三版:客户端 → OSS(服务器只签凭证)
import oss2
import time
import hashlib

# OSS 配置
OSS_ACCESS_KEY_ID = 'your_access_key'
OSS_ACCESS_KEY_SECRET = 'your_secret'
OSS_BUCKET_NAME = 'guangying-images'
OSS_ENDPOINT = 'oss-cn-beijing.aliyuncs.com'

@app.route('/api/upload/credential', methods=['POST'])
def get_upload_credential():
    """签发 OSS 上传凭证——服务器只做这一件事"""
    data = request.json
    
    # 校验用户权限
    user_id = get_current_user_id()
    if not user_id:
        return jsonify({'error': '请先登录'}), 401
    
    # 校验文件信息
    filename = data.get('filename', '')
    file_size = data.get('size', 0)
    content_type = data.get('content_type', '')
    
    allowed_types = {'image/jpeg', 'image/png', 'image/webp', 'image/avif', 'image/gif'}
    if content_type not in allowed_types:
        return jsonify({'error': '不支持的文件类型'}), 400
    
    if file_size > 50 * 1024 * 1024:
        return jsonify({'error': '文件不能超过 50MB'}), 400
    
    # 生成存储路径(按日期 + hash 分片)
    date_prefix = time.strftime('%Y/%m/%d')
    file_hash = hashlib.md5(f"{user_id}_{time.time()}_{filename}".encode()).hexdigest()[:8]
    ext = filename.rsplit('.', 1)[-1].lower()
    object_key = f'originals/{date_prefix}/{file_hash}_{uuid.uuid4().hex[:12]}.{ext}'
    
    # 生成带签名的上传 URL(STS 临时凭证)
    auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
    bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME)
    
    # 生成 PUT 上传 URL,15 分钟有效
    upload_url = bucket.sign_url(
        'PUT', 
        object_key, 
        15 * 60,  # 15 分钟过期
        headers={'Content-Type': content_type}
    )
    
    return jsonify({
        'upload_url': upload_url,
        'object_key': object_key,
        'expires_in': 900,
        # 上传成功后的回调
        'callback': {
            'url': f'https://api.guangying.com/api/upload/callback',
            'body': {
                'object_key': object_key,
                'user_id': user_id,
                'size': file_size,
            }
        }
    })


@app.route('/api/upload/callback', methods=['POST'])
def upload_callback():
    """OSS 上传成功后的回调——触发审核和处理流程"""
    data = request.json
    object_key = data['object_key']
    user_id = data['user_id']
    
    # 记录到数据库
    photo_id = db.insert('photos', {
        'user_id': user_id,
        'object_key': object_key,
        'status': 'uploaded',
        'created_at': datetime.now(),
    })
    
    # 发送到审核队列(同步审核,异步处理)
    message_queue.publish('audit', {
        'photo_id': photo_id,
        'object_key': object_key,
    })
    
    return jsonify({'photo_id': photo_id, 'status': 'uploaded'})

前端直传 OSS:

// 前端:直传 OSS
class OSSDirectUploader {
  async upload(file: File, onProgress?: (pct: number) => void): Promise<string> {
    // 1. 从服务器获取上传凭证
    const credential = await fetch('/api/upload/credential', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        filename: file.name,
        size: file.size,
        content_type: file.type,
      }),
    }).then(r => r.json());

    if (credential.error) {
      throw new Error(credential.error);
    }

    // 2. 直传 OSS
    const xhr = new XMLHttpRequest();
    
    await new Promise<void>((resolve, reject) => {
      xhr.upload.onprogress = (e) => {
        if (e.lengthComputable && onProgress) {
          onProgress(Math.round((e.loaded / e.total) * 100));
        }
      };
      xhr.onload = () => {
        if (xhr.status === 200) resolve();
        else reject(new Error(`上传失败: ${xhr.status}`));
      };
      xhr.onerror = () => reject(new Error('网络错误'));
      
      xhr.open('PUT', credential.upload_url);
      xhr.setRequestHeader('Content-Type', file.type);
      xhr.send(file);
    });

    // 3. 通知服务器上传完成
    const result = await fetch('/api/upload/callback', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        object_key: credential.object_key,
        user_id: credential.callback.body.user_id,
        size: file.size,
      }),
    }).then(r => r.json());

    return result.photo_id;
  }
}

对比三版方案

指标第一版(磁盘中转)第二版(流式分片)第三版(直传 OSS)
服务器带宽消耗上传 + 下载全占上传 + 下载全占几乎为零(只签凭证)
内存占用文件大小 × 并发数极小极小
支持最大文件受内存限制受磁盘限制5TB(OSS 限制)
断点续传✅(OSS 分片)
可扩展性单机天花板单机天花板无限(OSS 弹性)
上传速度受服务器带宽限制受服务器带宽限制OSS 多线 BGP,极速
运维复杂度

直传 OSS 最大的好处:上传流量完全不走我的服务器。OSS 的带宽是海量的,用户上传 100MB 的 RAW 文件也能在几秒内完成。

安全考量

客户端直传 OSS 后,我需要确保几个安全问题:

# 安全策略
UPLOAD_SECURITY = {
    # 1. 上传凭证有时效(15 分钟)
    'credential_ttl': 900,
    
    # 2. 限制上传路径(用户不能覆盖其他文件)
    'path_pattern': 'originals/{date}/{hash}_{random}.{ext}',
    
    # 3. 限制文件大小(凭证生成时校验)
    'max_file_size': 50 * 1024 * 1024,
    
    # 4. 限制 Content-Type(凭证中指定)
    'allowed_types': ['image/jpeg', 'image/png', 'image/webp', 'image/avif'],
    
    # 5. 上传频率限制(每用户每分钟最多 10 次)
    'rate_limit': '10/minute',
    
    # 6. OSS Bucket Policy(只允许 PUT,不允许 LIST/DELETE)
    'bucket_policy': {
        'effect': 'Allow',
        'action': ['oss:PutObject'],
        'resource': ['acs:oss:*:*:guangying-images/originals/*'],
    }
}

# 频率限制实现
from functools import wraps
import redis

redis_client = redis.Redis()

def rate_limit(limit='10/minute'):
    """上传频率限制装饰器"""
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            user_id = get_current_user_id()
            key = f'upload_limit:{user_id}'
            
            current = redis_client.get(key)
            if current and int(current) >= int(limit.split('/')[0]):
                return jsonify({'error': '上传过于频繁,请稍后再试'}), 429
            
            pipe = redis_client.pipeline()
            pipe.incr(key)
            pipe.expire(key, 60)
            pipe.execute()
            
            return f(*args, **kwargs)
        return wrapper
    return decorator

还有一个关键问题:用户可能伪造上传回调。解决方案是在 OSS 侧配置回调签名验证:

# OSS 回调签名验证
import hmac
import base64

def verify_oss_callback(request):
    """验证 OSS 回调请求的签名,防止伪造"""
    authorization = request.headers.get('Authorization')
    pub_key_url = request.headers.get('x-oss-pub-key-url')
    
    if not authorization or not pub_key_url:
        return False
    
    # 获取 OSS 公钥
    pub_key = fetch_public_key(pub_key_url)
    
    # 验证签名
    message = request.url + '\n' + request.get_data(as_text=True)
    try:
        from cryptography.hazmat.primitives import serialization
        from cryptography.hazmat.primitives.asymmetric import padding
        
        public_key = serialization.load_pem_public_key(pub_key.encode())
        public_key.verify(
            base64.b64decode(authorization),
            message.encode(),
            padding.PKCS1v15(),
        )
        return True
    except Exception:
        return False

本节小结

我学到了什么

  • 图片上传不应经过应用服务器——客户端直传 OSS 是业界标准
  • 服务器只负责签发临时凭证,不碰文件本身
  • 上传安全需要从凭证时效、路径限制、频率控制、回调签名四个维度保障

⚠️ 踩过的坑

  • Nginx 默认 client_max_body_size 只有 1MB
  • PIL 解码图片需要 3 倍于原始大小的内存
  • 用户可能伪造上传回调,必须验证 OSS 签名

🎯 下一步:文件上传到 OSS 后,用什么目录结构组织?文件名怎么设计?

我的思考

思考 1

如果 OSS 的签名上传 URL 泄露了(比如被抓包),攻击者能在过期前随意上传文件吗?如何进一步降低这个风险?

参考答案

签名 URL 泄露确实是个风险。虽然 URL 有 15 分钟有效期,但攻击者在这段时间内可以反复上传。

多层防护方案

# 第一层:缩短有效期 + 绑定内容
def generate_strict_credential(user_id, file_info):
    # 有效期缩短到 5 分钟
    upload_url = bucket.sign_url('PUT', object_key, 5 * 60,
        headers={
            'Content-Type': file_info['content_type'],
            'Content-Length': str(file_info['size']),  # 固定大小
        }
    )
    return upload_url

# 第二层:服务端记录凭证状态
def track_credential(user_id, object_key):
    key = f'upload_cred:{user_id}:{object_key}'
    redis_client.set(key, 'pending', ex=300)  # 5 分钟

def verify_callback(object_key, user_id):
    key = f'upload_cred:{user_id}:{object_key}'
    if not redis_client.exists(key):
        return False  # 凭证不存在或已过期
    redis_client.delete(key)  # 一次性使用
    return True

# 第三层:上传后验证文件内容
def validate_uploaded_file(object_key):
    """验证 OSS 上的文件确实是图片"""
    headers = oss_client.head_object(object_key)
    
    # 检查 Content-Type
    if headers.headers.get('Content-Type', '') not in ALLOWED_TYPES:
        oss_client.delete_object(object_key)
        return False
    
    # 检查文件头(Magic Bytes)
    first_bytes = oss_client.get_object(object_key, byte_range='0-7')
    if not is_valid_image_header(first_bytes):
        oss_client.delete_object(object_key)
        return False
    
    return True

核心原则:不要信任客户端传来的任何信息,一切都要在服务端验证。

思考 2

为什么不直接给客户端 OSS 的 AccessKey,而是用临时签名 URL?两者有什么区别?

参考答案

直接给 AccessKey = 把仓库钥匙交给别人

风险 1:客户端代码可以被反编译,AccessKey 会泄露
风险 2:拿到 AccessKey 后可以操作整个 Bucket(读取、删除、覆盖)
风险 3:无法限制操作的文件路径和大小
风险 4:无法设置操作时效

签名 URL = 给一把只能开一个柜子的临时钥匙

优势 1:URL 有时效(5~15 分钟),过期自动失效
优势 2:只能操作指定的 object_key(路径固定)
优势 3:可以限制 HTTP Method(只允许 PUT,不允许 GET/DELETE)
优势 4:AccessKey 不会暴露给客户端(签名在服务端生成)

更安全的方式是使用 STS(Security Token Service)

# STS 临时凭证——比签名 URL 更灵活
from aliyunsdkcore.client import AcsClient
from aliyunsdksts.request.v20150401 import AssumeRoleRequest

def generate_sts_token(user_id):
    """生成 STS 临时凭证"""
    # 定义策略:只允许上传到指定路径
    policy = {
        "Version": "1",
        "Statement": [{
            "Effect": "Allow",
            "Action": ["oss:PutObject"],
            "Resource": [
                f"acs:oss:*:*:guangying-images/originals/{user_id}/*"
            ],
            "Condition": {
                "NumericLessThanEquals": {
                    "oss:ContentLength": 52428800  # 50MB
                }
            }
        }]
    }
    
    # 生成临时凭证(15 分钟有效)
    request = AssumeRoleRequest.AssumeRoleRequest()
    request.set_RoleArn('acs:ram::xxx:role/oss-upload')
    request.set_RoleSessionName(f'upload-{user_id}')
    request.set_Policy(json.dumps(policy))
    request.set_DurationSeconds(900)
    
    response = client.do_action_with_exception(request)
    return json.loads(response)

STS 的优势是客户端拿到的凭证只能做允许的操作,连路径都被限定在 originals/{user_id}/ 下。

搜索