导航菜单

故障隔离

场景

新增 API 上线后,用户量增长。

九个月后:

  • API 数量:4 个
  • 日调用量:80 万次
  • 其中新闻 API 占:60%

问题发生

某天下午,我收到了告警:

【告警】新闻 API 响应异常

- 错误率:95%
- 响应时间:超时
- 时间:14:30-15:00

我赶紧检查,发现:

# 检查外部 API 状态
response = requests.get('https://news-api.kuaiyizhi.cn/v1/health')
# 连接超时

新闻 API 的外部数据源挂了!

问题影响

我查了一下影响范围:

影响分析:
- 新闻 API 调用失败:95% 的错误率
- 其他 API(天气、股票、IP):正常
- 用户体验:部分功能不可用
- 用户投诉:增加了 20%

但更严重的是:

连锁反应:
1. 新闻 API 失败
2. 用户重试新闻 API
3. 服务器资源被新闻 API 占用
4. 其他 API 也变慢了
5. 整体用户体验下降

根本原因

我查了一下代码:

def handle_news_api(request):
    # 调用外部 API(没有超时设置)
    response = requests.get(
        'https://news-api.kuaiyizhi.cn/v1/news'
    )
    # 如果外部 API 挂了,这个请求会一直等待
    # 直到超时(默认很长)
    return jsonify(response.json())

问题:

  1. 没有设置超时时间
  2. 没有错误处理
  3. 没有降级方案
  4. 一个 API 失败影响了其他 API

解决方案

1. 超时控制

def call_external_api(url, params=None, timeout=3):
    """调用外部 API(带超时控制)"""

    try:
        response = requests.get(
            url,
            params=params,
            timeout=timeout  # 设置超时
        )
        response.raise_for_status()
        return response.json()
    except requests.Timeout:
        raise Exception('External API timeout')
    except requests.RequestException as e:
        raise Exception(f'External API error: {e}')

2. 错误处理

def handle_news_api(request):
    """处理新闻 API 请求(带错误处理)"""

    category = request.args.get('category', 'general')

    try:
        # 调用外部 API
        data = call_external_api(
            'https://news-api.kuaiyizhi.cn/v1/news',
            params={'category': category},
            timeout=3
        )

        return jsonify({
            'success': True,
            'data': data
        })

    except Exception as e:
        # 记录错误
        logging.error(f'News API error: {e}')

        # 返回友好的错误信息
        return jsonify({
            'success': False,
            'error': 'News service temporarily unavailable',
            'message': 'Please try again later'
        }), 503  # Service Unavailable

3. 熔断机制

class CircuitBreaker:
    """熔断器"""

    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = {}
        self.last_failure_time = {}
        self.state = {}  # closed, open, half-open

    def is_open(self, service_name):
        """检查熔断器是否打开"""

        current_time = time.time()

        # 检查状态
        state = self.state.get(service_name, 'closed')

        if state == 'open':
            # 检查是否可以尝试恢复
            if current_time - self.last_failure_time[service_name] > self.timeout:
                self.state[service_name] = 'half-open'
                return False
            return True

        return False

    def record_success(self, service_name):
        """记录成功"""
        self.failures[service_name] = 0
        self.state[service_name] = 'closed'

    def record_failure(self, service_name):
        """记录失败"""

        self.failures[service_name] = self.failures.get(service_name, 0) + 1
        self.last_failure_time[service_name] = time.time()

        # 检查是否需要打开熔断器
        if self.failures[service_name] >= self.failure_threshold:
            self.state[service_name] = 'open'
            logging.warning(f'Circuit breaker opened for {service_name}')

# 创建全局熔断器
circuit_breaker = CircuitBreaker()

def handle_news_api(request):
    """处理新闻 API 请求(带熔断)"""

    service_name = 'external_news_api'

    # 检查熔断器
    if circuit_breaker.is_open(service_name):
        return jsonify({
            'success': False,
            'error': 'News service is temporarily unavailable',
            'message': 'Service is down, please try again later'
        }), 503

    try:
        # 调用外部 API
        data = call_external_api(
            'https://news-api.kuaiyizhi.cn/v1/news',
            timeout=3
        )

        # 记录成功
        circuit_breaker.record_success(service_name)

        return jsonify({
            'success': True,
            'data': data
        })

    except Exception as e:
        # 记录失败
        circuit_breaker.record_failure(service_name)

        return jsonify({
            'success': False,
            'error': 'News service temporarily unavailable'
        }), 503

