点击追踪
第 10 封邮件 📧
周五下午,我收到本周第 10 封邮件,主题还是一样:
“我的链接到底被点了多少次?哪个渠道效果最好?我的推广预算花得值不值?”
这些问题我已经听了太多次。我看着自己的代码——创建短链接时只是存了个映射关系,重定向就是 302 跳转。完全没有记录任何访问信息!
# 当前实现:跳转就完了,什么都没记下来 😅
@app.route('/<short_code>')
def redirect(short_code):
long_url = get_long_url(short_code)
return redirect(long_url, code=302) # 跳走就不管了我知道,点击统计不能再拖了。营销用户需要知道他们的推广效果,我需要记录每次点击的详细信息。但我心里也很清楚——不能影响跳转速度。
要追踪什么?
我打开笔记本,开始列出需要追踪的数据:
基础信息
- 时间戳——什么时候点的
- 短链接——点了哪个链接
- 目标 URL——最终跳到哪里
用户信息
- IP 地址——知道大概位置
- User-Agent——解析出设备、浏览器、操作系统
- Referer——从哪里来的(搜索、社交媒体、直接访问)
- Accept-Language——用户语言偏好
渠道信息
- UTM 参数——营销人员最关心的来源标记
utm_source:来源(微信、微博、抖音)utm_medium:媒介(链接、二维码、Banner)utm_campaign:活动名称
地理位置
- 国家——IP 推算
- 城市——IP 推算
我写了数据收集函数:
def collect_click_data(request, short_code):
"""收集点击数据"""
click_data = {
# 基础信息
'short_code': short_code,
'timestamp': datetime.now(),
# 用户信息
'ip': request.remote_addr,
'user_agent': request.headers.get('User-Agent'),
'referer': request.headers.get('Referer', ''),
'accept_language': request.headers.get('Accept-Language', ''),
# 地理位置(根据 IP 推算)
'country': geoip.country(request.remote_addr),
'city': geoip.city(request.remote_addr),
# 设备信息(解析 User-Agent)
'device': parse_device(request.headers.get('User-Agent')),
'browser': parse_browser(request.headers.get('User-Agent')),
'os': parse_os(request.headers.get('User-Agent')),
# 渠道信息(通过 UTM 参数)
'utm_source': request.args.get('utm_source', ''),
'utm_medium': request.args.get('utm_medium', ''),
'utm_campaign': request.args.get('utm_campaign', ''),
}
return click_data解析 User-Agent
User-Agent 是个复杂的字符串,我写了几个解析函数:
import re
def parse_device(user_agent):
"""解析设备类型"""
if not user_agent:
return 'unknown'
ua = user_agent.lower()
if re.search(r'ipad|tablet', ua):
return 'tablet'
elif re.search(r'mobile|android|iphone', ua):
return 'mobile'
else:
return 'desktop'
def parse_browser(user_agent):
"""解析浏览器"""
if not user_agent:
return 'unknown'
browsers = [
(r'edg/', 'Edge'),
(r'chrome/', 'Chrome'),
(r'firefox/', 'Firefox'),
(r'safari/', 'Safari'),
(r'msie|trident/', 'IE'),
]
for pattern, name in browsers:
if re.search(pattern, user_agent.lower()):
return name
return 'other'
def parse_os(user_agent):
"""解析操作系统"""
if not user_agent:
return 'unknown'
ua = user_agent.lower()
os_patterns = [
(r'windows', 'Windows'),
(r'mac os', 'macOS'),
(r'linux', 'Linux'),
(r'android', 'Android'),
(r'iphone|ipad', 'iOS'),
]
for pattern, name in os_patterns:
if re.search(pattern, ua):
return name
return 'other'第一次尝试:同步写入 🐌
数据收集好了,接下来就是存储。我最开始的想法很简单——直接写数据库:
CREATE TABLE click_logs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
short_code VARCHAR(10) NOT NULL,
long_url VARCHAR(2048),
ip VARCHAR(45),
country VARCHAR(64),
city VARCHAR(64),
device VARCHAR(32),
browser VARCHAR(32),
os VARCHAR(32),
referer VARCHAR(2048),
user_agent VARCHAR(512),
utm_source VARCHAR(128),
utm_medium VARCHAR(128),
utm_campaign VARCHAR(128),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_short_code (short_code),
INDEX idx_created_at (created_at),
INDEX idx_country (country)
);def track_click(click_data):
"""直接写入数据库"""
db.execute("""
INSERT INTO click_logs (short_code, ip, country, city, device,
browser, os, referer, user_agent,
utm_source, utm_medium, utm_campaign)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
click_data['short_code'],
click_data['ip'],
click_data['country'],
click_data['city'],
click_data['device'],
click_data['browser'],
click_data['os'],
click_data['referer'],
click_data['user_agent'],
click_data.get('utm_source', ''),
click_data.get('utm_medium', ''),
click_data.get('utm_campaign', ''),
))我部署上线,开始监控性能。结果让我倒吸一口凉气:
之前:平均响应时间 20ms
现在:平均响应时间 100ms
跳转速度涨了 5 倍! 😱我一看数据库监控,每次跳转都要执行一次 INSERT,数据库 QPS 瞬间飙到 1000+。这不行——用户体验会严重下降。
同步写入是死路。我需要异步。
第二次尝试:异步队列 ⚡
我的新方案:缓冲区 + 批量写入 + 后台线程。
核心思路:
- 收集到点击数据后,不直接写数据库
- 放入内存缓冲区
- 后台线程定期批量写入数据库
- 跳转响应不等待写入完成
import queue
import threading
import time
class AsyncClickTracker:
"""异步点击追踪器"""
def __init__(self, db_connection, batch_size=100, flush_interval=5):
self.db = db_connection
self.batch_size = batch_size
self.flush_interval = flush_interval
self.buffer = []
self.lock = threading.Lock()
# 启动后台写入线程
self.running = True
self.writer_thread = threading.Thread(target=self._writer, daemon=True)
self.writer_thread.start()
def track(self, click_data):
"""异步记录点击——放入缓冲区"""
with self.lock:
self.buffer.append(click_data)
# 缓冲区满了,立即刷写
if len(self.buffer) >= self.batch_size:
self._flush()
def _writer(self):
"""后台线程:定期刷写"""
while self.running:
time.sleep(self.flush_interval)
with self.lock:
if self.buffer:
self._flush()
def _flush(self):
"""批量写入数据库"""
if not self.buffer:
return
batch = self.buffer[:self.batch_size]
self.buffer = self.buffer[self.batch_size:]
# 批量 INSERT
values = []
for click in batch:
values.append((
click['short_code'],
click['ip'],
click['country'],
click['city'],
click['device'],
click['browser'],
click['os'],
click['referer'],
click['user_agent'],
click.get('utm_source', ''),
click.get('utm_medium', ''),
click.get('utm_campaign', ''),
))
self.db.batch_insert("""
INSERT INTO click_logs (short_code, ip, country, city, device,
browser, os, referer, user_agent,
utm_source, utm_medium, utm_campaign)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", values)
print(f"📝 写入 {len(values)} 条点击记录")
def shutdown(self):
"""优雅关闭"""
self.running = False
with self.lock:
self._flush() # 把剩余的写完
# 使用示例
tracker = AsyncClickTracker(db, batch_size=200, flush_interval=3)
@app.route('/<short_code>')
def redirect(short_code):
long_url = get_long_url(short_code)
if long_url:
# 异步追踪,不阻塞响应
tracker.track(collect_click_data(request, short_code))
return redirect(long_url, code=302)
return not_found()再次部署上线,监控结果:
平均响应时间:22ms ✅
数据库 QPS:从 1000+ 降到 50(批量写入)完美!跳转速度恢复正常,数据库压力也大幅降低。
第三次尝试:Kafka 高吞吐 🚀
随着用户量增长,日点击量从 10 万涨到 100 万,再到 1000 万。内存缓冲区方案开始吃力——缓冲区太大占用内存,进程重启会丢失数据。
我引入了 Kafka:
用户点击
↓
[短链接服务] → [Kafka] → [消费者] → [数据库/ClickHouse]
↓
消息队列缓冲
峰值可抗 100K+/秒from kafka import KafkaProducer
import json
class KafkaClickTracker:
"""基于 Kafka 的点击追踪"""
def __init__(self, bootstrap_servers, topic='click_logs'):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
# 异步发送,不等待确认
acks=0,
# 批量发送
batch_size=16384,
linger_ms=10,
)
self.topic = topic
def track(self, click_data):
"""发送点击事件到 Kafka"""
self.producer.send(self.topic, click_data)
def close(self):
self.producer.flush()
self.producer.close()
# 消费者端:从 Kafka 读取并写入数据库
from kafka import KafkaConsumer
class ClickLogConsumer:
def __init__(self, bootstrap_servers, db):
self.consumer = KafkaConsumer(
'click_logs',
bootstrap_servers=bootstrap_servers,
group_id='click-writer',
auto_offset_reset='latest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
self.db = db
self.batch = []
self.batch_size = 500
def consume(self):
for message in self.consumer:
self.batch.append(message.value)
if len(self.batch) >= self.batch_size:
self._write_batch()
def _write_batch(self):
"""批量写入"""
self.db.batch_insert("click_logs", self.batch)
self.batch = []三种方案对比
| 方案 | 响应延迟 | 吞吐量 | 可靠性 | 复杂度 |
|---|---|---|---|---|
| 同步写 DB | 10-50ms | 1K/s | 高 | ⭐ |
| 异步批量写 | <1ms | 10K/s | 中 | ⭐⭐ |
| Kafka | <1ms | 100K+/s | 高 | ⭐⭐⭐ |
建议:日点击 < 10 万用异步批量写,> 100 万上 Kafka。
数据模型优化 📊
我重新审视了数据表设计,做了些优化:
数据表设计
CREATE TABLE click_logs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
short_code VARCHAR(10) NOT NULL,
ip VARCHAR(15),
country_code CHAR(2),
city_id INT,
device ENUM('desktop', 'mobile', 'tablet', 'unknown'),
browser VARCHAR(32),
os VARCHAR(32),
referer VARCHAR(512),
utm_source VARCHAR(64),
utm_medium VARCHAR(64),
utm_campaign VARCHAR(64),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_short_code_time (short_code, created_at),
INDEX idx_country_time (country_code, created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;数据量估算
每条记录约 200 字节
日访问量 50 万 = 每天 100MB = 每月 3GB
日访问量 1000 万 = 每天 2GB = 每月 60GB查询统计
数据存好了,用户要能查。我写了几个常用查询:
-- 总点击数
SELECT COUNT(*) as total_clicks
FROM click_logs
WHERE short_code = 'abc123';
-- 按日期统计点击量
SELECT DATE(created_at) as date, COUNT(*) as clicks
FROM click_logs
WHERE short_code = 'abc123'
GROUP BY DATE(created_at)
ORDER BY date;
-- 按设备统计
SELECT device, COUNT(*) as clicks
FROM click_logs
WHERE short_code = 'abc123'
GROUP BY device;
-- 按国家统计
SELECT country_code, COUNT(*) as clicks
FROM click_logs
WHERE short_code = 'abc123'
GROUP BY country_code
ORDER BY clicks DESC
LIMIT 10;
-- 按来源统计(哪个渠道带来的流量)
SELECT
COALESCE(utm_source, referer, 'direct') as source,
COUNT(*) as clicks
FROM click_logs
WHERE short_code = 'abc123'
GROUP BY source
ORDER BY clicks DESC;统计 API
@app.route('/api/stats/<short_code>')
def get_stats(short_code):
"""获取短链接统计信息"""
# 基础统计
total = db.query("""
SELECT COUNT(*) as total
FROM click_logs WHERE short_code = ?
""", short_code)
# 最近 7 天趋势
daily = db.query_all("""
SELECT DATE(created_at) as date, COUNT(*) as clicks
FROM click_logs
WHERE short_code = ? AND created_at > DATE('now', '-7 days')
GROUP BY DATE(created_at)
ORDER BY date
""", short_code)
# 设备分布
devices = db.query_all("""
SELECT device, COUNT(*) as count
FROM click_logs WHERE short_code = ?
GROUP BY device
""", short_code)
# 地域分布
countries = db.query_all("""
SELECT country_code, COUNT(*) as count
FROM click_logs WHERE short_code = ?
GROUP BY country_code
ORDER BY count DESC LIMIT 10
""", short_code)
return jsonify({
'short_code': short_code,
'total_clicks': total,
'daily_trend': daily,
'device_distribution': devices,
'country_distribution': countries,
})隐私考量 ⚖️
数据收集到什么程度?这是个问题。
我思考了很久,决定这条线:
我会收集
- ✅ IP(脱敏到前 3 段)
- ✅ 城市(不收集精确位置)
- ✅ 设备、浏览器、操作系统
- ✅ 来源渠道(UTM 参数)
我不会收集
- ❌ 完整 IP
- ❌ GPS 位置
- ❌ 个人身份信息
- ❌ Cookie 跟踪
IP 脱敏实现
class PrivacyCompliantTracker:
"""隐私合规的追踪器"""
def track(self, click_data):
# IP 脱敏(只保留前三位)
ip = click_data.get('ip', '')
click_data['ip'] = self._mask_ip(ip)
# 不存储精确位置和 User-Agent 原始值
click_data.pop('latitude', None)
click_data.pop('longitude', None)
click_data['user_agent'] = ''
return click_data
def _mask_ip(self, ip):
"""只保留前三位"""
parts = ip.split('.')
if len(parts) == 4:
return f"{parts[0]}.{parts[1]}.{parts[2]}.0"
return "masked"这样既能提供有价值的统计数据,又保护了用户隐私。这是我的底线。
想一想
点击追踪会影响重定向的速度吗?
会,如果同步写入的话。我改成异步队列后,响应时间从 100ms 降回 20ms。如果追踪系统暂时不可用(如 Kafka 挂了),应该怎么处理?
我的选择是放行——宁可丢数据,也不能影响用户体验。可以在本地文件缓存,等恢复后再补传。如何防止伪造点击数据(刷量)?
这是个难题。我做了几个防护:
- 限制同一 IP 的短时间点击频率
- 检测异常 User-Agent
- 识别代理 IP
- 统计异常模式(比如连续点击同一链接)