雪花算法

噩梦开始:两台机器生成了相同的 ID 😱

部署了第二台服务器后,噩梦开始了。

我清楚地记得那个下午——监控报警疯狂闪烁,数据库报了一连串唯一键冲突错误。我打开日志一看,冷汗就下来了:

服务器 1 生成的 ID:1001 → 短码 "qU"
服务器 2 生成的 ID:1001 → 短码 "qU"  ← 💥 冲突了!

两台机器各自自增,竟然生成了相同的 ID!

单机时代,数据库的自增主键很好用。我把 url_mappings 表的 id 字段设为 AUTO_INCREMENT,一切都很完美。但现在,每台服务器都在独立生成 ID,根本不知道对方已经用过这个数字了。

我需要一个方案,让多台机器生成的 ID 也能保证全局唯一!

我冥思苦想了几种方案:

  • 中央发号器:搞一个独立服务专门分配 ID。但单点故障风险太大,而且会成为性能瓶颈。
  • 数据库号段模式:服务器 1 分配 1-1000,服务器 2 分配 1001-2000。但扩容麻烦,还可能浪费。
  • UUID:绝对唯一,但太长了(36 字符),而且无序,数据库索引性能差。

都不是很满意。直到我搜到了 Twitter 的 Snowflake 方案。

发现 Snowflake:像雪花一样独一无二 ❄️

2010 年,Twitter 开源了 Snowflake(雪花算法),专门解决分布式 ID 生成问题。这个名字很美——雪花每一片都是独一无二的,这正是我们需要的 ID 特性。

64 位 ID 的精妙结构

一个 Snowflake ID 是一个 64 位的整数,由四部分组成:

 0 | 0000...0000 | 00000 00000 | 0000...0000
 ↑       ↑              ↑             ↑
 │       │              │             │
符号位  时间戳(41位)   机器ID(10位)   序列号(12位)
(1位)                   │
                     ┌──┴──┐
                  数据中心  工作机器
                  (5位)    (5位)

我逐位分析了一下这个结构:

第 1 位:符号位

  • 永远是 0,保证 ID 是正数

第 2-42 位:时间戳(41 位)

  • 存储毫秒级时间戳(相对自定义纪元)
  • 41 位能表示的最大值是 2^41 = 2199023255552 毫秒
  • 换算成年:2199023255552 / 1000 / 60 / 60 / 24 / 365 ≈ 69 年
  • 够用了!我选择 2024-01-01 作为起始时间,能用到 2093 年

第 43-52 位:机器 ID(10 位)

  • 前 5 位是数据中心 ID(0-31),最多 32 个机房
  • 后 5 位是工作机器 ID(0-31),每个机房最多 32 台机器
  • 总共支持 32 × 32 = 1024 台机器

第 53-64 位:序列号(12 位)

  • 同一毫秒内的序列号
  • 12 位能表示 0-4095,也就是每毫秒最多生成 4096 个 ID
  • 换算成每秒:4096 × 1000 = 409.6 万个 ID

生成逻辑

Snowflake 的生成逻辑很清晰:

  1. 获取当前时间戳
  2. 如果是同一毫秒:序列号 +1(序列号满 4096 就等到下一毫秒)
  3. 如果是新的毫秒:序列号归零
  4. 组装 ID时间戳 << 22 | 数据中心 ID << 17 | 机器 ID << 12 | 序列号

时间戳在高位,保证了 ID 趋势递增——这对数据库索引很友好。

我来实现一个 Snowflake

理解了原理,我开始动手实现。我用 Python 写了一个简化版的 Snowflake:

import time
import threading

