导航菜单

定时更新

场景

缓存策略上线后,发现新问题:

用户反馈:
- "为什么你们的股票数据是 5 分钟前的?"
- "新闻更新太慢了"

技术分析:
- 被动缓存:用户访问时才更新
- 热点数据:应该主动更新
- 定时任务:需要任务调度系统

解决方案

引入任务调度系统,主动更新缓存。

任务调度器选择

方案对比

方案优势劣势
cron简单、系统自带不支持分布式
Celery功能强大、分布式复杂、依赖多
APScheduler轻量、易用单机
云函数免运维成本高、有延迟

选择 APScheduler(轻量、满足需求)

APScheduler 实现

安装

pip install apscheduler

基础配置

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger

# 创建调度器
scheduler = BackgroundScheduler()

# 添加任务
def update_stock_data():
    """更新股票数据"""

    # 获取热门股票
    hot_stocks = ['AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA']

    for symbol in hot_stocks:
        try:
            data = call_external_api(
                f'https://stock-api.kuaiyizhi.cn/quote',
                params={'symbol': symbol}
            )

            cache_key = f'stock:{symbol}'
            redis_cache.set(cache_key, data, ttl=300)

            logging.info(f'Updated stock data for {symbol}')

        except Exception as e:
            logging.error(f'Failed to update {symbol}: {e}')

# 添加定时任务:每 5 分钟更新一次
scheduler.add_job(
    update_stock_data,
    CronTrigger(minute='*/5'),
    id='update_stocks',
    name='Update Stock Data'
)

# 启动调度器
scheduler.start()

任务类型

1. 固定频率任务

# 每 10 秒执行一次
scheduler.add_job(
    health_check,
    'interval',
    seconds=10,
    id='health_check'
)

def health_check():
    """健康检查"""
    # 检查各个组件的健康状态
    pass

2. Cron 表达式任务

# 每天凌晨 2 点执行
scheduler.add_job(
    daily_report,
    CronTrigger(hour=2, minute=0),
    id='daily_report'
)

# 每周一上午 9 点执行
scheduler.add_job(
    weekly_report,
    CronTrigger(day_of_week='mon', hour=9, minute=0),
    id='weekly_report'
)

# 每月 1 号凌晨 3 点执行
scheduler.add_job(
    monthly_billing,
    CronTrigger(day=1, hour=3, minute=0),
    id='monthly_billing'
)

3. 一次性任务

# 5 分钟后执行一次
scheduler.add_job(
    one_time_task,
    'date',
    run_date=datetime.now() + timedelta(minutes=5),
    id='one_time'
)

任务管理 API

查看任务列表

@app.route('/admin/scheduled-jobs')
def list_scheduled_jobs():
    """查看所有定时任务"""

    jobs = scheduler.get_jobs()

    result = []
    for job in jobs:
        result.append({
            'id': job.id,
            'name': job.name,
            'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None,
            'trigger': str(job.trigger)
        })

    return jsonify(result)

手动触发任务

@app.route('/admin/jobs/<job_id>/trigger', methods=['POST'])
def trigger_job(job_id):
    """手动触发任务"""

    try:
        job = scheduler.get_job(job_id)
        if job:
            job.modify(next_run_time=datetime.now())
            return jsonify({'message': f'Job {job_id} triggered'})
        return jsonify({'error': 'Job not found'}), 404
    except Exception as e:
        return jsonify({'error': str(e)}), 500

高级任务

热点数据更新

def update_hot_data():
    """更新所有热点数据"""

    # 热门城市天气(每小时)
    hot_cities = ['北京', '上海', '深圳', '广州', '杭州']
    for city in hot_cities:
        update_weather_cache(city)

    # 热门股票(每 5 分钟)
    hot_stocks = ['AAPL', 'GOOGL', 'MSFT']
    for symbol in hot_stocks:
        update_stock_cache(symbol)

    # 热点新闻(每 10 分钟)
    hot_categories = ['general', 'technology', 'business']
    for category in hot_categories:
        update_news_cache(category)

scheduler.add_job(
    update_hot_data,
    CronTrigger(minute='*/5'),
    id='update_hot_data'
)

缓存预热

