导航菜单

API 管理

场景

天气 API 运行稳定,但用户需求更多。

八个月后:

  • 注册用户:4000 个
  • 付费用户:200 个
  • 月收入:¥30000

但运营团队发现:

用户反馈:
- "你们只有天气 API 吗?"
- "需要新闻 API"
- "需要股票查询 API"
- "需要 IP 归属地查询"

市场分析:
- 提供多种 API 的竞争更少
- 用户更愿意为一站式服务付费
- 单个用户会使用多个 API

决策

我决定扩展 API 种类:

第一阶段新增 API:
1. 新闻 API - 获取热点新闻
2. 股票 API - 查询股票行情
3. IP 查询 API - 查询 IP 归属地
4. 邮箱验证 API - 验证邮箱有效性

未来计划:
- 货币汇率 API
- 二维码生成 API
- 短链接 API
- 图片压缩 API

API 管理系统设计

数据库设计

-- API 定义表
CREATE TABLE api_definitions (
  id INT PRIMARY KEY AUTO_INCREMENT,
  name VARCHAR(100) NOT NULL,
  display_name VARCHAR(100) NOT NULL,
  description TEXT,

  -- API 配置
  endpoint VARCHAR(255) NOT NULL,
  method VARCHAR(10) DEFAULT 'GET',
  category VARCHAR(50),  -- weather, news, finance, tools 等

  -- 外部 API 配置
  external_api_url VARCHAR(255),
  external_api_method VARCHAR(10) DEFAULT 'GET',
  requires_auth BOOLEAN DEFAULT TRUE,

  -- 缓存配置
  cache_ttl INT DEFAULT 3600,  -- 缓存时间(秒)

  -- 限流配置
  rate_limit_factor INT DEFAULT 1,  -- 限流倍数(某些 API 可能消耗更多配额)

  -- 状态
  status VARCHAR(20) DEFAULT 'active',  -- active, deprecated, maintenance
  is_public BOOLEAN DEFAULT TRUE,  -- 是否对所有用户开放

  -- 元数据
  documentation TEXT,  -- API 文档(Markdown 格式)
  examples JSON,  -- 示例数据

  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,

  UNIQUE KEY unique_name (name),
  INDEX idx_category (category),
  INDEX idx_status (status)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

-- 用户 API 权限表
CREATE TABLE user_api_permissions (
  id INT PRIMARY KEY AUTO_INCREMENT,
  user_id INT NOT NULL,
  api_id INT NOT NULL,
  granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  expires_at TIMESTAMP NULL,

  UNIQUE KEY unique_user_api (user_id, api_id),
  FOREIGN KEY (user_id) REFERENCES users(id),
  FOREIGN KEY (api_id) REFERENCES api_definitions(id),
  INDEX idx_expires (expires_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

统一 API 框架

API 注册机制

class APIRegistry:
    """API 注册中心"""

    def __init__(self):
        self.apis = {}

    def register(self, name, config):
        """注册 API"""
        self.apis[name] = {
            'name': name,
            'display_name': config.get('display_name', name),
            'description': config.get('description', ''),
            'handler': config['handler'],
            'method': config.get('method', 'GET'),
            'cache_ttl': config.get('cache_ttl', 3600),
            'rate_limit_factor': config.get('rate_limit_factor', 1),
            'requires_auth': config.get('requires_auth', True),
            'category': config.get('category', 'general')
        }

    def get_api(self, name):
        """获取 API 配置"""
        return self.apis.get(name)

    def list_apis(self, category=None):
        """列出所有 API"""
        if category:
            return {k: v for k, v in self.apis.items() if v['category'] == category}
        return self.apis

# 创建全局注册器
api_registry = APIRegistry()

# 注册天气 API
api_registry.register('weather', {
    'display_name': '天气查询',
    'description': '查询指定城市的天气信息',
    'handler': handle_weather_api,
    'category': 'weather',
    'cache_ttl': 3600
})

统一处理函数

@app.route('/api/<api_name>', methods=['GET', 'POST'])
@require_api_key
def handle_api_request(api_name):
    """统一 API 处理入口"""

    # 获取 API 配置
    api_config = api_registry.get_api(api_name)

    if not api_config:
        return jsonify({'error': 'API not found'}), 404

    # 检查用户是否有权限
    if not check_user_api_permission(request.user['id'], api_name):
        return jsonify({'error': 'You do not have access to this API'}), 403

    # 检查限流(考虑 API 的限流倍数)
    limits = get_user_plan_limits(request.user['id'])
    result = rate_limiter.is_allowed(
        request.user['id'],
        limits['rate_limit'] / api_config['rate_limit_factor'],
        limits['burst_capacity']
    )

    if not result['allowed']:
        return jsonify({
            'error': 'Rate limit exceeded',
            'retry_after': api_config['rate_limit_factor']
        }), 429

    # 调用 API 处理函数
    try:
        response = api_config['handler'](request)
        return response
    except Exception as e:
        return jsonify({'error': str(e)}), 500

新增 API 实现

新闻 API

def handle_news_api(request):
    """处理新闻 API 请求"""

    category = request.args.get('category', 'general')
    page = request.args.get('page', 1, type=int)

    # 检查缓存
    cache_key = f'news:{category}:{page}'
    cached = redis_client.get(cache_key)

    if cached:
        return jsonify(json.loads(cached))

    # 调用外部 API
    response = requests.get(
        f'https://news-api.kuaiyizhi.cn/v1/news',
        params={
            'category': category,
            'page': page,
            'apikey': EXTERNAL_NEWS_API_KEY
        },
        timeout=5
    )

    data = response.json()

    # 格式化返回
    result = {
        'category': category,
        'page': page,
        'articles': [
            {
                'title': article['title'],
                'description': article['description'],
                'url': article['url'],
                'published_at': article['publishedAt'],
                'source': article['source']['name']
            }
            for article in data['articles']
        ],
        'total': data['totalResults']
    }

    # 缓存结果(10 分钟)
    redis_client.setex(cache_key, 600, json.dumps(result))

    return jsonify(result)

# 注册新闻 API
api_registry.register('news', {
    'display_name': '新闻查询',
    'description': '获取各类新闻资讯',
    'handler': handle_news_api,
    'category': 'news',
    'cache_ttl': 600
})

股票 API

def handle_stock_api(request):
    """处理股票 API 请求"""

    symbol = request.args.get('symbol')

    if not symbol:
        return jsonify({'error': 'Symbol is required'}), 400

    # 检查缓存
    cache_key = f'stock:{symbol}'
    cached = redis_client.get(cache_key)

    if cached:
        return jsonify(json.loads(cached))

    # 调用外部 API
    response = requests.get(
        f'https://stock-api.kuaiyizhi.cn/v1/quote',
        params={
            'symbol': symbol,
            'apikey': EXTERNAL_STOCK_API_KEY
        },
        timeout=5
    )

    data = response.json()

    # 格式化返回
    result = {
        'symbol': data['symbol'],
        'name': data['companyName'],
        'price': {
            'current': data['latestPrice'],
            'change': data['change'],
            'change_percent': data['changePercent']
        },
        'market': {
            'open': data['open'],
            'high': data['high'],
            'low': data['low'],
            'previous_close': data['previousClose']
        },
        'volume': data['volume'],
        'timestamp': data['latestUpdate']
    }

    # 缓存结果(5 分钟)
    redis_client.setex(cache_key, 300, json.dumps(result))

    return jsonify(result)

# 注册股票 API
api_registry.register('stock', {
    'display_name': '股票查询',
    'description': '查询股票实时行情',
    'handler': handle_stock_api,
    'category': 'finance',
    'cache_ttl': 300,
    'rate_limit_factor': 2  # 股票 API 消耗 2 倍配额
})

IP 查询 API

def handle_ip_query_api(request):
    """处理 IP 查询 API 请求"""

    ip = request.args.get('ip') or request.remote_addr

    # 检查缓存
    cache_key = f'ip_query:{ip}'
    cached = redis_client.get(cache_key)

    if cached:
        return jsonify(json.loads(cached))

    # 调用外部 API
    response = requests.get(
        f'https://ip-api.kuaiyizhi.cn/json/{ip}',
        timeout=3
    )

    data = response.json()

    # 格式化返回
    result = {
        'ip': data['query'],
        'country': data['country'],
        'region': data['regionName'],
        'city': data['city'],
        'isp': data['isp'],
        'location': {
            'lat': data['lat'],
            'lon': data['lon']
        },
        'timezone': data['timezone']
    }

    # 缓存结果(24 小时,IP 数据变化很少)
    redis_client.setex(cache_key, 86400, json.dumps(result))

    return jsonify(result)

# 注册 IP 查询 API
api_registry.register('ip_query', {
    'display_name': 'IP 查询',
    'description': '查询 IP 地址的归属地信息',
    'handler': handle_ip_query_api,
    'category': 'tools',
    'cache_ttl': 86400
})

API 文档

自动生成文档

@app.route('/api/docs')
def api_docs():
    """API 文档页面"""

    apis = api_registry.list_apis()

    # 按类别分组
    categories = {}
    for name, config in apis.items():
        category = config['category']
        if category not in categories:
            categories[category] = []

        categories[category].append({
            'name': name,
            'display_name': config['display_name'],
            'description': config['description'],
            'method': config['method'],
            'cache_ttl': config['cache_ttl']
        })

    return render_template('api_docs.html', categories=categories)

API 列表 API

@app.route('/api/apis')
@require_api_key
def list_apis():
    """获取用户可用的 API 列表"""

    # 获取用户的套餐
    user_plan = get_user_plan_limits(request.user['id'])

    # 获取所有 API
    all_apis = api_registry.list_apis()

    # 过滤用户有权限的 API
    available_apis = []
    for name, config in all_apis.items():
        if check_user_api_permission(request.user['id'], name):
            available_apis.append({
                'name': name,
                'display_name': config['display_name'],
                'description': config['description'],
                'category': config['category'],
                'endpoint': f'/api/{name}',
                'method': config['method'],
                'cache_ttl': config['cache_ttl']
            })

    return jsonify({
        'plan': user_plan['plan'],
        'apis': available_apis
    })

本节小结

✅ 完成的工作:

  • 设计了 API 管理系统
  • 实现了统一 API 注册机制
  • 新增了 3 个 API(新闻、股票、IP 查询)
  • 提供了 API 文档和列表

✅ 业务价值:

  • 丰富了 API 种类
  • 满足了用户多样化需求
  • 提升了平台吸引力

⚠️ 下一步: 我需要考虑某个数据源挂了的情况

🎯 下一步: 某个数据源故障,如何隔离?

搜索