内容审核

第二个危机 🚨

解决了恶意 URL 跳转问题后,我松了一口气。

直到那个周二的下午,我接到一通电话:

“您好,我们是网信办工作人员。” “接到举报,您的短链接平台被用于传播违法违规内容。” “请立即整改,否则可能暂停服务。”

我脑子一片空白。

打开后台查看,果然有几个短链接被大量举报:

举报记录
────────────────────────────────────────
链接:k.yz/a8x3k  举报数:127  原因:色情内容
链接:k.yz/m9p2q  举报数:89   原因:赌博网站
链接:k.yz/r4t7w  举报数:64   原因:诈骗信息
────────────────────────────────────────

我点开其中一个,跳转到充斥着违规内容的页面。

恶意 URL 识别只解决了”链接去哪”的问题。但链接指向的内容,可能是违规的。

后果不堪设想:

  • ❌ 监管处罚,甚至关停
  • ❌ 用户流失,声誉受损
  • ❌ 法律诉讼风险

我必须立刻行动。


方案调研

当晚,我开始调研内容审核方案。

方案一:自建审核系统

class ContentAuditor:
    """自建内容审核"""
    
    def __init__(self):
        self.sensitive_words = self.load_sensitive_words()
        self.blacklisted_domains = self.load_blacklisted_domains()
    
    def audit_url(self, url):
        from urllib.parse import urlparse
        
        # 检查域名黑名单
        parsed = urlparse(url)
        if parsed.netloc in self.blacklisted_domains:
            return {'pass': False, 'reason': '域名在黑名单', 'risk_level': 'high'}
        
        # 检查敏感词
        for word in self.sensitive_words:
            if word in url.lower():
                return {'pass': False, 'reason': f'包含敏感词:{word}', 'risk_level': 'medium'}
        
        # 检查可疑模式
        if any(kw in url for kw in ['赌博', '色情', '诈骗', '钓鱼']):
            return {'pass': False, 'reason': 'URL 特征可疑', 'risk_level': 'medium'}
        
        return {'pass': True, 'risk_level': 'low'}

优点:可控、成本低、毫秒级响应

缺点:准确率仅 70%,无法识别图片视频,易被谐音绕过

测试发现误判率高达 20%。自建方案,pass。


方案二:第三方 API

国内主要有阿里云和腾讯云内容安全。

class AliyunContentAuditor:
    """阿里云内容安全审核"""
    
    def __init__(self, access_key_id, access_key_secret):
        self.access_key_id = access_key_id
        self.access_key_secret = access_key_secret
        self.endpoint = 'https://green-cip.cn-shanghai.aliyuncs.com'
    
    def audit_url(self, url):
        # 1. 获取 URL 内容
        content = self.fetch_url_content(url)
        if not content:
            return {'pass': False, 'reason': '无法获取内容', 'risk_level': 'high'}
        
        # 2. 调用 API
        result = self.call_aliyun_api(content)
        return {
            'pass': result['pass'],
            'reason': result.get('reason', ''),
            'risk_level': result.get('risk_level', 'low')
        }
    
    def fetch_url_content(self, url, timeout=5):
        import requests
        try:
            resp = requests.get(url, timeout=timeout)
            return resp.text[:10240] if resp.status_code == 200 else None
        except:
            return None
    
    def call_aliyun_api(self, content):
        import json, hashlib, hmac, base64, time
        from datetime import datetime
        
        body = json.dumps({
            'Tasks': [{'DataId': f'audit_{int(time.time())}', 'Content': base64.b64encode(content.encode()).decode(), 'Type': 'text'}],
            'Scenes': ['porn', 'terrorism', 'politics', 'ad']
        })
        
        timestamp = datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
        sig = hmac.new(self.access_key_secret.encode(), f'POST\n\napplication/json\n{timestamp}\n/green/text/scan'.encode(), hashlib.sha1).digest()
        auth = f'DATA {self.access_key_id}:{base64.b64encode(sig).decode()}'
        
        import requests
        resp = requests.post(self.endpoint + '/green/text/scan', 
            headers={'Content-Type': 'application/json', 'Date': timestamp, 'Authorization': auth},
            data=body, timeout=10)
        
        result = resp.json()
        if result.get('Code') == 200:
            for scene in result['Data']['Results'][0]['SceneResults']:
                if scene['Suggestion'] == 'block':
                    return {'pass': False, 'reason': f'违规:{scene["Scene"]}', 'risk_level': 'high'}
            return {'pass': True, 'risk_level': 'low'}
        return {'pass': False, 'reason': 'API 失败', 'risk_level': 'high'}

