恶意网址识别
那封措辞严厉的邮件
那天上午,我收到一封邮件,标题是 “你们的短链接被用于钓鱼诈骗”。
如果你们不立即处理这个链接,我们将向公安机关报案。
我们的客户通过你们的 short.url/xxxxx 访问了一个仿冒银行的网站,输入了银行卡号和密码。这是严重的违法行为。
我的心一下子凉了。
我立刻打开那个短链接——果然,跳转到了一个仿冒某银行的钓鱼网站。页面做得极其逼真,如果不是域名不对,我自己都分辨不出来。
这是我第一次直面黑产。
紧急处理
第一步:下线
我没有犹豫,直接下线了那条链接。
# 紧急下线
db.execute(
"UPDATE urls SET status = 'blocked' WHERE short_code = ?", ('xxxxx',)
)
redis.delete("url:xxxxx")然后给投诉者回邮件道歉,承诺 24 小时内给出完整的解决方案。
第二步:排查
我查了一下数据库,发现问题的严重性超出我的想象——
过去一周的可疑链接:
- 指向 .xyz/.top/.gq 域名的短链接:87 条
- 指向 IP 地址(而非域名)的短链接:23 条
- URL 中包含 "login" "verify" "account" 的:45 条
- 被多个用户举报的:12 条87 条可疑链接。而我之前居然一条都没发现。
三层检测方案
“我不能靠人工一条条看。我需要一套自动检测系统。”
我花了一周时间,搭了三层防线。
第一层:黑名单检测(最快)
“最快的办法:维护一份已知恶意域名列表。”
class BlacklistChecker:
"""域名黑名单检查"""
def __init__(self):
self.blacklist = set()
self._load_blacklist()
def _load_blacklist(self):
"""加载黑名单数据"""
# 1. 从数据库加载(我手动添加的已知恶意域名)
domains = db.query("SELECT domain FROM malicious_domains")
self.blacklist.update(d['domain'] for d in domains)
# 2. 从公共黑名单加载
public_lists = [
'https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts',
'https://phishing.army/download/phishing_army_blocklist_extended.txt'
]
for url in public_lists:
try:
resp = requests.get(url, timeout=10)
for line in resp.text.splitlines():
line = line.strip()
if line and not line.startswith('#'):
domain = line.split()[-1] if '#' in line else line
self.blacklist.add(domain.lower())
except Exception:
pass
logging.info(f'黑名单加载完成:{len(self.blacklist)} 个域名')
def check(self, url):
"""检查 URL 是否命中黑名单"""
from urllib.parse import urlparse
domain = urlparse(url).netloc.lower()
# 精确匹配
if domain in self.blacklist:
return True, 'exact_match'
# 子域名匹配
for blocked in self.blacklist:
if domain.endswith('.' + blocked):
return True, 'subdomain_match'
return False, None“黑名单检测非常快,只需一次内存查找。但缺点也明显:只能识别已知的恶意域名,新出现的钓鱼网站识别不了。“
第二层:特征检测(中等速度)
“对于新出现的钓鱼网站,我需要分析 URL 本身的特征。“
import re
from urllib.parse import urlparse
class FeatureDetector:
"""URL 特征检测"""
def __init__(self):
# 可疑关键词模式
self.phishing_patterns = [
r'login.*paypal', # 仿冒 PayPal
r'login.*apple', # 仿冒 Apple
r'account.*verify', # "验证您的账户"
r'secure.*update', # "安全更新"
r'bank.*confirm', # "银行确认"
r'password.*reset', # "重置密码"
r'winner.*claim', # "中奖领取"
]
# 高风险域名后缀
self.suspicious_tlds = ['.xyz', '.top', '.gq', '.tk', '.ml', '.cf']
def detect(self, url):
"""检测 URL 可疑特征,返回风险分数"""
score = 0
reasons = []
parsed = urlparse(url)
domain = parsed.netloc.lower()
path = parsed.path.lower()
# 检查 1:仿冒品牌关键词(+30 分)
for pattern in self.phishing_patterns:
if re.search(pattern, url, re.IGNORECASE):
score += 30
reasons.append(f'匹配可疑模式: {pattern}')
break # 只计一次
# 检查 2:高风险域名后缀(+20 分)
for tld in self.suspicious_tlds:
if domain.endswith(tld):
score += 20
reasons.append(f'高风险域名后缀: {tld}')
break
# 检查 3:IP 地址代替域名(+40 分)
if re.match(r'https?://\d+\.\d+\.\d+\.\d+', url):
score += 40
reasons.append('使用 IP 地址而非域名')
# 检查 4:超长 URL(+10 分)
if len(url) > 200:
score += 10
reasons.append('URL 异常长')
# 检查 5:过多子域名(+15 分)
subdomain_count = domain.count('.')
if subdomain_count >= 3:
score += 15
reasons.append(f'子域名过多: {subdomain_count} 层')
return {
'score': score,
'malicious': score >= 50,
'reasons': reasons
}第三层:Google Safe Browsing API(最准确)
“Google 维护了全球最大的恶意 URL 数据库。虽然调用有延迟,但准确率最高。“
class SafeBrowsingChecker:
"""Google Safe Browsing API 检测"""
def __init__(self, api_key):
self.api_key = api_key
self.base_url = 'https://safebrowsing.googleapis.com/v4/threatMatches:find'
def check(self, url):
"""调用 Google Safe Browsing API"""
payload = {
'client': {
'clientId': 'short-url-service',
'clientVersion': '1.0.0'
},
'threatInfo': {
'threatTypes': ['MALWARE', 'SOCIAL_ENGINEERING', 'UNWANTED_SOFTWARE'],
'platformTypes': ['ANY_PLATFORM'],
'threatEntryTypes': ['URL'],
'threatEntries': [{'url': url}]
}
}
try:
resp = requests.post(
f'{self.base_url}?key={self.api_key}',
json=payload,
timeout=5
)
data = resp.json()
if 'matches' in data:
threats = [m['threatType'] for m in data['matches']]
return {
'malicious': True,
'threats': threats,
'source': 'Google Safe Browsing'
}
return {'malicious': False}
except Exception as e:
# API 调用失败,降级为仅黑名单+特征检测
logging.warning(f'Safe Browsing API 调用失败: {e}')
return {'malicious': False, 'error': str(e)}整合:三层防线
class URLSecurityChecker:
"""URL 安全检测(三层防线)"""
def __init__(self):
self.blacklist = BlacklistChecker()
self.feature = FeatureDetector()
self.safe_browsing = SafeBrowsingChecker(api_key=settings.GOOGLE_SB_KEY)
def check(self, url):
"""综合安全检测"""
# 第一层:黑名单(1ms)
is_blocked, reason = self.blacklist.check(url)
if is_blocked:
return {
'safe': False,
'reason': f'黑名单命中: {reason}',
'layer': 'blacklist'
}
# 第二层:特征检测(5ms)
features = self.feature.detect(url)
if features['malicious']:
return {
'safe': False,
'reason': f'特征检测风险分: {features["score"]}',
'details': features['reasons'],
'layer': 'feature'
}
# 第三层:Safe Browsing API(100-500ms)
sb_result = self.safe_browsing.check(url)
if sb_result.get('malicious'):
return {
'safe': False,
'reason': f'Google 识别为: {sb_result["threats"]}',
'layer': 'safe_browsing'
}
return {'safe': True}实时拦截
“检测系统搭好了,但必须在创建短链接时就拦截,不能事后补救。“
@app.route('/api/shorten', methods=['POST'])
def create_short_link():
url = request.get_json().get('url')
# 创建前检测
checker = URLSecurityChecker()
result = checker.check(url)
if not result['safe']:
# 记录恶意 URL 尝试
log_malicious_attempt(url, result, request.user['id'])
return {
'error': '该 URL 被识别为不安全',
'reason': result['reason'],
'appeal': '如果误判,请联系 support@short.url 申诉'
}, 403
# 通过检测,正常创建
short_code = generate_short_code(url)
db.execute(
"INSERT INTO urls (short_code, long_url, user_id) VALUES (?, ?, ?)",
(short_code, url, request.user['id'])
)
return {'short_url': f'https://short.url/{short_code}'}, 201举报系统
“自动检测不可能 100% 准确。我还需要一个用户举报通道。“
@app.route('/api/report', methods=['POST'])
def report_url():
"""用户举报可疑短链接"""
data = request.get_json()
short_code = data.get('short_code')
reason = data.get('reason', '')
# 记录举报
db.execute(
"""INSERT INTO url_reports
(short_code, reporter_id, reason, status, created_at)
VALUES (?, ?, ?, 'pending', ?)""",
(short_code, request.user['id'], reason, datetime.now())
)
# 如果同一链接被举报 >= 3 次,自动下线
report_count = db.query(
"SELECT COUNT(*) as cnt FROM url_reports WHERE short_code = ?",
(short_code,)
)[0]['cnt']
if report_count >= 3:
db.execute(
"UPDATE urls SET status = 'auto_blocked' WHERE short_code = ?",
(short_code,)
)
redis.delete(f"url:{short_code}")
logging.warning(f'短链接 {short_code} 被举报 {report_count} 次,已自动下线')
return {'message': '感谢举报,我们会在 24 小时内审核'}检测效果
上线一周后的数据:
| 指标 | 数值 |
|---|---|
| 创建请求总数 | 12,345 |
| 被拦截的恶意 URL | 156 条(1.26%) |
| 黑名单拦截 | 89 条 |
| 特征检测拦截 | 48 条 |
| Safe Browsing 拦截 | 19 条 |
| 用户举报 | 7 条(其中 5 条已被自动拦截) |
| 误判申诉 | 3 条(审核后 2 条恢复) |
“156 条恶意链接被拦截在创建阶段。如果这些链接流出去了,平台信誉就毁了。”