导航菜单

敏感内容识别

审核不只是”鉴黄”

鉴黄检测上线后,我以为审核问题解决了。直到有一天,一个用户上传了一张看似正常的风景照——画面上是一条安静的街道,但街道旁的广告牌上写着一句违禁标语。

AI 鉴黄模型判定:安全。

因为鉴黄模型只看图像的视觉内容,完全看不到图片里的文字

我又翻了翻后台数据,发现了更多被鉴黄模型放过的”漏网之鱼”:

已通过鉴黄但可能违规的图片:
1. 风景照 + 广告牌上的违禁标语     → 文字违规
2. 暴力场景截图(电影/新闻截图)    → 暴力血腥
3. 政治人物合影                    → 政治敏感
4. 假冒伪劣商品照片                → 广告违规
5. 含有二维码的图片                → 引流/诈骗

鉴黄只解决了色情这一个维度。一个完整的内容审核系统,需要多维度、多模态的检测能力。

维度一:OCR 文字识别

图片中的文字是审核的重要维度。很多违规内容不以视觉形式呈现,而是以文字形式”藏”在图片里。

OCR 方案对比

# 方案一:开源 Tesseract
import pytesseract
from PIL import Image

def ocr_tesseract(image_path: str) -> str:
    """使用 Tesseract 做文字识别"""
    img = Image.open(image_path)
    
    # 优化识别准确率:预处理
    # 1. 转灰度
    img = img.convert('L')
    # 2. 二值化
    threshold = 128
    img = img.point(lambda x: 255 if x > threshold else 0)
    
    text = pytesseract.image_to_string(img, lang='chi_sim+eng')
    return text.strip()


# 方案二:云 OCR API
class AliyunOCR:
    """阿里云 OCR"""
    
    def __init__(self, access_key, secret_key):
        self.access_key = access_key
        self.secret_key = secret_key
    
    def recognize(self, image_url: str) -> dict:
        """调用阿里云通用 OCR"""
        body = {
            'tasks': [{
                'dataId': str(uuid.uuid4()),
                'url': image_url,
            }],
            'scenes': ['antispam'],  # 文字反垃圾
        }
        
        result = self._call_api('/green/image/scan', body)
        
        return {
            'text': result.get('results', [{}])[0].get('ocr', ''),
            'words': result.get('results', [{}])[0].get('ocrDetail', []),
            'risk_words': result.get('results', [{}])[0].get('hit', []),
        }

对比测试:

OCR 方案对比(100 张含文字的图片)

方案              识别准确率   速度      成本
──────────────────────────────────────────────
Tesseract         72%        800ms     免费
阿里云 OCR         96%        200ms     0.8元/千次
腾讯云 OCR         95%        250ms     0.7元/千次

结论:Tesseract 对中文的识别准确率太低(72%),特别是手写体、艺术字体、小字。云 API 是更好的选择。

文字内容审核

识别出文字后,还需要判断文字内容是否违规:

import re

class TextContentAuditor:
    """文字内容审核"""
    
    def __init__(self):
        # 敏感词库(从云服务同步)
        self.sensitive_words = self._load_sensitive_words()
        self.sensitive_patterns = self._compile_patterns()
    
    def audit_text(self, text: str) -> dict:
        """审核文字内容"""
        risks = []
        
        # 1. 敏感词匹配
        for word in self.sensitive_words:
            if word in text:
                risks.append({
                    'type': 'sensitive_word',
                    'word': word,
                    'severity': 'high',
                })
        
        # 2. 正则匹配(手机号、微信号、URL)
        patterns = {
            'phone': r'1[3-9]\d{9}',
            'wechat': r'[Ww][Cc][Hh]?[Aa][Tt]?\s*[::]\s*\w{5,20}',
            'url': r'https?://\S+',
            'qq': r'[Qq][Qq]\s*[::]?\s*\d{5,12}',
        }
        
        for ptype, pattern in patterns.items():
            matches = re.findall(pattern, text)
            if matches:
                risks.append({
                    'type': ptype,
                    'matches': matches,
                    'severity': 'medium',
                })
        
        # 3. 调用 NLP 模型做语义理解
        nlp_result = self._nlp_audit(text)
        if nlp_result['risk']:
            risks.append({
                'type': 'semantic',
                'label': nlp_result['label'],
                'severity': nlp_result['severity'],
            })
        
        overall_severity = max(
            (r['severity'] for r in risks),
            key=lambda s: {'high': 3, 'medium': 2, 'low': 1}.get(s, 0),
            default='none'
        )
        
        return {
            'text': text,
            'has_risk': len(risks) > 0,
            'risks': risks,
            'action': 'reject' if overall_severity == 'high' else
                      'review' if overall_severity == 'medium' else 'pass',
        }
    
    def _nlp_audit(self, text):
        """调用 NLP 服务做语义审核"""
        # 使用云服务的文本审核 API
        result = text_audit_api.scan(text)
        return result
    
    def _load_sensitive_words(self):
        """加载敏感词库"""
        # 从数据库或配置文件加载
        # 注意:敏感词库需要定期更新
        words = set()
        for source in ['default', 'politics', 'pornography', 'violence']:
            batch = redis_client.smembers(f'sensitive_words:{source}')
            words.update(w.decode() for w in batch)
        return words
    
    def _compile_patterns(self):
        """编译正则表达式模式"""
        return {}  # 在 __init__ 中已硬编码

