锁机制
为什么需要分布式锁?
在分布式环境下,多个消费者可能同时获取同一个任务。
# 😱 没有锁的情况
consumer_1: pop_task() → 获取 task_1
consumer_2: pop_task() → 也获取 task_1
# 两个消费者都执行了 task_1!Redis 分布式锁
基本实现
class RedisDistributedLock:
"""Redis 分布式锁"""
def __init__(self, redis_client, lock_timeout=10):
self.redis = redis_client
self.lock_timeout = lock_timeout
def acquire(self, lock_key):
"""获取锁"""
# ✅ SETNX + 过期时间(原子操作)
lock_value = str(uuid.uuid4())
acquired = self.redis.set(
lock_key,
lock_value,
nx=True, # 不存在时才设置
ex=self.lock_timeout # 过期时间
)
if acquired:
logger.info(f"获取锁成功: {lock_key}")
return lock_value
return None
def release(self, lock_key, lock_value):
"""释放锁"""
# ✅ Lua 脚本:确保只释放自己的锁
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = self.redis.eval(
lua_script,
1,
lock_key,
lock_value
)
if result:
logger.info(f"释放锁成功: {lock_key}")
else:
logger.warning(f"释放锁失败: {lock_key}")
def __enter__(self):
"""上下文管理器支持"""
lock_key = f"lock:task:{self.task_id}"
self.lock_value = self.acquire(lock_key)
if not self.lock_value:
raise LockAcquireError(lock_key)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器支持"""
lock_key = f"lock:task:{self.task_id}"
self.release(lock_key, self.lock_value)使用示例
class SafeTaskExecutor:
"""带锁的任务执行器"""
def __init__(self):
self.redis = redis.Redis()
self.lock = RedisDistributedLock(self.redis)
def execute_task(self, task_id):
"""执行任务(带锁)"""
lock_key = f"lock:task:{task_id}"
# ✅ 获取锁
lock_value = self.lock.acquire(lock_key)
if not lock_value:
logger.info(f"任务 {task_id} 正在被处理,跳过")
return
try:
# ✅ 检查任务是否已执行
if self._is_task_executed(task_id):
logger.info(f"任务 {task_id} 已执行,跳过")
return
# ✅ 执行任务
self._do_execute(task_id)
# ✅ 标记为已执行
self._mark_executed(task_id)
finally:
# ✅ 释放锁
self.lock.release(lock_key, lock_value)
def execute_task_with_context(self, task_id):
"""使用上下文管理器"""
with self.lock.task_id == task_id:
# 在上下文中自动获取和释放锁
self._do_execute(task_id)数据库悲观锁
SKIP LOCKED
-- 查询并锁定(跳过已锁定的行)
SELECT * FROM delay_tasks
WHERE execute_at <= NOW()
AND status = 'pending'
ORDER BY execute_at ASC
LIMIT 10
FOR UPDATE SKIP LOCKED;class DatabaseLockExecutor:
"""数据库锁执行器"""
def __init__(self, db, consumer_id):
self.db = db
self.consumer_id = consumer_id
def pop_tasks(self, limit=10):
"""弹出任务(使用悲观锁)"""
with transaction.atomic():
# ✅ 查询并锁定
tasks = self.db.query("""
SELECT * FROM delay_tasks
WHERE execute_at <= NOW()
AND status = 'pending'
ORDER BY execute_at ASC
LIMIT %s
FOR UPDATE SKIP LOCKED
""", [limit])
# ✅ 标记为处理中
task_ids = [task['id'] for task in tasks]
if task_ids:
self.db.execute("""
UPDATE delay_tasks
SET status = 'processing',
consumer_id = %s,
updated_at = NOW()
WHERE id IN %s
""", [self.consumer_id, tuple(task_ids)])
return tasks