导航菜单

缓存策略

场景

新增多个 API 后,缓存策略需要优化。

问题分析:
- 新闻数据:10 分钟更新一次
- 股票数据:实时变化,需要 5 秒缓存
- IP 数据:几乎不变,可以缓存 24 小时
- 天气数据:每小时变化明显

当前问题:
- 所有 API 都使用相同的缓存时间
- 有些数据更新太慢
- 有些数据缓存时间浪费

数据特征分析

按更新频率分类

实时数据(秒级):
- 股票价格
- 汇率

准实时数据(分钟级):
- 新闻列表
- 天气温度

静态数据(小时级):
- IP 归属地
- 城市信息

不变数据(永久):
- 国家列表
- 货币代码

分层缓存策略

L1: 应用内存缓存(最快)

from functools import lru_cache
import threading

class MemoryCache:
    """应用内存缓存"""

    def __init__(self, max_size=1000):
        self.cache = {}
        self.lock = threading.Lock()
        self.max_size = max_size

    def get(self, key):
        with self.lock:
            return self.cache.get(key)

    def set(self, key, value, ttl=60):
        with self.lock:
            # 简单的 LRU
            if len(self.cache) >= self.max_size:
                self.cache.pop(next(iter(self.cache)))

            self.cache[key] = {
                'value': value,
                'expires': time.time() + ttl
            }

    def is_expired(self, key):
        with self.lock:
            data = self.cache.get(key)
            if not data:
                return True
            return time.time() > data['expires']

# 全局内存缓存
memory_cache = MemoryCache()

L2: Redis 缓存(共享)

class RedisCache:
    """Redis 缓存"""

    def __init__(self, redis_client):
        self.redis = redis_client

    def get(self, key):
        value = self.redis.get(key)
        if value:
            return json.loads(value)
        return None

    def set(self, key, value, ttl=3600):
        self.redis.setex(key, ttl, json.dumps(value))

    def delete(self, key):
        self.redis.delete(key)

# 全局 Redis 缓存
redis_cache = RedisCache(redis_client)

智能缓存策略

策略 1: 热点数据预热

def preload_hot_data():
    """预加载热点数据"""

    # 热门城市天气
    hot_cities = ['北京', '上海', '深圳', '广州', '杭州']

    for city in hot_cities:
        try:
            data = call_external_api(
                f'https://weather-api.kuaiyizhi.cn/weather',
                params={'city': city}
            )

            # 缓存到 Redis
            cache_key = f'weather:{city}'
            redis_cache.set(cache_key, data, ttl=3600)

            # 缓存到内存
            memory_cache.set(cache_key, data, ttl=300)

        except Exception as e:
            logging.error(f'Failed to preload {city}: {e}')

# 定时预热:每 10 分钟
# 0 */10 * * * * /usr/bin/python3 /path/to/preload.py

策略 2: 缓存穿透保护

def get_with_cache_protection(key, fetch_func, ttl=3600):
    """带缓存保护的查询"""

    # 查询缓存
    cached = redis_cache.get(key)
    if cached:
        return cached

    # 查询数据库/外部 API
    try:
        data = fetch_func()

        if data is None:
            # 防止缓存穿透:缓存空值
            redis_cache.set(key, {'empty': True}, ttl=60)
            return None

        # 缓存结果
        redis_cache.set(key, data, ttl)
        return data

    except Exception as e:
        # 如果查询失败,尝试返回过期缓存
        old_cached = redis_client.get(key)
        if old_cached:
            logging.warning(f'Using stale cache for {key}')
            return json.loads(old_cached)

        raise

策略 3: 缓存雪崩防护

import random

def set_with_random_ttl(key, value, base_ttl=3600):
    """设置随机 TTL,防止缓存雪崩"""

    # TTL = base_ttl ± 20%
    ttl = int(base_ttl * (0.8 + random.random() * 0.4))

    redis_cache.set(key, value, ttl)

# 使用示例
redis_cache.set('weather:北京', data, ttl=3600)
# 改为
set_with_random_ttl('weather:北京', data, base_ttl=3600)

按数据类型缓存

股票数据(短期缓存)

def handle_stock_api(request):
    """股票 API - 5 秒缓存"""

    symbol = request.args.get('symbol')

    # 先查内存缓存
    mem_key = f'stock:{symbol}'
    cached = memory_cache.get(mem_key)

    if cached and not memory_cache.is_expired(mem_key):
        return jsonify(cached['value'])

    # 再查 Redis 缓存
    redis_key = f'stock:{symbol}'
    redis_cached = redis_cache.get(redis_key)

    if redis_cached:
        # 更新内存缓存
        memory_cache.set(mem_key, redis_cached, ttl=5)
        return jsonify(redis_cached)

    # 调用外部 API
    data = call_external_api(
        f'https://stock-api.kuaiyizhi.cn/quote',
        params={'symbol': symbol}
    )

    # 缓存:Redis 5 秒,内存 5 秒
    redis_cache.set(redis_key, data, ttl=5)
    memory_cache.set(mem_key, data, ttl=5)

    return jsonify(data)

新闻数据(中期缓存)

def handle_news_api(request):
    """新闻 API - 10 分钟缓存"""

    category = request.args.get('category', 'general')

    cache_key = f'news:{category}'

    # 先查 Redis
    cached = redis_cache.get(cache_key)
    if cached:
        return jsonify(cached)

    # 调用外部 API
    data = call_external_api(
        'https://news-api.kuaiyizhi.cn/news',
        params={'category': category}
    )

    # 缓存 10 分钟
    redis_cache.set(cache_key, data, ttl=600)

    return jsonify(data)

IP 数据(长期缓存)

def handle_ip_query_api(request):
    """IP 查询 API - 24 小时缓存"""

    ip = request.args.get('ip') or request.remote_addr

    cache_key = f'ip_query:{ip}'

    # 查询 Redis
    cached = redis_cache.get(cache_key)
    if cached:
        return jsonify(cached)

    # 调用外部 API
    data = call_external_api(
        f'https://ip-api.kuaiyizhi.cn/json/{ip}'
    )

    # 缓存 24 小时
    redis_cache.set(cache_key, data, ttl=86400)

    return jsonify(data)

缓存监控

缓存命中率统计

class CacheMonitor:
    """缓存监控"""

    def __init__(self):
        self.hits = 0
        self.misses = 0

    def record_hit(self):
        self.hits += 1

    def record_miss(self):
        self.misses += 1

    def get_hit_rate(self):
        total = self.hits + self.misses
        if total == 0:
            return 0
        return self.hits / total

cache_monitor = CacheMonitor()

def get_with_monitoring(key, fetch_func, ttl=3600):
    """带监控的缓存查询"""

    # 查询缓存
    cached = redis_cache.get(key)
    if cached:
        cache_monitor.record_hit()
        return cached

    cache_monitor.record_miss()

    # 查询数据源
    data = fetch_func()
    redis_cache.set(key, data, ttl)

    return data

@app.route('/admin/cache-stats')
def cache_stats():
    """缓存统计"""
    return jsonify({
        'hits': cache_monitor.hits,
        'misses': cache_monitor.misses,
        'hit_rate': cache_monitor.get_hit_rate()
    })

本节小结

✅ 完成的工作:

  • 设计了分层缓存策略
  • 实现了智能缓存机制
  • 针对不同数据类型优化
  • 添加了缓存监控

✅ 效果:

  • 缓存命中率提升到 95%+
  • 响应时间降低 60%
  • 外部 API 调用量减少 80%

⚠️ 下一步: 有些数据需要定时更新

🎯 下一步: 需要定时更新缓存,引入任务调度系统

搜索