安全防护
场景
活动结束后,遭遇恶意攻击。
攻击特征:
- 短时间内大量请求
- 来自大量不同 IP
- 请求模式相似
- 目标:耗尽服务器资源
影响:
- 服务器 CPU 100%
- 数据库连接耗尽
- 正常用户无法访问攻击分析
日志分析:
- 10 分钟内 100 万次请求
- 来自 5000 个不同 IP
- 大部分请求没有有效 API Key
- 请求集中在几个接口
判断:DDoS 攻击防护策略:多层防护
第 1 层:网络层防护
# 使用 Nginx 限制请求频率
# /etc/nginx/nginx.conf
http {
# 限制单个 IP 的请求频率
limit_req_zone $binary_remote_addr zone=api_limit:10m rate=10r/s;
# 限制单个 IP 的连接数
limit_conn_zone $binary_remote_addr zone=conn_limit:10m;
server {
location /api/ {
# 限制请求频率
limit_req zone=api_limit burst=20 nodelay;
# 限制连接数
limit_conn conn_limit 10;
# 超限返回
limit_req_status 429;
limit_conn_status 429;
proxy_pass http://backend;
}
}
}第 2 层:应用层防护
class SecurityMiddleware:
"""安全中间件"""
def __init__(self):
self.blocked_ips = set()
self.ip_requests = defaultdict(list)
def check_ip_reputation(self, ip):
"""检查 IP 信誉"""
# 检查是否在黑名单
if ip in self.blocked_ips:
return False
# 检查请求频率
recent_requests = self.ip_requests[ip]
now = time.time()
# 清理旧记录(1 分钟前)
self.ip_requests[ip] = [
t for t in recent_requests if now - t < 60
]
# 检查频率(1 分钟内超过 100 次)
if len(self.ip_requests[ip]) > 100:
self.blocked_ips.add(ip)
send_alert(f'IP {ip} blocked due to high request rate')
return False
# 记录这次请求
self.ip_requests[ip].append(now)
return True
security = SecurityMiddleware()
@app.before_request
def security_check():
"""安全检查"""
ip = request.remote_addr
# 检查 IP 信誉
if not security.check_ip_reputation(ip):
return jsonify({'error': 'Too many requests'}), 429
# 检查 API Key
api_key = request.headers.get('X-API-Key')
if not api_key:
return jsonify({'error': 'API Key required'}), 401第 3 层:验证码防护
def check_captcha():
"""检查验证码"""
# 对于可疑请求,要求验证码
if is_suspicious_request():
captcha_token = request.headers.get('X-Captcha-Token')
if not captcha_token:
return jsonify({
'error': 'Captcha required',
'captcha_url': '/captcha'
}), 403
# 验证验证码
if not verify_captcha(captcha_token):
return jsonify({'error': 'Invalid captcha'}), 403
return None
def is_suspicious_request():
"""判断是否为可疑请求"""
# 没有 User-Agent
if not request.headers.get('User-Agent'):
return True
# 请求频率过高
ip = request.remote_addr
if len(security.ip_requests[ip]) > 50:
return True
return False第 4 层:CDN 防护
使用 Cloudflare 等 CDN 服务:
- DDoS 防护
- Web 应用防火墙 (WAF)
- 全球加速
- 自动屏蔽攻击 IP
配置:
1. 域名接入 CDN
2. 启用 DDoS 防护
3. 配置防火墙规则
4. 启用缓存监控和告警
实时监控
def monitor_security():
"""安全监控"""
# 检查系统指标
metrics = {
'qps': get_current_qps(),
'cpu': psutil.cpu_percent(),
'memory': psutil.virtual_memory().percent,
'connections': get_active_connections(),
'blocked_ips': len(security.blocked_ips)
}
# 异常检测
if metrics['qps'] > 1000:
send_alert(f'High QPS detected: {metrics["qps"]}')
if metrics['cpu'] > 90:
send_alert(f'High CPU usage: {metrics["cpu"]}%')
if len(metrics['blocked_ips']) > 100:
send_alert(f'Many IPs blocked: {len(metrics["blocked_ips"])}')
return metrics
# 实时监控(每 10 秒)
scheduler.add_job(
monitor_security,
'interval',
seconds=10,
id='security_monitor'
)应急响应
自动防护策略
class AutoDefense:
"""自动防护系统"""
def __init__(self):
self.protection_level = 0 # 0=正常,1=低防护,2=高防护
def adjust_protection(self):
"""根据威胁自动调整防护级别"""
threat_score = self.calculate_threat_score()
if threat_score > 80:
# 高威胁:启用所有防护
self.enable_high_protection()
elif threat_score > 50:
# 中等威胁:启用部分防护
self.enable_medium_protection()
else:
# 低威胁:正常模式
self.disable_protection()
def enable_high_protection(self):
"""启用高级防护"""
# 启用严格限流
set_rate_limit(requests_per_minute=10)
# 启用验证码
enable_captcha_for_all()
# 启用 CDN 防护模式
enable_cdn_under_attack_mode()
send_alert('High protection mode enabled')
auto_defense = AutoDefense()效果验证
攻击前
正常访问:
- QPS: 10
- 响应时间:50ms
- 用户体验:良好攻击时(无防护)
遭受攻击:
- QPS: 1000
- 响应时间:超时
- 用户体验:系统不可用攻击时(有防护)
多层防护:
- Nginx 层:拦截 90%
- 应用层:拦截 8%
- CDN 层:拦截 1.9%
- 最终到达后端:0.1%
效果:
- 正常用户仍可访问
- 系统保持稳定
- 攻击被有效拦截本节小结
✅ 完成的工作:
- 实现了多层防护策略
- 实现了 IP 信誉检查
- 实现了自动防护调整
- 集成了 CDN 防护
✅ 效果:
- 有效防御 DDoS 攻击
- 正常用户不受影响
- 系统保持可用
🎯 完成!我已经学会了处理海量流量
