Redis分布式锁

一、 Redis锁基础概念

1. 为什么需要Redis锁

Redis锁是一种基于内存数据库Redis实现的分布式锁机制,主要解决分布式系统中的资源竞争问题。相比数据库锁,Redis锁具有以下优势

  • 高性能:基于内存操作,响应速度快
  • 原子性保证:Redis单线程模型天然支持原子操作
  • 分布式支持:可跨多台服务器使用
  • 丰富的数据结构:支持多种锁实现方式

2. 基本实现原理

最简单的Redis锁实现方式:

SET resource_name my_random_value NX PX 30000
  • NX:仅当key不存在时设置
  • PX:设置过期时间(毫秒)
  • my_random_value:唯一标识,用于安全释放锁

二、Redis锁的实现方式

1. SETNX实现(基本方式)

实现步骤:

  1. 尝试获取锁:SETNX lock_key 1
  2. 获取成功则设置过期时间:EXPIRE lock_key 30
  3. 执行业务逻辑
  4. 释放锁:DEL lock_key

问题: 非原子操作,SETNX和EXPIRE之间可能崩溃导致死锁

2. SET扩展参数实现(推荐)

Redis 2.6.12+版本支持扩展参数,可原子性完成设置

SET lock_key unique_value NX PX 30000

PHP实现示例:

$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

$lockKey = 'order_lock_123';
$uniqueId = uniqid();
$expire = 30000; // 30秒

// 尝试获取锁
$acquired = $redis->set($lockKey, $uniqueId, ['NX', 'PX' => $expire]);

if ($acquired) {
    try {
        // 执行业务逻辑
        processOrder();
    } finally {
        // 使用Lua脚本保证原子性释放
        $script = "
            if redis.call('get', KEYS[1]) == ARGV[1] then
                return redis.call('del', KEYS[1])
            else
                return 0
            end
        ";
        $redis->eval($script, [$lockKey, $uniqueId], 1);
    }
} else {
    // 获取锁失败
    handleLockFailure();
}

3. RedLock算法(分布式环境)

Redis官方推荐的分布式锁算法,适用于多Redis节点场景:

实现步骤:

  1. 获取当前时间(毫秒)
  2. 依次尝试从N个Redis节点获取锁
  3. 计算获取锁总耗时,当且仅当从多数节点获取成功且耗时小于锁有效期时才算成功
  4. 锁的实际有效时间 = 初始有效时间 - 获取锁耗时
  5. 释放锁时向所有节点发送释放请求

PHP实现:

use RedLock\RedLock;

$servers = [
    ['127.0.0.1', 6379, 0.01],
    ['127.0.0.1', 6380, 0.01],
    ['127.0.0.1', 6381, 0.01]
];

$redLock = new RedLock($servers);

$lock = $redLock->lock('resource_name', 1000);

if ($lock) {
    try {
        // 执行业务逻辑
    } finally {
        $redLock->unlock($lock);
    }
}

三、Redis锁的关键特性

1. 互斥性

同一时刻只有一个客户端能持有锁,通过Redis的单线程模型和NX参数保证。

2. 避免死锁

通过过期时间保证即使客户端崩溃锁也能自动释放:

SET lock_key unique_value NX PX 30000

3. 释放安全性

只有锁的持有者才能释放锁,通过Lua脚本保证原子性:

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

4. 容错性

RedLock算法能在部分Redis节点宕机时继续工作,只要多数节点存活。

四、Redis锁的应用场景

1. 秒杀系统实现

function seckill($productId, $userId) {
    $redis = new Redis();
    $lockKey = "seckill🔒$productId";
    $uniqueId = uniqid();
    $expire = 1000; // 1秒
    
    // 尝试获取锁
    $locked = $redis->set($lockKey, $uniqueId, ['NX', 'PX' => $expire]);
    
    if (!$locked) {
        return ['success' => false, 'message' => '系统繁忙,请重试'];
    }
    
    try {
        // 检查库存
        $stockKey = "seckill:stock:$productId";
        $stock = $redis->get($stockKey);
        
        if ($stock <= 0) {
            return ['success' => false, 'message' => '已售罄'];
        }
        
        // 扣减库存
        $redis->decr($stockKey);
        
        // 创建订单
        createOrder($productId, $userId);
        
        return ['success' => true, 'message' => '秒杀成功'];
    } finally {
        // 释放锁
        $script = "if redis.call('get', KEYS[1]) == ARGV[1] then 
                     return redis.call('del', KEYS[1]) 
                   else 
                     return 0 
                   end";
        $redis->eval($script, [$lockKey, $uniqueId], 1);
    }
}

