当前位置 博文首页 > throwable:冷饭新炒:理解Redisson中分布式锁的实现

    throwable:冷饭新炒:理解Redisson中分布式锁的实现

    作者:throwable 时间:2021-01-16 22:20

    前提

    在很早很早之前,写过一篇文章介绍过Redis中的red lock的实现,但是在生产环境中,笔者所负责的项目使用的分布式锁组件一直是RedissonRedisson是具备多种内存数据网格特性的基于Java编写的Redis客户端框架(Redis Java Client with features of In-Memory Data Grid),基于Redis的基本数据类型扩展出很多种实现的高级数据结构,具体见其官方的简介图:

    本文要分析的R(ed)Lock实现,只是其中一个很小的模块,其他高级特性可以按需选用。下面会从基本原理、源码分析和基于Jedis仿实现等内容进行展开。本文分析的Redisson源码是2020-01左右Redisson项目的main分支源码,对应版本是3.14.1

    基本原理

    red lock的基本原理其实就"光明正大地"展示在Redis官网的首页文档中(具体链接是https://redis.io/topics/distlock):

    摘录一下简介进行翻译:在许多环境中不同进程必须以互斥方式使用共享资源进行操作时,分布式锁是一个非常有用的原语。此试图提供一种更规范的算法来实现Redis的分布式锁。我们提出了一种称为Redlock的算法,它实现了DLM(猜测是Distributed Lock Manager的缩写,分布式锁管理器),我们认为它比普通的单实例方法更安全。

    算法的三个核心特征(三大最低保证):

    • Safety property(安全性):互斥。确保在任何给定时刻下,只有一个客户端可以持有锁
    • Liveness property A(活性A):无死锁。即使存在曾经锁定资源的客户端崩溃或者出现网络分区异常,确保锁总是能够成功获取
    • Liveness property B(活性B):容错性。只要大多数Redis节点处于正常运行状态,客户端就可以获取和释放锁

    文档中还指出了目前算法对于故障转移的实现还存在明显的竞态条件问题(描述的应该是Redis主从架构下的问题):

    • 客户端A获取Redis主节点中的锁(假设锁定的资源为X
    • Redis主节点把KEY同步到Redis从节点之前,Redis主节点崩溃
    • Redis从节点因为故障晋升为主节点
    • 此时,客户端B获取资源X的锁成功,问题是资源X的锁在前面已经被客户端A获取过,这样就出现了并发问题

    算法的实现很简单,单个Redis实例下加锁命令如下:

    SET $resource_name $random_value NX PX $ttl
    

    这里的NxPXSET命令的增强参数,自从Redis2.6.12版本起,SET命令已经提供了可选的复合操作符:

    • EX:设置超时时间,单位是秒
    • PX:设置超时时间,单位是毫秒
    • NXIF NOT EXIST的缩写,只有KEY不存在的前提下才会设置K-V,设置成功返回1,否则返回0
    • XXIF EXIST的缩写,只有在KEY存在的前提下才会设置K-V,设置成功返回1,否则返回0

    单个Redis实例下解锁命令如下:

    # KEYS[1] = $resource_name
    # ARGV[1] = $random_value
    if redis.call("get",KEYS[1]) == ARGV[1] then
        return redis.call("del",KEYS[1])
    else
        return 0
    end
    

    使用Redisson中的RLock

    使用RLock要先实例化RedissonRedisson已经适配了Redis的哨兵、集群、普通主从和单机模式,因为笔者本地只安装了单机Redis,所以这里使用单机模式配置进行演示。实例化RedissonClient

    static RedissonClient REDISSON;
    
    @BeforeClass
    public static void beforeClass() throws Exception {
        Config config = new Config();
        // 单机
        config.useSingleServer()
                .setTimeout(10000)
                .setAddress("redis://127.0.0.1:6379");
        REDISSON = Redisson.create(config);
    //        // 主从
    //        config.useMasterSlaveServers()
    //                .setMasterAddress("主节点连接地址")
    //                .setSlaveAddresses(Sets.newHashSet("从节点连接地址"));
    //        REDISSON = Redisson.create(config);
    //        // 哨兵
    //        config.useSentinelServers()
    //                .setMasterName("Master名称")
    //                .addSentinelAddress(new String[]{"哨兵连接地址"});
    //        REDISSON = Redisson.create(config);
    //        // 集群
    //        config.useClusterServers()
    //                .addNodeAddress(new String[]{"集群节点连接地址"});
    //        REDISSON = Redisson.create(config);
    }
    

    加锁和解锁:

    @Test
    public void testLockAndUnLock() throws Exception {
        String resourceName = "resource:x";
        RLock lock = REDISSON.getLock(resourceName);
        Thread threadA = new Thread(() -> {
            try {
                lock.lock();
                process(resourceName);
            } finally {
                lock.unlock();
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        }, "threadA");
        Thread threadB = new Thread(() -> {
            try {
                lock.lock();
                process(resourceName);
            } finally {
                lock.unlock();
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        }, "threadB");
        threadA.start();
        threadB.start();
        Thread.sleep(Long.MAX_VALUE);
    }
    
    private void process(String resourceName) {
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("线程%s获取到资源%s的锁", threadName, resourceName));
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ignore) {
        }
    }
    
    // 某次执行的输出结果
    线程threadB获取到资源resource:x的锁
    线程threadB释放资源resource:x的锁
    线程threadA获取到资源resource:x的锁
    线程threadA释放资源resource:x的锁
    

    更多的时候,我们会选用带等待时间周期和锁最大持有时间的API

    @Test
    public void testTryLockAndUnLock() throws Exception {
        String resourceName = "resource:x";
        int waitTime = 500;
        int leaseTime = 1000;
        Thread threadA = new Thread(() -> {
            process(resourceName, waitTime, leaseTime);
        }, "threadA");
        Thread threadB = new Thread(() -> {
            process(resourceName, waitTime, leaseTime);
        }, "threadB");
        threadA.start();
        threadB.start();
        Thread.sleep(Long.MAX_VALUE);
    }
    
    private void process(String resourceName, int waitTime, int leaseTime) {
        RLock lock = REDISSON.getLock(resourceName);
        try {
            String threadName = Thread.currentThread().getName();
            boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
            if (tryLock) {
                try {
                    System.out.println(String.format("线程%s获取到资源%s的锁", threadName, resourceName));
                    Thread.sleep(800);
                } finally {
                    lock.unlock();
                    System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
                }
            } else {
                System.out.println(String.format("线程%s获取资源%s的锁失败,等待时间:%d ms", threadName, resourceName, waitTime));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    // 某次执行的输出结果
    线程threadA获取到资源resource:x的锁
    线程threadB获取资源resource:x的锁失败,等待时间:500 ms
    线程threadA释放资源resource:x的锁
    

    为了使用的时候更加简单,可以参考spring-tx中的编程式事务那样进行轻度封装:

    @RequiredArgsConstructor
    private static class RedissonLockProvider {
    
        private final RedissonClient redissonClient;
    
        public <T> T executeInLock(String resourceName, LockAction lockAction) {
            RLock lock = redissonClient.getLock(resourceName);
            try {
                lock.lock();
                lockAction.onAcquire(resourceName);
                return lockAction.doInLock(resourceName);
            } finally {
                lock.unlock();
                lockAction.onExit(resourceName);
            }
        }
    
        public <T> T executeInLock(String resourceName, int waitTime, int leaseTime, LockAction lockAction) throws InterruptedException {
            RLock lock = redissonClient.getLock(resourceName);
            boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
            if (tryLock) {
                try {
                    lockAction.onAcquire(resourceName);
                    return lockAction.doInLock(resourceName);
                } finally {
                    lock.unlock();
                    lockAction.onExit(resourceName);
                }
            }
            return null;
        }
    
        public void executeInLockWithoutResult(String resourceName, int waitTime, int leaseTime, LockActionWithoutResult lockAction) throws InterruptedException {
            RLock lock = redissonClient.getLock(resourceName);
            boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
            if (tryLock) {
                try {
                    lockAction.onAcquire(resourceName);
                    lockAction.doInLock(resourceName);
                } finally {
                    lock.unlock();
                    lockAction.onExit(resourceName);
                }
            }
        }
    
        public void executeInLockWithoutResult(String resourceName, LockActionWithoutResult lockAction) {
            RLock lock = redissonClient.getLock(resourceName);
            try {
                lock.lock();
                lockAction.onAcquire(resourceName);
                lockAction.doInLock(resourceName);
            } finally {
                lock.unlock();
                lockAction.onExit(resourceName);
            }
        }
    }
    
    @FunctionalInterface
    interface LockAction {
    
        default void onAcquire(String resourceName) {
    
        }
    
        <T> T doInLock(String resourceName);
    
        default void onExit(String resourceName) {
    
        }
    }
    
    @FunctionalInterface
    interface LockActionWithoutResult {
    
        default void onAcquire(String resourceName) {
    
        }
    
        void doInLock(String resourceName);
    
        default void onExit(String resourceName) {
    
        }
    }
    

    使用RedissonLockProvider(仅供参考):

    @Test
    public void testRedissonLockProvider() throws Exception {
        RedissonLockProvider provider = new RedissonLockProvider(REDISSON);
        String resourceName = "resource:x";
        Thread threadA = new Thread(() -> {
            provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() {
    
                @Override
                public void onAcquire(String resourceName) {
                    System.out.println(String.format("线程%s获取到资源%s的锁", Thread.currentThread().getName(), resourceName));
                }
    
                @Override
                public void doInLock(String resourceName) {
                    try {
                        Thread.sleep(800);
                    } catch (InterruptedException ignore) {
    
                    }
                }
    
                @Override
                public void onExit(String resourceName) {
                    System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
                }
            });
        }, "threadA");
        Thread threadB = new Thread(() -> {
            provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() {
    
                @Override
                public void onAcquire(String resourceName) {
                    System.out.println(String.format("线程%s获取到资源%s的锁", Thread.currentThread().getName(), resourceName));
                }
    
                @Override
                public void doInLock(String resourceName) {
                    try {
                        Thread.sleep(800);
                    } catch (InterruptedException ignore) {
    
                    }
                }
    
                @Override
                public void onExit(String resourceName) {
                    System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
                }
            });
        }, "threadB");
        threadA.start();
        threadB.start();
        Thread.sleep(Long.MAX_VALUE);
    }
    // 某次执行结果
    线程threadA获取到资源resource:x的锁
    线程threadA释放资源resource:x的锁
    线程threadB获取到资源resource:x的锁
    线程threadB释放资源resource:x的锁
    

    Redisson中RLock的实现原理

    RedissonRLock的实现是基本参照了Redisred lock算法进行实现,不过在原始的red lock算法下进行了改良,主要包括下面的特性:

    • 互斥
    • 无死锁
    • 可重入,类似于ReentrantLock,同一个线程可以重复获取同一个资源的锁(一般使用计数器实现),锁的重入特性一般情况下有利于提高资源的利用率
    • 续期,这个是一个比较前卫解决思路,也就是如果一个客户端对资源X永久锁定,那么并不是直接对KEY生存周期设置为-1,而是通过一个守护线程每隔固定周期延长KEY的过期时间,这样就能实现在守护线程不被杀掉的前提下,避免客户端崩溃导致锁无法释放长期占用资源的问题
    • 锁状态变更订阅,依赖于org.redisson.pubsub.LockPubSub,用于订阅和通知锁释放事件
    • 不是完全参考red lock算法的实现,数据类型选用了HASH,配合Lua脚本完成多个命令的原子性

    续期或者说延长KEY的过期时间在Redisson使用watch dog实现,理解为用于续期的守护线程,底层依赖于Netty的时间轮HashedWheelTimer和任务io.netty.util.Timeout实现,俗称看门狗,下面会详细分析。

    先看RLock的类图:

    这里有一个疑惑点,RedissonRedLock(RedissonMultiLock的子类)的注释中提到RedLock locking algorithm implementation for multiple locks. It manages all locks as one. 但从直观上看,RedissonLock才是整个锁体系的核心,里面的实现思路也是遵从red lock算法的。

    RedissonLock就是RLock的直接实现,也是分布式锁实现的核心类,从源码中看到Redisson#getLock()就是直接实例化RedissonLock

    public class Redisson implements RedissonClient {
        
        // ...... 省略其他代码
    
        @Override
        public RLock getLock(String name) {
            return new RedissonLock(connectionManager.getCommandExecutor(), name);
        }
    
        // ...... 省略其他代码
    }
    

    因此只需要围绕RedissonLock的源码进行分析即可。RedissonLock的类继承图如下:

    这里需要有几点认知:

    • RedissonLock实现了java.util.concurrent.locks.Lock接口中除了newCondition()方法外的所有方法,也就是可以基本无缝适配Lock接口,对于习惯Lock接口的API的使用者来说是一个福音
    • RedissonLock基本所有同步API都依赖于异步API的实现,也就是RLock的实现依赖于RLockAsync的实现,底层依赖的是Nettyio.netty.util.concurrent.Promise,具体见RedissonPromise,如果用过JUC中的Future的开发者应该比较熟悉Future#get(),这里的做法类似
    • 右边的几个父类的简单功能描述如下:
      • RObjectAsync:所有Redisson对象的基础接口,提供一些内存测量、对象拷贝、移动等的异步方法
      • RObjectRObjectAsync的同步版本
      • RExpirableAsync:提供对象TTL相关的异步方法
      • RExpirableRExpirableAsync的同步版本
      • RedissonObject:直接实现类RObject接口中的方法
      • RedissonExpirable:主要是实现了RExpirable接口中的方法

    接着先看RedissonLock的构造函数和核心属性:

    // 存放entryName -> ExpirationEntry,用于获取当前entryName的线程重入计数器和续期任务
    private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
    
    // 内部的锁持有的最大时间,来源于参数Config#lockWatchdogTimeout,用于控制续期的周期
    protected long internalLockLeaseTime;
    
    // ID,唯一标识,是一个UUID
    final String id;
    
    // 
    final String entryName;
    
    // 锁释放事件订阅发布相关
    protected final LockPubSub pubSub;
    
    // 命令异步执行器实例
    final CommandAsyncExecutor commandExecutor;
    
    /**
     * CommandAsyncExecutor是命令的异步执行器,里面的方法是相对底层的面向通讯框架的方法,包括异步写、异步读和同步结果获取等
     * name参数就是getLock()时候传入的参数,其实就是最终同步到Redis中的KEY
     */
    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        // 这里的ID为外部初始化的UUID实例,调用toString()
        this.id = commandExecutor.getConnectionManager().getId();
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        // 这里的entryName = uuid值 + : + 外部传进来的name(KEY),如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
        this.entryName = id + ":" + name;
        // 初始化LockPubSub实例,用于订阅和发布锁释放的事件
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }
    
    // RedissonLock内部类ExpirationEntry,存放着线程重入的计数器和续期的Timeout任务
    public static class ExpirationEntry {
        
        // 线程ID -> 线程重入的次数
        private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
        private volatile Timeout timeout;
        
        public ExpirationEntry() {
            super();
        }
        
        // 这个方法主要记录线程重入的计数
        public void addThreadId(long threadId) {
            Integer counter = threadIds.get(threadId);
            if (counter == null) {
                counter = 1;
            } else {
                counter++;
            }
            threadIds.put(threadId, counter);
        }
    
        public boolean hasNoThreads() {
            return threadIds.isEmpty();
        }
    
        public Long getFirstThreadId() {
            if (threadIds.isEmpty()) {
                return null;
            }
            return threadIds.keySet().iterator().next();
        }
    
        public void removeThreadId(long threadId) {
            Integer counter = threadIds.get(threadId);
            if (counter == null) {
                return;
            }
            counter--;
            if (counter == 0) {
                threadIds.remove(threadId);
            } else {
                threadIds.put(threadId, counter);
            }
        }
        
        public void setTimeout(Timeout timeout) {
            this.timeout = timeout;
        }
        public Timeout getTimeout() {
            return timeout;
        }
    }
    

    这里需要关注一下Config中的lockWatchdogTimeout参数:

    翻译一下大意:lockWatchdogTimeout参数只有在没有使用leaseTimeout参数定义的成功获取到锁的场景(简单来说就是不设置时限的加锁)下生效,如果看门狗在下一个lockWatchdogTimeout周期内不进行续期,那么锁就会过期释放(从源码上看,每三分之一lockWatchdogTimeout就会执行一次续期任务,每次通过pexpireKEY的存活周期延长lockWatchdogTimeout),lockWatchdogTimeout的默认值为30000,也就是30秒。

    这里先列举一下RedissonLock中获取名称的方法,以便后面分析这些名称作为K-V结构的KEY时候使用:

    • id:由配置实例化时候实例化的UUID实例生成,从源码上分析每个连接方式的Redisson实例有唯一的UUIDConnectionManager初始化的时候会调用UUID id = UUID.randomUUID(),笔者认为可以理解为Redisson实例在某个应用程序进程中的唯一标识,毕竟一般情况下,一个应用程序应该只会应用一种Redisson的连接方式
    • getEntryName():返回的是UUID + : + $KEY,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
    • getName():返回的是$KEY,例如resource:x
    • getChannelName():返回的是redisson_lock__channel:{$KEY},例如redisson_lock__channel:{resource:x}
    • getLockName(long threadId):返回的是UUID + : + $threadId,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1

    接着看加锁的方法,核心实现主要是:

    • private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedExceptionlock方法体系
    • public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedExceptiontryLock方法体系

    先看只包含锁最大持有时间的lock()方法体系:

    /**
     * 获取锁,不指定等待时间,只指定锁的最大持有时间
     * 通过interruptibly参数配置支持中断
     */
    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        // 尝试获取锁,返回的ttl为空代表获取锁成功,返回的ttl代表已经存在的KEY的剩余存活时间
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return;
        }
        // 订阅redisson_lock__channel:{$KEY},其实本质的目的是为了客户端通过Redis的订阅发布,感知到解锁的事件
        // 这个方法会在LockPubSub中注册一个entryName -> RedissonLockEntry的哈希映射,RedissonLockEntry实例中存放着RPromise<RedissonLockEntry>结果,一个信号量形式的锁和订阅方法重入计数器
        // 下面的死循环中的getEntry()或者RPromise<RedissonLockEntry>#getNow()就是从这个映射中获取的
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        // 同步订阅执行,获取注册订阅Channel的响应,区分是否支持中断
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }
        // 走到下面的for循环说明返回的ttl不为空,也就是Redis已经存在对应的KEY,有其他客户端已经获取到锁,此客户端线程的调用需要阻塞等待获取锁
        try {
            while (true) {
                // 死循环中尝试获取锁,这个是后面会分析的方法
                ttl = tryAcquire(leaseTime, unit, threadId);
                // 返回的ttl为空,说明获取到锁,跳出死循环,这个死循环或者抛出中断异常,或者获取到锁成功break跳出,没有其他方式
                if (ttl == null) {
                    break;
                }
    
                // 这个ttl来源于等待存在的锁的KEY的存活时间,直接使用许可为0的信号量进行阻塞等待,下面的几个分支判断都是大同小异,只是有的支持超时时间,有的支持中断
                // 有的是永久阻塞直到锁释放事件订阅LockPubSub的onMessage()方法回调激活getLatch().release()进行解锁才会往下走
                // 这里可以学到一个特殊的技巧,Semaphore(0),信号量的许可设置为0,首个调用acquire()的线程会被阻塞,直到其他线程调用此信号量的release()方法才会解除阻塞,类似于一个CountDownLatch(1)的效果
                if (ttl >= 0) {
                    try {
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        future.getNow().getLatch().acquire();
                    } else {
                        future.getNow().getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            // 获取到锁或者抛出中断异常,退订redisson_lock__channel:{$KEY},不再关注解锁事件
            unsubscribe(future, threadId);
        }
    }
    
    // 这是一个异步转同步的方法,类似于FutureTask#get(),关键看调用的tryAcquireAsync()方法
    private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));
    }
    
    /**
     * 通过传入锁持有的最大时间和线程ID异步获取锁
     */
    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
        // 锁持有最大时间不为-1,也就是明确锁的持有时间,不是永久持有的场景
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        // 走到这里说明是leaseTime == -1,KEY不设置过期时间的分支,需要启动看门狗机制。尝试内部异步获取锁,注意这里的lockWatchdogTimeout是从配置中获取传进去,不是内部的internalLockLeaseTime属性,这里的默认值还是30000毫秒
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            // 执行异常场景直接返回
            if (e != null) {
                return;
            }
    
            // 成功获取到锁的场景,需要基于线程ID启用看门狗,通过时间轮指定定时任务进行续期
            if (ttlRemaining == null) {
                // 定时调度进行续期操作
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }
    
    /**
     * 转换锁持有最大时间,通过参数进行加锁的LUA脚本调用 
     * getName()就是传入的KEY,如resource:x getLockName()就是锁的名称,形式是:UUID + : + threadId,如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
     * internalLockLeaseTime在leaseTime != -1的前提下使用的是原值,在leaseTime == -1的前提下,使用的是lockWatchdogTimeout
     */
    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        // 时间转换为毫秒,注意一点这里的internalLockLeaseTime是类内的属性,被重新赋值了
        internalLockLeaseTime = unit.toMillis(leaseTime);
        // 底层向Redis服务执行LUA脚本
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    

    先留意一下属性internalLockLeaseTime,它在tryLockInnerAsync()方法内被重新赋值,在leaseTime == -1L的前提下,它被赋值为lockWatchdogTimeout,这个细节很重要,决定了后面续期方法(看门狗)的调度频率。另外,leaseTime != -1L不会进行续期,也就是不会启动看门狗机制。

    接着需要仔细分析一下tryLockInnerAsync()中执行的LUA脚本,笔者把它提取出来通过注释进行描述:

    -- KEYS[1] == getName() --> $KEY --> resource:x
    -- ARGV[1] == internalLockLeaseTime --> 30000
    -- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
    -- 第一段代码是判断锁定的资源KEY不存在的时候进行相应值的设置,代表资源没有被锁定,首次获取锁成功
    if (redis.call('exists', KEYS[1]) == 0) then
        -- 这里是设置调用次数,可以理解为延长KEY过期时间的调用次数
        redis.call('hset', KEYS[1], ARGV[2], 1);
        -- 设置KEY的过期时间
        redis.call('pexpire', KEYS[1], ARGV[1]);
        return nil;
    end;
    -- 第二段代码是判断HASH的field是否存在,如果存在说明是同一个线程重入的情况,这个时候需要延长KEY的TTL,并且HASH的field对应的value加1,记录延长ttl的次数
    if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
        -- 这里是增加调用次数,可以理解为增加延长KEY过期时间的调用次数
        redis.call('hincrby', KEYS[1], ARGV[2], 1);
        -- 延长KEY的过期时间
        redis.call('pexpire', KEYS[1], ARGV[1]);
        return nil;
    end;
    -- 第三段代码是兜底的,走到这里说明当前线程获取锁失败,锁已经被其他(进程中的)线程占有,返回当前KEY被占用资源的ttl,用来确定需要休眠的最大时间
    return redis.call('pttl', KEYS[1]);
    

    这里画一个图演示一下这个Lua脚本中三段代码出现的逻辑:

    剩下一个scheduleExpirationRenewal(threadId)方法还没有分析,里面的逻辑就是看门狗的定期续期逻辑:

    // 基于线程ID定时调度和续期
    private void scheduleExpirationRenewal(long threadId) {
        // 如果需要的话新建一个ExpirationEntry记录线程重入计数,同时把续期的任务Timeout对象保存在属性中
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            // 当前进行的当前线程重入加锁
            oldEntry.addThreadId(threadId);
        } else {
            // 当前进行的当前线程首次加锁
            entry.addThreadId(threadId);
            // 首次新建ExpirationEntry需要触发续期方法,记录续期的任务句柄
            renewExpiration();
        }
    }
    
    // 处理续期
    private void renewExpiration() {
        // 根据entryName获取ExpirationEntry实例,如果为空,说明在cancelExpirationRenewal()方法已经被移除,一般是解锁的时候触发
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        // 新建一个定时任务,这个就是看门狗的实现,io.netty.util.Timeout是Netty结合时间轮使用的定时任务实例
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                // 这里是重复外面的那个逻辑,
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                // 获取ExpirationEntry中首个线程ID,如果为空说明调用过cancelExpirationRenewal()方法清空持有的线程重入计数,一般是锁已经释放的场景
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                // 向Redis异步发送续期的命令
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    // 抛出异常,续期失败,只打印日志和直接终止任务
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    // 返回true证明续期成功,则递归调用续期方法(重新调度自己),续期失败说明对应的锁已经不存在,直接返回,不再递归
                    if (res) {
                        // reschedule itself
                        renewExpiration();
                    }
                });
            }
        }, 
        // 这里的执行频率为leaseTime转换为ms单位下的三分之一,由于leaseTime初始值为-1的情况下才会进入续期逻辑,那么这里的执行频率为lockWatchdogTimeout的三分之一
        internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); 
        
        // ExpirationEntry实例持有调度任务实例
        ee.setTimeout(task);
    }
    
    // 调用Redis,执行Lua脚本,进行异步续期
    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0;",
            Collections.<Object>singletonList(getName()), 
            //  这里根据前面的分析,internalLockLeaseTime在leaseTime的值为-1的前提下,对应值为lockWatchdogTimeout
            internalLockLeaseTime, getLockName(threadId));  
    }
    
    
    下一篇:没有了