Redis ZSet 基础

从数据库到 Redis

当我们的订单量突破 10 万/天后,数据库轮询方案开始吃力:

性能瓶颈:

- 数据库查询:每次 100-500ms
- 网络延迟:每次 10-50ms
- 并发限制:数据库连接池有限
- 总吞吐量:~100 任务/秒

目标:
- 查询时间:< 10ms
- 总吞吐量:> 1000 任务/秒

经过调研,我们决定尝试 **Redis ZSet(有序集合)**方案。


什么是 ZSet?

Redis ZSet 是 Redis 的 5 种基础数据结构之一,全称为 Sorted Set(有序集合)。

ZSet 的特点

特性说明
有序元素按照 score 排序
唯一member 不重复
快速O(log n) 的插入、删除、查询
范围查询支持按 score 范围查询

ZSet 的数据结构

ZSet 内部使用跳表(Skip List)实现:

跳表结构:
Level 3:  1 ─────────────────────────────► 5 ──────────────────► 9
          │                                 │
Level 2:  1 ──────────► 3 ───────────────► 5 ──────────► 7 ───► 9
          │             │                    │             │
Level 1:  1 ──► 2 ──► 3 ──► 4 ──► 5 ──► 6 ──► 7 ──► 8 ──► 9
          │     │     │     │     │     │     │     │     │
Level 0:  1 ──► 2 ──► 3 ──► 4 ──► 5 ──► 6 ──► 7 ──► 8 ──► 9

时间复杂度:

  • 插入:O(log n)
  • 删除:O(log n)
  • 查询范围:O(log n + m),m 为结果数量

ZSet 实现延时队列的原理

核心思想

延时队列 = ZSet + 时间戳

- member: 任务 ID
- score: 执行时间戳

数据模型

# 任务数据结构
{
    "task_id": "cancel_order_12345",
    "task_type": "order_timeout",
    "task_data": {
        "order_id": 12345,
        "user_id": 888
    },
    "execute_at": 1710505800  # 时间戳
}

# 存储到 ZSet
ZADD delay_queue 1710505800 "cancel_order_12345"

基本实现

import redis
import json
import time

class RedisDelayQueue:
    """基于 Redis ZSet 的延时队列"""
    
    def __init__(self, queue_name='delay_queue'):
        self.redis = redis.Redis(
            host='localhost',
            port=6379,
            decode_responses=True
        )
        self.queue_key = f"delay_queue:{queue_name}"
        self.task_data_key = f"delay_queue_data:{queue_name}"
    
    def add_task(self, task_id, task_type, task_data, delay_seconds):
        """添加任务"""
        # ✅ 第 1 步:计算执行时间
        execute_at = int(time.time()) + delay_seconds
        
        # ✅ 第 2 步:存储任务数据
        task_info = {
            'task_id': task_id,
            'task_type': task_type,
            'task_data': task_data,
            'execute_at': execute_at,
            'created_at': int(time.time())
        }
        
        self.redis.hset(
            self.task_data_key,
            task_id,
            json.dumps(task_info)
        )
        
        # ✅ 第 3 步:添加到 ZSet
        self.redis.zadd(
            self.queue_key,
            {task_id: execute_at}
        )
        
        logger.info(
            f"添加任务: {task_id}, "
            f"类型: {task_type}, "
            f"延时: {delay_seconds}s"
        )
        
        return task_id
    
    def get_ready_tasks(self, batch_size=100):
        """获取到期任务(批量)"""
        now = int(time.time())
        
        # ✅ 查询 score <= now 的任务
        task_ids = self.redis.zrangebyscore(
            self.queue_key,
            0,
            now,
            start=0,
            num=batch_size
        )
        
        if not task_ids:
            return []
        
        # ✅ 批量获取任务数据
        task_data_list = self.redis.hmget(
            self.task_data_key,
            task_ids
        )
        
        tasks = []
        for task_id, task_data_json in zip(task_ids, task_data_list):
            if task_data_json:
                task = json.loads(task_data_json)
                tasks.append(task)
        
        return tasks
    
    def pop_ready_task(self):
        """弹出一个到期任务(原子操作)"""
        # ✅ 使用 Lua 脚本保证原子性
        lua_script = """
        local queue_key = KEYS[1]
        local task_data_key = KEYS[2]
        local now = tonumber(ARGV[1])
        
        -- 查询最早到期的任务
        local task_ids = redis.call('ZRANGEBYSCORE', queue_key, 0, now, 'LIMIT', 0, 1)
        
        if #task_ids == 0 then
            return nil
        end
        
        local task_id = task_ids[1]
        
        -- 从队列中删除
        redis.call('ZREM', queue_key, task_id)
        
        -- 获取任务数据
        local task_data = redis.call('HGET', task_data_key, task_id)
        
        -- 删除任务数据
        redis.call('HDEL', task_data_key, task_id)
        
        return task_data
        """
        
        result = self.redis.eval(
            lua_script,
            2,  # key 的数量
            self.queue_key,
            self.task_data_key,
            int(time.time())
        )
        
        if result:
            return json.loads(result)
        return None
    
    def cancel_task(self, task_id):
        """取消任务"""
        # ✅ 从队列删除
        self.redis.zrem(self.queue_key, task_id)
        
        # ✅ 删除任务数据
        self.redis.hdel(self.task_data_key, task_id)
        
        logger.info(f"取消任务: {task_id}")
    
    def get_task_count(self):
        """获取待处理任务数"""
        return self.redis.zcard(self.queue_key)
    
    def get_ready_task_count(self):
        """获取到期任务数"""
        now = int(time.time())
        return self.redis.zcount(self.queue_key, 0, now)

