当前位置 博文首页 > 黄智霖的博客:Java并发编程(十):ReentrantLock-NonfairSync源
??我们在前文ReentrantLock-NonfairSync源码逐行深度分析(上)中分析了NonfairSync获取锁和阻塞线程的入队逻辑,也就是定义在AQS中的acquire方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
??从前文中了解到,addWaiter方法会将Thread包装成一个Node对象,然后通过死循环(自旋)+CAS的方式保证node一定能够入队成功,入队成功之后会返回对应的node,然后当做入参传入acquireQueued方法中,该方法同样定义在AQS中:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//标识是否被中断
boolean interrupted = false;
//自旋
for (;;) {
//获取node节点的前驱节点
final Node p = node.predecessor();
//如果前驱节点是head节点,那么这里在阻塞之前再次尝试去获取锁
if (p == head && tryAcquire(arg)) {
//如果获取锁成功,那么将这个node节点设置为新的头节点
//setHead方法会清空node的thread和prev属性,并将node赋值到head
setHead(node);
//把旧头结点的后驱指针设置为空
p.next = null; // help GC
//到这里相当于将旧的那个头结点孤立了,没有指针指向它,等待被GC回收
failed = false;
//返回是否被中断唤醒,明显这里是false
return interrupted;
}
//下面是阻塞和为唤醒做准备的操作
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
??这里同样是自旋逻辑,首先会判断当前node的前驱节点是否是head节点,如果是,那么不会立即阻塞当前node代表的线程,而是再次尝试去获取锁。这个逻辑是什么意思呢?其实很好理解,如果这个条件成立,那就代表此时这个node是CLH的第一个真实排队节点,队列中的第二个节点(head头结点是一个空的Node),此时可能锁已经被持锁线程释放了,那么这个排在首位的线程其实不需要被阻塞,直接获取锁就行了。但是也有可能会获取锁失败,比如现在是非公平锁的场景,即使node是第一个排队的节点,但此时又有一个新的线程来竞争锁,由于是非公平的,那么这个锁就可能被这个新来的线程获取,node还是需要被阻塞。如果获取锁成功,那么成功节点就需要”出队“,但是这里的出队并不是真正的出队,其执行的操作是:将节点的thread和prev属性置空,把head节点设置为当前节点,将旧head节点的next指针断掉。假如原本是这样的状态(t0已经被阻塞,head节点的waitStatus为SIGNAL):
??现在t0被唤醒获取锁成功,对应的node从CLH"出队"之后的状态:
??可以看到,node节点并没有真正出队,而是通过一系列字段置空和断开指针操作,node节点变成了新的head节点,而旧的head节点由于没有引用,会被GC清理掉。当然,如果获取锁失败,那么还是需要进入阻塞状态。
??接下来看看阻塞的逻辑,这里涉及到了两个方法的调用:shouldParkAfterFailedAcquire和parkAndCheckInterrupt。如果这两个方法都返回true,那么会将interrupted字段设置为true,然后继续下一次循环。首先来看shouldParkAfterFailedAcquire方法,注意该方法调用传入的pred参数是node节点的前驱节点。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//pred为node节点的前驱节点
//获取前驱结点pred的waitStatus值
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//ws == -1
return true;
if (ws > 0) {
do {
//如果waitStatus为1(CANCELLED)
//那么需要移除pred节点,如果pred的前驱节点还是为CANCELLED
//那么继续移除,直到节点为非CANCELLED状态
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//将pred的waitStatus字段设置为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
??该方法主要是针对waitStatus的一系列操作,关于Node对象的waitStatus字段,前面提到过,其定义了4个值:
-1 :SIGNAL:可被唤醒
1:CANCELLED:需要删除
-2:CONDITION:条件等待
-3:PROPAGATE:无条件传播
??当然还有一个默认的0值,关于1、-2、-3几个值,在本文中不用去关心,后续在总结其它AQS相关实现的工具时遇到再谈,这里只需要关注默认值0和-1。初始状态ws的值为默认值0,那么将会走到CAS修改waitStatus的逻辑,将前驱结点的ws的值修改为-1,然后这个方法返回false(关于ws大于0,当前只定义了一个大于0的值,就是CANCELLED,本文不用理会,这里做的逻辑处理在代码中也给出了注释,就不再多言~)。
??shouldParkAfterFailedAcquire方法返回false之后,紧接着的parkAndCheckInterrupt方法也就不会执行,那么在acquireQueued方法中会进入下一次循环,这里我们就先不考虑node是首个排队节点的情况,会再次进入shouldParkAfterFailedAcquire方法的逻辑,只是此时前驱节点的ws的值成了-1,ws == Node.SIGNAL返回true,那么这个方法此时就会返回true。那么就会接着执行parkAndCheckInterrupt方法,该方法很简单,定义如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
??通过LockSupport.park方法实现了线程的阻塞。方法体中关于返回当前线程中断状态的逻辑我们暂时不管,现在先来重新思考shouldParkAfterFailedAcquire和parkAndCheckInterrupt两个方法的逻辑。经过前面的分析,shouldParkAfterFailedAcquire会执行两次(不考虑pread==head并且获取锁成功的情况,这时节点不会再阻塞),第一次将node的前驱节点的waitStatus设置为SIGNAL(-1),方法返回false,第二次判断到waitStatus等于-1,方法返回true,进而进入parkAndCheckInterrupt方法通过park进行阻塞。所以是node节点的前驱节点的waitStatus的取值影响node节点是否会进入park阻塞,这样做的意义暂时不谈,先知道这么个事儿,等我们看了释放锁的逻辑之后再回过头来看。
??那现在就来看ReentrantLock的unlock方法:
public void unlock() {
sync.release(1);
}
??调用的是AQS的release方法:
public final boolean release(int arg) {
//尝试释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
??该方法的布尔类型返回值表示释放锁是否成功,先不管失败的情况,直接看看tryReleaes方法是如何实现的。该方法在AQS中也是一个空方法,具体实现在子类中,这里我们看看ReentrantLock类中的实现(Sync):
protected final boolean tryRelease(int releases) {
//releases传入的是1
//将当前state的值减去1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
//只能是持锁线程才能释放锁,否则抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//如果state减为0才表示释放锁成功
//对于锁重入的场景,必须有对应重入次数的释放动作
free = true;
//锁释放成功,将exclusiveOwnerThread设置为空
setExclusiveOwnerThread(null);
}
//更新state属性
setState(c);
return free;
}
??锁释放的逻辑很简单,看代码中的注释就行,这里不再赘述。假设现在锁释放成功,返回的free为true,那么就需要执行阻塞线程的唤醒动作:
if (tryRelease(arg)) {
//获取头结点
Node h = head;
if (h != null && h.waitStatus != 0)
//这里要求头结点的waitStatus!=0才会去尝试唤醒阻塞线程
unparkSuccessor(h);
//返回true,表示锁释放成功,如果head为null,或head.waitStatus==0,那么不会去尝试唤醒阻塞线程
return true;
}
??释放锁和唤醒阻塞线程虽然都由持锁线程处理,但是这两个是独立的动作,按照代码逻辑来看,它们允许不同时发生,判断依据就是waitStatus字段。对应到前面提到的,线程在阻塞之前,会有两次循环,第一次会先把前驱节点的waitStatus修改为SIGNAL(-1),然后第二次才会进入parkAndCheckInterrupt方法进行阻塞。注意这里唤醒线程是从CLH的头结点开始唤醒,所以需要判断head的waitStatus字段是否不为默认值0,以下状态表示t0线程可以尝试被唤醒:
??此时head节点的waitStatus==-1,那么可以进入unparkSuccessor方法尝试唤醒阻塞的线程,该方法同样定义在AQS中:
private void unparkSuccessor(Node node) {
//此时传入的node是head节点
int ws = node.waitStatus;
if (ws < 0)
//如果waitStatus小于0,则需要重置其状态
compareAndSetWaitStatus(node, ws, 0);
//需要唤醒的是node的后驱节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//如果node没有后驱节点,或者后驱节点的waitStatus属性大于0
//目前大于0的值就只定义了1,表示CANCELLED,需要被移除
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
//从后往前找,找到最靠近队头的一个waitStatus<=0的节点
if (t.waitStatus <= 0)
//CANCELLED状态的节点不能被唤醒
s = t;
}
if (s != null)
//找到了需要唤醒的节点,调用unpark唤醒节点对应的线程
LockSupport.unpark(s.thread);
}
??前面介绍了在shouldParkAfterFailedAcquire方法中有对应CANCELLED节点的移除逻辑,这里也暂时不用考虑太多,只需要知道我们找到了一个需要被唤醒的线程,然后通过unpark方法将其唤醒,而这里被唤醒的节点就是head节点的后驱节点。
??那么当对应阻塞的线程被唤醒之后呢?我们回到线程阻塞的代码段:
private final boolean parkAndCheckInterrupt() {
//线程阻塞在这里
LockSupport.park(this);
//返回的是线程是否被中断,该方法会重置中断标志
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//阻塞线程被唤醒后,会继续循环,其前驱节点是head节点,才会尝试继续获取锁
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
//线程成功获取到锁,failed参数设置为false