2. 分布式定时任务

function runDistributedCron() {
    $redis = new Redis();
    $lockKey = "cron:report_generation";
    $uniqueId = gethostname().'-'.getmypid();
    $expire = 3600000; // 1小时
    
    // 尝试获取锁
    $locked = $redis->set($lockKey, $uniqueId, ['NX', 'PX' => $expire]);
    
    if ($locked) {
        try {
            generateDailyReport();
        } finally {
            $script = "if redis.call('get', KEYS[1]) == ARGV[1] then 
                         return redis.call('del', KEYS[1]) 
                       end";
            $redis->eval($script, [$lockKey, $uniqueId], 1);
        }
    }
}

3. 防止重复提交

function submitOrder($orderData) {
    $redis = new Redis();
    $lockKey = "order:submit:".md5(json_encode($orderData));
    $uniqueId = session_id();
    $expire = 5000; // 5秒
    
    if ($redis->set($lockKey, $uniqueId, ['NX', 'PX' => $expire])) {
        try {
            return processOrder($orderData);
        } finally {
            $script = "if redis.call('get', KEYS[1]) == ARGV[1] then 
                         return redis.call('del', KEYS[1]) 
                       end";
            $redis->eval($script, [$lockKey, $uniqueId], 1);
        }
    } else {
        throw new Exception('请勿重复提交订单');
    }
}

五、Redis锁的最佳实践

1. 锁的粒度控制

  • 不要太粗:避免大范围锁导致性能下降
    // 不好 - 锁整个系统
    $lockKey = "global_order_lock";
    
    // 好 - 按订单ID锁定
    $lockKey = "order_lock_".$orderId;
    
  • 不要太细:避免过多锁增加系统复杂度

2. 锁的过期时间设置

  • 根据业务操作合理设置:
    // 简单操作 - 短时间
    $expire = 1000; // 1秒
    
    // 复杂操作 - 适当延长
    $expire = 30000; // 30秒
    
  • 设置自动续期机制(看门狗):
function keepAlive($redis, $lockKey, $uniqueId, $expire) {
    $script = "if redis.call('get', KEYS[1]) == ARGV[1] then 
                 return redis.call('pexpire', KEYS[1], ARGV[2]) 
               else 
                 return 0 
               end";
    return $redis->eval($script, [$lockKey, $uniqueId, $expire], 1);
}

// 每10秒续期一次
Swoole\Timer::tick(10000, function() use ($redis, $lockKey, $uniqueId, $expire) {
    keepAlive($redis, $lockKey, $uniqueId, $expire);
});

3. 异常处理与重试机制

function acquireLockWithRetry($redis, $lockKey, $uniqueId, $expire, $maxRetries = 3) {
    $retries = 0;
    while ($retries < $maxRetries) {
        if ($redis->set($lockKey, $uniqueId, ['NX', 'PX' => $expire])) {
            return true;
        }
        
        $retries++;
        usleep(rand(100000, 500000)); // 随机等待0.1-0.5秒
    }
    return false;
}

六、Redis锁的常见问题与解决方案

1. 锁提前过期问题

问题:业务执行时间超过锁过期时间,导致其他客户端获取锁 解决方案:

  • 合理评估业务执行时间,设置足够长的过期时间
  • 实现锁续期机制(看门狗)

2. 时钟漂移问题

**问题:**多台服务器时钟不一致影响RedLock算法 解决方案:

  • 使用NTP服务同步时钟
  • 适当延长锁的有效期

3. 客户端阻塞问题

**问题:**客户端长时间GC停顿导致锁失效 解决方案:

  • 监控GC情况,优化应用内存使用
  • 缩短锁的有效期,增加续期频率

七、总结

Redis分布式锁是构建高并发分布式系统的重要组件,正确使用需要注意:

  1. 使用SET NX PX命令原子性获取锁
  2. 为每个锁设置唯一的随机值
  3. 使用Lua脚本原子性释放锁
  4. 合理设置锁的粒度和过期时间
  5. 在分布式环境考虑使用RedLock算法
  6. 实现适当的重试和异常处理机制

Redis锁虽然强大,但并非万能,对于需要强一致性的场景,可能需要结合数据库事务或其他分布式协调服务(如ZooKeeper)来实现。