导航菜单

用户认证

设计目标

我决定实现用户认证系统,让每个用户有独立的调用配额。

核心需求:

  1. 用户可以注册账号
  2. 系统分配唯一的 API Key
  3. 每次调用必须验证 API Key
  4. 限制每个用户的调用配额

这是一个完整的认证体系,我需要从零开始设计。

方案设计

API Key 的设计

API Key 是什么?

  • 一个唯一的字符串
  • 用来标识用户身份
  • 需要在每次调用时携带

API Key 的格式:

ak_live_xxxxxxxxxxxxxxx  (正式环境)
ak_test_xxxxxxxxxxxxxxx  (测试环境)

格式说明:

  • ak_:前缀,表示 API Key
  • live/test:环境标识
  • 后面:随机字符串

我特意加了环境标识,这样开发和生产环境可以分开管理。

数据模型设计

用户表 (users)

CREATE TABLE users (
  id INT PRIMARY KEY AUTO_INCREMENT,
  email VARCHAR(255) UNIQUE NOT NULL,
  password_hash VARCHAR(255) NOT NULL,
  api_key VARCHAR(64) UNIQUE NOT NULL,
  plan VARCHAR(50) DEFAULT 'free',  -- free, basic, pro
  daily_limit INT DEFAULT 1000,     -- 每日调用限额
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  INDEX idx_api_key (api_key)
);

调用记录表 (api_logs)

CREATE TABLE api_logs (
  id BIGINT PRIMARY KEY AUTO_INCREMENT,
  user_id INT NOT NULL,
  endpoint VARCHAR(255) NOT NULL,
  params TEXT,
  response_time INT,
  status VARCHAR(50),
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  INDEX idx_user_created (user_id, created_at),
  INDEX idx_created (created_at)
);

这两个表的设计,我考虑了很久。用户表要存储核心信息,日志表要记录每一次调用。

API Key 生成

生成安全的 API Key

import secrets
import string

def generate_api_key():
    """生成安全的 API Key"""
    # 生成 32 字节的随机数据
    random_bytes = secrets.token_bytes(32)

    # 转换为十六进制字符串
    api_key = 'ak_live_' + random_bytes.hex()

    return api_key

# 示例输出
# ak_live_a1b2c3d4e5f6... (64 字符)

为什么使用 secrets 模块?

  • 加密安全的随机数生成
  • 不可预测
  • 防止碰撞

我当时想,API Key 是用户的身份标识,必须在源头上保证安全。

用户注册流程

注册接口

@app.route('/api/auth/register', methods=['POST'])
def register():
    data = request.get_json()
    email = data.get('email')
    password = data.get('password')

    # 验证输入
    if not email or not password:
        return jsonify({'error': 'Email and password required'}), 400

    # 检查邮箱是否已注册
    if db.execute('SELECT id FROM users WHERE email = ?', (email,)):
        return jsonify({'error': 'Email already registered'}), 400

    # 生成 API Key
    api_key = generate_api_key()

    # 密码哈希
    password_hash = hashlib.sha256(password.encode()).hexdigest()

    # 保存到数据库
    user_id = db.execute(
        'INSERT INTO users (email, password_hash, api_key) VALUES (?, ?, ?)',
        (email, password_hash, api_key)
    )

    return jsonify({
        'message': 'Registration successful',
        'user_id': user_id,
        'api_key': api_key  ⚠️ 只在注册时返回一次
    }), 201

登录接口

@app.route('/api/auth/login', methods=['POST'])
def login():
    data = request.get_json()
    email = data.get('email')
    password = data.get('password')

    # 查询用户
    user = db.execute(
        'SELECT * FROM users WHERE email = ?',
        (email,)
    )

    if not user:
        return jsonify({'error': 'Invalid credentials'}), 401

    # 验证密码
    password_hash = hashlib.sha256(password.encode()).hexdigest()
    if user['password_hash'] != password_hash:
        return jsonify({'error': 'Invalid credentials'}), 401

    return jsonify({
        'message': 'Login successful',
        'api_key': user['api_key']
    })

我特意让 API Key 只在注册时返回一次,之后就再也看不到了。这样设计是为了安全,但也带来了一些麻烦——用户如果弄丢了 API Key,就只能重置。

API 调用认证

修改天气 API 接口

from functools import wraps