维度二:暴力血腥检测

class ViolenceDetector:
    """暴力血腥内容检测"""
    
    def __init__(self):
        self.api_client = AliyunGreenClient()
    
    def detect(self, image_url: str) -> dict:
        """检测暴力血腥内容"""
        result = self.api_client.scan_image(
            url=image_url,
            scenes=['terrorism', 'violence'],  # 暴恐检测
        )
        
        return {
            'terrorism': self._parse_scene(result, 'terrorism'),
            'violence': self._parse_scene(result, 'violence'),
        }
    
    def _parse_scene(self, result, scene):
        for r in result.get('results', []):
            if r.get('scene') == scene:
                return {
                    'label': r.get('label'),        # normal/violence/terror
                    'confidence': r.get('rate', 0),
                    'suggestion': r.get('suggestion'),  # pass/review/block
                }
        return {'label': 'unknown', 'confidence': 0, 'suggestion': 'pass'}


# 测试
detector = ViolenceDetector()

test_cases = [
    ('sports_match.jpg', '正常体育比赛'),
    ('horror_movie.jpg', '恐怖电影截图'),
    ('accident_news.jpg', '新闻事故现场'),
    ('cooking_raw_meat.jpg', '烹饪生肉'),
    ('surgery_photo.jpg', '手术照片(医学)'),
]

for url, desc in test_cases:
    result = detector.detect(url)
    print(f"{desc}: {result}")

暴力检测的难点:

容易误判的场景:
- 烹饪节目(生肉、鲜血)      → 误判为暴力
- 医学手术照片               → 误判为血腥
- 体育比赛(拳击、摔跤)     → 误判为暴力
- 万圣节装扮                 → 误判为恐怖
- 新闻报道中的事故现场       → 误判为血腥

这些场景的处理策略:
- 医学/新闻类:加标签但不拦截
- 体育比赛:加"体育"标签后降低权重
- 用户标注的"恐怖电影截图":允许但加内容警告

维度三:政治敏感检测

这是最敏感也最必须的维度。国内的平台必须具备政治敏感内容的识别能力。

class PoliticalContentDetector:
    """政治敏感内容检测"""
    
    def detect(self, image_url: str) -> dict:
        """检测政治敏感内容"""
        result = self.api_client.scan_image(
            url=image_url,
            scenes=['ad'],  # 某些政治内容会以广告形式出现
        )
        
        # 同时做 OCR + 文字审核
        ocr_result = ocr_service.recognize(image_url)
        text_audit = text_auditor.audit_text(ocr_result['text'])
        
        return {
            'image_risk': result,
            'text_risk': text_audit,
            'action': self._decide_action(result, text_audit),
        }
    
    def _decide_action(self, image_result, text_result):
        """综合决策"""
        # 政治敏感内容:宁可误报不可漏报
        if image_result.get('label') == 'politics':
            return 'reject'
        if text_result.get('has_risk') and text_result['action'] == 'reject':
            return 'reject'
        if text_result.get('action') == 'review':
            return 'review'
        return 'pass'

维度四:广告和二维码检测

