导航菜单

防止雪崩

场景

某个繁忙的周五下午,发生了系统雪崩。

事件经过:
14:30 - 新闻 API 外部数据源响应慢
14:35 - 新闻 API 占用大量连接
14:40 - 数据库连接池耗尽
14:45 - 所有 API 开始超时
14:50 - 整个系统不可用

影响:
- 所有用户都无法访问
- 持续时间:30 分钟
- 用户投诉激增

根本原因

雪崩效应:

某个服务变慢

占用资源(连接、线程)

其他服务无资源可用

整体系统崩溃

防护措施

1. 熔断器(Circuit Breaker)

from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=60)
def call_external_api(url):
    """调用外部 API(带熔断)"""
    response = requests.get(url, timeout=3)
    response.raise_for_status()
    return response.json()

# 使用时
try:
    data = call_external_api(url)
except circuit.CircuitBreakerError:
    # 熔断器打开,使用降级方案
    return get_cached_data()

2. 限流(Rate Limiting)

# 系统级限流(保护整体)
system_limiter = RateLimiter(rate=1000, capacity=10000)

@app.before_request
def check_system_limit():
    """请求前检查系统限流"""
    if not system_limiter.is_allowed():
        return jsonify({'error': 'System overloaded'}), 503

3. 超时控制

# 设置严格的超时
@app.route('/api/news')
def handle_news():
    try:
        data = call_with_timeout(
            fetch_news,
            timeout=2  # 最多 2 秒
        )
        return jsonify(data)
    except TimeoutError:
        return jsonify({'error': 'Request timeout'}), 504

4. 资源隔离

# 为每个服务分配独立的线程池
executor = ThreadPoolExecutor(max_workers=10)

def handle_in_isolation(func, *args):
    """在独立线程中执行"""
    future = executor.submit(func, *args)
    return future.result(timeout=3)

5. 降级策略

def handle_news_with_fallback():
    """带降级的新闻 API"""

    # 策略 1: 返回缓存
    cached = get_news_from_cache()
    if cached:
        return jsonify({'data': cached, 'source': 'cache'})

    # 策略 2: 返回推荐内容
    recommended = get_recommended_news()
    if recommended:
        return jsonify({'data': recommended, 'source': 'recommended'})

    # 策略 3: 返回友好错误
    return jsonify({
        'error': 'News service temporarily unavailable',
        'message': 'Please try again later'
    }), 503

监控和告警

系统健康监控

def check_system_health():
    """检查系统健康状态"""

    health = {
        'overall': 'healthy',
        'services': {}
    }

    # 检查各个服务
    services = ['news', 'weather', 'stock', 'database', 'redis']

    for service in services:
        try:
            # 尝试调用服务
            response = ping_service(service, timeout=1)
            health['services'][service] = {
                'status': 'healthy' if response.ok else 'degraded',
                'response_time': response.elapsed.total_seconds()
            }
        except Exception as e:
            health['services'][service] = {
                'status': 'down',
                'error': str(e)
            }
            health['overall'] = 'degraded'

    return health

@app.route('/health')
def health_check():
    return jsonify(check_system_health())

应急预案

自动降级

def emergency_degrade():
    """应急降级"""

    # 1. 关闭非核心功能
    disable_feature('news')
    disable_feature('stock')

    # 2. 降低缓存时间
    reduce_cache_ttl(all_apis=0.5)

    # 3. 增加限流
    increase_rate_limit(factor=0.5)

    # 4. 启用维护模式
    enable_maintenance_mode()

    send_alert('Emergency degradation activated')

效果验证

优化前

雪崩事件:
- 触发条件:单个 API 故障
- 影响范围:整个系统
- 恢复时间:30 分钟

优化后

隔离后:
- 单个 API 故障:仅影响该 API
- 其他 API:正常运行
- 自动恢复:2 分钟

本节小结

✅ 完成的工作:

  • 实现了熔断机制
  • 实现了资源隔离
  • 实现了自动降级
  • 完善了监控告警

✅ 效果:

  • 单点故障不影响整体
  • 系统可用性提升到 99.95%
  • 快速自动恢复

🎯 完成!我已经学会了构建高可用系统

搜索