防滥用机制

那天创建量暴增 10 倍

一个普通的周二上午,我打开监控面板,差点从椅子上跳起来。

⚠️ 异常告警
- 当前 QPS:3,500(正常约 300)
- 今日新建链接:52,317 条(日均约 5,000 条)
- 服务器 CPU:89%

我查了一下创建日志——一个用户,过去 1 小时内,创建了 50,000 条短链接。全部指向同一个垃圾营销网站。

“这不是正常使用,这是在薅我的资源。“


行为分析

正常用户 vs 滥用者

我花了半天时间,对比了正常用户和滥用者的行为模式。

def analyze_user_behavior(user_id):
    """分析用户行为"""
    links = db.query("""
        SELECT long_url, created_at 
        FROM urls 
        WHERE user_id = ? 
          AND created_at >= DATE_SUB(NOW(), INTERVAL 24 HOUR)
        ORDER BY created_at
    """, (user_id,))

    from urllib.parse import urlparse
    from collections import Counter

    domains = [urlparse(l['long_url']).netloc for l in links]
    domain_counts = Counter(domains)

    # 关键指标
    return {
        'total_links': len(links),
        'unique_domains': len(set(domains)),
        'top_domain_ratio': domain_counts.most_common(1)[0][1] / len(domains) if domains else 0,
        'links_per_hour': len(links) / max(1, (datetime.now() - links[0]['created_at']).seconds / 3600),
        'creation_intervals': [
            (links[i+1]['created_at'] - links[i]['created_at']).total_seconds()
            for i in range(min(len(links)-1, 99))
        ]
    }

分析结果

指标正常用户滥用者
24h 创建量5-50 条50,000 条
域名集中度< 30%> 95%
创建间隔10-300 秒< 0.1 秒(脚本)
活跃时段工作时间凌晨 2-5 点
账户年龄30+ 天当天注册

“特征很明显:量大、集中、速度快、凌晨活跃。“


风控规则

“分析清楚后,我设计了多层风控规则。“

第一层:注册门槛

class RegistrationGate:
    """注册门槛"""

    def check_before_create(self, user_id):
        user = db.query("SELECT * FROM users WHERE id = ?", (user_id,))[0]

        # 检查 1:邮箱是否已验证
        if not user['email_verified']:
            return {'allowed': False, 'reason': '请先验证邮箱'}

        # 检查 2:账户注册时间
        age_days = (datetime.now() - user['created_at']).days
        if age_days < 1:
            return {'allowed': False, 'reason': '新注册用户需等待 24 小时'}

        return {'allowed': True}

第二层:频率限制

class RateLimiter:
    """创建频率限制"""

    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, db=3)

    def check_rate(self, user_id):
        # 每分钟限制
        minute_key = f'rate:minute:{user_id}'
        minute_count = self.redis.incr(minute_key)
        if minute_count == 1:
            self.redis.expire(minute_key, 60)
        if minute_count > 10:
            return {'allowed': False, 'reason': f'1 分钟内最多创建 10 条(当前 {minute_count})'}

        # 每小时限制
        hour_key = f'rate:hour:{user_id}'
        hour_count = self.redis.incr(hour_key)
        if hour_count == 1:
            self.redis.expire(hour_key, 3600)
        if hour_count > 100:
            return {'allowed': False, 'reason': f'1 小时内最多创建 100 条(当前 {hour_count})'}

        # 每天限制
        day_key = f'rate:day:{user_id}'
        day_count = self.redis.incr(day_key)
        if day_count == 1:
            self.redis.expire(day_key, 86400)
        if day_count > 500:
            return {'allowed': False, 'reason': f'每天最多创建 500 条(当前 {day_count})'}

        return {'allowed': True}

第三层:异常检测

class AnomalyDetector:
    """异常行为检测"""

    def check(self, user_id, url):
        # 检测 1:目标域名集中度
        recent = db.query("""
            SELECT long_url FROM urls 
            WHERE user_id = ? AND created_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
        """, (user_id,))

        if len(recent) >= 10:
            from urllib.parse import urlparse
            from collections import Counter
            domains = [urlparse(r['long_url']).netloc for r in recent]
            domain_counts = Counter(domains)
            top_domain, top_count = domain_counts.most_common(1)[0]

            if top_count / len(domains) > 0.8:
                return {
                    'abuse': True,
                    'reason': f'80% 以上链接指向同一域名: {top_domain}',
                    'level': 'high'
                }

        # 检测 2:凌晨异常活跃
        current_hour = datetime.now().hour
        if 2 <= current_hour <= 5:
            hour_count = len(recent)
            if hour_count > 50:
                return {
                    'abuse': True,
                    'reason': f'凌晨时段异常活跃: {hour_count} 条/小时',
                    'level': 'medium'
                }

        # 检测 3:URL 参数操控
        if len(recent) >= 20:
            from urllib.parse import urlparse
            base_urls = []
            for r in recent:
                parsed = urlparse(r['long_url'])
                base_urls.append(f'{parsed.scheme}://{parsed.netloc}{parsed.path}')
            base_counts = Counter(base_urls)
            for base, count in base_counts.items():
                if count > 15:
                    return {
                        'abuse': True,
                        'reason': f'通过参数变化创建大量链接',
                        'level': 'medium'
                    }

        return {'abuse': False}

验证码系统

“对于可疑操作,我不直接拒绝,而是触发验证码。这样既防滥用,又不误伤正常用户。“