优点:准确率 95%+,支持多种违规类型,能识别图片视频

缺点:成本高,延迟 300-500ms,依赖外部服务

测试对比:

方案响应时间准确率成本误判率
自建10ms70%¥020%
阿里云500ms95%¥5000/月5%
腾讯云450ms94%¥4500/月6%

但审核服务挂了怎么办?


方案三:混合方案(最终选择)⭐

本地规则 + 第三方 API + 人工审核

class HybridContentAuditor:
    """混合内容审核"""
    
    def __init__(self):
        self.local = ContentAuditor()
        self.aliyun = AliyunContentAuditor(os.getenv('ALIYUN_KEY'), os.getenv('ALIYUN_SECRET'))
        self.cache = redis.Redis(host='localhost', port=6379, db=2)
    
    def audit_url(self, url):
        # 检查缓存
        cached = self.cache.get(f'audit:{hashlib.md5(url.encode()).hexdigest()}')
        if cached:
            return json.loads(cached)
        
        # 本地规则快速拦截
        local_result = self.local.audit_url(url)
        if not local_result['pass'] and local_result['risk_level'] == 'high':
            self._cache(url, local_result, 86400)
            return local_result
        
        # 第三方深度审核
        try:
            aliyun_result = self.aliyun.audit_url(url)
        except Exception as e:
            aliyun_result = {'pass': True, 'risk_level': 'pending_review', 'reason': str(e)}
        
        # 合并结果
        final = local_result if not local_result['pass'] else (aliyun_result if not aliyun_result['pass'] else {'pass': True, 'risk_level': 'low'})
        
        self._cache(url, final, 3600 if final['pass'] else 86400)
        return final
    
    def _cache(self, url, result, ttl):
        self.cache.setex(f'audit:{hashlib.md5(url.encode()).hexdigest()}', ttl, json.dumps(result))

优势

  • 70% 请求本地完成,节省 70% 成本
  • 毫秒级响应 + 缓存
  • 服务不可用时降级
  • 误判率<4%

异步审核实现

同步审核体验差,我采用分级异步审核

  • 高风险:同步拦截
  • 中低风险:先允许,异步审核
class AsyncContentAuditor:
    def __init__(self):
        self.auditor = HybridContentAuditor()
        self.queue = RabbitMQ()
        self.db = get_db()
    
    def audit_url_async(self, url, short_code=None):
        # 快速审核
        result = self.auditor.local.audit_url(url)
        
        # 高风险直接拒绝
        if not result['pass'] and result['risk_level'] == 'high':
            return {'status': 'rejected', 'reason': result['reason']}
        
        # 异步深度审核
        audit_id = str(uuid.uuid4())
        self.db.query("INSERT INTO audit_tasks (audit_id, url, short_code, status) VALUES (%s, %s, %s, 'pending')",
            (audit_id, url, short_code))
        self.queue.publish('content_audit', {'audit_id': audit_id, 'url': url, 'short_code': short_code})
        
        return {'status': 'pending', 'audit_id': audit_id}
    
    def process_task(self, task):
        result = self.auditor.aliyun.audit_url(task['url'])
        
        self.db.query("UPDATE audit_tasks SET status=%s, result=%s WHERE audit_id=%s",
            ('completed' if result['pass'] else 'failed', json.dumps(result), task['audit_id']))
        
        if not result['pass'] and task.get('short_code'):
            self.db.query("UPDATE short_links SET status='blocked', block_reason=%s WHERE short_code=%s",
                (result['reason'], task['short_code']))


