导航菜单

数据分析

场景

数据量增长后,产品部门需要数据分析。

需求:
- 实时监控 API 调用量
- 统计用户增长趋势
- 分析 API 使用情况
- 生成数据报表

问题:
- MySQL 聚合查询太慢
- 数据量太大,统计困难
- 无法实时返回结果

解决方案

1. 实时统计系统

class RealTimeStats:
    """实时统计系统"""

    def __init__(self, redis_client):
        self.redis = redis_client

    def record_api_call(self, user_id, endpoint, status):
        """记录 API 调用"""

        # 使用 Redis 的 INCR 原子操作
        timestamp = int(time.time())
        minute_key = f'stats:minute:{timestamp // 60}'
        hour_key = f'stats:hour:{timestamp // 3600}'
        day_key = f'stats:day:{timestamp // 86400}'

        # 管道操作(批量执行)
        pipe = self.redis.pipeline()

        # 按时间统计
        pipe.incr(f'{minute_key}:total')
        pipe.incr(f'{hour_key}:total')
        pipe.incr(f'{day_key}:total')

        # 按 API 统计
        pipe.incr(f'{minute_key}:api:{endpoint}')
        pipe.incr(f'{hour_key}:api:{endpoint}')
        pipe.incr(f'{day_key}:api:{endpoint}')

        # 按状态统计
        pipe.incr(f'{day_key}:status:{status}')

        # 按用户统计
        pipe.incr(f'{day_key}:user:{user_id}')

        # 设置过期时间
        pipe.expire(minute_key, 86400 * 7)  # 保留 7 天
        pipe.expire(hour_key, 86400 * 30)   # 保留 30 天
        pipe.expire(day_key, 86400 * 365)   # 保留 1 年

        pipe.execute()

    def get_stats(self, time_range='hour'):
        """获取统计数据"""

        now = int(time.time())

        if time_range == 'minute':
            keys = [
                f'stats:minute:{(now // 60) - i}'
                for i in range(60)  # 最近 60 分钟
            ]
        elif time_range == 'hour':
            keys = [
                f'stats:hour:{(now // 3600) - i}'
                for i in range(24)  # 最近 24 小时
            ]
        elif time_range == 'day':
            keys = [
                f'stats:day:{(now // 86400) - i}'
                for i in range(30)  # 最近 30 天
            ]

        # 获取数据
        pipe = self.redis.pipeline()
        for key in keys:
            pipe.get(f'{key}:total')

        results = pipe.execute()

        # 处理数据
        stats = []
        for i, result in enumerate(results):
            if result:
                stats.append({
                    'time': keys[i].split(':')[-1],
                    'count': int(result)
                })

        return stats

stats = RealTimeStats(redis_client)

# 在 API 调用时记录
@app.route('/api/weather')
@require_api_key
def get_weather():
    # ... 业务逻辑 ...

    # 记录统计
    stats.record_api_call(
        request.user['id'],
        '/api/weather',
        'success'
    )

    return jsonify(result)

2. 数据分析 API

@app.route('/api/admin/stats/overview')
@require_admin
def get_stats_overview():
    """获取统计概览"""

    stats_manager = RealTimeStats(redis_client)

    # 当前时间
    now = int(time.time())

    # 今日数据
    today_key = f'stats:day:{now // 86400}'
    today_calls = int(redis_client.get(f'{today_key}:total') or 0)

    # 本小时数据
    hour_key = f'stats:hour:{now // 3600}'
    hour_calls = int(redis_client.get(f'{hour_key}:total') or 0)

    # 热门 API
    hot_apis = []
    for api in ['weather', 'news', 'stock', 'ip_query']:
        count = int(redis_client.get(f'{today_key}:api:{api}') or 0)
        hot_apis.append({'api': api, 'calls': count})

    hot_apis.sort(key=lambda x: x['calls'], reverse=True)

    return jsonify({
        'today': {
            'calls': today_calls,
            'date': datetime.now().date().isoformat()
        },
        'current_hour': {
            'calls': hour_calls,
            'hour': datetime.now().hour
        },
        'hot_apis': hot_apis[:5]
    })

@app.route('/api/admin/stats/trend')
@require_admin
def get_stats_trend():
    """获取趋势数据"""

    time_range = request.args.get('range', 'day')  # minute, hour, day

    stats_manager = RealTimeStats(redis_client)
    data = stats_manager.get_stats(time_range)

    return jsonify({
        'range': time_range,
        'data': data
    })

3. 数据聚合任务

def aggregate_daily_stats():
    """每日数据聚合"""

    yesterday = (datetime.now() - timedelta(days=1)).date()
    timestamp = int(time.mktime(yesterday.timetuple()))
    day_key = f'stats:day:{timestamp // 86400}'

    # 获取所有统计
    pipe = redis_client.pipeline()

    # 总调用量
    total_calls = pipe.get(f'{day_key}:total')

    # 按 API 统计
    api_stats = {}
    for api in ['weather', 'news', 'stock', 'ip_query']:
        api_stats[api] = pipe.get(f'{day_key}:api:{api}')

    # 按状态统计
    success_count = pipe.get(f'{day_key}:status:success')
    error_count = pipe.get(f'{day_key}:status:error')

    pipe.execute()

    # 生成报表
    report = {
        'date': yesterday.isoformat(),
        'total_calls': int(total_calls or 0),
        'api_breakdown': {
            api: int(count or 0)
            for api, count in api_stats.items()
        },
        'success_rate': (
            int(success_count or 0) /
            (int(success_count or 0) + int(error_count or 0))
            if (success_count or error_count) else 0
        )
    }

    # 存储报表
    redis_client.setex(
        f'report:daily:{yesterday}',
        86400 * 365,
        json.dumps(report)
    )

    return report

# 定时执行:每天凌晨 1 点
scheduler.add_job(
    aggregate_daily_stats,
    CronTrigger(hour=1, minute=0),
    id='daily_stats_aggregation'
)

效果验证

优化前(MySQL 查询)

统计今日调用量:
- 查询时间:5 秒
- 锁表:影响正常业务
- 无法实时返回

优化后(Redis 实时统计)

统计今日调用量:
- 查询时间:<100ms
- 无锁表
- 实时返回

本节小结

✅ 完成的工作:

  • 实现了实时统计系统
  • 实现了数据分析 API
  • 实现了数据聚合任务

✅ 效果:

  • 查询速度提升 50 倍
  • 支持实时数据分析
  • 不影响正常业务

⚠️ 下一步:冷热数据分离

🎯 下一步:冷热数据如何分离?

搜索