使用示例

# 初始化队列
queue = RedisDelayQueue('orders')

# 添加任务
queue.add_task(
    task_id='cancel_order_12345',
    task_type='order_timeout',
    task_data={'order_id': 12345},
    delay_seconds=30 * 60  # 30 分钟
)

# 查询任务数
print(f"待处理任务: {queue.get_task_count()}")
print(f"到期任务: {queue.get_ready_task_count()}")

# 消费任务
def consumer(queue, handler):
    """消费者"""
    while True:
        task = queue.pop_ready_task()
        if task:
            try:
                handler(task)
                logger.info(f"任务完成: {task['task_id']}")
            except Exception as e:
                logger.error(f"任务失败: {task['task_id']}, {e}")
                # 可以将任务重新加入队列
                queue.add_task(
                    task['task_id'],
                    task['task_type'],
                    task['task_data'],
                    60  # 1 分钟后重试
                )
        else:
            time.sleep(0.1)  # 没有任务时休眠

# 启动消费者
def order_timeout_handler(task):
    """订单超时处理器"""
    order_id = task['task_data']['order_id']
    
    order = Order.get(order_id)
    if order.status == 'pending_payment':
        order.status = 'cancelled'
        order.cancel_reason = 'payment_timeout'
        order.save()
        
        # 释放库存
        Inventory.release(order.product_id, order.quantity)

# 启动多个消费者
for i in range(10):
    threading.Thread(
        target=consumer,
        args=(queue, order_timeout_handler),
        daemon=True
    ).start()

ZSet 延时队列的优缺点

优点

优点说明
高性能O(log n) 的时间复杂度,查询快
高精度可以精确到毫秒
范围查询支持按时间范围查询
易扩展可以增加 Redis 节点
支持取消O(log n) 删除,效率高

缺点

缺点说明解决方案
内存占用所有任务都在内存中分层存储,长延时任务用数据库
持久化Redis 默认不持久化开启 AOF 或 RDB
单点故障Redis 挂了怎么办使用 Redis 集群或哨兵
容量限制受内存大小限制任务分层或使用 Redis Cluster

性能对比

数据库轮询 vs Redis ZSet

维度数据库轮询Redis ZSet提升
查询时间100-500ms5-20ms10-50x
插入时间10-50ms1-5ms5-10x
删除时间20-100ms1-5ms10-100x
并发能力~100 QPS~10000 QPS100x
精度秒级毫秒级-
内存占用低(磁盘)高(内存)-

性能测试

import time

