实时统计

问题:“你的统计是昨天的数据?”

日志追踪系统上线后,我以为一切都很完美。直到有一天,一个用户给我发了封邮件:

“你的统计页面显示的是昨天的数据?我刚才点了十几次,数字一点都没变。能不能实时更新啊?”

我打开统计页面看了看,确实如此。点击数据是批量写入数据库的,每隔 5 秒才刷新一次。更糟糕的是,即使数据写入了数据库,SQL 的 COUNT(*) 查询也越来越慢——日志表已经有十几万条记录了。

用户要的是实时的数字,每点一次就能看到变化。

我需要一个能实时计数的方案。Redis 再合适不过了。

Redis 实时计数

我开始研究 Redis 的数据结构。对于计数这个需求,Redis 提供了几种非常强大的工具:

  • String + INCR:最简单的计数器,原子性递增
  • Hash + HINCRBY:多维度统计,比如按设备、按国家
  • HyperLogLog + PFADD:海量去重统计,12KB 内存就能统计百万独立用户
  • Sorted Set + ZINCRBY:排行榜,比如热门短链接 Top 10

我先从最简单的开始——用 INCR 统计 PV(总点击量):

import redis
from datetime import datetime, timedelta

class RealtimeCounter:
    """简单实时计数器"""

    def __init__(self, redis_client):
        self.redis = redis_client

    def increment(self, short_code):
        """点击计数 +1"""
        self.redis.incr(f"clicks:total:{short_code}")

    def get_count(self, short_code):
        """获取总点击数"""
        count = self.redis.get(f"clicks:total:{short_code}")
        return int(count) if count else 0

太简单了。每次点击调用一次 increment,查询时调用一次 get_count,速度飞快。

但实际业务中,用户需要的不只是总点击数。他们想知道:

  • 今天点击了多少次?
  • 移动端和桌面端的比例?
  • 访客来自哪些国家?
  • 过去 24 小时的点击趋势?

我设计了多维度计数器:

class MultiDimensionCounter:
    """多维度实时计数器"""

    def __init__(self, redis_client):
        self.redis = redis_client

    def track(self, short_code, click_data):
        """记录一次点击的多维度数据"""
        pipe = self.redis.pipeline()

        # 1. 总点击数
        pipe.incr(f"clicks:total:{short_code}")

        # 2. 今日点击数(每天自动重置)
        today = datetime.now().strftime("%Y-%m-%d")
        pipe.incr(f"clicks:daily:{short_code}:{today}")

        # 3. 按设备统计
        device = click_data.get('device', 'unknown')
        pipe.hincrby(f"clicks:device:{short_code}", device, 1)

        # 4. 按国家统计
        country = click_data.get('country', 'unknown')
        pipe.hincrby(f"clicks:country:{short_code}", country, 1)

        # 5. 按浏览器统计
        browser = click_data.get('browser', 'unknown')
        pipe.hincrby(f"clicks:browser:{short_code}", browser, 1)

        # 6. 按小时统计(用于趋势图)
        hour_key = datetime.now().strftime("%Y-%m-%d:%H")
        pipe.incr(f"clicks:hourly:{short_code}:{hour_key}")

        pipe.execute()

    def get_realtime_stats(self, short_code):
        """获取实时统计数据"""
        pipe = self.redis.pipeline()

        # 总点击
        pipe.get(f"clicks:total:{short_code}")

        # 今日点击
        today = datetime.now().strftime("%Y-%m-%d")
        pipe.get(f"clicks:daily:{short_code}:{today}")

        # 设备分布
        pipe.hgetall(f"clicks:device:{short_code}")

        # 国家分布
        pipe.hgetall(f"clicks:country:{short_code}")

        # 浏览器分布
        pipe.hgetall(f"clicks:browser:{short_code}")

        # 最近 24 小时趋势
        now = datetime.now()
        for i in range(24):
            hour = (now - timedelta(hours=i)).strftime("%Y-%m-%d:%H")
            pipe.get(f"clicks:hourly:{short_code}:{hour}")

        results = pipe.execute()

        return {
            'total_clicks': int(results[0] or 0),
            'today_clicks': int(results[1] or 0),
            'devices': results[2],
            'countries': results[3],
            'browsers': results[4],
            'hourly_trend': [int(v or 0) for v in results[5:29]],
        }

Redis 的数据结构大概是这样:

Redis Key 设计:
├── clicks:total:{short_code}        → String    总点击数
├── clicks:daily:{short_code}:{date} → String    日点击数
├── clicks:hourly:{short_code}:{h}   → String    小时点击数
├── clicks:device:{short_code}       → Hash      设备分布
│   ├── mobile → 3500
│   ├── desktop → 2800
│   └── tablet → 700
├── clicks:country:{short_code}      → Hash      国家分布
│   ├── China → 5000
│   ├── USA → 1200
│   └── Japan → 800
└── clicks:browser:{short_code}      → Hash      浏览器分布
    ├── Chrome → 4000
    ├── Safari → 2000
    └── Firefox → 1000