class CaptchaGate:
    """验证码门控"""

    def should_show_captcha(self, user_id):
        """判断是否需要验证码"""
        # 条件 1:1 分钟内创建超过 5 条
        minute_key = f'rate:minute:{user_id}'
        minute_count = int(self.redis.get(minute_key) or 0)
        if minute_count > 5:
            return True

        # 条件 2:新用户首次创建
        user = db.query("SELECT created_at FROM users WHERE id = ?", (user_id,))[0]
        if (datetime.now() - user['created_at']).days < 3:
            link_count = db.query(
                "SELECT COUNT(*) as cnt FROM urls WHERE user_id = ?", (user_id,)
            )[0]['cnt']
            if link_count < 3:
                return True

        return False

    def verify_captcha(self, user_id, captcha_response):
        """验证滑块验证码"""
        # 调用第三方验证码服务验证
        result = verify_with_provider(captcha_response)
        if result['success']:
            # 验证通过,设置标记(1 小时内免验证)
            self.redis.setex(f'captcha:passed:{user_id}', 3600, '1')
            return True
        return False

黑名单与封禁

三层防线

class BanSystem:
    """封禁系统"""

    def check_and_ban(self, user_id):
        """检查并封禁"""
        # IP 黑名单
        ip = get_user_ip(user_id)
        if self._is_ip_banned(ip):
            return {'action': 'ban', 'reason': 'IP 在黑名单中'}

        # 邮箱黑名单
        email = get_user_email(user_id)
        if self._is_email_banned(email):
            return {'action': 'ban', 'reason': '邮箱在黑名单中'}

        # 用户封禁
        user = db.query("SELECT status FROM users WHERE id = ?", (user_id,))[0]
        if user['status'] == 'banned':
            return {'action': 'ban', 'reason': '账户已被封禁'}

        return {'action': 'allow'}

    def ban_user(self, user_id, reason, duration='permanent'):
        """封禁用户"""
        db.execute(
            "UPDATE users SET status = 'banned', ban_reason = ? WHERE id = ?",
            (reason, user_id)
        )

        # 封禁关联信息
        ip = get_user_ip(user_id)
        email = get_user_email(user_id)
        db.execute(
            "INSERT INTO ban_records (user_id, ip, email, reason, created_at) VALUES (?, ?, ?, ?, ?)",
            (user_id, ip, email, reason, datetime.now())
        )

        # 删除该用户的所有短链接
        db.execute("UPDATE urls SET status = 'banned' WHERE user_id = ?", (user_id,))

        logging.warning(f'用户 {user_id} 已被封禁: {reason}')

综合风控入口

class AntiAbuseSystem:
    """防滥用系统"""

    def __init__(self):
        self.reg_gate = RegistrationGate()
        self.rate_limiter = RateLimiter()
        self.anomaly = AnomalyDetector()
        self.captcha = CaptchaGate()
        self.ban = BanSystem()

    def check(self, user_id, url):
        """创建前的完整检查"""
        # 1. 封禁检查
        ban_result = self.ban.check_and_ban(user_id)
        if ban_result['action'] == 'ban':
            return {'allowed': False, 'reason': ban_result['reason']}

        # 2. 注册门槛
        gate_result = self.reg_gate.check_before_create(user_id)
        if not gate_result['allowed']:
            return {'allowed': False, 'reason': gate_result['reason']}

        # 3. 频率限制
        rate_result = self.rate_limiter.check_rate(user_id)
        if not rate_result['allowed']:
            return {'allowed': False, 'reason': rate_result['reason']}

        # 4. 验证码检查
        if self.captcha.should_show_captcha(user_id):
            return {'allowed': False, 'need_captcha': True}

        # 5. 异常检测
        anomaly_result = self.anomaly.check(user_id, url)
        if anomaly_result.get('abuse'):
            level = anomaly_result['level']
            if level == 'high':
                # 高风险:封禁
                self.ban.ban_user(user_id, anomaly_result['reason'])
                return {'allowed': False, 'reason': '账户因异常行为被封禁'}
            else:
                # 中风险:触发验证码
                return {'allowed': False, 'need_captcha': True}

        return {'allowed': True}

在 API 中使用

@app.route('/api/shorten', methods=['POST'])
def create_short_link():
    user_id = request.user['id']
    url = request.get_json().get('url')

    # 防滥用检查
    anti_abuse = AntiAbuseSystem()
    result = anti_abuse.check(user_id, url)

    if not result['allowed']:
        if result.get('need_captcha'):
            return {'error': '请完成人机验证', 'need_captcha': True}, 403
        return {'error': result['reason']}, 403

    # 通过检查,正常创建
    short_code = generate_short_code(url)
    db.execute(
        "INSERT INTO urls (short_code, long_url, user_id) VALUES (?, ?, ?)",
        (short_code, url, user_id)
    )
    return {'short_url': f'https://short.url/{short_code}'}, 201

效果

上线一周后的数据:

指标数值
总创建请求85,000
频率限制拦截12,000(14%)
验证码触发3,200(3.8%)
异常检测拦截156
账户封禁23
误判申诉2(均已解封)

“50,000 条/小时的垃圾链接降到了 0。风控系统起作用了。“


持续对抗

“风控不是一次性工程。黑产在不断进化,我也在持续优化规则。”

每周我会做一次风控复盘:

  1. 检查新出现的滥用模式
  2. 调整检测阈值
  3. 更新黑名单
  4. 评估误判率

“这是一场持续的猫鼠游戏。但至少现在,我不再是被动挨打的那一方了。”

“安全体系建立起来了。回头看这段旅程——从一台服务器到完整的系统……”