Redis 计数器

Snowflake 实现了一周,我累了

上节我花了整整一周实现 Snowflake,调试时钟回拨、分配机器 ID、保证线程安全…虽然最终跑起来了,但我心里总有点不安:时钟回拨的隐患始终存在

那天凌晨三点,我盯着代码突然想:我只是想生成一个自增 ID,真的需要这么复杂吗?

Snowflake 的"重":
- 64 位二进制拆分,心累
- 时钟回拨处理,头大
- 机器 ID 分配,麻烦
- 线程安全保证,复杂

有没有更简单的方案?

我打开 Redis 文档,突然眼睛一亮 —— INCR 命令

原来 Redis INCR 天生就是做这个的

Redis 的 INCR 命令将 key 中存储的数字值加 1,并返回新值。关键在于:它是原子操作

我迫不及待地测试了一下:

Redis: SET url_counter 0
Redis: INCR url_counter → 1
Redis: INCR url_counter → 2
Redis: INCR url_counter → 3

  原子操作!多台机器同时 INCR 也不会冲突!

为什么 INCR 是原子的?

我深入研究了一下,发现原理很简单:Redis 是单线程执行命令的,即使多个客户端同时执行 INCR,也是排队一个个加的:

客户端 A: INCR url_counter
客户端 B: INCR url_counter  ← 等待 A 执行完
客户端 C: INCR url_counter  ← 等待 B 执行完

结果:1, 2, 3  ← 完美自增,没有冲突

这不就是我想要的分布式计数器吗?不需要时钟同步,不需要机器 ID,只需要一个 Redis 命令!

极简实现:代码量只有 Snowflake 的十分之一

我立刻写了一个实现,惊讶地发现代码量只有 Snowflake 的十分之一:

import redis

BASE62 = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

class RedisIDGenerator:
    """基于 Redis 计数器的 ID 生成器"""

    def __init__(self, redis_client, counter_key="url:counter"):
        self.redis = redis_client
        self.counter_key = counter_key

    def next_id(self):
        """获取下一个自增 ID"""
        # INCR 是原子操作,线程安全
        new_id = self.redis.incr(self.counter_key)
        return new_id

    def next_short_code(self):
        """获取下一个自增 ID 并转为短链接码"""
        new_id = self.next_id()
        return self._to_base62(new_id)

    def _to_base62(self, num):
        """将数字转为 62 进制"""
        if num == 0:
            return BASE62[0]
        result = ""
        while num > 0:
            result = BASE62[num % 62] + result
            num //= 62
        return result


# 使用示例
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
generator = RedisIDGenerator(r)

# 生成 5 个短链接码
for _ in range(5):
    short_code = generator.next_short_code()
    print(f"短链接码: {short_code}")

# 输出:
# 短链接码: 1
# 短链接码: 2
# 短链接码: 3
# 短链接码: 4
# 短链接码: 5

就这么简单!没有时钟回拨的烦恼,没有机器 ID 的分配,一个 INCR 命令搞定一切。

集成到短链接服务

我把这个方案集成到了短链接服务中:

from flask import Flask, request, jsonify, redirect, abort
import redis
import hashlib

app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

BASE62 = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

def to_base62(num):
    if num == 0:
        return BASE62[0]
    result = ""
    while num > 0:
        result = BASE62[num % 62] + result
        num //= 62
    return result


@app.route('/shorten', methods=['POST'])
def create_short_link():
    """创建短链接"""
    long_url = request.json.get('url')
    if not long_url:
        return jsonify({"error": "URL is required"}), 400

    # 1. 检查是否已存在(去重)
    cache_key = f"url:lookup:{hashlib.md5(long_url.encode()).hexdigest()}"
    existing = redis_client.get(cache_key)
    if existing:
        return jsonify({"short_url": f"https://s.url/{existing}"})

    # 2. 生成新 ID(一行代码!)
    new_id = redis_client.incr("url:counter")

    # 3. 转为短码
    short_code = to_base62(new_id)

    # 4. 存储映射关系
    # 数据库持久化
    db.execute(
        "INSERT INTO url_mapping (id, short_code, long_url) VALUES (?, ?, ?)",
        (new_id, short_code, long_url)
    )

    # Redis 缓存(加速查询)
    redis_client.setex(f"url:{short_code}", 86400, long_url)

    # 去重索引
    redis_client.setex(cache_key, 86400, short_code)

    return jsonify({
        "short_url": f"https://s.url/{short_code}",
        "id": new_id
    }), 201