HyperLogLog 统计 UV

用户还想知道”有多少独立访客”访问了短链接,而不仅仅是总点击数。

传统的做法是用 Set 存储每个 IP:

# ❌ 精确但内存消耗大
def track_unique_user(redis_client, short_code, ip):
    redis_client.sadd(f"unique:{short_code}", ip)

def get_unique_count(redis_client, short_code):
    return redis_client.scard(f"unique:{short_code}")

我算了一笔账:100 万独立 IP ≈ 50MB 内存。如果有 10 万条短链接,那就是 5TB 内存。这显然不现实。

Redis 有个神奇的数据结构叫 HyperLogLog——它用概率算法,只需 12KB 内存就能统计百万级独立用户,误差只有 0.81%:

class UniqueVisitorCounter:
    """基于 HyperLogLog 的独立访客计数"""

    def __init__(self, redis_client):
        self.redis = redis_client

    def track(self, short_code, user_id):
        """记录一个独立访客"""
        # HyperLogLog 最多只占 12KB 内存!
        self.redis.pfadd(f"unique:{short_code}", user_id)

    def get_count(self, short_code):
        """获取独立访客数(误差约 0.81%)"""
        return self.redis.pfcount(f"unique:{short_code}")

    def merge(self, short_codes, target_key):
        """合并多个短链接的独立访客"""
        keys = [f"unique:{code}" for code in short_codes]
        self.redis.pfmerge(f"unique:{target_key}", *keys)
        return self.redis.pfcount(f"unique:{target_key}")

Set vs HyperLogLog:

特性SetHyperLogLog
精确度100% 精确0.81% 误差
内存消耗100万用户 ≈ 50MB固定 12KB
适合场景精确统计大规模近似统计
可回查✅ 可以查具体 IP❌ 只知道数量

对于统计场景,0.81% 的误差完全可以接受。用 12KB 换取百万级统计能力,太划算了。

数据仪表盘

实时计数有了,接下来就是展示。我花了一个周末做了一个简单的统计仪表盘。

当我第一次看到数字实时跳动的那一刻,成就感爆棚。每有一次点击,图表就自动更新,那种即时反馈太爽了。

from flask import Flask, jsonify

app = Flask(__name__)

@app.route('/api/stats/<short_code>/realtime')
def realtime_stats(short_code):
    """获取实时统计数据"""

    counter = MultiDimensionCounter(redis_client)
    unique = UniqueVisitorCounter(redis_client)
    ranking = HotLinkRanking(redis_client)

    stats = counter.get_realtime_stats(short_code)
    stats['unique_visitors'] = unique.get_count(short_code)
    stats['rank'] = ranking.get_rank(short_code)

    # 与昨日对比
    yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
    yesterday_clicks = redis_client.get(f"clicks:daily:{short_code}:{yesterday}")
    yesterday_clicks = int(yesterday_clicks) if yesterday_clicks else 0

    if yesterday_clicks > 0:
        growth = (stats['today_clicks'] - yesterday_clicks) / yesterday_clicks * 100
        stats['daily_growth'] = f"{growth:+.1f}%"
    else:
        stats['daily_growth'] = "N/A"

    return jsonify(stats)

API 返回的数据:

{
  "total_clicks": 52847,
  "today_clicks": 1253,
  "unique_visitors": 8921,
  "daily_growth": "+15.3%",
  "rank": 3,
  "devices": {
    "mobile": 6800,
    "desktop": 4200,
    "tablet": 1500
  },
  "countries": {
    "China": 8000,
    "USA": 2500,
    "Japan": 1200
  },
  "hourly_trend": [45, 52, 38, 29, 15, 12, 18, 42, 89, 120, ...]
}

我还加了一个热门排行榜功能,用 Redis 的 Sorted Set 实现:

class HotLinkRanking:
    """热门短链接排行榜"""

    def __init__(self, redis_client):
        self.redis = redis_client

    def increment(self, short_code, amount=1):
        """增加点击数"""
        self.redis.zincrby("ranking:daily", amount, short_code)

    def get_top_links(self, limit=10):
        """获取 Top N 热门短链接"""
        results = self.redis.zrevrange(
            "ranking:daily", 0, limit - 1, withscores=True
        )

        return [
            {'short_code': code, 'clicks': int(score)}
            for code, score in results
        ]

    def get_rank(self, short_code):
        """获取某个短链接的排名"""
        rank = self.redis.zrevrank("ranking:daily", short_code)
        return rank + 1 if rank is not None else -1

每次点击时更新排行,查询时直接 ZREVRANGE,效率极高。

性能优化

仪表盘上线后,用户很满意。但我发现一个问题:每次查询都要从 Redis 读取 20 多个 Key,虽然 Redis 很快,但 Pipeline 请求也有开销。

