点击追踪

第 10 封邮件 📧

周五下午,我收到本周第 10 封邮件,主题还是一样:

“我的链接到底被点了多少次?哪个渠道效果最好?我的推广预算花得值不值?”

这些问题我已经听了太多次。我看着自己的代码——创建短链接时只是存了个映射关系,重定向就是 302 跳转。完全没有记录任何访问信息!

# 当前实现:跳转就完了,什么都没记下来 😅
@app.route('/<short_code>')
def redirect(short_code):
    long_url = get_long_url(short_code)
    return redirect(long_url, code=302)  # 跳走就不管了

我知道,点击统计不能再拖了。营销用户需要知道他们的推广效果,我需要记录每次点击的详细信息。但我心里也很清楚——不能影响跳转速度

要追踪什么?

我打开笔记本,开始列出需要追踪的数据:

基础信息

  • 时间戳——什么时候点的
  • 短链接——点了哪个链接
  • 目标 URL——最终跳到哪里

用户信息

  • IP 地址——知道大概位置
  • User-Agent——解析出设备、浏览器、操作系统
  • Referer——从哪里来的(搜索、社交媒体、直接访问)
  • Accept-Language——用户语言偏好

渠道信息

  • UTM 参数——营销人员最关心的来源标记
    • utm_source:来源(微信、微博、抖音)
    • utm_medium:媒介(链接、二维码、Banner)
    • utm_campaign:活动名称

地理位置

  • 国家——IP 推算
  • 城市——IP 推算

我写了数据收集函数:

def collect_click_data(request, short_code):
    """收集点击数据"""

    click_data = {
        # 基础信息
        'short_code': short_code,
        'timestamp': datetime.now(),

        # 用户信息
        'ip': request.remote_addr,
        'user_agent': request.headers.get('User-Agent'),
        'referer': request.headers.get('Referer', ''),
        'accept_language': request.headers.get('Accept-Language', ''),

        # 地理位置(根据 IP 推算)
        'country': geoip.country(request.remote_addr),
        'city': geoip.city(request.remote_addr),

        # 设备信息(解析 User-Agent)
        'device': parse_device(request.headers.get('User-Agent')),
        'browser': parse_browser(request.headers.get('User-Agent')),
        'os': parse_os(request.headers.get('User-Agent')),

        # 渠道信息(通过 UTM 参数)
        'utm_source': request.args.get('utm_source', ''),
        'utm_medium': request.args.get('utm_medium', ''),
        'utm_campaign': request.args.get('utm_campaign', ''),
    }

    return click_data

解析 User-Agent

User-Agent 是个复杂的字符串,我写了几个解析函数:

import re

def parse_device(user_agent):
    """解析设备类型"""
    if not user_agent:
        return 'unknown'

    ua = user_agent.lower()

    if re.search(r'ipad|tablet', ua):
        return 'tablet'
    elif re.search(r'mobile|android|iphone', ua):
        return 'mobile'
    else:
        return 'desktop'


def parse_browser(user_agent):
    """解析浏览器"""
    if not user_agent:
        return 'unknown'

    browsers = [
        (r'edg/', 'Edge'),
        (r'chrome/', 'Chrome'),
        (r'firefox/', 'Firefox'),
        (r'safari/', 'Safari'),
        (r'msie|trident/', 'IE'),
    ]

    for pattern, name in browsers:
        if re.search(pattern, user_agent.lower()):
            return name

    return 'other'


def parse_os(user_agent):
    """解析操作系统"""
    if not user_agent:
        return 'unknown'

    ua = user_agent.lower()

    os_patterns = [
        (r'windows', 'Windows'),
        (r'mac os', 'macOS'),
        (r'linux', 'Linux'),
        (r'android', 'Android'),
        (r'iphone|ipad', 'iOS'),
    ]

    for pattern, name in os_patterns:
        if re.search(pattern, ua):
            return name

    return 'other'

第一次尝试:同步写入 🐌

数据收集好了,接下来就是存储。我最开始的想法很简单——直接写数据库:

CREATE TABLE click_logs (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    short_code VARCHAR(10) NOT NULL,
    long_url VARCHAR(2048),
    ip VARCHAR(45),
    country VARCHAR(64),
    city VARCHAR(64),
    device VARCHAR(32),
    browser VARCHAR(32),
    os VARCHAR(32),
    referer VARCHAR(2048),
    user_agent VARCHAR(512),
    utm_source VARCHAR(128),
    utm_medium VARCHAR(128),
    utm_campaign VARCHAR(128),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    INDEX idx_short_code (short_code),
    INDEX idx_created_at (created_at),
    INDEX idx_country (country)
);
def track_click(click_data):
    """直接写入数据库"""
    db.execute("""
        INSERT INTO click_logs (short_code, ip, country, city, device,
                                browser, os, referer, user_agent,
                                utm_source, utm_medium, utm_campaign)
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """, (
        click_data['short_code'],
        click_data['ip'],
        click_data['country'],
        click_data['city'],
        click_data['device'],
        click_data['browser'],
        click_data['os'],
        click_data['referer'],
        click_data['user_agent'],
        click_data.get('utm_source', ''),
        click_data.get('utm_medium', ''),
        click_data.get('utm_campaign', ''),
    ))