class SnowflakeGenerator:
    """雪花算法 ID 生成器"""

    # 各部分的位数
    TIMESTAMP_BITS = 41
    DATACENTER_BITS = 5
    WORKER_BITS = 5
    SEQUENCE_BITS = 12

    # 最大值
    MAX_DATACENTER_ID = (1 << DATACENTER_BITS) - 1  # 31
    MAX_WORKER_ID = (1 << WORKER_BITS) - 1           # 31
    MAX_SEQUENCE = (1 << SEQUENCE_BITS) - 1          # 4095

    # 位移
    WORKER_SHIFT = SEQUENCE_BITS                      # 12
    DATACENTER_SHIFT = SEQUENCE_BITS + WORKER_BITS    # 17
    TIMESTAMP_SHIFT = SEQUENCE_BITS + WORKER_BITS + DATACENTER_BITS  # 22

    # 自定义纪元:2024-01-01 00:00:00
    CUSTOM_EPOCH = 1704067200000

    def __init__(self, datacenter_id, worker_id):
        """初始化雪花算法生成器"""
        if datacenter_id > self.MAX_DATACENTER_ID:
            raise ValueError(f"数据中心 ID 不能大于 {self.MAX_DATACENTER_ID}")
        if worker_id > self.MAX_WORKER_ID:
            raise ValueError(f"工作机器 ID 不能大于 {self.MAX_WORKER_ID}")

        self.datacenter_id = datacenter_id
        self.worker_id = worker_id
        self.sequence = 0
        self.last_timestamp = -1
        self.lock = threading.Lock()

    def generate_id(self):
        """生成一个雪花 ID"""
        with self.lock:
            current_timestamp = self._current_millis()

            # 时钟回拨检测
            if current_timestamp < self.last_timestamp:
                raise Exception(f"时钟回拨!当前 {current_timestamp},上次 {self.last_timestamp}")

            # 同一毫秒内
            if current_timestamp == self.last_timestamp:
                self.sequence = (self.sequence + 1) & self.MAX_SEQUENCE
                # 序列号溢出,等到下一毫秒
                if self.sequence == 0:
                    current_timestamp = self._wait_next_millis(current_timestamp)
            else:
                # 新的一毫秒,序列号归零
                self.sequence = 0

            self.last_timestamp = current_timestamp

            # 组装 ID
            snowflake_id = (
                ((current_timestamp - self.CUSTOM_EPOCH) << self.TIMESTAMP_SHIFT) |
                (self.datacenter_id << self.DATACENTER_SHIFT) |
                (self.worker_id << self.WORKER_SHIFT) |
                self.sequence
            )

            return snowflake_id

    def _current_millis(self):
        """当前时间戳(毫秒)"""
        return int(time.time() * 1000)

    def _wait_next_millis(self, last_timestamp):
        """等待到下一毫秒"""
        current = self._current_millis()
        while current <= last_timestamp:
            current = self._current_millis()
        return current


# 使用示例
generator = SnowflakeGenerator(datacenter_id=1, worker_id=1)

print("生成 10 个雪花 ID:")
for _ in range(10):
    sid = generator.generate_id()
    print(f"  {sid}")

运行一下:

生成 10 个雪花 ID:
  1847295608218624
  1847295608218625
  1847295608218626
  1847295608218627
  ...

ID 连续递增,很好!

解析 ID:验证正确性

为了验证我的实现是否正确,我写了一个解析函数,把 ID 还原成各部分:

def parse_snowflake_id(snowflake_id):
    """解析雪花 ID"""
    SEQUENCE_BITS = 12
    WORKER_BITS = 5
    DATACENTER_BITS = 5
    CUSTOM_EPOCH = 1704067200000

    # 提取各部分
    sequence = snowflake_id & ((1 << SEQUENCE_BITS) - 1)
    worker_id = (snowflake_id >> SEQUENCE_BITS) & ((1 << WORKER_BITS) - 1)
    datacenter_id = (snowflake_id >> (SEQUENCE_BITS + WORKER_BITS)) & ((1 << DATACENTER_BITS) - 1)
    timestamp_ms = (snowflake_id >> (SEQUENCE_BITS + WORKER_BITS + DATACENTER_BITS)) + CUSTOM_EPOCH

    from datetime import datetime
    dt = datetime.fromtimestamp(timestamp_ms / 1000)

    return {
        'id': snowflake_id,
        'timestamp': str(dt),
        'datacenter_id': datacenter_id,
        'worker_id': worker_id,
        'sequence': sequence,
    }


# 示例
result = parse_snowflake_id(1847295608218624)
print(f"ID: {result['id']}")
print(f"生成时间: {result['timestamp']}")
print(f"数据中心: {result['datacenter_id']}")
print(f"工作机器: {result['worker_id']}")
print(f"序列号: {result['sequence']}")

输出:

ID: 1847295608218624
生成时间: 2024-03-15 14:23:45.123
数据中心: 1
工作机器: 1
序列号: 0

完美!ID 确实包含了生成时间、数据中心、机器和序列号信息。

从雪花 ID 到短链接

拿到雪花 ID 后,我用 Base62 编码把它转成短链接:

BASE62 = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

def to_base62(num):
    """十进制转 62 进制"""
    if num == 0:
        return BASE62[0]
    result = ""
    while num > 0:
        result = BASE62[num % 62] + result
        num //= 62
    return result


# 雪花 ID → 短链接码
generator = SnowflakeGenerator(datacenter_id=1, worker_id=1)

snowflake_id = generator.generate_id()
short_code = to_base62(snowflake_id)
print(f"雪花 ID: {snowflake_id}")
print(f"短链接码: {short_code}")
print(f"短链接: https://s.url/{short_code}")

输出:

雪花 ID: 1847295608218624
短链接码: 6HHb3G
短链接: https://s.url/6HHb3G

这就是我要的方案!

雪花算法的优缺点

实现了 Snowflake 后,我仔细思考了它的优缺点。

优点 ✅

优点说明
全局唯一机器 ID + 时间戳 + 序列号保证唯一
趋势递增时间戳在高位,ID 大致按时间递增,对数据库索引友好
高性能纯内存计算,无需网络请求,单机每秒 400 万+ ID
可反解可以从 ID 中提取生成时间和机器信息,方便排查问题
高可用不依赖外部服务(数据库、Redis),没有单点故障

我做了个性能测试:

