Redis 计数器
Snowflake 实现了一周,我累了
上节我花了整整一周实现 Snowflake,调试时钟回拨、分配机器 ID、保证线程安全…虽然最终跑起来了,但我心里总有点不安:时钟回拨的隐患始终存在。
那天凌晨三点,我盯着代码突然想:我只是想生成一个自增 ID,真的需要这么复杂吗?
Snowflake 的"重":
- 64 位二进制拆分,心累
- 时钟回拨处理,头大
- 机器 ID 分配,麻烦
- 线程安全保证,复杂
有没有更简单的方案?我打开 Redis 文档,突然眼睛一亮 —— INCR 命令!
原来 Redis INCR 天生就是做这个的
Redis 的 INCR 命令将 key 中存储的数字值加 1,并返回新值。关键在于:它是原子操作!
我迫不及待地测试了一下:
Redis: SET url_counter 0
Redis: INCR url_counter → 1
Redis: INCR url_counter → 2
Redis: INCR url_counter → 3
↑
原子操作!多台机器同时 INCR 也不会冲突!为什么 INCR 是原子的?
我深入研究了一下,发现原理很简单:Redis 是单线程执行命令的,即使多个客户端同时执行 INCR,也是排队一个个加的:
客户端 A: INCR url_counter
客户端 B: INCR url_counter ← 等待 A 执行完
客户端 C: INCR url_counter ← 等待 B 执行完
结果:1, 2, 3 ← 完美自增,没有冲突这不就是我想要的分布式计数器吗?不需要时钟同步,不需要机器 ID,只需要一个 Redis 命令!
极简实现:代码量只有 Snowflake 的十分之一
我立刻写了一个实现,惊讶地发现代码量只有 Snowflake 的十分之一:
import redis
BASE62 = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
class RedisIDGenerator:
"""基于 Redis 计数器的 ID 生成器"""
def __init__(self, redis_client, counter_key="url:counter"):
self.redis = redis_client
self.counter_key = counter_key
def next_id(self):
"""获取下一个自增 ID"""
# INCR 是原子操作,线程安全
new_id = self.redis.incr(self.counter_key)
return new_id
def next_short_code(self):
"""获取下一个自增 ID 并转为短链接码"""
new_id = self.next_id()
return self._to_base62(new_id)
def _to_base62(self, num):
"""将数字转为 62 进制"""
if num == 0:
return BASE62[0]
result = ""
while num > 0:
result = BASE62[num % 62] + result
num //= 62
return result
# 使用示例
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
generator = RedisIDGenerator(r)
# 生成 5 个短链接码
for _ in range(5):
short_code = generator.next_short_code()
print(f"短链接码: {short_code}")
# 输出:
# 短链接码: 1
# 短链接码: 2
# 短链接码: 3
# 短链接码: 4
# 短链接码: 5就这么简单!没有时钟回拨的烦恼,没有机器 ID 的分配,一个 INCR 命令搞定一切。
集成到短链接服务
我把这个方案集成到了短链接服务中:
from flask import Flask, request, jsonify, redirect, abort
import redis
import hashlib
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
BASE62 = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
def to_base62(num):
if num == 0:
return BASE62[0]
result = ""
while num > 0:
result = BASE62[num % 62] + result
num //= 62
return result
@app.route('/shorten', methods=['POST'])
def create_short_link():
"""创建短链接"""
long_url = request.json.get('url')
if not long_url:
return jsonify({"error": "URL is required"}), 400
# 1. 检查是否已存在(去重)
cache_key = f"url:lookup:{hashlib.md5(long_url.encode()).hexdigest()}"
existing = redis_client.get(cache_key)
if existing:
return jsonify({"short_url": f"https://s.url/{existing}"})
# 2. 生成新 ID(一行代码!)
new_id = redis_client.incr("url:counter")
# 3. 转为短码
short_code = to_base62(new_id)
# 4. 存储映射关系
# 数据库持久化
db.execute(
"INSERT INTO url_mapping (id, short_code, long_url) VALUES (?, ?, ?)",
(new_id, short_code, long_url)
)
# Redis 缓存(加速查询)
redis_client.setex(f"url:{short_code}", 86400, long_url)
# 去重索引
redis_client.setex(cache_key, 86400, short_code)
return jsonify({
"short_url": f"https://s.url/{short_code}",
"id": new_id
}), 201
@app.route('/<short_code>')
def redirect_to_original(short_code):
"""重定向到原始 URL"""
# 先查缓存
long_url = redis_client.get(f"url:{short_code}")
if not long_url:
# 缓存未命中,查数据库
long_url = db.query(
"SELECT long_url FROM url_mapping WHERE short_code = ?",
short_code
)
if long_url:
redis_client.setex(f"url:{short_code}", 86400, long_url)
if not long_url:
abort(404)
return redirect(long_url, code=302)整个实现清爽多了!我迫不及待地上线测试,一切正常。但是…
但 Redis 挂了怎么办?
上线第二天,我突然意识到一个问题:Redis 是单点,挂了怎么办?
我的短链接服务完全依赖 Redis 生成 ID,如果 Redis 挂了,整个服务就不可用了。这比 Snowflake 的时钟回拨问题还要严重!
方案一:Redis Sentinel(哨兵模式)
我研究了一下 Redis 的高可用方案,发现了 Sentinel:
┌─────────────┐
│ Redis Master │ ← 主节点,处理所有写操作
└──────┬──────┘
│ 复制
┌──────┴──────┐
│Redis Slave 1│ ← 从节点,随时准备接管
│Redis Slave 2│
└─────────────┘
↑ 监控
┌──────┴──────┐
│ Sentinel │ × 3 ← 哨兵,监控主节点健康
└─────────────┘我配置了 Sentinel,代码改动也很小:
from redis.sentinel import Sentinel
# 配置哨兵
sentinel = Sentinel([
('sentinel-1', 26379),
('sentinel-2', 26379),
('sentinel-3', 26379),
], socket_timeout=0.1)
# 获取主节点连接
master = sentinel.master_for('mymaster', socket_timeout=0.1)
# 使用主节点执行 INCR(代码完全不变!)
new_id = master.incr("url:counter")Sentinel 会自动监控主节点健康,如果主节点挂了,会自动选举一个从节点成为新的主节点。我的代码完全不用改!
方案二:Redis Cluster(集群模式)
如果数据量更大,我还研究了 Redis Cluster:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Master 1 │ │ Master 2 │ │ Master 3 │
│ slot 0- │ │slot 5461-│ │slot 10923│
│ 5460 │ │ 10922 │ │ -16383 │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
┌────┴────┐ ┌────┴────┐ ┌────┴────┐
│Slave 1 │ │Slave 2 │ │Slave 3 │
└─────────┘ └─────────┘ └─────────┘不过对于短链接服务,单个 Redis + Sentinel 就够用了,Cluster 有点杀鸡用牛刀。
Redis 计数器的持久化问题
配置好高可用后,我又想到一个问题:如果 Redis 重启,计数器会不会丢?
我查了一下 Redis 的持久化机制:
RDB vs AOF
| 持久化方式 | 数据安全 | 性能影响 | 恢复速度 |
|---|---|---|---|
| RDB(快照) | 可能丢失几分钟数据 | 低 | 快 |
| AOF(追加日志) | 最多丢 1 秒 | 中 | 慢 |
| RDB + AOF | 最多丢 1 秒 | 中 | 中 |
如果 Redis 配置了 AOF,最多只丢 1 秒的数据。但对于 ID 生成器来说,即使丢失,也只是少了一小段 ID,不会导致重复。
更安全的做法:启动时校准计数器
为了更保险,我加了一个校准逻辑:启动时从数据库查询最大 ID,确保 Redis 计数器不会比数据库小:
class SafeRedisCounter:
"""安全的 Redis 计数器——防止重启后 ID 重复"""
def __init__(self, redis_client, db_connection):
self.redis = redis_client
self.db = db_connection
self.initialized = False
def next_id(self):
"""获取下一个 ID"""
if not self.initialized:
self._init_counter()
self.initialized = True
return self.redis.incr("url:counter")
def _init_counter(self):
"""初始化计数器:取数据库最大 ID + 10000(安全余量)"""
max_db_id = self.db.query("SELECT MAX(id) FROM url_mapping") or 0
safe_start = max_db_id + 10000 # 加 1 万的安全余量
# 只有当 Redis 计数器比安全值小时才设置
current = self.redis.get("url:counter")
if current is None or int(current) < safe_start:
self.redis.set("url:counter", safe_start)
print(f"⚠️ 计数器已校准到 {safe_start}(DB 最大 ID: {max_db_id})")这样即使 Redis 完全丢失数据,重启后也能从数据库恢复,并且加 1 万的安全余量,绝对不会重复。
性能优化:批量预取
解决了高可用问题,我开始优化性能。Redis INCR 虽然快,但每次生成 ID 都要请求 Redis,网络开销不小。
我想到了一个优化方案:批量预取 ID。
class BatchIDGenerator:
"""批量预取 ID,减少 Redis 请求次数"""
def __init__(self, redis_client, batch_size=1000):
self.redis = redis_client
self.batch_size = batch_size
self.current_id = 0
self.max_id = 0
def next_id(self):
"""获取下一个 ID"""
if self.current_id >= self.max_id:
self._fetch_batch()
self.current_id += 1
return self.current_id
def _fetch_batch(self):
"""批量预取一批 ID"""
# INCRBY 原子地增加 batch_size,返回新值
self.max_id = self.redis.incrby("url:counter", self.batch_size)
self.current_id = self.max_id - self.batch_size
print(f"📦 预取了一批 ID:{self.current_id + 1} ~ {self.max_id}")
# 使用示例
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
batch_gen = BatchIDGenerator(r, batch_size=100)
for _ in range(250):
sid = batch_gen.next_id()
print(f"ID: {sid}, 短码: {to_base62(sid)}")
# 输出:
# 📦 预取了一批 ID:1 ~ 100
# ID: 1, 短码: 1
# ID: 2, 短码: 2
# ...
# ID: 100, 短码: 1c
# 📦 预取了一批 ID:101 ~ 200
# ID: 101, 短码: 1d
# ...批量预取后,每 100 个 ID 只需要 1 次 Redis 请求,性能提升了近百倍!
不过这个方案也有个小问题:如果服务重启,预取但未使用的 ID 会丢失。但这只是浪费了一些 ID,不会导致重复,完全可以接受。
Redis 计数器 vs 雪花算法
上线一周后,我对比了两种方案:
| 特性 | Redis 计数器 | 雪花算法 |
|---|---|---|
| 实现复杂度 | ⭐ 非常简单 | ⭐⭐⭐ 复杂 |
| 依赖外部服务 | Redis | 无 |
| 性能 | 10-50K QPS | 400 万+/秒 |
| 全局递增 | ✅ 严格递增 | ❌ 趋势递增 |
| 时钟依赖 | ❌ 无依赖 | ✅ 强依赖 |
| 高可用 | 需要 Redis HA | 天然高可用 |
| ID 长度 | 从 1 开始,很短 | 64 位,较长 |
| 机器 ID | 不需要 | 需要分配 |
| 可读性 | ✅ 短码短 | ❌ 短码较长 |
我的最终选择
我决定在项目中同时支持两种方案:
class HybridIDGenerator:
"""混合 ID 生成器:Redis + Snowflake"""
def __init__(self, redis_client, use_redis=True):
self.redis_gen = RedisIDGenerator(redis_client) if use_redis else None
self.snowflake_gen = SnowflakeGenerator(worker_id=1) if not use_redis else None
def next_id(self):
if self.redis_gen:
try:
return self.redis_gen.next_id()
except Exception as e:
# Redis 挂了,降级到 Snowflake
print(f"Redis failed: {e}, fallback to Snowflake")
return self.snowflake_gen.next_id()
else:
return self.snowflake_gen.next_id()对于小规模应用(日生成量 < 100 万),Redis 计数器足够了;对于超大规模应用,Snowflake 更合适。
想一想
- Redis INCR 的性能瓶颈在哪里?单节点最多能支持多少 QPS?
- 如果 Redis 计数器丢失了,怎么保证新生成的 ID 不和已有的冲突?
- 批量预取 ID 时,如果服务重启了,预取但未使用的 ID 会怎样?