摄影社区里经常有人上传带二维码的图片做引流:

from pyzbar import pyzbar
from PIL import Image

class QRCodeDetector:
    """二维码检测"""
    
    def detect(self, image_path: str) -> dict:
        """检测图片中的二维码"""
        img = Image.open(image_path)
        
        # 解码二维码
        barcodes = pyzbar.decode(img)
        
        qr_codes = []
        for barcode in barcodes:
            if barcode.type == 'QRCODE':
                qr_codes.append({
                    'data': barcode.data.decode('utf-8'),
                    'rect': barcode.rect,
                })
        
        has_qr = len(qr_codes) > 0
        
        # 检查二维码内容是否安全
        unsafe_qr = False
        for qr in qr_codes:
            url_audit = self._audit_qr_url(qr['data'])
            if url_audit['unsafe']:
                unsafe_qr = True
        
        return {
            'has_qr_code': has_qr,
            'count': len(qr_codes),
            'is_unsafe': unsafe_qr,
            'codes': qr_codes,
        }
    
    def _audit_qr_url(self, url: str) -> dict:
        """审核二维码中的 URL"""
        # 检查是否是已知的恶意域名
        # 检查是否是竞品引流
        # 检查是否是违禁品交易
        parsed = urlparse(url)
        
        blocked_domains = ['xxx.com', 'gambling.xxx', ...]
        if parsed.hostname in blocked_domains:
            return {'unsafe': True, 'reason': 'blocked_domain'}
        
        return {'unsafe': False}


class WatermarkDetector:
    """水印检测——检查是否是其他平台的图"""
    
    SUSPICIOUS_WATERMARKS = [
        '小红书', '图虫', '500px', 'Getty Images',
        'Shutterstock', '微博', '抖音',
    ]
    
    def detect(self, image_path: str) -> dict:
        """检测是否含有其他平台的水印"""
        # 先 OCR 识别文字
        text = ocr_service.recognize(image_path)
        
        found_watermarks = []
        for wm in self.SUSPICIOUS_WATERMARKS:
            if wm in text:
                found_watermarks.append(wm)
        
        return {
            'has_watermark': len(found_watermarks) > 0,
            'watermarks': found_watermarks,
            'action': 'review' if found_watermarks else 'pass',
        }

多维度审核整合

把所有审核维度整合到一个服务中:

from dataclasses import dataclass
from typing import List, Optional

@dataclass
class AuditResult:
    photo_id: str
    action: str  # pass / review / reject
    scores: dict
    risks: List[dict]
    review_reason: Optional[str] = None