@app.route('/<short_code>')
def redirect_to_original(short_code):
    """重定向到原始 URL"""
    # 先查缓存
    long_url = redis_client.get(f"url:{short_code}")
    if not long_url:
        # 缓存未命中,查数据库
        long_url = db.query(
            "SELECT long_url FROM url_mapping WHERE short_code = ?",
            short_code
        )
        if long_url:
            redis_client.setex(f"url:{short_code}", 86400, long_url)

    if not long_url:
        abort(404)

    return redirect(long_url, code=302)

整个实现清爽多了!我迫不及待地上线测试,一切正常。但是…

但 Redis 挂了怎么办?

上线第二天,我突然意识到一个问题:Redis 是单点,挂了怎么办?

我的短链接服务完全依赖 Redis 生成 ID,如果 Redis 挂了,整个服务就不可用了。这比 Snowflake 的时钟回拨问题还要严重!

方案一:Redis Sentinel(哨兵模式)

我研究了一下 Redis 的高可用方案,发现了 Sentinel:

┌─────────────┐
│ Redis Master │ ← 主节点,处理所有写操作
└──────┬──────┘
       │ 复制
┌──────┴──────┐
│Redis Slave 1│ ← 从节点,随时准备接管
│Redis Slave 2│
└─────────────┘
       ↑ 监控
┌──────┴──────┐
│  Sentinel   │ × 3  ← 哨兵,监控主节点健康
└─────────────┘

我配置了 Sentinel,代码改动也很小:

from redis.sentinel import Sentinel

# 配置哨兵
sentinel = Sentinel([
    ('sentinel-1', 26379),
    ('sentinel-2', 26379),
    ('sentinel-3', 26379),
], socket_timeout=0.1)

# 获取主节点连接
master = sentinel.master_for('mymaster', socket_timeout=0.1)

# 使用主节点执行 INCR(代码完全不变!)
new_id = master.incr("url:counter")

Sentinel 会自动监控主节点健康,如果主节点挂了,会自动选举一个从节点成为新的主节点。我的代码完全不用改!

方案二:Redis Cluster(集群模式)

如果数据量更大,我还研究了 Redis Cluster:

┌──────────┐  ┌──────────┐  ┌──────────┐
│ Master 1 │  │ Master 2 │  │ Master 3 │
│ slot 0-  │  │slot 5461-│  │slot 10923│
│  5460    │  │  10922   │  │ -16383   │
└────┬─────┘  └────┬─────┘  └────┬─────┘
     │              │              │
┌────┴────┐   ┌────┴────┐   ┌────┴────┐
│Slave 1  │   │Slave 2  │   │Slave 3  │
└─────────┘   └─────────┘   └─────────┘

不过对于短链接服务,单个 Redis + Sentinel 就够用了,Cluster 有点杀鸡用牛刀。

Redis 计数器的持久化问题

配置好高可用后,我又想到一个问题:如果 Redis 重启,计数器会不会丢?

我查了一下 Redis 的持久化机制:

RDB vs AOF

持久化方式数据安全性能影响恢复速度
RDB(快照)可能丢失几分钟数据
AOF(追加日志)最多丢 1 秒
RDB + AOF最多丢 1 秒

如果 Redis 配置了 AOF,最多只丢 1 秒的数据。但对于 ID 生成器来说,即使丢失,也只是少了一小段 ID,不会导致重复。

更安全的做法:启动时校准计数器

为了更保险,我加了一个校准逻辑:启动时从数据库查询最大 ID,确保 Redis 计数器不会比数据库小:

class SafeRedisCounter:
    """安全的 Redis 计数器——防止重启后 ID 重复"""

    def __init__(self, redis_client, db_connection):
        self.redis = redis_client
        self.db = db_connection
        self.initialized = False

    def next_id(self):
        """获取下一个 ID"""
        if not self.initialized:
            self._init_counter()
            self.initialized = True

        return self.redis.incr("url:counter")

    def _init_counter(self):
        """初始化计数器:取数据库最大 ID + 10000(安全余量)"""
        max_db_id = self.db.query("SELECT MAX(id) FROM url_mapping") or 0
        safe_start = max_db_id + 10000  # 加 1 万的安全余量

        # 只有当 Redis 计数器比安全值小时才设置
        current = self.redis.get("url:counter")
        if current is None or int(current) < safe_start:
            self.redis.set("url:counter", safe_start)
            print(f"⚠️ 计数器已校准到 {safe_start}(DB 最大 ID: {max_db_id})")

这样即使 Redis 完全丢失数据,重启后也能从数据库恢复,并且加 1 万的安全余量,绝对不会重复。

性能优化:批量预取

解决了高可用问题,我开始优化性能。Redis INCR 虽然快,但每次生成 ID 都要请求 Redis,网络开销不小。

我想到了一个优化方案:批量预取 ID

class BatchIDGenerator:
    """批量预取 ID,减少 Redis 请求次数"""

    def __init__(self, redis_client, batch_size=1000):
        self.redis = redis_client
        self.batch_size = batch_size
        self.current_id = 0
        self.max_id = 0

    def next_id(self):
        """获取下一个 ID"""
        if self.current_id >= self.max_id:
            self._fetch_batch()

        self.current_id += 1
        return self.current_id

    def _fetch_batch(self):
        """批量预取一批 ID"""
        # INCRBY 原子地增加 batch_size,返回新值
        self.max_id = self.redis.incrby("url:counter", self.batch_size)
        self.current_id = self.max_id - self.batch_size

        print(f"📦 预取了一批 ID:{self.current_id + 1} ~ {self.max_id}")


# 使用示例
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
batch_gen = BatchIDGenerator(r, batch_size=100)

for _ in range(250):
    sid = batch_gen.next_id()
    print(f"ID: {sid}, 短码: {to_base62(sid)}")

# 输出:
# 📦 预取了一批 ID:1 ~ 100
# ID: 1, 短码: 1
# ID: 2, 短码: 2
# ...
# ID: 100, 短码: 1c
# 📦 预取了一批 ID:101 ~ 200
# ID: 101, 短码: 1d
# ...

批量预取后,每 100 个 ID 只需要 1 次 Redis 请求,性能提升了近百倍!

不过这个方案也有个小问题:如果服务重启,预取但未使用的 ID 会丢失。但这只是浪费了一些 ID,不会导致重复,完全可以接受。

Redis 计数器 vs 雪花算法

上线一周后,我对比了两种方案:

特性Redis 计数器雪花算法
实现复杂度⭐ 非常简单⭐⭐⭐ 复杂
依赖外部服务Redis
性能10-50K QPS400 万+/秒
全局递增✅ 严格递增❌ 趋势递增
时钟依赖❌ 无依赖✅ 强依赖
高可用需要 Redis HA天然高可用
ID 长度从 1 开始,很短64 位,较长
机器 ID不需要需要分配
可读性✅ 短码短❌ 短码较长

我的最终选择

我决定在项目中同时支持两种方案

class HybridIDGenerator:
    """混合 ID 生成器:Redis + Snowflake"""

    def __init__(self, redis_client, use_redis=True):
        self.redis_gen = RedisIDGenerator(redis_client) if use_redis else None
        self.snowflake_gen = SnowflakeGenerator(worker_id=1) if not use_redis else None

    def next_id(self):
        if self.redis_gen:
            try:
                return self.redis_gen.next_id()
            except Exception as e:
                # Redis 挂了,降级到 Snowflake
                print(f"Redis failed: {e}, fallback to Snowflake")
                return self.snowflake_gen.next_id()
        else:
            return self.snowflake_gen.next_id()

对于小规模应用(日生成量 < 100 万),Redis 计数器足够了;对于超大规模应用,Snowflake 更合适。

想一想

  1. Redis INCR 的性能瓶颈在哪里?单节点最多能支持多少 QPS?
  2. 如果 Redis 计数器丢失了,怎么保证新生成的 ID 不和已有的冲突?
  3. 批量预取 ID 时,如果服务重启了,预取但未使用的 ID 会怎样?