恶意网址识别

那封措辞严厉的邮件

那天上午,我收到一封邮件,标题是 “你们的短链接被用于钓鱼诈骗”

如果你们不立即处理这个链接,我们将向公安机关报案。

我们的客户通过你们的 short.url/xxxxx 访问了一个仿冒银行的网站,输入了银行卡号和密码。这是严重的违法行为。

我的心一下子凉了。

我立刻打开那个短链接——果然,跳转到了一个仿冒某银行的钓鱼网站。页面做得极其逼真,如果不是域名不对,我自己都分辨不出来。

这是我第一次直面黑产。


紧急处理

第一步:下线

我没有犹豫,直接下线了那条链接。

# 紧急下线
db.execute(
    "UPDATE urls SET status = 'blocked' WHERE short_code = ?", ('xxxxx',)
)
redis.delete("url:xxxxx")

然后给投诉者回邮件道歉,承诺 24 小时内给出完整的解决方案。

第二步:排查

我查了一下数据库,发现问题的严重性超出我的想象——

过去一周的可疑链接:
- 指向 .xyz/.top/.gq 域名的短链接:87 条
- 指向 IP 地址(而非域名)的短链接:23 条
- URL 中包含 "login" "verify" "account" 的:45 条
- 被多个用户举报的:12 条

87 条可疑链接。而我之前居然一条都没发现。


三层检测方案

“我不能靠人工一条条看。我需要一套自动检测系统。”

我花了一周时间,搭了三层防线。

第一层:黑名单检测(最快)

“最快的办法:维护一份已知恶意域名列表。”

class BlacklistChecker:
    """域名黑名单检查"""

    def __init__(self):
        self.blacklist = set()
        self._load_blacklist()

    def _load_blacklist(self):
        """加载黑名单数据"""
        # 1. 从数据库加载(我手动添加的已知恶意域名)
        domains = db.query("SELECT domain FROM malicious_domains")
        self.blacklist.update(d['domain'] for d in domains)

        # 2. 从公共黑名单加载
        public_lists = [
            'https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts',
            'https://phishing.army/download/phishing_army_blocklist_extended.txt'
        ]

        for url in public_lists:
            try:
                resp = requests.get(url, timeout=10)
                for line in resp.text.splitlines():
                    line = line.strip()
                    if line and not line.startswith('#'):
                        domain = line.split()[-1] if '#' in line else line
                        self.blacklist.add(domain.lower())
            except Exception:
                pass

        logging.info(f'黑名单加载完成:{len(self.blacklist)} 个域名')

    def check(self, url):
        """检查 URL 是否命中黑名单"""
        from urllib.parse import urlparse
        domain = urlparse(url).netloc.lower()

        # 精确匹配
        if domain in self.blacklist:
            return True, 'exact_match'

        # 子域名匹配
        for blocked in self.blacklist:
            if domain.endswith('.' + blocked):
                return True, 'subdomain_match'

        return False, None

“黑名单检测非常快,只需一次内存查找。但缺点也明显:只能识别已知的恶意域名,新出现的钓鱼网站识别不了。“

第二层:特征检测(中等速度)

“对于新出现的钓鱼网站,我需要分析 URL 本身的特征。“

import re
from urllib.parse import urlparse

class FeatureDetector:
    """URL 特征检测"""

    def __init__(self):
        # 可疑关键词模式
        self.phishing_patterns = [
            r'login.*paypal',      # 仿冒 PayPal
            r'login.*apple',       # 仿冒 Apple
            r'account.*verify',    # "验证您的账户"
            r'secure.*update',     # "安全更新"
            r'bank.*confirm',      # "银行确认"
            r'password.*reset',    # "重置密码"
            r'winner.*claim',      # "中奖领取"
        ]

        # 高风险域名后缀
        self.suspicious_tlds = ['.xyz', '.top', '.gq', '.tk', '.ml', '.cf']

    def detect(self, url):
        """检测 URL 可疑特征,返回风险分数"""
        score = 0
        reasons = []
        parsed = urlparse(url)
        domain = parsed.netloc.lower()
        path = parsed.path.lower()

        # 检查 1:仿冒品牌关键词(+30 分)
        for pattern in self.phishing_patterns:
            if re.search(pattern, url, re.IGNORECASE):
                score += 30
                reasons.append(f'匹配可疑模式: {pattern}')
                break  # 只计一次

        # 检查 2:高风险域名后缀(+20 分)
        for tld in self.suspicious_tlds:
            if domain.endswith(tld):
                score += 20
                reasons.append(f'高风险域名后缀: {tld}')
                break

        # 检查 3:IP 地址代替域名(+40 分)
        if re.match(r'https?://\d+\.\d+\.\d+\.\d+', url):
            score += 40
            reasons.append('使用 IP 地址而非域名')

        # 检查 4:超长 URL(+10 分)
        if len(url) > 200:
            score += 10
            reasons.append('URL 异常长')

        # 检查 5:过多子域名(+15 分)
        subdomain_count = domain.count('.')
        if subdomain_count >= 3:
            score += 15
            reasons.append(f'子域名过多: {subdomain_count} 层')

        return {
            'score': score,
            'malicious': score >= 50,
            'reasons': reasons
        }

