图片上传流程
第一版上传:直接存磁盘
“光影”上线第一天,上传接口只有十几行代码:
@app.route('/api/upload', methods=['POST'])
def upload_image():
file = request.files['image']
filename = f"{uuid.uuid4().hex}.{file.filename.split('.')[-1]}"
filepath = os.path.join(UPLOAD_FOLDER, filename)
file.save(filepath)
return jsonify({'url': f'/photos/{filename}', 'status': 'ok'})简单粗暴,图片直接存到 /var/www/photos/。上线第一天我上传了 3 张照片,一切正常。
然后小李来了。
他带着 5 张索尼 A7R5 拍的夜景,每张 8.7 MB。我的 Flask 服务器接收这 5 张图花了 40 秒——因为 Nginx 默认的 client_max_body_size 只有 1MB,我改成了 50MB,但单线程处理大文件依然很慢。
更要命的是,10 个用户同时上传时,服务器的带宽和内存开始告急:
# 问题的数学
avg_upload_size_mb = 8.7
concurrent_uploads = 10
memory_needed = concurrent_uploads * avg_upload_size_mb # = 87 MB
# Flask 默认把整个文件读入内存
# 10 个并发 = 87 MB 内存被占用
# 加上图片处理(PIL 解码需要 3 倍原始大小)
processing_memory = 87 * 3 # = 261 MB
# 我的服务器总共才 2 GB 内存
# 261 MB / 2048 MB = 12.7% 内存被上传占了
# 再加上应用本身、数据库、Redis...这还没考虑带宽。我的 5Mbps 服务器上行带宽,同时服务上传和下载,直接打满。
第二版上传:流式写入 + 分片
我先优化了上传本身,不把整个文件读入内存:
from werkzeug.formparser import parse_form_data
import shutil
@app.route('/api/upload', methods=['POST'])
def upload_image_v2():
"""流式上传:文件直接写入磁盘,不经过内存"""
file = request.files['image']
# 校验文件类型
allowed_types = {'image/jpeg', 'image/png', 'image/webp', 'image/avif'}
if file.content_type not in allowed_types:
return jsonify({'error': '不支持的文件类型'}), 400
# 校验文件大小(最大 50MB)
file.seek(0, 2) # 移到文件末尾
file_size = file.tell()
file.seek(0) # 回到开头
max_size = 50 * 1024 * 1024
if file_size > max_size:
return jsonify({'error': '文件大小不能超过 50MB'}), 400
# 生成文件名
filename = f"{uuid.uuid4().hex}.{file.filename.split('.')[-1]}"
filepath = os.path.join(UPLOAD_FOLDER, filename)
# 流式写入磁盘(不经过内存)
with open(filepath, 'wb') as f:
shutil.copyfileobj(file.stream, f, 1024 * 1024) # 1MB 缓冲
return jsonify({
'url': f'/photos/{filename}',
'size': file_size,
'status': 'ok'
})内存问题缓解了,但带宽问题还在。于是我又加了分片上传支持:
// 前端:大文件分片上传
class ChunkedUploader {
private chunkSize = 5 * 1024 * 1024; // 5MB 一片
async upload(file: File): Promise<UploadResult> {
// 第一步:获取上传凭证
const { upload_id, object_key } = await fetch('/api/upload/init', {
method: 'POST',
body: JSON.stringify({
filename: file.name,
size: file.size,
content_type: file.type,
}),
}).then(r => r.json());
// 第二步:分片上传
const chunks = Math.ceil(file.size / this.chunkSize);
const results = [];
for (let i = 0; i < chunks; i++) {
const start = i * this.chunkSize;
const end = Math.min(start + this.chunkSize, file.size);
const chunk = file.slice(start, end);
const result = await fetch(`/api/upload/chunk`, {
method: 'POST',
body: JSON.stringify({
upload_id,
part_number: i + 1,
chunk: await chunk.arrayBuffer(),
}),
});
results.push(await result.json());
// 更新进度
this.onProgress?.((i + 1) / chunks * 100);
}
// 第三步:合并分片
return fetch('/api/upload/complete', {
method: 'POST',
body: JSON.stringify({ upload_id, parts: results }),
}).then(r => r.json());
}
onProgress?: (percent: number) => void;
}分片解决了大文件上传的问题,但还有一个根本性的问题——所有流量都经过我的服务器。无论怎么优化,上传带宽和存储都是单机的天花板。
第三版上传:客户端直传 OSS
我研究了一圈,发现业界几乎都在用客户端直传对象存储的方案。
核心思路:服务器不碰文件,只负责签发上传凭证。
第一版:客户端 → 我的服务器 → 磁盘
第二版:客户端 → 我的服务器(流式)→ 磁盘
第三版:客户端 → OSS(服务器只签凭证)import oss2
import time
import hashlib
# OSS 配置
OSS_ACCESS_KEY_ID = 'your_access_key'
OSS_ACCESS_KEY_SECRET = 'your_secret'
OSS_BUCKET_NAME = 'guangying-images'
OSS_ENDPOINT = 'oss-cn-beijing.aliyuncs.com'
@app.route('/api/upload/credential', methods=['POST'])
def get_upload_credential():
"""签发 OSS 上传凭证——服务器只做这一件事"""
data = request.json
# 校验用户权限
user_id = get_current_user_id()
if not user_id:
return jsonify({'error': '请先登录'}), 401
# 校验文件信息
filename = data.get('filename', '')
file_size = data.get('size', 0)
content_type = data.get('content_type', '')
allowed_types = {'image/jpeg', 'image/png', 'image/webp', 'image/avif', 'image/gif'}
if content_type not in allowed_types:
return jsonify({'error': '不支持的文件类型'}), 400
if file_size > 50 * 1024 * 1024:
return jsonify({'error': '文件不能超过 50MB'}), 400
# 生成存储路径(按日期 + hash 分片)
date_prefix = time.strftime('%Y/%m/%d')
file_hash = hashlib.md5(f"{user_id}_{time.time()}_{filename}".encode()).hexdigest()[:8]
ext = filename.rsplit('.', 1)[-1].lower()
object_key = f'originals/{date_prefix}/{file_hash}_{uuid.uuid4().hex[:12]}.{ext}'
# 生成带签名的上传 URL(STS 临时凭证)
auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
bucket = oss2.Bucket(auth, OSS_ENDPOINT, OSS_BUCKET_NAME)
# 生成 PUT 上传 URL,15 分钟有效
upload_url = bucket.sign_url(
'PUT',
object_key,
15 * 60, # 15 分钟过期
headers={'Content-Type': content_type}
)
return jsonify({
'upload_url': upload_url,
'object_key': object_key,
'expires_in': 900,
# 上传成功后的回调
'callback': {
'url': f'https://api.guangying.com/api/upload/callback',
'body': {
'object_key': object_key,
'user_id': user_id,
'size': file_size,
}
}
})
@app.route('/api/upload/callback', methods=['POST'])
def upload_callback():
"""OSS 上传成功后的回调——触发审核和处理流程"""
data = request.json
object_key = data['object_key']
user_id = data['user_id']
# 记录到数据库
photo_id = db.insert('photos', {
'user_id': user_id,
'object_key': object_key,
'status': 'uploaded',
'created_at': datetime.now(),
})
# 发送到审核队列(同步审核,异步处理)
message_queue.publish('audit', {
'photo_id': photo_id,
'object_key': object_key,
})
return jsonify({'photo_id': photo_id, 'status': 'uploaded'})前端直传 OSS:
// 前端:直传 OSS
class OSSDirectUploader {
async upload(file: File, onProgress?: (pct: number) => void): Promise<string> {
// 1. 从服务器获取上传凭证
const credential = await fetch('/api/upload/credential', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
filename: file.name,
size: file.size,
content_type: file.type,
}),
}).then(r => r.json());
if (credential.error) {
throw new Error(credential.error);
}
// 2. 直传 OSS
const xhr = new XMLHttpRequest();
await new Promise<void>((resolve, reject) => {
xhr.upload.onprogress = (e) => {
if (e.lengthComputable && onProgress) {
onProgress(Math.round((e.loaded / e.total) * 100));
}
};
xhr.onload = () => {
if (xhr.status === 200) resolve();
else reject(new Error(`上传失败: ${xhr.status}`));
};
xhr.onerror = () => reject(new Error('网络错误'));
xhr.open('PUT', credential.upload_url);
xhr.setRequestHeader('Content-Type', file.type);
xhr.send(file);
});
// 3. 通知服务器上传完成
const result = await fetch('/api/upload/callback', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
object_key: credential.object_key,
user_id: credential.callback.body.user_id,
size: file.size,
}),
}).then(r => r.json());
return result.photo_id;
}
}对比三版方案
| 指标 | 第一版(磁盘中转) | 第二版(流式分片) | 第三版(直传 OSS) |
|---|---|---|---|
| 服务器带宽消耗 | 上传 + 下载全占 | 上传 + 下载全占 | 几乎为零(只签凭证) |
| 内存占用 | 文件大小 × 并发数 | 极小 | 极小 |
| 支持最大文件 | 受内存限制 | 受磁盘限制 | 5TB(OSS 限制) |
| 断点续传 | ❌ | ✅ | ✅(OSS 分片) |
| 可扩展性 | 单机天花板 | 单机天花板 | 无限(OSS 弹性) |
| 上传速度 | 受服务器带宽限制 | 受服务器带宽限制 | OSS 多线 BGP,极速 |
| 运维复杂度 | 低 | 中 | 中 |
直传 OSS 最大的好处:上传流量完全不走我的服务器。OSS 的带宽是海量的,用户上传 100MB 的 RAW 文件也能在几秒内完成。
安全考量
客户端直传 OSS 后,我需要确保几个安全问题:
# 安全策略
UPLOAD_SECURITY = {
# 1. 上传凭证有时效(15 分钟)
'credential_ttl': 900,
# 2. 限制上传路径(用户不能覆盖其他文件)
'path_pattern': 'originals/{date}/{hash}_{random}.{ext}',
# 3. 限制文件大小(凭证生成时校验)
'max_file_size': 50 * 1024 * 1024,
# 4. 限制 Content-Type(凭证中指定)
'allowed_types': ['image/jpeg', 'image/png', 'image/webp', 'image/avif'],
# 5. 上传频率限制(每用户每分钟最多 10 次)
'rate_limit': '10/minute',
# 6. OSS Bucket Policy(只允许 PUT,不允许 LIST/DELETE)
'bucket_policy': {
'effect': 'Allow',
'action': ['oss:PutObject'],
'resource': ['acs:oss:*:*:guangying-images/originals/*'],
}
}
# 频率限制实现
from functools import wraps
import redis
redis_client = redis.Redis()
def rate_limit(limit='10/minute'):
"""上传频率限制装饰器"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
user_id = get_current_user_id()
key = f'upload_limit:{user_id}'
current = redis_client.get(key)
if current and int(current) >= int(limit.split('/')[0]):
return jsonify({'error': '上传过于频繁,请稍后再试'}), 429
pipe = redis_client.pipeline()
pipe.incr(key)
pipe.expire(key, 60)
pipe.execute()
return f(*args, **kwargs)
return wrapper
return decorator还有一个关键问题:用户可能伪造上传回调。解决方案是在 OSS 侧配置回调签名验证:
# OSS 回调签名验证
import hmac
import base64
def verify_oss_callback(request):
"""验证 OSS 回调请求的签名,防止伪造"""
authorization = request.headers.get('Authorization')
pub_key_url = request.headers.get('x-oss-pub-key-url')
if not authorization or not pub_key_url:
return False
# 获取 OSS 公钥
pub_key = fetch_public_key(pub_key_url)
# 验证签名
message = request.url + '\n' + request.get_data(as_text=True)
try:
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
public_key = serialization.load_pem_public_key(pub_key.encode())
public_key.verify(
base64.b64decode(authorization),
message.encode(),
padding.PKCS1v15(),
)
return True
except Exception:
return False本节小结
✅ 我学到了什么:
- 图片上传不应经过应用服务器——客户端直传 OSS 是业界标准
- 服务器只负责签发临时凭证,不碰文件本身
- 上传安全需要从凭证时效、路径限制、频率控制、回调签名四个维度保障
⚠️ 踩过的坑:
- Nginx 默认
client_max_body_size只有 1MB - PIL 解码图片需要 3 倍于原始大小的内存
- 用户可能伪造上传回调,必须验证 OSS 签名
🎯 下一步:文件上传到 OSS 后,用什么目录结构组织?文件名怎么设计?
我的思考
思考 1
如果 OSS 的签名上传 URL 泄露了(比如被抓包),攻击者能在过期前随意上传文件吗?如何进一步降低这个风险?
签名 URL 泄露确实是个风险。虽然 URL 有 15 分钟有效期,但攻击者在这段时间内可以反复上传。
多层防护方案:
# 第一层:缩短有效期 + 绑定内容
def generate_strict_credential(user_id, file_info):
# 有效期缩短到 5 分钟
upload_url = bucket.sign_url('PUT', object_key, 5 * 60,
headers={
'Content-Type': file_info['content_type'],
'Content-Length': str(file_info['size']), # 固定大小
}
)
return upload_url
# 第二层:服务端记录凭证状态
def track_credential(user_id, object_key):
key = f'upload_cred:{user_id}:{object_key}'
redis_client.set(key, 'pending', ex=300) # 5 分钟
def verify_callback(object_key, user_id):
key = f'upload_cred:{user_id}:{object_key}'
if not redis_client.exists(key):
return False # 凭证不存在或已过期
redis_client.delete(key) # 一次性使用
return True
# 第三层:上传后验证文件内容
def validate_uploaded_file(object_key):
"""验证 OSS 上的文件确实是图片"""
headers = oss_client.head_object(object_key)
# 检查 Content-Type
if headers.headers.get('Content-Type', '') not in ALLOWED_TYPES:
oss_client.delete_object(object_key)
return False
# 检查文件头(Magic Bytes)
first_bytes = oss_client.get_object(object_key, byte_range='0-7')
if not is_valid_image_header(first_bytes):
oss_client.delete_object(object_key)
return False
return True核心原则:不要信任客户端传来的任何信息,一切都要在服务端验证。
思考 2
为什么不直接给客户端 OSS 的 AccessKey,而是用临时签名 URL?两者有什么区别?
直接给 AccessKey = 把仓库钥匙交给别人:
风险 1:客户端代码可以被反编译,AccessKey 会泄露
风险 2:拿到 AccessKey 后可以操作整个 Bucket(读取、删除、覆盖)
风险 3:无法限制操作的文件路径和大小
风险 4:无法设置操作时效签名 URL = 给一把只能开一个柜子的临时钥匙:
优势 1:URL 有时效(5~15 分钟),过期自动失效
优势 2:只能操作指定的 object_key(路径固定)
优势 3:可以限制 HTTP Method(只允许 PUT,不允许 GET/DELETE)
优势 4:AccessKey 不会暴露给客户端(签名在服务端生成)更安全的方式是使用 STS(Security Token Service):
# STS 临时凭证——比签名 URL 更灵活
from aliyunsdkcore.client import AcsClient
from aliyunsdksts.request.v20150401 import AssumeRoleRequest
def generate_sts_token(user_id):
"""生成 STS 临时凭证"""
# 定义策略:只允许上传到指定路径
policy = {
"Version": "1",
"Statement": [{
"Effect": "Allow",
"Action": ["oss:PutObject"],
"Resource": [
f"acs:oss:*:*:guangying-images/originals/{user_id}/*"
],
"Condition": {
"NumericLessThanEquals": {
"oss:ContentLength": 52428800 # 50MB
}
}
}]
}
# 生成临时凭证(15 分钟有效)
request = AssumeRoleRequest.AssumeRoleRequest()
request.set_RoleArn('acs:ram::xxx:role/oss-upload')
request.set_RoleSessionName(f'upload-{user_id}')
request.set_Policy(json.dumps(policy))
request.set_DurationSeconds(900)
response = client.do_action_with_exception(request)
return json.loads(response)STS 的优势是客户端拿到的凭证只能做允许的操作,连路径都被限定在 originals/{user_id}/ 下。