def test_performance(queue_class, task_count=100000):
    """性能测试"""
    queue = queue_class()
    
    # 测试插入性能
    start = time.time()
    for i in range(task_count):
        queue.add_task(
            f"task_{i}",
            'test',
            {'id': i},
            60
        )
    insert_time = time.time() - start
    
    print(f"插入 {task_count} 个任务: {insert_time:.2f}s")
    print(f"平均插入时间: {insert_time/task_count*1000:.2f}ms")
    
    # 测试查询性能
    time.sleep(60)  # 等待任务到期
    
    start = time.time()
    count = 0
    while True:
        task = queue.pop_ready_task()
        if task:
            count += 1
            if count >= task_count:
                break
        else:
            break
    query_time = time.time() - start
    
    print(f"查询 {count} 个任务: {query_time:.2f}s")
    print(f"平均查询时间: {query_time/count*1000:.2f}ms")
    print(f"吞吐量: {count/query_time:.0f} 任务/秒")

# 测试结果(100000 任务)
# 数据库轮询:
#   插入: 15.3s (0.15ms/任务)
#   查询: 8.5s (0.085ms/任务)
#   吞吐量: ~11700 任务/秒

# Redis ZSet:
#   插入: 3.2s (0.032ms/任务)
#   查询: 2.1s (0.021ms/任务)
#   吞吐量: ~47600 任务/秒

想一想

思考 1

如何处理 Redis 宕机时的数据丢失问题?

参考答案

问题分析:

Redis 是内存数据库,宕机可能导致数据丢失。

解决方案:

1. 开启 AOF 持久化

# redis.conf
appendonly yes
appendfsync everysec  # 每秒同步一次

优点: 数据丢失最多 1 秒 缺点: 性能略有下降


2. 双层存储(Redis + 数据库)

class HybridRedisDelayQueue:
    """双层存储的 Redis 延时队列"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.db = MySQLDatabase()
    
    def add_task(self, task_id, task_type, task_data, delay_seconds):
        """添加任务:双层写入"""
        # ✅ 第 1 步:写入数据库(保证持久化)
        self.db.execute("""
            INSERT INTO delay_tasks 
            (task_id, task_type, task_data, execute_at, status)
            VALUES (%s, %s, %s, %s, 'pending')
        """, [
            task_id,
            task_type,
            json.dumps(task_data),
            datetime.now() + timedelta(seconds=delay_seconds)
        ])
        
        # ✅ 第 2 步:写入 Redis(保证性能)
        execute_at = int(time.time()) + delay_seconds
        self.redis.zadd('delay_queue', {task_id: execute_at})
    
    def recover_from_db(self):
        """从数据库恢复任务"""
        # 查询未完成的任务
        tasks = self.db.query("""
            SELECT * FROM delay_tasks 
            WHERE status IN ('pending', 'processing')
        """)
        
        for task in tasks:
            # 检查 Redis 中是否存在
            if not self.redis.zscore('delay_queue', task['task_id']):
                # Redis 中不存在,重新添加
                self.redis.zadd(
                    'delay_queue',
                    {task['task_id']: task['execute_at'].timestamp()}
                )
                logger.info(f"恢复任务: {task['task_id']}")

3. 使用 Redis 哨兵或集群

# Redis 哨兵模式
sentinel monitor mymaster 127.0.0.1 6379 2
sentinel down-after-milliseconds mymaster 30000
sentinel parallel-syncs mymaster 1
sentinel failover-timeout mymaster 180000

优点: 自动故障转移 缺点: 需要至少 3 个节点


4. 定期备份

class BackupRedisDelayQueue:
    """带备份的 Redis 延时队列"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.backup_redis = redis.Redis(host='backup-server')
    
    def add_task(self, task_id, task_type, task_data, delay_seconds):
        """添加任务:同步到备份"""
        # 写入主 Redis
        self.redis.zadd(
            'delay_queue',
            {task_id: int(time.time()) + delay_seconds}
        )
        
        # ✅ 异步写入备份 Redis
        threading.Thread(
            target=lambda: self.backup_redis.zadd(
                'delay_queue',
                {task_id: int(time.time()) + delay_seconds}
            ),
            daemon=True
        ).start()

最佳实践建议:

  1. 必须开启 AOF:保证数据持久化
  2. 双层存储:Redis + 数据库,性能和可靠性兼顾
  3. 使用哨兵/集群:保证高可用
  4. 定期备份:防止数据损坏