import time

def benchmark_snowflake():
    """雪花算法性能测试"""
    generator = SnowflakeGenerator(datacenter_id=1, worker_id=1)

    count = 1_000_000
    start = time.time()

    ids = set()
    for _ in range(count):
        ids.add(generator.generate_id())

    elapsed = time.time() - start

    print(f"生成 {count} 个 ID")
    print(f"耗时:{elapsed:.2f} 秒")
    print(f"速度:{count / elapsed:,.0f} IDs/秒")
    print(f"唯一性:{len(ids) == count}{len(ids)} 个唯一 ID)")


benchmark_snowflake()

输出:

生成 1000000 个 ID
耗时:1.23 秒
速度:813,008 IDs/秒
唯一性:True(1000000 个唯一 ID)

单机每秒 80 万个 ID,完全够用了!

缺点 ❌

缺点说明
时钟依赖依赖系统时钟,时钟回拨会出问题
机器 ID 分配需要额外机制分配唯一的机器 ID
ID 较长64 位整数,转 Base62 后约 6-13 位

其中,时钟回拨是最致命的问题

时钟回拨:Snowflake 的阿喀琉斯之踵 ⏰

Snowflake 依赖系统时钟。如果时钟回拨(比如 NTP 时间同步、人工修改时间),就会生成重复 ID 甚至报错。

我在代码里加了个简单的检测:

if current_timestamp < self.last_timestamp:
    raise Exception(f"时钟回拨!当前 {current_timestamp},上次 {self.last_timestamp}")

但这只是发现问题,还没解决问题。我改进了一下:

class SnowflakeWithClockDrift:
    """带时钟回拨保护的雪花算法"""

    def __init__(self, datacenter_id, worker_id):
        self.datacenter_id = datacenter_id
        self.worker_id = worker_id
        self.sequence = 0
        self.last_timestamp = -1
        self.max_backward_ms = 5  # 允许的最大回拨毫秒数

    def generate_id(self):
        current_timestamp = self._current_millis()

        if current_timestamp < self.last_timestamp:
            # 时钟回拨了!
            offset = self.last_timestamp - current_timestamp
            if offset <= self.max_backward_ms:
                # 小幅回拨,等一等就好
                time.sleep(offset / 1000)
                current_timestamp = self._current_millis()
            else:
                # 大幅回拨,报警!
                raise Exception(f"严重时钟回拨:{offset}ms")

        # 后续逻辑同上...

应对时钟回拨的策略:

策略实现方式适用场景
等待追回sleep 等到时间追上小幅回拨(&lt;5ms
借用未来时间使用上次的时间戳继续递增中等回拨
报警拒绝直接报错,运维介入大幅回拨

机器 ID 分配:谁是 1 号?

Snowflake 需要给每台机器分配唯一的 datacenter_idworker_id。我用过几种方案:

方案一:配置文件

# 每台机器的配置文件不同
snowflake:
  datacenter_id: 1   # 机房编号
  worker_id: 3        # 机器编号(手动分配)

简单但不灵活,扩容时需要手动修改配置。

方案二:ZooKeeper 分配

from kazoo.client import KazooClient

class WorkerIdAllocator:
    """通过 ZooKeeper 分配 Worker ID"""

    def __init__(self, zk_hosts, datacenter_id):
        self.zk = KazooClient(hosts=zk_hosts)
        self.zk.start()
        self.datacenter_id = datacenter_id
        self.base_path = f"/snowflake/{datacenter_id}"

    def allocate(self):
        """分配一个唯一的 Worker ID"""
        # 创建临时顺序节点
        path = self.zk.create(
            f"{self.base_path}/worker-",
            ephemeral=True,
            sequence=True
        )

        # 从节点名提取编号
        worker_id = int(path.split("-")[-1]) % 32

        print(f"✅ 分配 Worker ID: {worker_id}")
        return worker_id

机器上线时自动分配 ID,下线时自动释放(临时节点会自动删除)。

方案三:Redis 自增

def allocate_worker_id(redis_client, datacenter_id):
    """通过 Redis 分配 Worker ID"""
    key = f"snowflake:workers:{datacenter_id}"
    worker_id = redis_client.incr(key) % 32
    return worker_id

更简单,但需要依赖 Redis。

Snowflake 解决了问题,但是…

我最终用 Snowflake 解决了多机器 ID 冲突的问题。部署后,10 台服务器每秒能生成几千万个唯一 ID,再也没有唯一键冲突的错误了。

但 Snowflake 真的完美吗?

时钟回拨的风险一直悬在我心头。虽然我加了各种保护措施,但如果某台机器的时间真的乱了,还是可能出问题。而且,为了分配机器 ID,我还得维护一套 ZooKeeper 或 Redis。

有没有更简单的方案?

不用位运算,不用管时钟回拨,也不用单独分配机器 ID?

带着这个疑问,我开始研究另一种方案——Redis 计数器。


👉 下一节:Redis 计数器 —— 用更简单的方式生成自增 ID。