def warmup_cache():
    """缓存预热"""

    logging.info('Starting cache warmup...')

    # 预加载明天可能查询的数据
    tomorrow = (datetime.now() + timedelta(days=1)).date()

    # 预加载天气预报
    for city in get_all_cities():
        try:
            forecast = get_weather_forecast(city, tomorrow)
            cache_key = f'forecast:{city}:{tomorrow}'
            redis_cache.set(cache_key, forecast, ttl=86400)
        except Exception as e:
            logging.error(f'Failed to warmup {city}: {e}')

    logging.info('Cache warmup completed')

# 每天晚上 11 点执行
scheduler.add_job(
    warmup_cache,
    CronTrigger(hour=23, minute=0),
    id='cache_warmup'
)

数据清理

def cleanup_old_data():
    """清理旧数据"""

    # 清理 90 天前的日志
    cutoff_date = datetime.now() - timedelta(days=90)

    with get_db_connection() as conn:
        cursor = conn.cursor()

        deleted = cursor.execute(
            'DELETE FROM api_logs WHERE created_at < ?',
            (cutoff_date,)
        )

        conn.commit()
        logging.info(f'Deleted {deleted} old log entries')

# 每周日凌晨 3 点执行
scheduler.add_job(
    cleanup_old_data,
    CronTrigger(day_of_week='sun', hour=3, minute=0),
    id='cleanup_logs'
)

任务监控

任务执行日志

class JobLogger:
    """任务执行日志"""

    def __init__(self):
        self.redis = redis_client

    def log_job_start(self, job_id):
        """记录任务开始"""
        key = f'job_log:{job_id}:{datetime.now().isoformat()}'
        self.redis.setex(f'{key}:start', 1, 86400)

    def log_job_end(self, job_id, success=True, error=None):
        """记录任务结束"""
        key = f'job_log:{job_id}:{datetime.now().isoformat()}'
        self.redis.setex(f'{key}:end', 1, 86400)

        if error:
            self.redis.setex(f'{key}:error', error, 86400)

    def get_job_logs(self, job_id, limit=10):
        """获取任务日志"""
        pattern = f'job_log:{job_id}:*'
        keys = self.redis.keys(pattern)

        logs = []
        for key in keys[-limit:]:
            logs.append({
                'time': key.split(':')[-1],
                'start': self.redis.get(f'{key}:start'),
                'end': self.redis.get(f'{key}:end'),
                'error': self.redis.get(f'{key}:error')
            })

        return logs

job_logger = JobLogger()

# 装饰器:自动记录日志
def log_job(job_id):
    def decorator(func):
        def wrapper(*args, **kwargs):
            job_logger.log_job_start(job_id)
            try:
                result = func(*args, **kwargs)
                job_logger.log_job_end(job_id, success=True)
                return result
            except Exception as e:
                job_logger.log_job_end(job_id, success=False, error=str(e))
                raise
        return wrapper
    return decorator

# 使用
@log_job('update_stocks')
def update_stock_data():
    # ... 任务逻辑
    pass

失败重试

from apscheduler.executors.pool import ThreadPoolExecutor

# 配置执行器(支持重试)
executors = {
    'default': ThreadPoolExecutor(max_workers=20)
}

job_defaults = {
    'coalesce': True,  # 合并错过的任务
    'max_instances': 3,  # 最多 3 个实例
    'misfire_grace_time': 60  # 错过任务的宽限时间
}

scheduler = BackgroundScheduler(
    executors=executors,
    job_defaults=job_defaults
)

# 带重试的任务
def update_with_retry(job_id, max_retries=3):
    """带重试的任务"""

    for attempt in range(max_retries):
        try:
            # 执行任务
            result = fetch_and_cache_data()

            # 成功则返回
            logging.info(f'{job_id} succeeded on attempt {attempt + 1}')
            return result

        except Exception as e:
            logging.warning(f'{job_id} failed on attempt {attempt + 1}: {e}')

            if attempt == max_retries - 1:
                # 最后一次尝试也失败了
                logging.error(f'{job_id} failed after {max_retries} attempts')
                raise

            # 等待后重试
            time.sleep(2 ** attempt)  # 指数退避

本节小结

✅ 完成的工作:

  • 引入了 APScheduler 任务调度
  • 实现了多种定时任务
  • 添加了任务监控和日志
  • 实现了失败重试机制

✅ 效果:

  • 热点数据始终保持新鲜
  • 用户体验提升
  • 减少了用户等待时间

⚠️ 新挑战: 用户数突破百万,流量压力巨大

🎯 下一步: 用户数突破百万,需要处理海量请求

搜索