技术挑战
那次惨痛的事故回顾
在开始深入技术方案之前,让我先回顾一下我们系统遇到的所有问题。这些问题后来都成了我们设计延时队列时的核心挑战。
挑战 1:任务丢失
问题的发现
2024 年 4 月的一天,客服收到了 200 多条投诉:
“我下单后支付了,为什么第二天订单被取消了?!”
我慌了,赶紧查日志。发现:
- 用户 14:00 下单
- 用户 14:05 支付成功
- 服务器 14:30 重启
- 重启后,Redis 中的延时任务数据丢失(我们没有开启 AOF)
- 14:35,系统重新启动,数据库查询时没找到这个任务的记录
- 任务永远丢失了
后果:用户支付成功,但订单还是被取消了,愤怒要求退款。
根本原因分析
任务丢失的原因
1. 内存存储,无持久化
├─ Redis RDB 默认策略:可能丢失最近 5-15 分钟的数据
├─ 服务器重启:内存数据全部清空
└─ 恢复机制缺失:没有从数据库恢复任务的机制
2. 任务创建和执行分离
├─ 创建时:只写入内存(Redis)
├─ 支付时:取消任务(删除内存记录)
├─ 服务器重启:内存清空,任务记录消失
└─ 执行时:无从知晓任务曾经存在
3. 缺少补偿机制
├─ 任务丢失后无法发现
├─ 没有定期校验任务状态
└─ 没有死信队列记录失败任务解决方案思路
class ReliableDelayQueue:
"""可靠的延时队列:持久化 + 补偿"""
def __init__(self):
self.redis = redis.Redis() # 内存层:高性能
self.db = get_database() # 持久层:可靠
def add_task(self, task_id, task_data, delay_seconds):
"""添加任务:双层写入"""
execute_at = time.time() + delay_seconds
# 1. 先写入数据库(保证持久化)
self.db.execute("""
INSERT INTO delay_tasks (task_id, task_data, execute_at, status)
VALUES (%s, %s, %s, 'pending')
""", (task_id, json.dumps(task_data), datetime.fromtimestamp(execute_at)))
# 2. 再写入 Redis(保证高性能)
self.redis.zadd(
'delay_queue',
{task_id: execute_at}
)
def pop_task(self):
"""弹出任务:先删后查"""
# 1. 从 Redis 弹出
task_id = self._atomic_pop_from_redis()
if not task_id:
return None
# 2. 从数据库查询
task = self.db.query("""
SELECT * FROM delay_tasks WHERE task_id = %s AND status = 'pending'
""", [task_id])
if task:
# 3. 标记为处理中
self.db.execute("""
UPDATE delay_tasks SET status = 'processing' WHERE task_id = %s
""", [task_id])
return task
else:
# 😱 Redis 有任务但数据库没有,说明数据不一致
# 记录到死信队列,人工介入
self.db.execute("""
INSERT INTO dead_letter_queue (task_id, reason, created_at)
VALUES (%s, 'redis_db_inconsistency', NOW())
""", [task_id])
return None
def recover_tasks(self):
"""补偿机制:定期检查"""
# 查询超过执行时间但状态还是 pending 的任务
tasks = self.db.query("""
SELECT * FROM delay_tasks
WHERE execute_at <= NOW()
AND status = 'pending'
AND created_at > NOW() - INTERVAL '1 hour'
""")
for task in tasks:
# 重新添加到 Redis
self.redis.zadd(
'delay_queue',
{task['task_id']: task['execute_at'].timestamp()}
)
def start_recovery_worker(self):
"""启动补偿线程"""
def recovery_loop():
while True:
self.recover_tasks()
time.sleep(60) # 每分钟检查一次
threading.Thread(target=recovery_loop, daemon=True).start()关键要点:
- ✅ 双层存储:Redis + 数据库,性能和可靠性兼顾
- ✅ 先持久化后内存:保证数据不丢
- ✅ 补偿机制:定期检查数据一致性
- ✅ 死信队列:记录异常任务,便于排查
挑战 2:任务重复执行
问题的发现
某天运营发现:库存扣减有问题。
正常情况下:
- 用户下单:库存 -1
- 订单取消:库存 +1
但实际数据显示:
- 某个商品的总订单量:1000
- 实际库存扣减:1500
- 多扣了 500 件!
我查日志,发现了问题:同一个”取消订单”任务被执行了多次。
根本原因分析
# 有问题的代码
def cancel_order_task(order_id):
"""取消订单任务"""
# 😱 没有幂等性检查!
order = Order.get(order_id)
# 如果订单已经是 cancelled 状态,还会再执行一次
order.status = 'cancelled'
order.save()
# 库存也会再释放一次
Inventory.release(order.product_id, order.quantity)任务重复执行的原因:
重复执行的场景
1. 消费者重启
├─ 任务刚从队列取出,还没执行完
├─ 消费者挂了,任务没有标记为完成
└─ 消费者重启,又把任务取出来执行
2. 网络问题
├─ 任务执行成功,但确认 ACK 丢失
├─ 队列认为任务还没完成
└─ 又把任务投递给其他消费者
3. 并发竞争
├─ 两个消费者同时弹出同一个任务
├─ 由于锁机制失效
└─ 两个消费者同时执行
4. 重试机制
├─ 任务执行抛异常
├─ 进入重试队列
└─ 但实际上任务已经执行了一部分解决方案:幂等性设计
class IdempotentTaskExecutor:
"""幂等性任务执行器"""
def __init__(self):
self.redis = redis.Redis()
def execute_with_idempotency(self, task_id, handler):
"""带幂等性的任务执行"""
# 1. 检查任务是否已执行
lock_key = f"executed:{task_id}"
# SETNX:如果任务已执行,直接返回
executed = self.redis.set(
lock_key, '1',
nx=True, # 不存在时才设置
ex=3600 # 1 小时后自动过期
)
if not executed:
# 任务已执行,直接返回
logger.info(f"任务 {task_id} 已执行过,跳过")
return
# 2. 执行任务
try:
handler()
logger.info(f"任务 {task_id} 执行成功")
except Exception as e:
# 3. 执行失败,删除执行标记,允许重试
self.redis.delete(lock_key)
logger.error(f"任务 {task_id} 执行失败: {e}")
raise
# 使用示例
def cancel_order_task(order_id):
"""取消订单任务(幂等版本)"""
task_id = f"cancel_order_{order_id}"
def handler():
# 业务逻辑
order = Order.get(order_id)
# 二次检查:订单状态
if order.status == 'cancelled':
return # 已取消,无需再处理
order.status = 'cancelled'
order.cancel_reason = 'payment_timeout'
order.save()
# 释放库存(也要幂等)
Inventory.release(order.product_id, order.quantity)
# 使用幂等性执行器
executor = IdempotentTaskExecutor()
executor.execute_with_idempotency(task_id, handler)幂等性设计的三个层次:
# 层次 1:任务级别(最外层)
def execute_task(task):
# 使用分布式锁确保任务只执行一次
if not acquire_lock(task.id):
return # 被其他消费者执行了
try:
# 层次 2:状态检查
if task.status == 'completed':
return # 任务已完成
# 层次 3:业务状态检查
order = Order.get(task.order_id)
if order.status == 'cancelled':
return # 订单已取消,无需再处理
# 执行业务逻辑
cancel_order(order)
task.status = 'completed'
finally:
release_lock(task.id)挑战 3:执行精度
问题的发现
产品经理跑来找我:
“用户投诉了,他们下单后 29 分钟订单就被取消了,不是说 30 分钟吗?”
我查了一下日志:
- 用户 14:00:30 下单
- 定时任务 14:30:05 执行(应该 14:30:30 执行,但 cron 是每分钟的第 0 秒执行)
- 订单超时判定:14:30:05 - 30 分钟 = 14:00:05
- 因为 14:00:30 > 14:00:05,所以被判定为超时
误差接近 1 分钟!
用户觉得被骗了——说好的 30 分钟,怎么 29 分钟就取消了?
精度问题分析
不同方案的精度对比:
| 方案 | 精度 | 误差来源 | 适用场景 |
|---|---|---|---|
| 数据库轮询 | 分钟级 | 轮询间隔(如 1 分钟) | 低精度要求 |
| Redis ZSet | 毫秒级 | 消费者轮询间隔 | 高精度要求 |
| 时间轮 | 毫秒级 | 时间轮粒度 | 固定间隔延时 |
| 消息队列 | 毫秒级 | 网络延迟、消费延迟 | 高精度要求 |
误差来源分解:
延时执行的实际延时 = 业务指定的延时 + 系统误差
系统误差 = Δ1 + Δ2 + Δ3 + Δ4
Δ1: 调度延迟
└─ 调度器检查队列的间隔(如 1 秒)
Δ2: 消费延迟
└─ 消费器处理任务的耗时
Δ3: 网络延迟
└─ 任务从队列传输到消费者的时间
Δ4: 时钟偏差
└─ 不同服务器的时钟不一致提高精度的方法
方法 1:时间补偿
class CompensatedDelayQueue:
"""带时间补偿的延时队列"""
def __init__(self):
self.redis = redis.Redis()
self.ntp_sync = NTPSync() # NTP 时间同步
def add_task(self, task_id, task_data, delay_seconds):
"""添加任务:计算补偿后的执行时间"""
# 1. 获取系统时间偏差
time_offset = self.ntp_sync.get_offset()
# 2. 计算实际执行时间(补偿系统时间偏差)
now = time.time()
compensated_now = now + time_offset
execute_at = compensated_now + delay_seconds
# 3. 对于短延时任务,增加额外缓冲
if delay_seconds < 3600: # 1 小时内
execute_at += 30 # 增加 30 秒缓冲
# 4. 存储任务
self.redis.zadd(
'delay_queue',
{task_id: execute_at}
)
return execute_at方法 2:优先级队列
class PriorityDelayQueue:
"""优先级队列:紧急任务优先处理"""
def __init__(self):
self.redis = redis.Redis()
def pop_ready_tasks(self, batch_size=100):
"""批量弹出到期任务,按紧急程度排序"""
now = time.time()
# 查询所有到期任务
tasks = self.redis.zrangebyscore(
'delay_queue',
0,
now,
start=0,
num=batch_size
)
# 按剩余时间排序(过期越久的越紧急)
tasks_with_urgency = []
for task in tasks:
task_data = json.loads(task)
execute_at = task_data['execute_at']
overdue = now - execute_at # 过期时间
tasks_with_urgency.append((overdue, task))
# 排序:过期时间长的优先
tasks_with_urgency.sort(key=lambda x: x[0], reverse=True)
# 返回按紧急程度排序的任务列表
return [task for _, task in tasks_with_urgency]方法 3:动态调整轮询间隔
class AdaptivePollingQueue:
"""自适应轮询队列:根据任务量动态调整"""
def __init__(self):
self.redis = redis.Redis()
self.current_interval = 1.0 # 当前轮询间隔(秒)
self.last_task_count = 0
def consume(self, handler):
"""消费任务,动态调整轮询间隔"""
while True:
now = time.time()
# 1. 查询到期任务数量
ready_count = self.redis.zcount('delay_queue', 0, now)
# 2. 根据任务量调整轮询间隔
if ready_count > 100:
# 任务多,加快轮询
self.current_interval = max(0.1, self.current_interval / 2)
elif ready_count < 10:
# 任务少,降低轮询频率
self.current_interval = min(5.0, self.current_interval * 1.5)
# 3. 取出并执行任务
tasks = self.pop_tasks(min(ready_count, 100))
for task in tasks:
handler(task)
# 4. 休眠(动态间隔)
time.sleep(self.current_interval)精度优化效果:
| 优化方法 | 精度提升 | 适用场景 | 成本 |
|---|---|---|---|
| 时间补偿 | ±100ms → ±10ms | 时钟偏差大的环境 | 需要时间同步服务 |
| 优先级队列 | 平均延迟降低 30% | 高并发场景 | 需要排序开销 |
| 动态轮询 | 响应速度提高 50% | 波动明显的场景 | 复杂度增加 |
挑战 4:性能瓶颈
问题的发现
2024 年 6 月 18 日,我们做了一次”618 大促”活动。
活动开始前,我们做了充分的准备:
- 数据库扩容到 32 核 128GB
- Redis 集群扩容到 3 主 3 从
- 消费者增加到 20 个节点
但活动开始 10 分钟后,系统还是挂了。
监控显示:
- 延时任务创建 QPS:8000
- 消费者处理能力:5000
- 任务积压:每秒增加 3000
- Redis 内存占用:90%
- 消费者 CPU:100%
结论:生产速度 > 消费速度,系统雪崩。
性能瓶颈分析
性能瓶颈的三个层面
1. 存储层瓶颈
├─ 数据库:连接池耗尽
├─ Redis:带宽和内存不足
└─ 消息队列:分区不足
2. 网络层瓶颈
├─ 消费者和存储之间的高频交互
├─ 序列化/反序列化开销
└─ 网络带宽限制
3. 计算层瓶颈
├─ 单个消费者处理能力有限
├─ 锁竞争导致性能下降
└─ GC 或内存问题性能优化方案
优化 1:批量处理
class BatchDelayQueue:
"""批量处理:减少 I/O 次数"""
def __init__(self):
self.redis = redis.Redis()
def add_tasks_batch(self, tasks):
"""批量添加任务"""
pipe = self.redis.pipeline()
for task in tasks:
execute_at = time.time() + task['delay']
pipe.zadd(
'delay_queue',
{task['id']: execute_at}
)
pipe.execute()
def consume_batch(self, handler, batch_size=100):
"""批量消费任务"""
while True:
now = time.time()
# 1. 批量查询到期任务
tasks = self.redis.zrangebyscore(
'delay_queue',
0,
now,
start=0,
num=batch_size
)
if not tasks:
time.sleep(0.1)
continue
# 2. 批量删除
pipe = self.redis.pipeline()
for task in tasks:
pipe.zrem('delay_queue', task)
removed_tasks = pipe.execute()
# 3. 批量处理
for i, task in enumerate(tasks):
if removed_tasks[i]:
try:
handler(json.loads(task))
except Exception as e:
# 失败的任务单独处理
self.handle_failed_task(task, e)性能提升:
- 查询次数:1 次 / 100 个任务(减少 100 倍)
- 删除次数:1 次 / 100 个任务(减少 100 倍)
- 网络往返:1 次批量 vs 100 次单次(减少 100 倍)
优化 2:多队列分片
class ShardedDelayQueue:
"""多队列分片:并行处理"""
def __init__(self, shard_count):
self.shard_count = shard_count
def _get_shard(self, task_id):
"""计算任务所属的分片"""
return int(task_id) % self.shard_count
def add_task(self, task_id, task_data, delay_seconds):
"""添加任务到对应分片"""
shard = self._get_shard(task_id)
queue_name = f'delay_queue:shard:{shard}'
execute_at = time.time() + delay_seconds
self.redis.zadd(queue_name, {task_id: execute_at})
def consume(self, shard_id, handler):
"""消费指定分片的任务"""
queue_name = f'delay_queue:shard:{shard_id}'
while True:
now = time.time()
tasks = self.redis.zrangebyscore(queue_name, 0, now)
for task in tasks:
# 尝试删除(原子操作)
removed = self.redis.zrem(queue_name, task)
if removed:
handler(json.loads(task))
time.sleep(0.1)
# 启动多个消费者,每个消费者负责一个分片
shard_count = 10
for shard_id in range(shard_count):
consumer = ShardedDelayQueue(shard_count)
threading.Thread(
target=consumer.consume,
args=(shard_id, task_handler),
daemon=True
).start()性能提升:
- 10 个分片,理论吞吐量提升 10 倍
- 分片之间无竞争,充分利用多核 CPU
- 每个分片独立扩容,灵活调整
优化 3:缓存热点数据
class CachedDelayQueue:
"""缓存优化:减少热点任务的查询压力"""
def __init__(self):
self.redis = redis.Redis()
self.local_cache = {} # 本地缓存
def get_ready_tasks(self):
"""获取到期任务(优先从缓存)"""
now = time.time()
# 1. 先查本地缓存
if 'cached_tasks' in self.local_cache:
cached_tasks = self.local_cache['cached_tasks']
ready_tasks = [
task for task in cached_tasks
if task['execute_at'] <= now
]
if ready_tasks:
return ready_tasks
# 2. 缓存为空,批量加载
tasks = self.redis.zrangebyscore(
'delay_queue',
0,
now + 300, # 提前 5 分钟加载
start=0,
num=1000
)
self.local_cache['cached_tasks'] = [json.loads(t) for t in tasks]
# 3. 返回当前到期的任务
return self.local_cache['cached_tasks']性能提升:
- 热点任务查询减少 99% 的网络 I/O
- 本地缓存命中时,延迟从毫秒级降到微秒级
- 适用于延时时间集中的场景
挑战 5:监控与运维
问题的发现
2024 年 8 月,我们的系统突然出现大量订单超时未取消。
但监控大盘显示:
- 延时队列服务状态:✅ 正常
- 任务创建成功率:✅ 99.9%
- 任务执行成功率:✅ 99.9%
一切都看起来正常!
但实际情况:
- 消费者进程虽然活着,但死锁了
- 任务虽然创建成功,但被错误地路由到错误的队列
- 任务虽然执行成功,但数据库更新失败了
结论:监控指标太粗粒度,无法发现问题。
监控指标设计
我们需要更细粒度的监控:
class DelayQueueMonitor:
"""延时队列监控"""
def __init__(self):
self.metrics = PrometheusMetrics()
self.redis = redis.Redis()
def collect_metrics(self):
"""收集关键指标"""
now = time.time()
# 指标 1:任务积压数
pending_count = self.redis.zcount('delay_queue', now, float('inf'))
self.metrics.gauge('delay_queue_pending', pending_count)
# 指标 2:到期任务数
ready_count = self.redis.zcount('delay_queue', 0, now)
self.metrics.gauge('delay_queue_ready', ready_count)
# 指标 3:任务积压率
backlog_ratio = ready_count / (pending_count + 1)
self.metrics.gauge('delay_queue_backlog_ratio', backlog_ratio)
# 指标 4:平均延时
avg_delay = self._calculate_avg_delay()
self.metrics.gauge('delay_queue_avg_delay', avg_delay)
# 指标 5:P99 延时
p99_delay = self._calculate_p99_delay()
self.metrics.gauge('delay_queue_p99_delay', p99_delay)
# 指标 6:任务处理速率
process_rate = self._calculate_process_rate()
self.metrics.gauge('delay_queue_process_rate', process_rate)
# 指标 7:任务失败率
failure_rate = self._calculate_failure_rate()
self.metrics.gauge('delay_queue_failure_rate', failure_rate)
# 指标 8:消费者健康度
consumer_health = self._check_consumer_health()
self.metrics.gauge('delay_queue_consumer_health', consumer_health)
def _calculate_avg_delay(self):
"""计算平均任务延时"""
now = time.time()
tasks = self.redis.zrangebyscore('delay_queue', 0, now, withscores=True)
if not tasks:
return 0
total_delay = sum(now - score for _, score in tasks)
return total_delay / len(tasks)
def _check_consumer_health(self):
"""检查消费者健康度"""
# 检查消费者最近心跳
last_heartbeat = self.redis.get('consumer:last_heartbeat')
if last_heartbeat:
last_heartbeat_time = float(last_heartbeat)
if time.time() - last_heartbeat_time > 60:
return 0 # 消费者不健康
return 1 # 消费者健康
# 告警规则
alert_rules = [
# 规则 1:任务积压过多
Alert(
name='delay_queue_backlog',
condition='delay_queue_backlog_ratio > 0.5',
severity='warning',
message='任务积压率超过 50%,请检查消费者'
),
# 规则 2:消费者不健康
Alert(
name='delay_queue_consumer_down',
condition='delay_queue_consumer_health == 0',
severity='critical',
message='消费者不健康或已停止,请立即检查'
),
# 规则 3:任务失败率过高
Alert(
name='delay_queue_high_failure_rate',
condition='delay_queue_failure_rate > 0.01',
severity='warning',
message='任务失败率超过 1%,请检查任务执行逻辑'
),
]核心监控指标:
| 指标 | 含义 | 告警阈值 | 严重程度 |
|---|---|---|---|
| pending_count | 待处理任务数 | > 10000 | Warning |
| ready_count | 到期待执行任务数 | > 1000 | Critical |
| backlog_ratio | 积压率 | > 50% | Warning |
| avg_delay | 平均执行延迟 | > 60s | Warning |
| p99_delay | P99 执行延迟 | > 300s | Critical |
| process_rate | 任务处理速率 | < 创建速率的 80% | Warning |
| failure_rate | 任务失败率 | > 1% | Warning |
| consumer_health | 消费者健康度 | 0(不健康) | Critical |
想一想
思考 1
如果你的系统要求任务绝对不能丢,你会采用什么方案?请从持久化、补偿、重试三个维度设计。
参考答案
需求分析:
“绝对不能丢”意味着:
- 服务器重启时,任务不能丢
- 数据库故障时,任务不能丢
- 网络故障时,任务不能丢
- 消费者崩溃时,任务不能丢
方案设计:三层可靠性保障
class UltimateReliableDelayQueue:
"""终极可靠的延时队列"""
def __init__(self):
# L1: 主存储(数据库)
self.primary_db = MySQLDatabase()
# L2: 高性能存储(Redis)
self.cache_redis = redis.Redis()
# L3: 备份存储(消息队列)
self.backup_mq = KafkaProducer()
# 监控和告警
self.monitor = DelayQueueMonitor()
def add_task(self, task_id, task_data, delay_seconds):
"""添加任务:三层写入"""
execute_at = datetime.now() + timedelta(seconds=delay_seconds)
# L1: 写入数据库(强事务保证)
try:
with self.primary_db.transaction():
self.primary_db.execute("""
INSERT INTO delay_tasks
(task_id, task_data, execute_at, status, created_at)
VALUES (%s, %s, %s, 'pending', NOW())
""", (task_id, json.dumps(task_data), execute_at))
except Exception as e:
# 数据库写入失败,立即告警
self.monitor.alert_critical(f"数据库写入失败: {task_id}, {e}")
raise
# L2: 写入 Redis(异步,失败不影响主流程)
try:
self.cache_redis.zadd(
'delay_queue',
{task_id: execute_at.timestamp()}
)
except Exception as e:
# Redis 失败,记录日志但不阻断
logger.warning(f"Redis 写入失败: {task_id}, {e}")
# L3: 发送到备份 MQ(异步,最终一致性)
try:
self.backup_mq.send(
topic='delay_queue_backup',
value={
'task_id': task_id,
'task_data': task_data,
'execute_at': execute_at.isoformat()
}
)
except Exception as e:
logger.warning(f"MQ 发送失败: {task_id}, {e}")
def pop_task(self):
"""弹出任务:L1 + L2 协同"""
# 1. 优先从 Redis 弹出(高性能)
task_id = self._atomic_pop_from_redis()
if task_id:
# 2. 从数据库查询确认
task = self.primary_db.query("""
SELECT * FROM delay_tasks
WHERE task_id = %s AND status = 'pending'
""", [task_id])
if task:
# 3. 标记为处理中
self.primary_db.execute("""
UPDATE delay_tasks
SET status = 'processing',
consumer_id = %s,
updated_at = NOW()
WHERE task_id = %s
""", [get_consumer_id(), task_id])
return task
else:
# 😱 数据不一致:Redis 有但数据库没有
self.monitor.alert_warning(
f"数据不一致: task_id={task_id}, Redis 有但数据库无"
)
# 4. Redis 没有任务,从数据库查询(兜底)
task = self.primary_db.query("""
SELECT * FROM delay_tasks
WHERE execute_at <= NOW()
AND status = 'pending'
ORDER BY execute_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
""")
if task:
# 标记为处理中
self.primary_db.execute("""
UPDATE delay_tasks
SET status = 'processing',
consumer_id = %s,
updated_at = NOW()
WHERE task_id = %s
""", [get_consumer_id(), task['task_id']])
return task
return None
def complete_task(self, task_id):
"""完成任务"""
self.primary_db.execute("""
UPDATE delay_tasks
SET status = 'completed',
completed_at = NOW()
WHERE task_id = %s
""", [task_id])
def fail_task(self, task_id, error_message):
"""任务失败"""
task = self.primary_db.query("""
SELECT * FROM delay_tasks WHERE task_id = %s
""", [task_id])
if task['retry_count'] >= task['max_retries']:
# 超过重试次数,标记为失败
self.primary_db.execute("""
UPDATE delay_tasks
SET status = 'failed',
error_message = %s,
updated_at = NOW()
WHERE task_id = %s
""", [error_message, task_id])
# 发送到死信队列
self._send_to_dead_letter_queue(task)
else:
# 增加重试次数,重新排队
self.primary_db.execute("""
UPDATE delay_tasks
SET retry_count = retry_count + 1,
status = 'pending',
execute_at = NOW() + INTERVAL '5' MINUTE,
updated_at = NOW()
WHERE task_id = %s
""", [task_id])
# 重新添加到 Redis
self.cache_redis.zadd(
'delay_queue',
{task_id: task['execute_at'].timestamp()}
)
def reconcile(self):
"""定期校验:发现和修复数据不一致"""
# 1. 检查数据库中有但 Redis 中没有的任务
orphaned_tasks = self.primary_db.query("""
SELECT * FROM delay_tasks
WHERE status IN ('pending', 'processing')
AND created_at > NOW() - INTERVAL '1 hour'
""")
for task in orphaned_tasks:
# 检查 Redis 中是否存在
if not self.cache_redis.zscore('delay_queue', task['task_id']):
# Redis 中没有,说明丢失了,重新添加
logger.warning(f"发现孤立任务: {task['task_id']}")
self.cache_redis.zadd(
'delay_queue',
{task['task_id']: task['execute_at'].timestamp()}
)
# 2. 检查超时未处理的任务
timeout_tasks = self.primary_db.query("""
SELECT * FROM delay_tasks
WHERE status = 'processing'
AND updated_at < NOW() - INTERVAL '10' MINUTE
""")
for task in timeout_tasks:
# 处理超时,可能消费者挂了
logger.error(f"处理超时: {task['task_id']}")
self.fail_task(
task['task_id'],
f"处理超时: last_update={task['updated_at']}"
)
def start_reconciliation_worker(self):
"""启动校验线程"""
def reconcile_loop():
while True:
try:
self.reconcile()
except Exception as e:
logger.error(f"校验失败: {e}")
time.sleep(60) # 每分钟校验一次
threading.Thread(target=reconcile_loop, daemon=True).start()可靠性保障总结:
| 保障层次 | 技术 | 可靠性 | 性能 |
|---|---|---|---|
| L1: 数据库 | 事务 + 日志 | ⭐⭐⭐⭐⭐ | ⭐⭐ |
| L2: Redis | ZSet + 持久化 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| L3: 备份 MQ | 持久化队列 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ |
| 补偿机制 | 定期校验 | ⭐⭐⭐⭐ | ⭐⭐ |
| 重试机制 | 指数退避 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ |
最佳实践:
- 数据永远先写数据库,再写缓存
- 所有操作都要有日志,便于故障排查
- 定期进行数据校验,发现不一致及时修复
- 设置合理的重试策略,避免无限重试
- 监控所有关键指标,及时发现问题