# 消费者
def start_worker():
    auditor = AsyncContentAuditor()
    def callback(ch, method, properties, body):
        auditor.process_task(json.loads(body))
        ch.basic_ack(method.delivery_tag)
    auditor.queue.consume('content_audit', callback)


# API
@app.route('/api/shorten', methods=['POST'])
def create_short_link():
    url = request.get_json().get('url')
    result = AsyncContentAuditor().audit_url_async(url)
    
    if result['status'] == 'rejected':
        return {'error': '未通过审核', 'reason': result['reason']}, 403
    
    short_code = generate_short_code(url)
    db.query("INSERT INTO short_links (short_code, original_url) VALUES (%s, %s)", (short_code, url))
    return {'short_code': short_code, 'audit_status': result['status']}

人工审核队列

边界情况需要人工判断:

class ManualReviewQueue:
    def __init__(self):
        self.db = get_db()
    
    def add(self, url, short_code, reason):
        review_id = str(uuid.uuid4())
        self.db.query("INSERT INTO manual_reviews (review_id, url, short_code, reason) VALUES (%s, %s, %s, %s)",
            (review_id, url, short_code, reason))
        return review_id
    
    def get_pending(self, limit=20):
        return self.db.query("SELECT * FROM manual_reviews WHERE status='pending' ORDER BY created_at LIMIT %s", (limit,))
    
    def submit(self, review_id, passed, reviewer_id, comment=None):
        review = self.db.query_one("SELECT * FROM manual_reviews WHERE review_id=%s", (review_id,))
        self.db.query("UPDATE manual_reviews SET status='completed', passed=%s, reviewer_id=%s WHERE review_id=%s",
            (passed, reviewer_id, review_id))
        
        if passed:
            self.db.query("UPDATE short_links SET status='active' WHERE short_code=%s", (review['short_code'],))
        else:
            self.db.query("UPDATE short_links SET status='blocked' WHERE short_code=%s", (review['short_code'],))

审核标准:

类型处理
色情/赌博/诈骗立即封禁
政治敏感封禁 + 上报
广告营销首次警告

合规建设

律师朋友给了几个建议:

1. 用户协议

### 禁止内容
不得创建指向色情、暴力、赌博、诈骗等内容的短链接。

### 违规处理
平台有权下架违规链接、封禁账号、上报相关部门。

2. 举报机制

@app.route('/api/report', methods=['POST'])
def report():
    data = request.get_json()
    db.query("INSERT INTO reports (short_code, reason) VALUES (%s, %s)",
        (data['short_code'], data['reason']))
    
    # 24 小时 5 次举报自动下架
    count = db.query_one("SELECT COUNT(*) as c FROM reports WHERE short_code=%s AND created_at>DATE_SUB(NOW(), INTERVAL 24 HOUR)",
        (data['short_code'],))['c']
    if count >= 5:
        db.query("UPDATE short_links SET status='under_review' WHERE short_code=%s", (data['short_code'],))
    return {'success': True}

3. 数据留存

日志至少留存 6 个月,建立 7x24 小时应急响应。


效果

首月数据:

总链接:128,456
自动拦截:3,241 (2.5%)
人工审核:487 (0.38%)
准确率:96.2%
成本:¥7,347/月

再也没接到监管电话了。


新问题

我以为可以放心了,但监控又告警:

创建频率
──────────────────────────────
正常用户:5 次/小时
异常账户 A:1,247 次/小时 ❗
异常账户 B:892 次/小时 ❗
──────────────────────────────

有人用脚本批量创建短链接,消耗资源。

大量 API 调用让成本飙升,攻击者还可能用合法请求掩盖违规内容。

这是资源滥用攻击。

我需要防滥用机制。

但那是下一个故事了。


小结

方案演进

自建规则 → 准确率低
第三方 API → 成本高
混合方案 → 平衡

要点

✅ 多层防护:本地 + 第三方 + 人工

✅ 异步处理:高风险同步,中低风险异步

✅ 降级策略:服务不可用时保守处理

✅ 合规:用户协议、举报、数据留存

✅ 成本:节省 70% API 费用

待解决

⚠️ 资源滥用 ⚠️ 审核绕过 ⚠️ 成本压力

(下一节:防滥用机制)