Redis ZSet 基础
从数据库到 Redis
当我们的订单量突破 10 万/天后,数据库轮询方案开始吃力:
性能瓶颈:
- 数据库查询:每次 100-500ms
- 网络延迟:每次 10-50ms
- 并发限制:数据库连接池有限
- 总吞吐量:~100 任务/秒
目标:
- 查询时间:< 10ms
- 总吞吐量:> 1000 任务/秒经过调研,我们决定尝试 **Redis ZSet(有序集合)**方案。
什么是 ZSet?
Redis ZSet 是 Redis 的 5 种基础数据结构之一,全称为 Sorted Set(有序集合)。
ZSet 的特点
| 特性 | 说明 |
|---|---|
| 有序 | 元素按照 score 排序 |
| 唯一 | member 不重复 |
| 快速 | O(log n) 的插入、删除、查询 |
| 范围查询 | 支持按 score 范围查询 |
ZSet 的数据结构
ZSet 内部使用跳表(Skip List)实现:
跳表结构:
Level 3: 1 ─────────────────────────────► 5 ──────────────────► 9
│ │
Level 2: 1 ──────────► 3 ───────────────► 5 ──────────► 7 ───► 9
│ │ │ │
Level 1: 1 ──► 2 ──► 3 ──► 4 ──► 5 ──► 6 ──► 7 ──► 8 ──► 9
│ │ │ │ │ │ │ │ │
Level 0: 1 ──► 2 ──► 3 ──► 4 ──► 5 ──► 6 ──► 7 ──► 8 ──► 9时间复杂度:
- 插入:O(log n)
- 删除:O(log n)
- 查询范围:O(log n + m),m 为结果数量
ZSet 实现延时队列的原理
核心思想
延时队列 = ZSet + 时间戳
- member: 任务 ID
- score: 执行时间戳数据模型
# 任务数据结构
{
"task_id": "cancel_order_12345",
"task_type": "order_timeout",
"task_data": {
"order_id": 12345,
"user_id": 888
},
"execute_at": 1710505800 # 时间戳
}
# 存储到 ZSet
ZADD delay_queue 1710505800 "cancel_order_12345"基本实现
import redis
import json
import time
class RedisDelayQueue:
"""基于 Redis ZSet 的延时队列"""
def __init__(self, queue_name='delay_queue'):
self.redis = redis.Redis(
host='localhost',
port=6379,
decode_responses=True
)
self.queue_key = f"delay_queue:{queue_name}"
self.task_data_key = f"delay_queue_data:{queue_name}"
def add_task(self, task_id, task_type, task_data, delay_seconds):
"""添加任务"""
# ✅ 第 1 步:计算执行时间
execute_at = int(time.time()) + delay_seconds
# ✅ 第 2 步:存储任务数据
task_info = {
'task_id': task_id,
'task_type': task_type,
'task_data': task_data,
'execute_at': execute_at,
'created_at': int(time.time())
}
self.redis.hset(
self.task_data_key,
task_id,
json.dumps(task_info)
)
# ✅ 第 3 步:添加到 ZSet
self.redis.zadd(
self.queue_key,
{task_id: execute_at}
)
logger.info(
f"添加任务: {task_id}, "
f"类型: {task_type}, "
f"延时: {delay_seconds}s"
)
return task_id
def get_ready_tasks(self, batch_size=100):
"""获取到期任务(批量)"""
now = int(time.time())
# ✅ 查询 score <= now 的任务
task_ids = self.redis.zrangebyscore(
self.queue_key,
0,
now,
start=0,
num=batch_size
)
if not task_ids:
return []
# ✅ 批量获取任务数据
task_data_list = self.redis.hmget(
self.task_data_key,
task_ids
)
tasks = []
for task_id, task_data_json in zip(task_ids, task_data_list):
if task_data_json:
task = json.loads(task_data_json)
tasks.append(task)
return tasks
def pop_ready_task(self):
"""弹出一个到期任务(原子操作)"""
# ✅ 使用 Lua 脚本保证原子性
lua_script = """
local queue_key = KEYS[1]
local task_data_key = KEYS[2]
local now = tonumber(ARGV[1])
-- 查询最早到期的任务
local task_ids = redis.call('ZRANGEBYSCORE', queue_key, 0, now, 'LIMIT', 0, 1)
if #task_ids == 0 then
return nil
end
local task_id = task_ids[1]
-- 从队列中删除
redis.call('ZREM', queue_key, task_id)
-- 获取任务数据
local task_data = redis.call('HGET', task_data_key, task_id)
-- 删除任务数据
redis.call('HDEL', task_data_key, task_id)
return task_data
"""
result = self.redis.eval(
lua_script,
2, # key 的数量
self.queue_key,
self.task_data_key,
int(time.time())
)
if result:
return json.loads(result)
return None
def cancel_task(self, task_id):
"""取消任务"""
# ✅ 从队列删除
self.redis.zrem(self.queue_key, task_id)
# ✅ 删除任务数据
self.redis.hdel(self.task_data_key, task_id)
logger.info(f"取消任务: {task_id}")
def get_task_count(self):
"""获取待处理任务数"""
return self.redis.zcard(self.queue_key)
def get_ready_task_count(self):
"""获取到期任务数"""
now = int(time.time())
return self.redis.zcount(self.queue_key, 0, now)使用示例
# 初始化队列
queue = RedisDelayQueue('orders')
# 添加任务
queue.add_task(
task_id='cancel_order_12345',
task_type='order_timeout',
task_data={'order_id': 12345},
delay_seconds=30 * 60 # 30 分钟
)
# 查询任务数
print(f"待处理任务: {queue.get_task_count()}")
print(f"到期任务: {queue.get_ready_task_count()}")
# 消费任务
def consumer(queue, handler):
"""消费者"""
while True:
task = queue.pop_ready_task()
if task:
try:
handler(task)
logger.info(f"任务完成: {task['task_id']}")
except Exception as e:
logger.error(f"任务失败: {task['task_id']}, {e}")
# 可以将任务重新加入队列
queue.add_task(
task['task_id'],
task['task_type'],
task['task_data'],
60 # 1 分钟后重试
)
else:
time.sleep(0.1) # 没有任务时休眠
# 启动消费者
def order_timeout_handler(task):
"""订单超时处理器"""
order_id = task['task_data']['order_id']
order = Order.get(order_id)
if order.status == 'pending_payment':
order.status = 'cancelled'
order.cancel_reason = 'payment_timeout'
order.save()
# 释放库存
Inventory.release(order.product_id, order.quantity)
# 启动多个消费者
for i in range(10):
threading.Thread(
target=consumer,
args=(queue, order_timeout_handler),
daemon=True
).start()ZSet 延时队列的优缺点
优点
| 优点 | 说明 |
|---|---|
| ✅ 高性能 | O(log n) 的时间复杂度,查询快 |
| ✅ 高精度 | 可以精确到毫秒 |
| ✅ 范围查询 | 支持按时间范围查询 |
| ✅ 易扩展 | 可以增加 Redis 节点 |
| ✅ 支持取消 | O(log n) 删除,效率高 |
缺点
| 缺点 | 说明 | 解决方案 |
|---|---|---|
| ❌ 内存占用 | 所有任务都在内存中 | 分层存储,长延时任务用数据库 |
| ❌ 持久化 | Redis 默认不持久化 | 开启 AOF 或 RDB |
| ❌ 单点故障 | Redis 挂了怎么办 | 使用 Redis 集群或哨兵 |
| ❌ 容量限制 | 受内存大小限制 | 任务分层或使用 Redis Cluster |
性能对比
数据库轮询 vs Redis ZSet
| 维度 | 数据库轮询 | Redis ZSet | 提升 |
|---|---|---|---|
| 查询时间 | 100-500ms | 5-20ms | 10-50x |
| 插入时间 | 10-50ms | 1-5ms | 5-10x |
| 删除时间 | 20-100ms | 1-5ms | 10-100x |
| 并发能力 | ~100 QPS | ~10000 QPS | 100x |
| 精度 | 秒级 | 毫秒级 | - |
| 内存占用 | 低(磁盘) | 高(内存) | - |
性能测试
import time
def test_performance(queue_class, task_count=100000):
"""性能测试"""
queue = queue_class()
# 测试插入性能
start = time.time()
for i in range(task_count):
queue.add_task(
f"task_{i}",
'test',
{'id': i},
60
)
insert_time = time.time() - start
print(f"插入 {task_count} 个任务: {insert_time:.2f}s")
print(f"平均插入时间: {insert_time/task_count*1000:.2f}ms")
# 测试查询性能
time.sleep(60) # 等待任务到期
start = time.time()
count = 0
while True:
task = queue.pop_ready_task()
if task:
count += 1
if count >= task_count:
break
else:
break
query_time = time.time() - start
print(f"查询 {count} 个任务: {query_time:.2f}s")
print(f"平均查询时间: {query_time/count*1000:.2f}ms")
print(f"吞吐量: {count/query_time:.0f} 任务/秒")
# 测试结果(100000 任务)
# 数据库轮询:
# 插入: 15.3s (0.15ms/任务)
# 查询: 8.5s (0.085ms/任务)
# 吞吐量: ~11700 任务/秒
# Redis ZSet:
# 插入: 3.2s (0.032ms/任务)
# 查询: 2.1s (0.021ms/任务)
# 吞吐量: ~47600 任务/秒想一想
思考 1
如何处理 Redis 宕机时的数据丢失问题?
参考答案
问题分析:
Redis 是内存数据库,宕机可能导致数据丢失。
解决方案:
1. 开启 AOF 持久化
# redis.conf
appendonly yes
appendfsync everysec # 每秒同步一次优点: 数据丢失最多 1 秒 缺点: 性能略有下降
2. 双层存储(Redis + 数据库)
class HybridRedisDelayQueue:
"""双层存储的 Redis 延时队列"""
def __init__(self):
self.redis = redis.Redis()
self.db = MySQLDatabase()
def add_task(self, task_id, task_type, task_data, delay_seconds):
"""添加任务:双层写入"""
# ✅ 第 1 步:写入数据库(保证持久化)
self.db.execute("""
INSERT INTO delay_tasks
(task_id, task_type, task_data, execute_at, status)
VALUES (%s, %s, %s, %s, 'pending')
""", [
task_id,
task_type,
json.dumps(task_data),
datetime.now() + timedelta(seconds=delay_seconds)
])
# ✅ 第 2 步:写入 Redis(保证性能)
execute_at = int(time.time()) + delay_seconds
self.redis.zadd('delay_queue', {task_id: execute_at})
def recover_from_db(self):
"""从数据库恢复任务"""
# 查询未完成的任务
tasks = self.db.query("""
SELECT * FROM delay_tasks
WHERE status IN ('pending', 'processing')
""")
for task in tasks:
# 检查 Redis 中是否存在
if not self.redis.zscore('delay_queue', task['task_id']):
# Redis 中不存在,重新添加
self.redis.zadd(
'delay_queue',
{task['task_id']: task['execute_at'].timestamp()}
)
logger.info(f"恢复任务: {task['task_id']}")3. 使用 Redis 哨兵或集群
# Redis 哨兵模式
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 30000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 180000优点: 自动故障转移 缺点: 需要至少 3 个节点
4. 定期备份
class BackupRedisDelayQueue:
"""带备份的 Redis 延时队列"""
def __init__(self):
self.redis = redis.Redis()
self.backup_redis = redis.Redis(host='backup-server')
def add_task(self, task_id, task_type, task_data, delay_seconds):
"""添加任务:同步到备份"""
# 写入主 Redis
self.redis.zadd(
'delay_queue',
{task_id: int(time.time()) + delay_seconds}
)
# ✅ 异步写入备份 Redis
threading.Thread(
target=lambda: self.backup_redis.zadd(
'delay_queue',
{task_id: int(time.time()) + delay_seconds}
),
daemon=True
).start()最佳实践建议:
- ✅ 必须开启 AOF:保证数据持久化
- ✅ 双层存储:Redis + 数据库,性能和可靠性兼顾
- ✅ 使用哨兵/集群:保证高可用
- ✅ 定期备份:防止数据损坏