第三层:Google Safe Browsing API(最准确)

“Google 维护了全球最大的恶意 URL 数据库。虽然调用有延迟,但准确率最高。“

class SafeBrowsingChecker:
    """Google Safe Browsing API 检测"""

    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = 'https://safebrowsing.googleapis.com/v4/threatMatches:find'

    def check(self, url):
        """调用 Google Safe Browsing API"""
        payload = {
            'client': {
                'clientId': 'short-url-service',
                'clientVersion': '1.0.0'
            },
            'threatInfo': {
                'threatTypes': ['MALWARE', 'SOCIAL_ENGINEERING', 'UNWANTED_SOFTWARE'],
                'platformTypes': ['ANY_PLATFORM'],
                'threatEntryTypes': ['URL'],
                'threatEntries': [{'url': url}]
            }
        }

        try:
            resp = requests.post(
                f'{self.base_url}?key={self.api_key}',
                json=payload,
                timeout=5
            )
            data = resp.json()

            if 'matches' in data:
                threats = [m['threatType'] for m in data['matches']]
                return {
                    'malicious': True,
                    'threats': threats,
                    'source': 'Google Safe Browsing'
                }

            return {'malicious': False}

        except Exception as e:
            # API 调用失败,降级为仅黑名单+特征检测
            logging.warning(f'Safe Browsing API 调用失败: {e}')
            return {'malicious': False, 'error': str(e)}

整合:三层防线

class URLSecurityChecker:
    """URL 安全检测(三层防线)"""

    def __init__(self):
        self.blacklist = BlacklistChecker()
        self.feature = FeatureDetector()
        self.safe_browsing = SafeBrowsingChecker(api_key=settings.GOOGLE_SB_KEY)

    def check(self, url):
        """综合安全检测"""
        # 第一层:黑名单(1ms)
        is_blocked, reason = self.blacklist.check(url)
        if is_blocked:
            return {
                'safe': False,
                'reason': f'黑名单命中: {reason}',
                'layer': 'blacklist'
            }

        # 第二层:特征检测(5ms)
        features = self.feature.detect(url)
        if features['malicious']:
            return {
                'safe': False,
                'reason': f'特征检测风险分: {features["score"]}',
                'details': features['reasons'],
                'layer': 'feature'
            }

        # 第三层:Safe Browsing API(100-500ms)
        sb_result = self.safe_browsing.check(url)
        if sb_result.get('malicious'):
            return {
                'safe': False,
                'reason': f'Google 识别为: {sb_result["threats"]}',
                'layer': 'safe_browsing'
            }

        return {'safe': True}

实时拦截

“检测系统搭好了,但必须在创建短链接时就拦截,不能事后补救。“

@app.route('/api/shorten', methods=['POST'])
def create_short_link():
    url = request.get_json().get('url')

    # 创建前检测
    checker = URLSecurityChecker()
    result = checker.check(url)

    if not result['safe']:
        # 记录恶意 URL 尝试
        log_malicious_attempt(url, result, request.user['id'])

        return {
            'error': '该 URL 被识别为不安全',
            'reason': result['reason'],
            'appeal': '如果误判,请联系 support@short.url 申诉'
        }, 403

    # 通过检测,正常创建
    short_code = generate_short_code(url)
    db.execute(
        "INSERT INTO urls (short_code, long_url, user_id) VALUES (?, ?, ?)",
        (short_code, url, request.user['id'])
    )

    return {'short_url': f'https://short.url/{short_code}'}, 201

举报系统

“自动检测不可能 100% 准确。我还需要一个用户举报通道。“

@app.route('/api/report', methods=['POST'])
def report_url():
    """用户举报可疑短链接"""
    data = request.get_json()
    short_code = data.get('short_code')
    reason = data.get('reason', '')

    # 记录举报
    db.execute(
        """INSERT INTO url_reports 
           (short_code, reporter_id, reason, status, created_at)
           VALUES (?, ?, ?, 'pending', ?)""",
        (short_code, request.user['id'], reason, datetime.now())
    )

    # 如果同一链接被举报 >= 3 次,自动下线
    report_count = db.query(
        "SELECT COUNT(*) as cnt FROM url_reports WHERE short_code = ?",
        (short_code,)
    )[0]['cnt']

    if report_count >= 3:
        db.execute(
            "UPDATE urls SET status = 'auto_blocked' WHERE short_code = ?",
            (short_code,)
        )
        redis.delete(f"url:{short_code}")
        logging.warning(f'短链接 {short_code} 被举报 {report_count} 次,已自动下线')

    return {'message': '感谢举报,我们会在 24 小时内审核'}

检测效果

上线一周后的数据:

指标数值
创建请求总数12,345
被拦截的恶意 URL156 条(1.26%)
黑名单拦截89 条
特征检测拦截48 条
Safe Browsing 拦截19 条
用户举报7 条(其中 5 条已被自动拦截)
误判申诉3 条(审核后 2 条恢复)

“156 条恶意链接被拦截在创建阶段。如果这些链接流出去了,平台信誉就毁了。”