防止雪崩
场景
某个繁忙的周五下午,发生了系统雪崩。
事件经过:
14:30 - 新闻 API 外部数据源响应慢
14:35 - 新闻 API 占用大量连接
14:40 - 数据库连接池耗尽
14:45 - 所有 API 开始超时
14:50 - 整个系统不可用
影响:
- 所有用户都无法访问
- 持续时间:30 分钟
- 用户投诉激增根本原因
雪崩效应:
某个服务变慢
↓
占用资源(连接、线程)
↓
其他服务无资源可用
↓
整体系统崩溃防护措施
1. 熔断器(Circuit Breaker)
from circuitbreaker import circuit
@circuit(failure_threshold=5, recovery_timeout=60)
def call_external_api(url):
"""调用外部 API(带熔断)"""
response = requests.get(url, timeout=3)
response.raise_for_status()
return response.json()
# 使用时
try:
data = call_external_api(url)
except circuit.CircuitBreakerError:
# 熔断器打开,使用降级方案
return get_cached_data()2. 限流(Rate Limiting)
# 系统级限流(保护整体)
system_limiter = RateLimiter(rate=1000, capacity=10000)
@app.before_request
def check_system_limit():
"""请求前检查系统限流"""
if not system_limiter.is_allowed():
return jsonify({'error': 'System overloaded'}), 5033. 超时控制
# 设置严格的超时
@app.route('/api/news')
def handle_news():
try:
data = call_with_timeout(
fetch_news,
timeout=2 # 最多 2 秒
)
return jsonify(data)
except TimeoutError:
return jsonify({'error': 'Request timeout'}), 5044. 资源隔离
# 为每个服务分配独立的线程池
executor = ThreadPoolExecutor(max_workers=10)
def handle_in_isolation(func, *args):
"""在独立线程中执行"""
future = executor.submit(func, *args)
return future.result(timeout=3)5. 降级策略
def handle_news_with_fallback():
"""带降级的新闻 API"""
# 策略 1: 返回缓存
cached = get_news_from_cache()
if cached:
return jsonify({'data': cached, 'source': 'cache'})
# 策略 2: 返回推荐内容
recommended = get_recommended_news()
if recommended:
return jsonify({'data': recommended, 'source': 'recommended'})
# 策略 3: 返回友好错误
return jsonify({
'error': 'News service temporarily unavailable',
'message': 'Please try again later'
}), 503监控和告警
系统健康监控
def check_system_health():
"""检查系统健康状态"""
health = {
'overall': 'healthy',
'services': {}
}
# 检查各个服务
services = ['news', 'weather', 'stock', 'database', 'redis']
for service in services:
try:
# 尝试调用服务
response = ping_service(service, timeout=1)
health['services'][service] = {
'status': 'healthy' if response.ok else 'degraded',
'response_time': response.elapsed.total_seconds()
}
except Exception as e:
health['services'][service] = {
'status': 'down',
'error': str(e)
}
health['overall'] = 'degraded'
return health
@app.route('/health')
def health_check():
return jsonify(check_system_health())应急预案
自动降级
def emergency_degrade():
"""应急降级"""
# 1. 关闭非核心功能
disable_feature('news')
disable_feature('stock')
# 2. 降低缓存时间
reduce_cache_ttl(all_apis=0.5)
# 3. 增加限流
increase_rate_limit(factor=0.5)
# 4. 启用维护模式
enable_maintenance_mode()
send_alert('Emergency degradation activated')效果验证
优化前
雪崩事件:
- 触发条件:单个 API 故障
- 影响范围:整个系统
- 恢复时间:30 分钟优化后
隔离后:
- 单个 API 故障:仅影响该 API
- 其他 API:正常运行
- 自动恢复:2 分钟本节小结
✅ 完成的工作:
- 实现了熔断机制
- 实现了资源隔离
- 实现了自动降级
- 完善了监控告警
✅ 效果:
- 单点故障不影响整体
- 系统可用性提升到 99.95%
- 快速自动恢复
🎯 完成!我已经学会了构建高可用系统