我部署上线,开始监控性能。结果让我倒吸一口凉气:

之前:平均响应时间 20ms
现在:平均响应时间 100ms

跳转速度涨了 5 倍! 😱

我一看数据库监控,每次跳转都要执行一次 INSERT,数据库 QPS 瞬间飙到 1000+。这不行——用户体验会严重下降。

同步写入是死路。我需要异步。

第二次尝试:异步队列 ⚡

我的新方案:缓冲区 + 批量写入 + 后台线程

核心思路:

  1. 收集到点击数据后,不直接写数据库
  2. 放入内存缓冲区
  3. 后台线程定期批量写入数据库
  4. 跳转响应不等待写入完成
import queue
import threading
import time

class AsyncClickTracker:
    """异步点击追踪器"""

    def __init__(self, db_connection, batch_size=100, flush_interval=5):
        self.db = db_connection
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.buffer = []
        self.lock = threading.Lock()

        # 启动后台写入线程
        self.running = True
        self.writer_thread = threading.Thread(target=self._writer, daemon=True)
        self.writer_thread.start()

    def track(self, click_data):
        """异步记录点击——放入缓冲区"""
        with self.lock:
            self.buffer.append(click_data)

            # 缓冲区满了,立即刷写
            if len(self.buffer) >= self.batch_size:
                self._flush()

    def _writer(self):
        """后台线程:定期刷写"""
        while self.running:
            time.sleep(self.flush_interval)
            with self.lock:
                if self.buffer:
                    self._flush()

    def _flush(self):
        """批量写入数据库"""
        if not self.buffer:
            return

        batch = self.buffer[:self.batch_size]
        self.buffer = self.buffer[self.batch_size:]

        # 批量 INSERT
        values = []
        for click in batch:
            values.append((
                click['short_code'],
                click['ip'],
                click['country'],
                click['city'],
                click['device'],
                click['browser'],
                click['os'],
                click['referer'],
                click['user_agent'],
                click.get('utm_source', ''),
                click.get('utm_medium', ''),
                click.get('utm_campaign', ''),
            ))

        self.db.batch_insert("""
            INSERT INTO click_logs (short_code, ip, country, city, device,
                                    browser, os, referer, user_agent,
                                    utm_source, utm_medium, utm_campaign)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, values)

        print(f"📝 写入 {len(values)} 条点击记录")

    def shutdown(self):
        """优雅关闭"""
        self.running = False
        with self.lock:
            self._flush()  # 把剩余的写完


# 使用示例
tracker = AsyncClickTracker(db, batch_size=200, flush_interval=3)

@app.route('/<short_code>')
def redirect(short_code):
    long_url = get_long_url(short_code)
    if long_url:
        # 异步追踪,不阻塞响应
        tracker.track(collect_click_data(request, short_code))
        return redirect(long_url, code=302)
    return not_found()

再次部署上线,监控结果:

平均响应时间:22ms ✅
数据库 QPS:从 1000+ 降到 50(批量写入)

完美!跳转速度恢复正常,数据库压力也大幅降低。

第三次尝试:Kafka 高吞吐 🚀

随着用户量增长,日点击量从 10 万涨到 100 万,再到 1000 万。内存缓冲区方案开始吃力——缓冲区太大占用内存,进程重启会丢失数据。

我引入了 Kafka:

用户点击

[短链接服务] → [Kafka] → [消费者] → [数据库/ClickHouse]

              消息队列缓冲
              峰值可抗 100K+/秒
from kafka import KafkaProducer
import json

class KafkaClickTracker:
    """基于 Kafka 的点击追踪"""

    def __init__(self, bootstrap_servers, topic='click_logs'):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            # 异步发送,不等待确认
            acks=0,
            # 批量发送
            batch_size=16384,
            linger_ms=10,
        )
        self.topic = topic

    def track(self, click_data):
        """发送点击事件到 Kafka"""
        self.producer.send(self.topic, click_data)

    def close(self):
        self.producer.flush()
        self.producer.close()


# 消费者端:从 Kafka 读取并写入数据库
from kafka import KafkaConsumer

class ClickLogConsumer:
    def __init__(self, bootstrap_servers, db):
        self.consumer = KafkaConsumer(
            'click_logs',
            bootstrap_servers=bootstrap_servers,
            group_id='click-writer',
            auto_offset_reset='latest',
            value_deserializer=lambda m: json.loads(m.decode('utf-8'))
        )
        self.db = db
        self.batch = []
        self.batch_size = 500

    def consume(self):
        for message in self.consumer:
            self.batch.append(message.value)

            if len(self.batch) >= self.batch_size:
                self._write_batch()

    def _write_batch(self):
        """批量写入"""
        self.db.batch_insert("click_logs", self.batch)
        self.batch = []

三种方案对比

方案响应延迟吞吐量可靠性复杂度
同步写 DB10-50ms1K/s
异步批量写&lt;1ms10K/s⭐⭐
Kafka&lt;1ms100K+/s⭐⭐⭐

建议:日点击 < 10 万用异步批量写,> 100 万上 Kafka。

数据模型优化 📊

我重新审视了数据表设计,做了些优化:

数据表设计

CREATE TABLE click_logs (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    short_code VARCHAR(10) NOT NULL,
    ip VARCHAR(15),
    country_code CHAR(2),
    city_id INT,
    device ENUM('desktop', 'mobile', 'tablet', 'unknown'),
    browser VARCHAR(32),
    os VARCHAR(32),
    referer VARCHAR(512),
    utm_source VARCHAR(64),
    utm_medium VARCHAR(64),
    utm_campaign VARCHAR(64),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    INDEX idx_short_code_time (short_code, created_at),
    INDEX idx_country_time (country_code, created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

数据量估算

每条记录约 200 字节

日访问量 50 万 = 每天 100MB = 每月 3GB
日访问量 1000 万 = 每天 2GB = 每月 60GB

查询统计

数据存好了,用户要能查。我写了几个常用查询:

-- 总点击数
SELECT COUNT(*) as total_clicks
FROM click_logs
WHERE short_code = 'abc123';

-- 按日期统计点击量
SELECT DATE(created_at) as date, COUNT(*) as clicks
FROM click_logs
WHERE short_code = 'abc123'
GROUP BY DATE(created_at)
ORDER BY date;

-- 按设备统计
SELECT device, COUNT(*) as clicks
FROM click_logs
WHERE short_code = 'abc123'
GROUP BY device;

-- 按国家统计
SELECT country_code, COUNT(*) as clicks
FROM click_logs
WHERE short_code = 'abc123'
GROUP BY country_code
ORDER BY clicks DESC
LIMIT 10;

-- 按来源统计(哪个渠道带来的流量)
SELECT
    COALESCE(utm_source, referer, 'direct') as source,
    COUNT(*) as clicks
FROM click_logs
WHERE short_code = 'abc123'
GROUP BY source
ORDER BY clicks DESC;

统计 API

@app.route('/api/stats/<short_code>')
def get_stats(short_code):
    """获取短链接统计信息"""

    # 基础统计
    total = db.query("""
        SELECT COUNT(*) as total
        FROM click_logs WHERE short_code = ?
    """, short_code)

    # 最近 7 天趋势
    daily = db.query_all("""
        SELECT DATE(created_at) as date, COUNT(*) as clicks
        FROM click_logs
        WHERE short_code = ? AND created_at > DATE('now', '-7 days')
        GROUP BY DATE(created_at)
        ORDER BY date
    """, short_code)

    # 设备分布
    devices = db.query_all("""
        SELECT device, COUNT(*) as count
        FROM click_logs WHERE short_code = ?
        GROUP BY device
    """, short_code)

    # 地域分布
    countries = db.query_all("""
        SELECT country_code, COUNT(*) as count
        FROM click_logs WHERE short_code = ?
        GROUP BY country_code
        ORDER BY count DESC LIMIT 10
    """, short_code)

    return jsonify({
        'short_code': short_code,
        'total_clicks': total,
        'daily_trend': daily,
        'device_distribution': devices,
        'country_distribution': countries,
    })

隐私考量 ⚖️

数据收集到什么程度?这是个问题。

我思考了很久,决定这条线:

我会收集

  • ✅ IP(脱敏到前 3 段)
  • ✅ 城市(不收集精确位置)
  • ✅ 设备、浏览器、操作系统
  • ✅ 来源渠道(UTM 参数)

我不会收集

  • ❌ 完整 IP
  • ❌ GPS 位置
  • ❌ 个人身份信息
  • ❌ Cookie 跟踪

IP 脱敏实现

class PrivacyCompliantTracker:
    """隐私合规的追踪器"""

    def track(self, click_data):
        # IP 脱敏(只保留前三位)
        ip = click_data.get('ip', '')
        click_data['ip'] = self._mask_ip(ip)
        
        # 不存储精确位置和 User-Agent 原始值
        click_data.pop('latitude', None)
        click_data.pop('longitude', None)
        click_data['user_agent'] = ''
        return click_data

    def _mask_ip(self, ip):
        """只保留前三位"""
        parts = ip.split('.')
        if len(parts) == 4:
            return f"{parts[0]}.{parts[1]}.{parts[2]}.0"
        return "masked"

这样既能提供有价值的统计数据,又保护了用户隐私。这是我的底线。

想一想

  1. 点击追踪会影响重定向的速度吗?
    会,如果同步写入的话。我改成异步队列后,响应时间从 100ms 降回 20ms。

  2. 如果追踪系统暂时不可用(如 Kafka 挂了),应该怎么处理?
    我的选择是放行——宁可丢数据,也不能影响用户体验。可以在本地文件缓存,等恢复后再补传。

  3. 如何防止伪造点击数据(刷量)?
    这是个难题。我做了几个防护:

    • 限制同一 IP 的短时间点击频率
    • 检测异常 User-Agent
    • 识别代理 IP
    • 统计异常模式(比如连续点击同一链接)