class ComprehensiveAuditor:
    """多维度综合审核"""
    
    def __init__(self):
        self.nsfw = NSFWDetector()              # 鉴黄
        self.ocr = AliyunOCR()                   # OCR
        self.text_audit = TextContentAuditor()   # 文字审核
        self.violence = ViolenceDetector()        # 暴力检测
        self.qrcode = QRCodeDetector()            # 二维码
        self.watermark = WatermarkDetector()      # 水印
    
    def audit(self, photo_id: str, image_url: str, local_path: str) -> AuditResult:
        """综合审核:并行调用所有维度"""
        import concurrent.futures
        
        results = {}
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            futures = {
                executor.submit(self.nsfw.detect, image_url): 'nsfw',
                executor.submit(self.ocr.recognize, image_url): 'ocr',
                executor.submit(self.violence.detect, image_url): 'violence',
                executor.submit(self.qrcode.detect, local_path): 'qrcode',
                executor.submit(self.watermark.detect, local_path): 'watermark',
            }
            
            for future in concurrent.futures.as_completed(futures):
                dim = futures[future]
                try:
                    results[dim] = future.result()
                except Exception as e:
                    results[dim] = {'error': str(e)}
        
        # 文字审核(依赖 OCR 结果)
        if 'ocr' in results and not results['ocr'].get('error'):
            text = results['ocr'].get('text', '')
            if text:
                results['text'] = self.text_audit.audit_text(text)
        
        # 综合决策
        return self._decide(photo_id, results)
    
    def _decide(self, photo_id: str, results: dict) -> AuditResult:
        """综合所有维度的结果做最终决策"""
        risks = []
        action = 'pass'
        review_reasons = []
        
        # 维度 1:鉴黄
        nsfw = results.get('nsfw', {})
        if nsfw.get('label') == 'porn' and nsfw.get('confidence', 0) > 90:
            action = 'reject'
            risks.append({'dim': 'nsfw', 'level': 'high', 'detail': nsfw})
        elif nsfw.get('label') in ('porn', 'sexy') and nsfw.get('confidence', 0) > 60:
            if action != 'reject':
                action = 'review'
            review_reasons.append('色情风险')
            risks.append({'dim': 'nsfw', 'level': 'medium', 'detail': nsfw})
        
        # 维度 2:暴力
        violence = results.get('violence', {})
        if violence.get('terrorism', {}).get('suggestion') == 'block':
            action = 'reject'
            risks.append({'dim': 'violence', 'level': 'high', 'detail': violence})
        elif violence.get('violence', {}).get('suggestion') == 'review':
            if action != 'reject':
                action = 'review'
            review_reasons.append('暴力风险')
            risks.append({'dim': 'violence', 'level': 'medium', 'detail': violence})
        
        # 维度 3:文字
        text = results.get('text', {})
        if text.get('action') == 'reject':
            action = 'reject'
            risks.append({'dim': 'text', 'level': 'high', 'detail': text})
        elif text.get('action') == 'review':
            if action != 'reject':
                action = 'review'
            review_reasons.append('文字风险')
            risks.append({'dim': 'text', 'level': 'medium', 'detail': text})
        
        # 维度 4:二维码
        qrcode = results.get('qrcode', {})
        if qrcode.get('is_unsafe'):
            action = 'reject'
            risks.append({'dim': 'qrcode', 'level': 'high', 'detail': qrcode})
        elif qrcode.get('has_qr_code'):
            if action == 'pass':
                action = 'review'
            review_reasons.append('含二维码')
            risks.append({'dim': 'qrcode', 'level': 'low', 'detail': qrcode})
        
        # 维度 5:水印
        watermark = results.get('watermark', {})
        if watermark.get('has_watermark'):
            if action == 'pass':
                action = 'review'
            review_reasons.append(f"含平台水印: {', '.join(watermark['watermarks'])}")
            risks.append({'dim': 'watermark', 'level': 'low', 'detail': watermark})
        
        return AuditResult(
            photo_id=photo_id,
            action=action,
            scores=results,
            risks=risks,
            review_reason='; '.join(review_reasons) if review_reasons else None,
        )


# 使用
auditor = ComprehensiveAuditor()

# 在 Worker 中调用
result = auditor.audit(
    photo_id='12345',
    image_url='https://cdn.guangying.com/originals/2024/06/a3/abc.jpg',
    local_path='/tmp/process_12345_original',
)

print(f"图片 12345: {result.action}")
# reject: 直接拦截
# review: 转人工审核
# pass: 放行

性能优化

多维度审核意味着多次 API 调用,如何控制延迟?

# 审核延迟分析
"""
单次审核的 API 调用:

1. 鉴黄:     200ms
2. OCR:      250ms
3. 暴力检测:  200ms
4. 二维码:   150ms(本地)
5. 水印:     100ms(本地)

串行总计:    900ms
并行总计:    250ms(取最长)

结论:必须并行调用,否则单张图片审核耗时接近 1 秒。
"""

进一步的优化——分级审核

class TieredAuditor:
    """分级审核——先快审,再精审"""
    
    def audit(self, photo_id, image_url, local_path):
        # 第一级:快速审核(只做鉴黄 + 二维码,耗时 < 200ms)
        quick_result = self._quick_audit(photo_id, image_url, local_path)
        
        if quick_result.action == 'reject':
            # 快审直接拒绝,不需要精审
            return quick_result
        
        if quick_result.action == 'pass':
            # 快审通过,但可能需要精审
            # 异步发起精审
            queue.publish('full_audit', {
                'photo_id': photo_id,
                'image_url': image_url,
                'local_path': local_path,
            })
            # 先放行(如果后续精审发现问题会回调)
            return quick_result
        
        # review 需要等精审结果
        return self._full_audit(photo_id, image_url, local_path)
    
    def _quick_audit(self, photo_id, image_url, local_path):
        """快审:只做鉴黄和二维码"""
        nsfw = self.nsfw.detect(image_url)
        qrcode = self.qrcode.detect(local_path)
        
        if nsfw.get('label') == 'porn' and nsfw.get('confidence', 0) > 90:
            return AuditResult(photo_id, 'reject', {'nsfw': nsfw}, [])
        if qrcode.get('is_unsafe'):
            return AuditResult(photo_id, 'reject', {'qrcode': qrcode}, [])
        
        return AuditResult(photo_id, 'pass', {}, [])

