实时统计
问题:“你的统计是昨天的数据?”
日志追踪系统上线后,我以为一切都很完美。直到有一天,一个用户给我发了封邮件:
“你的统计页面显示的是昨天的数据?我刚才点了十几次,数字一点都没变。能不能实时更新啊?”
我打开统计页面看了看,确实如此。点击数据是批量写入数据库的,每隔 5 秒才刷新一次。更糟糕的是,即使数据写入了数据库,SQL 的 COUNT(*) 查询也越来越慢——日志表已经有十几万条记录了。
用户要的是实时的数字,每点一次就能看到变化。
我需要一个能实时计数的方案。Redis 再合适不过了。
Redis 实时计数
我开始研究 Redis 的数据结构。对于计数这个需求,Redis 提供了几种非常强大的工具:
- String + INCR:最简单的计数器,原子性递增
- Hash + HINCRBY:多维度统计,比如按设备、按国家
- HyperLogLog + PFADD:海量去重统计,12KB 内存就能统计百万独立用户
- Sorted Set + ZINCRBY:排行榜,比如热门短链接 Top 10
我先从最简单的开始——用 INCR 统计 PV(总点击量):
import redis
from datetime import datetime, timedelta
class RealtimeCounter:
"""简单实时计数器"""
def __init__(self, redis_client):
self.redis = redis_client
def increment(self, short_code):
"""点击计数 +1"""
self.redis.incr(f"clicks:total:{short_code}")
def get_count(self, short_code):
"""获取总点击数"""
count = self.redis.get(f"clicks:total:{short_code}")
return int(count) if count else 0太简单了。每次点击调用一次 increment,查询时调用一次 get_count,速度飞快。
但实际业务中,用户需要的不只是总点击数。他们想知道:
- 今天点击了多少次?
- 移动端和桌面端的比例?
- 访客来自哪些国家?
- 过去 24 小时的点击趋势?
我设计了多维度计数器:
class MultiDimensionCounter:
"""多维度实时计数器"""
def __init__(self, redis_client):
self.redis = redis_client
def track(self, short_code, click_data):
"""记录一次点击的多维度数据"""
pipe = self.redis.pipeline()
# 1. 总点击数
pipe.incr(f"clicks:total:{short_code}")
# 2. 今日点击数(每天自动重置)
today = datetime.now().strftime("%Y-%m-%d")
pipe.incr(f"clicks:daily:{short_code}:{today}")
# 3. 按设备统计
device = click_data.get('device', 'unknown')
pipe.hincrby(f"clicks:device:{short_code}", device, 1)
# 4. 按国家统计
country = click_data.get('country', 'unknown')
pipe.hincrby(f"clicks:country:{short_code}", country, 1)
# 5. 按浏览器统计
browser = click_data.get('browser', 'unknown')
pipe.hincrby(f"clicks:browser:{short_code}", browser, 1)
# 6. 按小时统计(用于趋势图)
hour_key = datetime.now().strftime("%Y-%m-%d:%H")
pipe.incr(f"clicks:hourly:{short_code}:{hour_key}")
pipe.execute()
def get_realtime_stats(self, short_code):
"""获取实时统计数据"""
pipe = self.redis.pipeline()
# 总点击
pipe.get(f"clicks:total:{short_code}")
# 今日点击
today = datetime.now().strftime("%Y-%m-%d")
pipe.get(f"clicks:daily:{short_code}:{today}")
# 设备分布
pipe.hgetall(f"clicks:device:{short_code}")
# 国家分布
pipe.hgetall(f"clicks:country:{short_code}")
# 浏览器分布
pipe.hgetall(f"clicks:browser:{short_code}")
# 最近 24 小时趋势
now = datetime.now()
for i in range(24):
hour = (now - timedelta(hours=i)).strftime("%Y-%m-%d:%H")
pipe.get(f"clicks:hourly:{short_code}:{hour}")
results = pipe.execute()
return {
'total_clicks': int(results[0] or 0),
'today_clicks': int(results[1] or 0),
'devices': results[2],
'countries': results[3],
'browsers': results[4],
'hourly_trend': [int(v or 0) for v in results[5:29]],
}Redis 的数据结构大概是这样:
Redis Key 设计:
├── clicks:total:{short_code} → String 总点击数
├── clicks:daily:{short_code}:{date} → String 日点击数
├── clicks:hourly:{short_code}:{h} → String 小时点击数
├── clicks:device:{short_code} → Hash 设备分布
│ ├── mobile → 3500
│ ├── desktop → 2800
│ └── tablet → 700
├── clicks:country:{short_code} → Hash 国家分布
│ ├── China → 5000
│ ├── USA → 1200
│ └── Japan → 800
└── clicks:browser:{short_code} → Hash 浏览器分布
├── Chrome → 4000
├── Safari → 2000
└── Firefox → 1000HyperLogLog 统计 UV
用户还想知道”有多少独立访客”访问了短链接,而不仅仅是总点击数。
传统的做法是用 Set 存储每个 IP:
# ❌ 精确但内存消耗大
def track_unique_user(redis_client, short_code, ip):
redis_client.sadd(f"unique:{short_code}", ip)
def get_unique_count(redis_client, short_code):
return redis_client.scard(f"unique:{short_code}")我算了一笔账:100 万独立 IP ≈ 50MB 内存。如果有 10 万条短链接,那就是 5TB 内存。这显然不现实。
Redis 有个神奇的数据结构叫 HyperLogLog——它用概率算法,只需 12KB 内存就能统计百万级独立用户,误差只有 0.81%:
class UniqueVisitorCounter:
"""基于 HyperLogLog 的独立访客计数"""
def __init__(self, redis_client):
self.redis = redis_client
def track(self, short_code, user_id):
"""记录一个独立访客"""
# HyperLogLog 最多只占 12KB 内存!
self.redis.pfadd(f"unique:{short_code}", user_id)
def get_count(self, short_code):
"""获取独立访客数(误差约 0.81%)"""
return self.redis.pfcount(f"unique:{short_code}")
def merge(self, short_codes, target_key):
"""合并多个短链接的独立访客"""
keys = [f"unique:{code}" for code in short_codes]
self.redis.pfmerge(f"unique:{target_key}", *keys)
return self.redis.pfcount(f"unique:{target_key}")Set vs HyperLogLog:
| 特性 | Set | HyperLogLog |
|---|---|---|
| 精确度 | 100% 精确 | 0.81% 误差 |
| 内存消耗 | 100万用户 ≈ 50MB | 固定 12KB |
| 适合场景 | 精确统计 | 大规模近似统计 |
| 可回查 | ✅ 可以查具体 IP | ❌ 只知道数量 |
对于统计场景,0.81% 的误差完全可以接受。用 12KB 换取百万级统计能力,太划算了。
数据仪表盘
实时计数有了,接下来就是展示。我花了一个周末做了一个简单的统计仪表盘。
当我第一次看到数字实时跳动的那一刻,成就感爆棚。每有一次点击,图表就自动更新,那种即时反馈太爽了。
from flask import Flask, jsonify
app = Flask(__name__)
@app.route('/api/stats/<short_code>/realtime')
def realtime_stats(short_code):
"""获取实时统计数据"""
counter = MultiDimensionCounter(redis_client)
unique = UniqueVisitorCounter(redis_client)
ranking = HotLinkRanking(redis_client)
stats = counter.get_realtime_stats(short_code)
stats['unique_visitors'] = unique.get_count(short_code)
stats['rank'] = ranking.get_rank(short_code)
# 与昨日对比
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
yesterday_clicks = redis_client.get(f"clicks:daily:{short_code}:{yesterday}")
yesterday_clicks = int(yesterday_clicks) if yesterday_clicks else 0
if yesterday_clicks > 0:
growth = (stats['today_clicks'] - yesterday_clicks) / yesterday_clicks * 100
stats['daily_growth'] = f"{growth:+.1f}%"
else:
stats['daily_growth'] = "N/A"
return jsonify(stats)API 返回的数据:
{
"total_clicks": 52847,
"today_clicks": 1253,
"unique_visitors": 8921,
"daily_growth": "+15.3%",
"rank": 3,
"devices": {
"mobile": 6800,
"desktop": 4200,
"tablet": 1500
},
"countries": {
"China": 8000,
"USA": 2500,
"Japan": 1200
},
"hourly_trend": [45, 52, 38, 29, 15, 12, 18, 42, 89, 120, ...]
}我还加了一个热门排行榜功能,用 Redis 的 Sorted Set 实现:
class HotLinkRanking:
"""热门短链接排行榜"""
def __init__(self, redis_client):
self.redis = redis_client
def increment(self, short_code, amount=1):
"""增加点击数"""
self.redis.zincrby("ranking:daily", amount, short_code)
def get_top_links(self, limit=10):
"""获取 Top N 热门短链接"""
results = self.redis.zrevrange(
"ranking:daily", 0, limit - 1, withscores=True
)
return [
{'short_code': code, 'clicks': int(score)}
for code, score in results
]
def get_rank(self, short_code):
"""获取某个短链接的排名"""
rank = self.redis.zrevrank("ranking:daily", short_code)
return rank + 1 if rank is not None else -1每次点击时更新排行,查询时直接 ZREVRANGE,效率极高。
性能优化
仪表盘上线后,用户很满意。但我发现一个问题:每次查询都要从 Redis 读取 20 多个 Key,虽然 Redis 很快,但 Pipeline 请求也有开销。
而且,用户查询统计页面的频率远高于点击频率——大部分时候数字根本没变化,却还在重复查询。
我想到了一个办法:预聚合 + 缓存。
class StatsCache:
"""统计数据缓存"""
def __init__(self, redis_client):
self.redis = redis_client
self.cache_ttl = 10 # 缓存 10 秒
def get_cached_stats(self, short_code):
"""获取缓存的统计数据"""
cache_key = f"stats:cache:{short_code}"
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# 缓存未命中,实时计算
counter = MultiDimensionCounter(self.redis)
stats = counter.get_realtime_stats(short_code)
# 写入缓存
self.redis.setex(cache_key, self.cache_ttl, json.dumps(stats))
return stats
def invalidate_cache(self, short_code):
"""让缓存失效(点击时调用)"""
cache_key = f"stats:cache:{short_code}"
self.redis.delete(cache_key)流程变成:
用户查询 → 检查缓存 → 缓存命中直接返回
→ 缓存未命中 → 计算 → 缓存 → 返回
用户点击 → 更新计数器 → 清除缓存 → 下次查询重新计算这样,大部分查询都直接命中缓存,Redis 压力骤降。10 秒的延迟对于统计场景完全可以接受——用户感知不到差异。
我还设置了 Redis Key 的过期策略,避免无限增长:
class StatsCleaner:
"""统计数据清理器"""
def __init__(self, redis_client):
self.redis = redis_client
def set_ttl(self, short_code):
"""设置过期时间"""
pipe = self.redis.pipeline()
# 总点击数:永不过期
# pipe.expire(f"clicks:total:{short_code}", -1)
# 日点击数:保留 90 天
today = datetime.now().strftime("%Y-%m-%d")
pipe.expire(f"clicks:daily:{short_code}:{today}", 86400 * 90)
# 小时点击数:保留 7 天
hour_key = datetime.now().strftime("%Y-%m-%d:%H")
pipe.expire(f"clicks:hourly:{short_code}:{hour_key}", 86400 * 7)
# 设备/国家分布:保留 30 天
pipe.expire(f"clicks:device:{short_code}", 86400 * 30)
pipe.expire(f"clicks:country:{short_code}", 86400 * 30)
# 排行榜:每天重置
pipe.expire("ranking:daily", 86400)
pipe.execute()Redis 与数据库协同
Redis 存实时数据,数据库存历史数据。两者需要定期同步:
┌─────────────────────────┐
│ 实时数据(Redis) │
│ - 最近 24 小时点击量 │
│ - 实时设备/地域分布 │
│ - 独立访客数 │
│ - 热门排行榜 │
└─────────────┬───────────┘
│ 定期同步
┌─────────────┴───────────┐
│ 历史数据(数据库) │
│ - 每日汇总统计 │
│ - 详细点击日志 │
│ - 历史趋势数据 │
└─────────────────────────┘class StatsSyncService:
"""Redis → 数据库 同步服务"""
def daily_sync(self):
"""每日同步:将 Redis 统计数据落库"""
yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
# 获取所有活跃的短链接
active_links = self.redis.zrevrange("ranking:daily", 0, -1)
for short_code in active_links:
# 从 Redis 读取昨日统计
daily_key = f"clicks:daily:{short_code}:{yesterday}"
clicks = int(self.redis.get(daily_key) or 0)
devices = self.redis.hgetall(f"clicks:device:{short_code}")
countries = self.redis.hgetall(f"clicks:country:{short_code}")
# 写入数据库汇总表
db.execute("""
INSERT INTO daily_stats (short_code, date, clicks, devices, countries)
VALUES (?, ?, ?, ?, ?)
""", (short_code, yesterday, clicks,
json.dumps(devices), json.dumps(countries)))
# 清理过期的 Redis Key
self.redis.delete(daily_key)每天凌晨,脚本自动把前一天的统计数据从 Redis 同步到数据库,然后清理 Redis 中的过期 Key。这样既保证了实时性,又保留了历史数据。
下一个问题
实时统计系统运行了一个月,效果很好。用户终于能看到秒级更新的数据了,老板也很满意。
但问题又出现了——日志表已经有 1500 万条记录了。
即使 Redis 承担了实时统计的压力,原始日志数据仍然要写入数据库。查询历史数据越来越慢,磁盘空间也在告急。
我需要想办法解决这个大数据挑战……
想一想
- HyperLogLog 的 0.81% 误差在什么场景下可以接受?什么场景下不能?
- 如果 Redis 统计数据和数据库统计数据不一致,以哪个为准?
- 如何设计一个”秒级”更新的实时大屏?