防滥用机制
那天创建量暴增 10 倍
一个普通的周二上午,我打开监控面板,差点从椅子上跳起来。
⚠️ 异常告警
- 当前 QPS:3,500(正常约 300)
- 今日新建链接:52,317 条(日均约 5,000 条)
- 服务器 CPU:89%我查了一下创建日志——一个用户,过去 1 小时内,创建了 50,000 条短链接。全部指向同一个垃圾营销网站。
“这不是正常使用,这是在薅我的资源。“
行为分析
正常用户 vs 滥用者
我花了半天时间,对比了正常用户和滥用者的行为模式。
def analyze_user_behavior(user_id):
"""分析用户行为"""
links = db.query("""
SELECT long_url, created_at
FROM urls
WHERE user_id = ?
AND created_at >= DATE_SUB(NOW(), INTERVAL 24 HOUR)
ORDER BY created_at
""", (user_id,))
from urllib.parse import urlparse
from collections import Counter
domains = [urlparse(l['long_url']).netloc for l in links]
domain_counts = Counter(domains)
# 关键指标
return {
'total_links': len(links),
'unique_domains': len(set(domains)),
'top_domain_ratio': domain_counts.most_common(1)[0][1] / len(domains) if domains else 0,
'links_per_hour': len(links) / max(1, (datetime.now() - links[0]['created_at']).seconds / 3600),
'creation_intervals': [
(links[i+1]['created_at'] - links[i]['created_at']).total_seconds()
for i in range(min(len(links)-1, 99))
]
}分析结果
| 指标 | 正常用户 | 滥用者 |
|---|---|---|
| 24h 创建量 | 5-50 条 | 50,000 条 |
| 域名集中度 | < 30% | > 95% |
| 创建间隔 | 10-300 秒 | < 0.1 秒(脚本) |
| 活跃时段 | 工作时间 | 凌晨 2-5 点 |
| 账户年龄 | 30+ 天 | 当天注册 |
“特征很明显:量大、集中、速度快、凌晨活跃。“
风控规则
“分析清楚后,我设计了多层风控规则。“
第一层:注册门槛
class RegistrationGate:
"""注册门槛"""
def check_before_create(self, user_id):
user = db.query("SELECT * FROM users WHERE id = ?", (user_id,))[0]
# 检查 1:邮箱是否已验证
if not user['email_verified']:
return {'allowed': False, 'reason': '请先验证邮箱'}
# 检查 2:账户注册时间
age_days = (datetime.now() - user['created_at']).days
if age_days < 1:
return {'allowed': False, 'reason': '新注册用户需等待 24 小时'}
return {'allowed': True}第二层:频率限制
class RateLimiter:
"""创建频率限制"""
def __init__(self):
self.redis = redis.Redis(host='localhost', port=6379, db=3)
def check_rate(self, user_id):
# 每分钟限制
minute_key = f'rate:minute:{user_id}'
minute_count = self.redis.incr(minute_key)
if minute_count == 1:
self.redis.expire(minute_key, 60)
if minute_count > 10:
return {'allowed': False, 'reason': f'1 分钟内最多创建 10 条(当前 {minute_count})'}
# 每小时限制
hour_key = f'rate:hour:{user_id}'
hour_count = self.redis.incr(hour_key)
if hour_count == 1:
self.redis.expire(hour_key, 3600)
if hour_count > 100:
return {'allowed': False, 'reason': f'1 小时内最多创建 100 条(当前 {hour_count})'}
# 每天限制
day_key = f'rate:day:{user_id}'
day_count = self.redis.incr(day_key)
if day_count == 1:
self.redis.expire(day_key, 86400)
if day_count > 500:
return {'allowed': False, 'reason': f'每天最多创建 500 条(当前 {day_count})'}
return {'allowed': True}第三层:异常检测
class AnomalyDetector:
"""异常行为检测"""
def check(self, user_id, url):
# 检测 1:目标域名集中度
recent = db.query("""
SELECT long_url FROM urls
WHERE user_id = ? AND created_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
""", (user_id,))
if len(recent) >= 10:
from urllib.parse import urlparse
from collections import Counter
domains = [urlparse(r['long_url']).netloc for r in recent]
domain_counts = Counter(domains)
top_domain, top_count = domain_counts.most_common(1)[0]
if top_count / len(domains) > 0.8:
return {
'abuse': True,
'reason': f'80% 以上链接指向同一域名: {top_domain}',
'level': 'high'
}
# 检测 2:凌晨异常活跃
current_hour = datetime.now().hour
if 2 <= current_hour <= 5:
hour_count = len(recent)
if hour_count > 50:
return {
'abuse': True,
'reason': f'凌晨时段异常活跃: {hour_count} 条/小时',
'level': 'medium'
}
# 检测 3:URL 参数操控
if len(recent) >= 20:
from urllib.parse import urlparse
base_urls = []
for r in recent:
parsed = urlparse(r['long_url'])
base_urls.append(f'{parsed.scheme}://{parsed.netloc}{parsed.path}')
base_counts = Counter(base_urls)
for base, count in base_counts.items():
if count > 15:
return {
'abuse': True,
'reason': f'通过参数变化创建大量链接',
'level': 'medium'
}
return {'abuse': False}验证码系统
“对于可疑操作,我不直接拒绝,而是触发验证码。这样既防滥用,又不误伤正常用户。“
class CaptchaGate:
"""验证码门控"""
def should_show_captcha(self, user_id):
"""判断是否需要验证码"""
# 条件 1:1 分钟内创建超过 5 条
minute_key = f'rate:minute:{user_id}'
minute_count = int(self.redis.get(minute_key) or 0)
if minute_count > 5:
return True
# 条件 2:新用户首次创建
user = db.query("SELECT created_at FROM users WHERE id = ?", (user_id,))[0]
if (datetime.now() - user['created_at']).days < 3:
link_count = db.query(
"SELECT COUNT(*) as cnt FROM urls WHERE user_id = ?", (user_id,)
)[0]['cnt']
if link_count < 3:
return True
return False
def verify_captcha(self, user_id, captcha_response):
"""验证滑块验证码"""
# 调用第三方验证码服务验证
result = verify_with_provider(captcha_response)
if result['success']:
# 验证通过,设置标记(1 小时内免验证)
self.redis.setex(f'captcha:passed:{user_id}', 3600, '1')
return True
return False黑名单与封禁
三层防线
class BanSystem:
"""封禁系统"""
def check_and_ban(self, user_id):
"""检查并封禁"""
# IP 黑名单
ip = get_user_ip(user_id)
if self._is_ip_banned(ip):
return {'action': 'ban', 'reason': 'IP 在黑名单中'}
# 邮箱黑名单
email = get_user_email(user_id)
if self._is_email_banned(email):
return {'action': 'ban', 'reason': '邮箱在黑名单中'}
# 用户封禁
user = db.query("SELECT status FROM users WHERE id = ?", (user_id,))[0]
if user['status'] == 'banned':
return {'action': 'ban', 'reason': '账户已被封禁'}
return {'action': 'allow'}
def ban_user(self, user_id, reason, duration='permanent'):
"""封禁用户"""
db.execute(
"UPDATE users SET status = 'banned', ban_reason = ? WHERE id = ?",
(reason, user_id)
)
# 封禁关联信息
ip = get_user_ip(user_id)
email = get_user_email(user_id)
db.execute(
"INSERT INTO ban_records (user_id, ip, email, reason, created_at) VALUES (?, ?, ?, ?, ?)",
(user_id, ip, email, reason, datetime.now())
)
# 删除该用户的所有短链接
db.execute("UPDATE urls SET status = 'banned' WHERE user_id = ?", (user_id,))
logging.warning(f'用户 {user_id} 已被封禁: {reason}')综合风控入口
class AntiAbuseSystem:
"""防滥用系统"""
def __init__(self):
self.reg_gate = RegistrationGate()
self.rate_limiter = RateLimiter()
self.anomaly = AnomalyDetector()
self.captcha = CaptchaGate()
self.ban = BanSystem()
def check(self, user_id, url):
"""创建前的完整检查"""
# 1. 封禁检查
ban_result = self.ban.check_and_ban(user_id)
if ban_result['action'] == 'ban':
return {'allowed': False, 'reason': ban_result['reason']}
# 2. 注册门槛
gate_result = self.reg_gate.check_before_create(user_id)
if not gate_result['allowed']:
return {'allowed': False, 'reason': gate_result['reason']}
# 3. 频率限制
rate_result = self.rate_limiter.check_rate(user_id)
if not rate_result['allowed']:
return {'allowed': False, 'reason': rate_result['reason']}
# 4. 验证码检查
if self.captcha.should_show_captcha(user_id):
return {'allowed': False, 'need_captcha': True}
# 5. 异常检测
anomaly_result = self.anomaly.check(user_id, url)
if anomaly_result.get('abuse'):
level = anomaly_result['level']
if level == 'high':
# 高风险:封禁
self.ban.ban_user(user_id, anomaly_result['reason'])
return {'allowed': False, 'reason': '账户因异常行为被封禁'}
else:
# 中风险:触发验证码
return {'allowed': False, 'need_captcha': True}
return {'allowed': True}在 API 中使用
@app.route('/api/shorten', methods=['POST'])
def create_short_link():
user_id = request.user['id']
url = request.get_json().get('url')
# 防滥用检查
anti_abuse = AntiAbuseSystem()
result = anti_abuse.check(user_id, url)
if not result['allowed']:
if result.get('need_captcha'):
return {'error': '请完成人机验证', 'need_captcha': True}, 403
return {'error': result['reason']}, 403
# 通过检查,正常创建
short_code = generate_short_code(url)
db.execute(
"INSERT INTO urls (short_code, long_url, user_id) VALUES (?, ?, ?)",
(short_code, url, user_id)
)
return {'short_url': f'https://short.url/{short_code}'}, 201效果
上线一周后的数据:
| 指标 | 数值 |
|---|---|
| 总创建请求 | 85,000 |
| 频率限制拦截 | 12,000(14%) |
| 验证码触发 | 3,200(3.8%) |
| 异常检测拦截 | 156 |
| 账户封禁 | 23 |
| 误判申诉 | 2(均已解封) |
“50,000 条/小时的垃圾链接降到了 0。风控系统起作用了。“
持续对抗
“风控不是一次性工程。黑产在不断进化,我也在持续优化规则。”
每周我会做一次风控复盘:
- 检查新出现的滥用模式
- 调整检测阈值
- 更新黑名单
- 评估误判率
“这是一场持续的猫鼠游戏。但至少现在,我不再是被动挨打的那一方了。”
“安全体系建立起来了。回头看这段旅程——从一台服务器到完整的系统……”