本节小结

我学到了什么

  • 内容审核不只是鉴黄——OCR 文字、暴力血腥、二维码、水印都需要检测
  • 多维度审核需要并行调用,否则延迟不可接受
  • 分级审核(快审 + 精审)可以在 200ms 内完成初步判断
  • 云 API 在 OCR 和暴力检测上远优于开源方案

⚠️ 踩过的坑

  • Tesseract 对中文的识别准确率只有 72%,手写体更差
  • 暴力检测容易误判医学照片和烹饪内容
  • OCR 审核需要先识别文字再审核文字,链路较长

🎯 下一步:审核能力都有了,但如何设计一套完整的审核流程?机审和人审怎么配合?

我的思考

思考 1

如果用户在图片中把敏感文字做了”变形处理”(比如”加 V:①③⑤②④⑥⑧⑨⑩”),OCR 和文字审核能识别吗?

参考答案

这种变形文字是内容审核的常见对抗手段。应对策略:

class AntiEvasionTextProcessor:
    """反变形文字处理"""
    
    # 常见变形映射
    CHAR_MAP = {
        # 全角数字 → 半角
        '①': '1', '②': '2', '③': '3', '④': '4', '⑤': '5',
        '⑥': '6', '⑦': '7', '⑧': '8', '⑨': '9', '⑩': '0',
        # 拼音替换
        '微': 'V', '薇': 'V', '威': 'V',
        # 同音字
        '加V': '加微', '加VX': '加微信',
        # 特殊字符拆字
        '徴信': '微信', '威芯': '微信',
        # 插入干扰符
        'V.X': 'VX', 'V-X': 'VX',
    }
    
    def normalize(self, text: str) -> str:
        """文字归一化——还原变形"""
        # 1. 全角转半角
        result = []
        for ch in text:
            if ch in self.CHAR_MAP:
                result.append(self.CHAR_MAP[ch])
            else:
                result.append(ch)
        normalized = ''.join(result)
        
        # 2. 去除干扰字符(空格、点、横线等)
        import re
        normalized = re.sub(r'[\s.\-·\-_]+', '', normalized)
        
        # 3. 对归一化后的文字再做审核
        return normalized
    
    def audit(self, raw_text: str) -> dict:
        normalized = self.normalize(raw_text)
        return text_auditor.audit_text(normalized)

更高级的对抗手段(如把文字做成图片拼接、用 emoji 替代关键字符等),需要结合上下文语义分析和用户行为分析来应对。

思考 2

如果一个认证摄影师上传了含二维码的图片(二维码指向他自己的摄影作品集),应该拦截吗?

参考答案

不应该一刀切拦截。需要根据上下文判断:

判断策略:
1. 二维码内容是什么?
   - 个人网站/作品集 → 允许
   - 竞品平台/违规网站 → 拦截
   - 未知链接 → 人工审核

2. 上传者身份
   - 认证摄影师 → 信任度更高
   - 新注册用户 → 更严格

3. 图片上下文
   - 个人主页头像/封面 → 允许
   - 作品详情页 → 允许(标注"含联系方式")
   - 评论/回复中的图片 → 更严格
def handle_qrcode_case(qr_result, uploader_info, context):
    if not qr_result['has_qr_code']:
        return 'pass'
    
    # 解析二维码内容
    for code in qr_result['codes']:
        url = code['data']
        
        if is_personal_portfolio(url) and uploader_info['is_verified']:
            return 'pass_with_tag'  # 允许,但标记
        elif is_known_safe_domain(url):
            return 'pass_with_tag'
        elif is_blocked_domain(url):
            return 'reject'
        else:
            return 'review'  # 未知链接,人工审核

原则:对可信用户 + 可信内容给予信任,对未知情况转人工审核。

搜索