当前位置 博文首页 > 黄智霖的博客:Java并发编程(十):ReentrantLock-NonfairSync源

    黄智霖的博客:Java并发编程(十):ReentrantLock-NonfairSync源

    作者:[db:作者] 时间:2021-06-07 15:41

    ??我们在前文ReentrantLock-NonfairSync源码逐行深度分析(上)中分析了NonfairSync获取锁和阻塞线程的入队逻辑,也就是定义在AQS中的acquire方法:

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    ??从前文中了解到,addWaiter方法会将Thread包装成一个Node对象,然后通过死循环(自旋)+CAS的方式保证node一定能够入队成功,入队成功之后会返回对应的node,然后当做入参传入acquireQueued方法中,该方法同样定义在AQS中:

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
            	//标识是否被中断
                boolean interrupted = false;
                //自旋
                for (;;) {
                	//获取node节点的前驱节点
                    final Node p = node.predecessor();
                    //如果前驱节点是head节点,那么这里在阻塞之前再次尝试去获取锁
                    if (p == head && tryAcquire(arg)) {
                    	//如果获取锁成功,那么将这个node节点设置为新的头节点
                    	//setHead方法会清空node的thread和prev属性,并将node赋值到head
                        setHead(node);
                        //把旧头结点的后驱指针设置为空
                        p.next = null; // help GC
                        //到这里相当于将旧的那个头结点孤立了,没有指针指向它,等待被GC回收
                        failed = false;
                        //返回是否被中断唤醒,明显这里是false
                        return interrupted;
                    }
                    //下面是阻塞和为唤醒做准备的操作
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    ??这里同样是自旋逻辑,首先会判断当前node的前驱节点是否是head节点,如果是,那么不会立即阻塞当前node代表的线程,而是再次尝试去获取锁。这个逻辑是什么意思呢?其实很好理解,如果这个条件成立,那就代表此时这个node是CLH的第一个真实排队节点,队列中的第二个节点(head头结点是一个空的Node),此时可能锁已经被持锁线程释放了,那么这个排在首位的线程其实不需要被阻塞,直接获取锁就行了。但是也有可能会获取锁失败,比如现在是非公平锁的场景,即使node是第一个排队的节点,但此时又有一个新的线程来竞争锁,由于是非公平的,那么这个锁就可能被这个新来的线程获取,node还是需要被阻塞。如果获取锁成功,那么成功节点就需要”出队“,但是这里的出队并不是真正的出队,其执行的操作是:将节点的thread和prev属性置空,把head节点设置为当前节点,将旧head节点的next指针断掉。假如原本是这样的状态(t0已经被阻塞,head节点的waitStatus为SIGNAL):
    在这里插入图片描述
    ??现在t0被唤醒获取锁成功,对应的node从CLH"出队"之后的状态:
    在这里插入图片描述
    ??可以看到,node节点并没有真正出队,而是通过一系列字段置空和断开指针操作,node节点变成了新的head节点,而旧的head节点由于没有引用,会被GC清理掉。当然,如果获取锁失败,那么还是需要进入阻塞状态。
    ??接下来看看阻塞的逻辑,这里涉及到了两个方法的调用:shouldParkAfterFailedAcquire和parkAndCheckInterrupt。如果这两个方法都返回true,那么会将interrupted字段设置为true,然后继续下一次循环。首先来看shouldParkAfterFailedAcquire方法,注意该方法调用传入的pred参数是node节点的前驱节点。

        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        	//pred为node节点的前驱节点
        	//获取前驱结点pred的waitStatus值
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
            	//ws == -1
                return true;
            if (ws > 0) {
                do {
                	//如果waitStatus为1(CANCELLED)
                	//那么需要移除pred节点,如果pred的前驱节点还是为CANCELLED
                	//那么继续移除,直到节点为非CANCELLED状态
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
            	//将pred的waitStatus字段设置为-1
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    ??该方法主要是针对waitStatus的一系列操作,关于Node对象的waitStatus字段,前面提到过,其定义了4个值:

    -1 :SIGNAL:可被唤醒
    1:CANCELLED:需要删除
    -2:CONDITION:条件等待
    -3:PROPAGATE:无条件传播

    ??当然还有一个默认的0值,关于1、-2、-3几个值,在本文中不用去关心,后续在总结其它AQS相关实现的工具时遇到再谈,这里只需要关注默认值0和-1。初始状态ws的值为默认值0,那么将会走到CAS修改waitStatus的逻辑,将前驱结点的ws的值修改为-1,然后这个方法返回false(关于ws大于0,当前只定义了一个大于0的值,就是CANCELLED,本文不用理会,这里做的逻辑处理在代码中也给出了注释,就不再多言~)。
    ??shouldParkAfterFailedAcquire方法返回false之后,紧接着的parkAndCheckInterrupt方法也就不会执行,那么在acquireQueued方法中会进入下一次循环,这里我们就先不考虑node是首个排队节点的情况,会再次进入shouldParkAfterFailedAcquire方法的逻辑,只是此时前驱节点的ws的值成了-1,ws == Node.SIGNAL返回true,那么这个方法此时就会返回true。那么就会接着执行parkAndCheckInterrupt方法,该方法很简单,定义如下:

    private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    ??通过LockSupport.park方法实现了线程的阻塞。方法体中关于返回当前线程中断状态的逻辑我们暂时不管,现在先来重新思考shouldParkAfterFailedAcquire和parkAndCheckInterrupt两个方法的逻辑。经过前面的分析,shouldParkAfterFailedAcquire会执行两次(不考虑pread==head并且获取锁成功的情况,这时节点不会再阻塞),第一次将node的前驱节点的waitStatus设置为SIGNAL(-1),方法返回false,第二次判断到waitStatus等于-1,方法返回true,进而进入parkAndCheckInterrupt方法通过park进行阻塞。所以是node节点的前驱节点的waitStatus的取值影响node节点是否会进入park阻塞,这样做的意义暂时不谈,先知道这么个事儿,等我们看了释放锁的逻辑之后再回过头来看。
    ??那现在就来看ReentrantLock的unlock方法:

    public void unlock() {
            sync.release(1);
    }
    

    ??调用的是AQS的release方法:

    public final boolean release(int arg) {
    		//尝试释放锁
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    ??该方法的布尔类型返回值表示释放锁是否成功,先不管失败的情况,直接看看tryReleaes方法是如何实现的。该方法在AQS中也是一个空方法,具体实现在子类中,这里我们看看ReentrantLock类中的实现(Sync):

    protected final boolean tryRelease(int releases) {
    			//releases传入的是1
    			//将当前state的值减去1
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                	//只能是持锁线程才能释放锁,否则抛出异常
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                	//如果state减为0才表示释放锁成功
                	//对于锁重入的场景,必须有对应重入次数的释放动作
                    free = true;
                    //锁释放成功,将exclusiveOwnerThread设置为空
                    setExclusiveOwnerThread(null);
                }
                //更新state属性
                setState(c);
                return free;
            }
    

    ??锁释放的逻辑很简单,看代码中的注释就行,这里不再赘述。假设现在锁释放成功,返回的free为true,那么就需要执行阻塞线程的唤醒动作:

    if (tryRelease(arg)) {
    	//获取头结点
    	Node h = head;
    	if (h != null && h.waitStatus != 0)
    		//这里要求头结点的waitStatus!=0才会去尝试唤醒阻塞线程
    		unparkSuccessor(h);
    	//返回true,表示锁释放成功,如果head为null,或head.waitStatus==0,那么不会去尝试唤醒阻塞线程
    	return true;
    }
    

    ??释放锁和唤醒阻塞线程虽然都由持锁线程处理,但是这两个是独立的动作,按照代码逻辑来看,它们允许不同时发生,判断依据就是waitStatus字段。对应到前面提到的,线程在阻塞之前,会有两次循环,第一次会先把前驱节点的waitStatus修改为SIGNAL(-1),然后第二次才会进入parkAndCheckInterrupt方法进行阻塞。注意这里唤醒线程是从CLH的头结点开始唤醒,所以需要判断head的waitStatus字段是否不为默认值0,以下状态表示t0线程可以尝试被唤醒:

    ??此时head节点的waitStatus==-1,那么可以进入unparkSuccessor方法尝试唤醒阻塞的线程,该方法同样定义在AQS中:

    private void unparkSuccessor(Node node) {
    		//此时传入的node是head节点
            int ws = node.waitStatus;
            if (ws < 0)
            	//如果waitStatus小于0,则需要重置其状态
                compareAndSetWaitStatus(node, ws, 0);
               
            //需要唤醒的是node的后驱节点 
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
            	//如果node没有后驱节点,或者后驱节点的waitStatus属性大于0 
            	//目前大于0的值就只定义了1,表示CANCELLED,需要被移除
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                	//从后往前找,找到最靠近队头的一个waitStatus<=0的节点
                    if (t.waitStatus <= 0)
                    	//CANCELLED状态的节点不能被唤醒
                        s = t;
            }
            if (s != null)
            	//找到了需要唤醒的节点,调用unpark唤醒节点对应的线程
                LockSupport.unpark(s.thread);
        }
    

    ??前面介绍了在shouldParkAfterFailedAcquire方法中有对应CANCELLED节点的移除逻辑,这里也暂时不用考虑太多,只需要知道我们找到了一个需要被唤醒的线程,然后通过unpark方法将其唤醒,而这里被唤醒的节点就是head节点的后驱节点。
    ??那么当对应阻塞的线程被唤醒之后呢?我们回到线程阻塞的代码段:

    private final boolean parkAndCheckInterrupt() {
    		//线程阻塞在这里
            LockSupport.park(this);
            //返回的是线程是否被中断,该方法会重置中断标志
            return Thread.interrupted();
        }
        
    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                	//阻塞线程被唤醒后,会继续循环,其前驱节点是head节点,才会尝试继续获取锁
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        //线程成功获取到锁,failed参数设置为false