锁机制

为什么需要分布式锁?

在分布式环境下,多个消费者可能同时获取同一个任务。

# 😱 没有锁的情况
consumer_1: pop_task() → 获取 task_1
consumer_2: pop_task() → 也获取 task_1
# 两个消费者都执行了 task_1!

Redis 分布式锁

基本实现

class RedisDistributedLock:
    """Redis 分布式锁"""
    
    def __init__(self, redis_client, lock_timeout=10):
        self.redis = redis_client
        self.lock_timeout = lock_timeout
    
    def acquire(self, lock_key):
        """获取锁"""
        # ✅ SETNX + 过期时间(原子操作)
        lock_value = str(uuid.uuid4())
        
        acquired = self.redis.set(
            lock_key,
            lock_value,
            nx=True,  # 不存在时才设置
            ex=self.lock_timeout  # 过期时间
        )
        
        if acquired:
            logger.info(f"获取锁成功: {lock_key}")
            return lock_value
        
        return None
    
    def release(self, lock_key, lock_value):
        """释放锁"""
        # ✅ Lua 脚本:确保只释放自己的锁
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        
        result = self.redis.eval(
            lua_script,
            1,
            lock_key,
            lock_value
        )
        
        if result:
            logger.info(f"释放锁成功: {lock_key}")
        else:
            logger.warning(f"释放锁失败: {lock_key}")
    
    def __enter__(self):
        """上下文管理器支持"""
        lock_key = f"lock:task:{self.task_id}"
        self.lock_value = self.acquire(lock_key)
        if not self.lock_value:
            raise LockAcquireError(lock_key)
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """上下文管理器支持"""
        lock_key = f"lock:task:{self.task_id}"
        self.release(lock_key, self.lock_value)

使用示例

class SafeTaskExecutor:
    """带锁的任务执行器"""
    
    def __init__(self):
        self.redis = redis.Redis()
        self.lock = RedisDistributedLock(self.redis)
    
    def execute_task(self, task_id):
        """执行任务(带锁)"""
        lock_key = f"lock:task:{task_id}"
        
        # ✅ 获取锁
        lock_value = self.lock.acquire(lock_key)
        if not lock_value:
            logger.info(f"任务 {task_id} 正在被处理,跳过")
            return
        
        try:
            # ✅ 检查任务是否已执行
            if self._is_task_executed(task_id):
                logger.info(f"任务 {task_id} 已执行,跳过")
                return
            
            # ✅ 执行任务
            self._do_execute(task_id)
            
            # ✅ 标记为已执行
            self._mark_executed(task_id)
            
        finally:
            # ✅ 释放锁
            self.lock.release(lock_key, lock_value)
    
    def execute_task_with_context(self, task_id):
        """使用上下文管理器"""
        with self.lock.task_id == task_id:
            # 在上下文中自动获取和释放锁
            self._do_execute(task_id)

数据库悲观锁

SKIP LOCKED

-- 查询并锁定(跳过已锁定的行)
SELECT * FROM delay_tasks
WHERE execute_at <= NOW()
  AND status = 'pending'
ORDER BY execute_at ASC
LIMIT 10
FOR UPDATE SKIP LOCKED;
class DatabaseLockExecutor:
    """数据库锁执行器"""
    
    def __init__(self, db, consumer_id):
        self.db = db
        self.consumer_id = consumer_id
    
    def pop_tasks(self, limit=10):
        """弹出任务(使用悲观锁)"""
        with transaction.atomic():
            # ✅ 查询并锁定
            tasks = self.db.query("""
                SELECT * FROM delay_tasks
                WHERE execute_at <= NOW()
                  AND status = 'pending'
                ORDER BY execute_at ASC
                LIMIT %s
                FOR UPDATE SKIP LOCKED
            """, [limit])
            
            # ✅ 标记为处理中
            task_ids = [task['id'] for task in tasks]
            if task_ids:
                self.db.execute("""
                    UPDATE delay_tasks
                    SET status = 'processing',
                        consumer_id = %s,
                        updated_at = NOW()
                    WHERE id IN %s
                """, [self.consumer_id, tuple(task_ids)])
            
            return tasks