而且,用户查询统计页面的频率远高于点击频率——大部分时候数字根本没变化,却还在重复查询。

我想到了一个办法:预聚合 + 缓存

class StatsCache:
    """统计数据缓存"""

    def __init__(self, redis_client):
        self.redis = redis_client
        self.cache_ttl = 10  # 缓存 10 秒

    def get_cached_stats(self, short_code):
        """获取缓存的统计数据"""
        cache_key = f"stats:cache:{short_code}"

        cached = self.redis.get(cache_key)
        if cached:
            return json.loads(cached)

        # 缓存未命中,实时计算
        counter = MultiDimensionCounter(self.redis)
        stats = counter.get_realtime_stats(short_code)

        # 写入缓存
        self.redis.setex(cache_key, self.cache_ttl, json.dumps(stats))

        return stats

    def invalidate_cache(self, short_code):
        """让缓存失效(点击时调用)"""
        cache_key = f"stats:cache:{short_code}"
        self.redis.delete(cache_key)

流程变成:

用户查询 → 检查缓存 → 缓存命中直接返回
                     → 缓存未命中 → 计算 → 缓存 → 返回

用户点击 → 更新计数器 → 清除缓存 → 下次查询重新计算

这样,大部分查询都直接命中缓存,Redis 压力骤降。10 秒的延迟对于统计场景完全可以接受——用户感知不到差异。

我还设置了 Redis Key 的过期策略,避免无限增长:

class StatsCleaner:
    """统计数据清理器"""

    def __init__(self, redis_client):
        self.redis = redis_client

    def set_ttl(self, short_code):
        """设置过期时间"""
        pipe = self.redis.pipeline()

        # 总点击数:永不过期
        # pipe.expire(f"clicks:total:{short_code}", -1)

        # 日点击数:保留 90 天
        today = datetime.now().strftime("%Y-%m-%d")
        pipe.expire(f"clicks:daily:{short_code}:{today}", 86400 * 90)

        # 小时点击数:保留 7 天
        hour_key = datetime.now().strftime("%Y-%m-%d:%H")
        pipe.expire(f"clicks:hourly:{short_code}:{hour_key}", 86400 * 7)

        # 设备/国家分布:保留 30 天
        pipe.expire(f"clicks:device:{short_code}", 86400 * 30)
        pipe.expire(f"clicks:country:{short_code}", 86400 * 30)

        # 排行榜:每天重置
        pipe.expire("ranking:daily", 86400)

        pipe.execute()

Redis 与数据库协同

Redis 存实时数据,数据库存历史数据。两者需要定期同步:

           ┌─────────────────────────┐
           │     实时数据(Redis)     │
           │  - 最近 24 小时点击量     │
           │  - 实时设备/地域分布      │
           │  - 独立访客数             │
           │  - 热门排行榜             │
           └─────────────┬───────────┘
                         │ 定期同步
           ┌─────────────┴───────────┐
           │    历史数据(数据库)      │
           │  - 每日汇总统计           │
           │  - 详细点击日志           │
           │  - 历史趋势数据          │
           └─────────────────────────┘
class StatsSyncService:
    """Redis → 数据库 同步服务"""

    def daily_sync(self):
        """每日同步:将 Redis 统计数据落库"""
        yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")

        # 获取所有活跃的短链接
        active_links = self.redis.zrevrange("ranking:daily", 0, -1)

        for short_code in active_links:
            # 从 Redis 读取昨日统计
            daily_key = f"clicks:daily:{short_code}:{yesterday}"
            clicks = int(self.redis.get(daily_key) or 0)

            devices = self.redis.hgetall(f"clicks:device:{short_code}")
            countries = self.redis.hgetall(f"clicks:country:{short_code}")

            # 写入数据库汇总表
            db.execute("""
                INSERT INTO daily_stats (short_code, date, clicks, devices, countries)
                VALUES (?, ?, ?, ?, ?)
            """, (short_code, yesterday, clicks,
                  json.dumps(devices), json.dumps(countries)))

            # 清理过期的 Redis Key
            self.redis.delete(daily_key)

每天凌晨,脚本自动把前一天的统计数据从 Redis 同步到数据库,然后清理 Redis 中的过期 Key。这样既保证了实时性,又保留了历史数据。

下一个问题

实时统计系统运行了一个月,效果很好。用户终于能看到秒级更新的数据了,老板也很满意。

但问题又出现了——日志表已经有 1500 万条记录了。

即使 Redis 承担了实时统计的压力,原始日志数据仍然要写入数据库。查询历史数据越来越慢,磁盘空间也在告急。

我需要想办法解决这个大数据挑战……


想一想

  1. HyperLogLog 的 0.81% 误差在什么场景下可以接受?什么场景下不能?
  2. 如果 Redis 统计数据和数据库统计数据不一致,以哪个为准?
  3. 如何设计一个”秒级”更新的实时大屏?