可靠性
那次惨痛的教训
2024 年 4 月 1 日(愚人节,但对我们来说不是玩笑),我们的延时队列服务发生了一起严重事故。
事故经过
时间线:
14:00 - 用户 A 下单,创建 30 分钟后取消订单的任务
14:05 - 用户 A 支付成功,取消超时任务
14:30 - 延时队列服务重启(部署新版本)
14:31 - 用户 A 收到订单被取消的通知 🤯根因分析
经过排查,问题出在这里:
# 😱 有问题的代码
class UnreliableDelayQueue:
def __init__(self):
self.tasks = [] # 内存存储
def add_task(self, task_id, task_data, delay_seconds):
"""添加任务到内存"""
execute_at = time.time() + delay_seconds
self.tasks.append({
'id': task_id,
'data': task_data,
'execute_at': execute_at
})
def cancel_task(self, task_id):
"""取消任务"""
# 😱 从内存删除
self.tasks = [t for t in self.tasks if t['id'] != task_id]
def service_restart(self):
"""服务重启"""
# 😱 内存数据全部丢失!
self.tasks = []问题根源:
- ❌ 任务只存储在内存,无持久化
- ❌ 服务重启后,任务全部丢失
- ❌ 取消任务只是从内存删除,没有记录
- ❌ 重启后,任务不存在,但用户的期望还在
可靠性的三个维度
1. 任务不丢失(No Loss)
任务创建后,无论发生什么情况(服务器崩溃、网络故障、停电),都不能丢失。
2. 任务不重复(No Duplicate)
同一个任务不能被多次执行。如果执行失败需要重试,要有明确的控制。
3. 任务不遗漏(No Miss)
到期的任务必须被执行,不能因为任何原因被遗漏。
可靠性三角
不丢失
/ \
/ \
/_____\
不遗漏 不重复任务不丢失的保障
方案 1:双层存储(数据库 + 内存)
class ReliableDelayQueue:
"""可靠的延时队列:双层存储"""
def __init__(self):
# L1: 数据库(持久化层)
self.db = MySQLDatabase()
# L2: Redis(性能层)
self.redis = redis.Redis()
def add_task(self, task_id, task_data, delay_seconds):
"""添加任务:先持久化,再内存"""
execute_at = datetime.now() + timedelta(seconds=delay_seconds)
# ✅ 第 1 步:写入数据库(强事务保证)
try:
with self.db.transaction():
self.db.execute("""
INSERT INTO delay_tasks
(task_id, task_data, execute_at, status, created_at)
VALUES (%s, %s, %s, 'pending', NOW())
""", (task_id, json.dumps(task_data), execute_at))
except Exception as e:
# 数据库写入失败,任务创建失败
logger.error(f"数据库写入失败: {task_id}, {e}")
raise
# ✅ 第 2 步:写入 Redis(异步,失败不影响主流程)
try:
self.redis.zadd(
'delay_queue',
{task_id: execute_at.timestamp()}
)
except Exception as e:
# Redis 写入失败,记录日志但不阻断
logger.warning(f"Redis 写入失败: {task_id}, {e}")
# 依赖补偿机制定期将数据库任务同步到 Redis
self._mark_for_sync(task_id)
def pop_task(self):
"""弹出任务:先从 Redis,再到数据库"""
# ✅ 第 1 步:从 Redis 弹出
task_id = self._atomic_pop_from_redis()
if task_id:
# ✅ 第 2 步:从数据库查询确认
task = self.db.query("""
SELECT * FROM delay_tasks
WHERE task_id = %s AND status = 'pending'
""", [task_id])
if task:
# ✅ 第 3 步:标记为处理中
self.db.execute("""
UPDATE delay_tasks
SET status = 'processing',
consumer_id = %s,
updated_at = NOW()
WHERE task_id = %s
""", [get_consumer_id(), task_id])
return task
else:
# 😱 数据不一致:Redis 有但数据库没有
logger.warning(f"数据不一致: task_id={task_id}")
self._record_dead_letter(task_id, "redis_db_mismatch")
# ✅ 第 4 步:Redis 没有任务,从数据库查询(兜底)
task = self.db.query("""
SELECT * FROM delay_tasks
WHERE execute_at <= NOW()
AND status = 'pending'
ORDER BY execute_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
""")
if task:
self.db.execute("""
UPDATE delay_tasks
SET status = 'processing',
consumer_id = %s,
updated_at = NOW()
WHERE task_id = %s
""", [get_consumer_id(), task['task_id']])
return task
return None
def _atomic_pop_from_redis(self):
"""从 Redis 原子弹出任务"""
lua_script = """
local tasks = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, 1)
if #tasks > 0 then
local removed = redis.call('ZREM', KEYS[1], tasks[1])
if removed > 0 then
return tasks[1]
end
end
return nil
"""
return self.redis.eval(
lua_script,
1,
'delay_queue',
time.time()
)双层存储的优势:
| 层级 | 存储 | 优点 | 缺点 |
|---|---|---|---|
| L1: 数据库 | 磁盘 | ✅ 强持久化 ✅ 事务保证 ✅ 可靠性高 | ⚠️ 性能较低 |
| L2: Redis | 内存 | ✅ 高性能 ✅ 快速查询 | ⚠️ 需要持久化配置 |
方案 2:WAL(Write-Ahead Log)
使用写前日志确保数据不丢失:
class WALDelayQueue:
"""带 WAL 的延时队列"""
def __init__(self):
self.wal_file = open('/data/delay_queue.wal', 'a')
self.redis = redis.Redis()
# 启动时恢复 WAL
self._recover_from_wal()
def add_task(self, task_id, task_data, delay_seconds):
"""添加任务:先写 WAL"""
execute_at = time.time() + delay_seconds
# ✅ 第 1 步:写 WAL(顺序写,性能高)
log_entry = {
'timestamp': time.time(),
'operation': 'ADD',
'task_id': task_id,
'task_data': task_data,
'execute_at': execute_at
}
self.wal_file.write(json.dumps(log_entry) + '\n')
self.wal_file.flush() # 强制刷盘
# ✅ 第 2 步:写入 Redis
self.redis.zadd('delay_queue', {task_id: execute_at})
# ✅ 第 3 步:WAL 标记为完成
log_entry['status'] = 'COMMITTED'
self.wal_file.write(json.dumps(log_entry) + '\n')
self.wal_file.flush()
def _recover_from_wal(self):
"""从 WAL 恢复数据"""
with open('/data/delay_queue.wal', 'r') as f:
for line in f:
entry = json.loads(line)
if entry['operation'] == 'ADD':
# 检查任务是否已经完成
if entry.get('status') != 'COMMITTED':
# 任务未完成,需要重新添加
self.redis.zadd(
'delay_queue',
{entry['task_id']: entry['execute_at']}
)
logger.info(f"恢复任务: {entry['task_id']}")方案 3:消息队列持久化
使用 Kafka 等持久化消息队列:
class KafkaDelayQueue:
"""基于 Kafka 的延时队列"""
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
# ✅ 启用持久化
acks='all', # 等待所有副本确认
retries=3,
# ✅ 启用幂等性
enable_idempotence=True
)
self.consumer = KafkaConsumer(
'delay_queue',
bootstrap_servers=['localhost:9092'],
# ✅ 从最早的消息开始消费(容错)
auto_offset_reset='earliest',
# ✅ 手动提交 offset(可靠性)
enable_auto_commit=False
)
def add_task(self, task_id, task_data, delay_seconds):
"""添加任务到 Kafka"""
message = {
'task_id': task_id,
'task_data': task_data,
'execute_at': time.time() + delay_seconds
}
# ✅ 同步发送,确认成功
future = self.producer.send(
'delay_queue',
value=json.dumps(message).encode(),
key=task_id.encode()
)
# 等待确认
try:
record_metadata = future.get(timeout=10)
logger.info(
f"任务 {task_id} 已发送: "
f"partition={record_metadata.partition}, "
f"offset={record_metadata.offset}"
)
except Exception as e:
logger.error(f"发送失败: {task_id}, {e}")
raise任务不重复的保障
幂等性设计
确保同一个任务即使被多次调用,结果也是一样的。
class IdempotentExecutor:
"""幂等性执行器"""
def __init__(self):
self.redis = redis.Redis()
def execute(self, task_id, handler):
"""执行任务(带幂等性)"""
# ✅ 第 1 步:检查是否已执行
executed_key = f"executed:{task_id}"
if self.redis.exists(executed_key):
logger.info(f"任务 {task_id} 已执行,跳过")
return
# ✅ 第 2 步:获取分布式锁
lock_key = f"lock:task:{task_id}"
lock_acquired = self.redis.set(
lock_key, '1',
nx=True,
ex=30 # 30 秒超时
)
if not lock_acquired:
logger.info(f"任务 {task_id} 正在被其他消费者处理")
return
try:
# ✅ 第 3 步:再次检查(双重检查)
if self.redis.exists(executed_key):
return
# ✅ 第 4 步:执行业务逻辑
handler()
# ✅ 第 5 步:标记为已执行
self.redis.setex(executed_key, '1', 86400) # 1 天
logger.info(f"任务 {task_id} 执行成功")
finally:
# ✅ 第 6 步:释放锁
self.redis.delete(lock_key)
# 使用示例
def cancel_order_task(order_id):
"""取消订单任务(幂等版本)"""
task_id = f"cancel_order_{order_id}"
def handler():
# 业务逻辑
order = Order.get(order_id)
# ✅ 业务层幂等检查
if order.status == 'cancelled':
return
order.status = 'cancelled'
order.save()
# 释放库存(也要幂等)
Inventory.release(order.product_id, order.quantity)
executor = IdempotentExecutor()
executor.execute(task_id, handler)三层幂等检查
def execute_with_three_layer_idempotency(task_id, handler):
"""三层幂等检查"""
# ✅ 第 1 层:分布式锁(防止并发执行)
lock_key = f"lock:task:{task_id}"
if not acquire_distributed_lock(lock_key):
return
try:
# ✅ 第 2 层:执行标记(防止重复执行)
executed_key = f"executed:{task_id}"
if redis.exists(executed_key):
return
# ✅ 第 3 层:业务状态检查(防止状态不一致)
if not check_business_state(task_id):
return
# 执行业务逻辑
handler()
# 标记为已执行
redis.setex(executed_key, '1', 86400)
finally:
release_distributed_lock(lock_key)任务不遗漏的保障
补偿机制
定期检查是否有遗漏的任务:
class CompensationMechanism:
"""补偿机制"""
def __init__(self):
self.db = MySQLDatabase()
self.redis = redis.Redis()
def compensate(self):
"""补偿:发现并修复遗漏的任务"""
# ✅ 第 1 步:查询数据库中到期但未执行的任务
orphaned_tasks = self.db.query("""
SELECT * FROM delay_tasks
WHERE execute_at <= NOW()
AND status = 'pending'
AND created_at > NOW() - INTERVAL '1 hour'
""")
for task in orphaned_tasks:
logger.warning(f"发现遗漏任务: {task['task_id']}")
# ✅ 第 2 步:重新添加到 Redis
self.redis.zadd(
'delay_queue',
{task['task_id']: task['execute_at'].timestamp()}
)
# ✅ 第 3 步:记录到监控
self.monitor.alert_warning(
f"任务遗漏: task_id={task['task_id']}, "
f"execute_at={task['execute_at']}"
)
def check_stuck_tasks(self):
"""检查卡住的任务"""
# 查询长时间处于 processing 状态的任务
stuck_tasks = self.db.query("""
SELECT * FROM delay_tasks
WHERE status = 'processing'
AND updated_at < NOW() - INTERVAL '10' MINUTE
""")
for task in stuck_tasks:
logger.error(f"任务卡住: {task['task_id']}")
# 重置为 pending,重新执行
self.db.execute("""
UPDATE delay_tasks
SET status = 'pending',
consumer_id = NULL,
retry_count = retry_count + 1,
updated_at = NOW()
WHERE task_id = %s
""", [task['task_id']])心跳机制
消费者定期发送心跳,证明自己还活着:
class ConsumerWithHeartbeat:
"""带心跳的消费者"""
def __init__(self, consumer_id):
self.consumer_id = consumer_id
self.redis = redis.Redis()
self.heartbeat_interval = 10 # 10 秒
def send_heartbeat(self):
"""发送心跳"""
heartbeat_key = f"consumer:heartbeat:{self.consumer_id}"
self.redis.setex(
heartbeat_key,
'1',
self.heartbeat_interval * 3 # 3 倍间隔超时
)
def consume(self, queue, handler):
"""消费任务"""
def heartbeat_loop():
while True:
self.send_heartbeat()
time.sleep(self.heartbeat_interval)
# 启动心跳线程
threading.Thread(target=heartbeat_loop, daemon=True).start()
# 消费任务
while True:
task = queue.pop_task()
if task:
# 检查消费者是否还活着
if not self._is_alive():
logger.error("消费者已超时,停止消费")
break
try:
handler(task)
except Exception as e:
logger.error(f"任务执行失败: {e}")
else:
time.sleep(0.1)
def _is_alive(self):
"""检查消费者是否还活着"""
heartbeat_key = f"consumer:heartbeat:{self.consumer_id}"
return self.redis.exists(heartbeat_key)想一想
思考 1
如果消费者在执行任务时挂了,如何保证任务能被重新执行?
参考答案
问题分析:
消费者在执行任务时挂了,会导致:
- 任务状态停留在
processing - 任务没有完成
- 其他消费者不知道该任务需要重试
解决方案:心跳 + 超时检测
class FailoverAwareDelayQueue:
"""支持故障转移的延时队列"""
def __init__(self):
self.db = MySQLDatabase()
self.redis = redis.Redis()
def pop_task(self):
"""弹出任务"""
task = self.db.query("""
SELECT * FROM delay_tasks
WHERE execute_at <= NOW()
AND status = 'pending'
ORDER BY execute_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
""")
if task:
# ✅ 标记为处理中,记录消费者 ID 和心跳时间
self.db.execute("""
UPDATE delay_tasks
SET status = 'processing',
consumer_id = %s,
heartbeat_at = NOW(),
updated_at = NOW()
WHERE task_id = %s
""", [get_consumer_id(), task['task_id']])
return task
return None
def complete_task(self, task_id):
"""完成任务"""
self.db.execute("""
UPDATE delay_tasks
SET status = 'completed',
completed_at = NOW()
WHERE task_id = %s
""", [task_id])
def send_heartbeat(self, task_id):
"""发送心跳"""
self.db.execute("""
UPDATE delay_tasks
SET heartbeat_at = NOW()
WHERE task_id = %s AND status = 'processing'
""", [task_id])
def detect_dead_tasks(self):
"""检测死掉的任务"""
# 查询心跳超时的任务
dead_tasks = self.db.query("""
SELECT * FROM delay_tasks
WHERE status = 'processing'
AND heartbeat_at < NOW() - INTERVAL '30' SECOND
""")
for task in dead_tasks:
logger.warning(f"检测到死任务: {task['task_id']}")
# 重置为 pending,允许重新执行
self.db.execute("""
UPDATE delay_tasks
SET status = 'pending',
consumer_id = NULL,
heartbeat_at = NULL,
retry_count = retry_count + 1,
updated_at = NOW()
WHERE task_id = %s
""", [task['task_id']])
# 消费者使用
class RobustConsumer:
def __init__(self):
self.queue = FailoverAwareDelayQueue()
def consume(self):
while True:
task = self.queue.pop_task()
if not task:
time.sleep(0.1)
continue
# ✅ 启动心跳线程
stop_event = threading.Event()
def heartbeat_loop():
while not stop_event.is_set():
self.queue.send_heartbeat['task_id']
time.sleep(10)
heartbeat_thread = threading.Thread(
target=heartbeat_loop,
daemon=True
)
heartbeat_thread.start()
try:
# 执行任务
handler(task)
self.queue.complete_task(task['task_id'])
except Exception as e:
logger.error(f"任务执行失败: {e}")
# 不手动重置,依赖心跳超时检测
finally:
stop_event.set()关键要点:
- ✅ 记录心跳时间:
heartbeat_at字段 - ✅ 定期检测死任务:心跳超时 > 30 秒
- ✅ 重置任务状态:从
processing改回pending - ✅ 增加重试次数:记录失败次数