4. 降级方案

def handle_news_api(request):
    """处理新闻 API 请求(带降级)"""

    service_name = 'external_news_api'

    # 检查熔断器
    if circuit_breaker.is_open(service_name):
        # 降级:返回缓存的热点新闻
        cached_news = redis_client.get('news:hot:fallback')

        if cached_news:
            return jsonify({
                'success': True,
                'data': json.loads(cached_news),
                'source': 'cache',
                'message': 'Showing cached news due to service issues'
            })

        return jsonify({
            'success': False,
            'error': 'News service temporarily unavailable'
        }), 503

    try:
        # 正常调用
        data = call_external_api(
            'https://news-api.kuaiyizhi.cn/v1/news',
            timeout=3
        )

        circuit_breaker.record_success(service_name)

        # 更新降级缓存
        redis_client.setex('news:hot:fallback', 300, json.dumps(data))

        return jsonify({
            'success': True,
            'data': data
        })

    except Exception as e:
        circuit_breaker.record_failure(service_name)

        # 降级处理
        cached_news = redis_client.get('news:hot:fallback')

        if cached_news:
            return jsonify({
                'success': True,
                'data': json.loads(cached_news),
                'source': 'cache',
                'message': 'Showing cached news due to service issues'
            })

        return jsonify({
            'success': False,
            'error': 'News service temporarily unavailable'
        }), 503

隔离策略

资源隔离

from concurrent.futures import ThreadPoolExecutor

# 为每个外部 API 创建独立的线程池
api_executors = {
    'weather': ThreadPoolExecutor(max_workers=10),
    'news': ThreadPoolExecutor(max_workers=5),
    'stock': ThreadPoolExecutor(max_workers=5),
    'ip_query': ThreadPoolExecutor(max_workers=10)
}

def handle_api_with_isolation(api_name, handler):
    """使用隔离的线程池处理 API"""

    executor = api_executors.get(api_name)

    if not executor:
        # 没有配置隔离,使用默认处理
        return handler(request)

    try:
        # 在独立的线程池中执行
        future = executor.submit(handler, request)
        result = future.result(timeout=5)
        return result

    except TimeoutError:
        return jsonify({
            'success': False,
            'error': f'{api_name} API timeout'
        }), 504

    except Exception as e:
        return jsonify({
            'success': False,
            'error': f'{api_name} API error: {str(e)}'
        }), 500

监控和告警

API 健康检查

def check_api_health(api_name):
    """检查 API 健康状态"""

    health_url = f'https://{api_name}-api.kuaiyizhi.cn/health'

    try:
        response = requests.get(health_url, timeout=3)
        return {
            'api': api_name,
            'status': 'healthy' if response.status_code == 200 else 'unhealthy',
            'response_time': response.elapsed.total_seconds()
        }
    except Exception as e:
        return {
            'api': api_name,
            'status': 'down',
            'error': str(e)
        }

@app.route('/admin/api-health')
def api_health_check():
    """API 健康检查接口"""

    apis = ['weather', 'news', 'stock', 'ip_query']

    health_status = {}
    for api in apis:
        health_status[api] = check_api_health(api)

    return jsonify(health_status)

效果验证

优化前

新闻 API 故障时:
- 所有 API 响应都变慢
- 服务器资源耗尽
- 用户投诉增加 20%

优化后

新闻 API 故障时:
- 新闻 API 快速失败(3 秒超时)
- 熔断器打开,停止调用
- 返回缓存数据或友好错误
- 其他 API 不受影响
- 用户体验基本正常

本节小结

✅ 完成的工作:

  • 实现了超时控制
  • 实现了熔断机制
  • 实现了降级方案
  • 实现了资源隔离

✅ 效果:

  • 单个 API 故障不影响其他 API
  • 提升了系统整体可用性
  • 用户体验更加稳定

⚠️ 下一步: 不同数据的更新频率不同,我需要设计缓存策略

🎯 下一步: 不同数据更新频率不同,如何设计缓存策略?

搜索