def require_api_key(f):
    """API Key 验证装饰器"""
    @wraps(f)
    def decorated_function(*args, **kwargs):
        # 获取 API Key
        api_key = request.headers.get('X-API-Key')

        if not api_key:
            return jsonify({'error': 'API Key required'}), 401

        # 验证 API Key
        user = db.execute(
            'SELECT * FROM users WHERE api_key = ?',
            (api_key,)
        )

        if not user:
            return jsonify({'error': 'Invalid API Key'}), 401

        # 将用户信息添加到 request
        request.user = user

        return f(*args, **kwargs)
    return decorated_function

@app.route('/api/weather')
@require_api_key
def get_weather():
    city = request.args.get('city')

    # 检查今日配额
    today = datetime.now().date()
    usage = db.execute(
        '''SELECT COUNT(*) as count FROM api_logs
           WHERE user_id = ? AND DATE(created_at) = ?''',
        (request.user['id'], today)
    )['count']

    if usage >= request.user['daily_limit']:
        return jsonify({'error': 'Daily limit exceeded'}), 429

    # ... 正常的业务逻辑

    # 记录调用日志
    db.execute(
        '''INSERT INTO api_logs (user_id, endpoint, params)
           VALUES (?, ?, ?)''',
        (request.user['id'], '/api/weather', f'city={city}')
    )

    return jsonify(result)

用装饰器的方式,我可以把认证逻辑从业务代码中分离出来。这是我从之前写代码的经验中学到的——关注点分离。

客户端使用方式

正确的调用方式

import requests

# 设置 API Key
api_key = 'ak_live_a1b2c3d4e5f6...'

# 调用 API
response = requests.get(
    'https://kuaiyizhi.cn/api/weather?city=北京',
    headers={'X-API-Key': api_key}
)

print(response.json())

错误的调用方式

# ❌ 缺少 API Key
response = requests.get('https://kuaiyizhi.cn/api/weather?city=北京')
# 返回:{"error": "API Key required"}

# ❌ 无效的 API Key
response = requests.get(
    'https://kuaiyizhi.cn/api/weather?city=北京',
    headers={'X-API-Key': 'invalid_key'}
)
# 返回:{"error": "Invalid API Key"}

# ❌ 超出配额
# 返回:{"error": "Daily limit exceeded"}

安全考虑

API Key 的安全问题

问题 1:API Key 泄露

  • 如果 API Key 泄露,他人可以冒用
  • 配额被消耗
  • 产生费用

解决方案:

  1. 提醒用户妥善保管 API Key
  2. 提供重置 API Key 的功能
  3. 设置 IP 白名单(可选)
  4. 监控异常调用

密码存储

问题 2:密码不能明文存储

# ❌ 错误:明文存储
password_hash = password

# ✅ 正确:哈希存储
password_hash = hashlib.sha256(password.encode()).hexdigest()

更好的方案是使用 bcrypt

import bcrypt

# 哈希密码
password_hash = bcrypt.hashpw(password.encode(), bcrypt.gensalt())

# 验证密码
if bcrypt.checkpw(password.encode(), stored_hash):
    # 密码正确

我当时想,安全这件事,再怎么重视也不为过。

效果验证

上线后,我观察了一周的数据:

滥用情况

指标之前现在
异常 IP 调用每天 10+ 个0 个 ✅
外部 API 滥用频繁0 次 ✅
正常用户体验优秀 ✅

用户增长

指标数值
注册用户200 个
日活用户80 个
平均每用户调用100 次/天

看到这些数据,我知道这条路走对了。

新的问题

虽然解决了滥用问题,但带来了一些新挑战:

问题 1:数据存储

用户数据和调用日志越来越多:

  • 用户表:200 条
  • 日志表:每天 5 万条
  • 增长速度:每月 150 万条

应该用什么数据库?

问题 2:查询性能

日志表查询越来越慢:

# 统计用户的今日用量
db.execute(
    '''SELECT COUNT(*) FROM api_logs
       WHERE user_id = ? AND DATE(created_at) = ?'''
)

随着数据量增长,这个查询会越来越慢。

如何优化?

问题 3:数据备份

如果数据库挂了,所有用户数据都会丢失。

如何备份数据?

一个问题解决了,新的问题又来了。这就是做系统的常态吧。

本节小结

✅ 完成的工作:

  • 实现了用户注册和登录
  • 设计了 API Key 机制
  • 添加了调用配额限制
  • 解决了 API 滥用问题

✅ 效果:

  • 完全阻止了滥用行为
  • 提升了正常用户体验
  • 可以追踪每个用户的调用情况

⚠️ 新的问题:

  • 数据存储方案需要优化
  • 查询性能需要提升
  • 数据安全需要保障

下一节,我要解决数据库的问题。

搜索