当前位置 博文首页 > 昔久:深入理解Java并发框架AQS系列(五):条件队列(Condition

    昔久:深入理解Java并发框架AQS系列(五):条件队列(Condition

    作者:昔久 时间:2021-04-28 12:20

    深入理解Java并发框架AQS系列(一):线程
    深入理解Java并发框架AQS系列(二):AQS框架简介及锁概念
    深入理解Java并发框架AQS系列(三):独占锁(Exclusive Lock)
    深入理解Java并发框架AQS系列(四):共享锁(Shared Lock)
    深入理解Java并发框架AQS系列(五):条件队列(Condition)

    一、前言

    AQS中的条件队列相比较前文中的“独占锁”、“共享锁”等比较独立,即便没有条件队列也丝毫不影响诸如ReentrantLockSemaphore类的实现,那如此说来条件队列是否就是一个可有可无的产物?答案是否定的,我们来看下直接或间接用到条件队列的JDK并发类:

    • ReentrantLock 独占锁经典类
    • ReentrantReadWriteLock 读写锁
    • ArrayBlockingQueue 基于数组的阻塞队列
    • CyclicBarrier 循环栅栏,解决线程同步问题
    • DelayQueue 延时队列
    • LinkedBlockingDeque 双向阻塞队列
    • PriorityBlockingQueue 支持优先级的无界阻塞队列
    • ThreadPoolExecutor 线程池构造器
    • ScheduledThreadPoolExecutor 可基于时间调度的线程池构造器
    • StampedLock 邮戳锁,1.8后引入,更高效的读写锁

    如此豪华的阵容,可见Condition的地位不可小觑

    我们简单描述下条件队列实现的功能:有3个线程A、B、C,分别调用wait/await方法后,线程进入阻塞,在没有其他线程去唤醒的情况下,3个线程将永远处于阻塞状态。此时如果有另外线程调用notify/signal,那么A、B、C线程中的某一个将被激活(根据其进入条件队列的顺序而定),从而执行后续的逻辑;如果调用notifyAll/signalAll的话,那么3个线程都将被激活,这可能是我们对条件队列的简单认识。这样的描述是否准确呢?可能不太严谨,我们引入JDK的条件队列来做说明

    统一话术:其实语法层面支持的wait/notify与AQS都属于JDK的范畴,但为了区分两者,我们定义如下:

    • JDK条件队列:语法层面提供支持的wait/notify,即Object类中的wait()/notify()方法
    • AQS条件队列:AQS提供的条件队列,即AQS内部的ConditionObject

    二、JDK中的条件队列(wait/notify)

    众所周知,在JDK中,wait/notify/notifyAll是根对象Object中内置的方法,且方法均被定义为native本地方法

    // 等待
    public final native void wait(long timeout) throws InterruptedException;
    // 唤醒
    public final native void notify();
    // 唤醒所有等待线程
    public final native void notifyAll();
    

    2.1、wait

    // 步骤1
    synchronized (obj) {
      // 步骤2
      before();
      // 步骤3
      obj.wait();
      // 步骤4
      after();
    }
    

    相信大家对上述代码并不陌生,我们将JDK的条件队列抽象为4步,逐一阐述

    • 步骤1: synchronized (obj)
      • 在jdk中如果想调用Object.wait()方法,必须首先获取该对象的synchronized锁,当前步骤,如果成功获取到锁,那么将进入“步骤2”,如果存在并发,当前线程将会进入阻塞(线程状态为BLOCKED),知道获取到锁为止
    • 步骤2: before()
      • 我们知道synchronized是独占锁,所以在执行步骤2代码时,程序是不存在并发的,即同一时刻,只有一个线程正在执行,此处也相对好理解
    • 步骤3: obj.wait()
      • 此步骤是将当前线程放入条件队列,同时释放obj的同步锁。此处跟我们对synchronized的认知有悖,我们一般认为synchronized (obj) {......}在大括号中的代码会一直持有锁,而事实情况却是,当程序执行wait()方法时,会释放obj的同步锁
    • 步骤4: after()
      • 此步骤是并发执行还是串行执行?假设我们现在有3个线程A、B、C都已经执行完毕wait()方法,并进入了条件队列,等待其他线程唤醒;此时另外一个线程执行了notifyAll()时,后续的激活流程是怎么样的?
        • 错误观点:有很多同学直观感受是,线程A、B、C同时被激活,所以步骤4是并发执行的;就像是百米赛跑,所有同学都准备就绪(wait),一声枪响后(notifyAll),所有人开始赛跑,并跑到终点(步骤4
        • 正确观点:其实“步骤4”是串行执行的,大家再检查下代码后便可发现,“步骤4”处于synchronized的大括号之间;还是拿上述赛跑举例,如果认为从听到枪响至跑到终点是“步骤4”的话,那真实的场景应该是这样的:一声枪响后,A起跑,B、C原地不动;A跑到终点后,B开始起跑,C原地不动;最后是C跑到终点

    由此我们断定,obj.wait()虽然是native方法,但其内部经历了释放锁、重新抢锁的两个大环节

    2.2、notify

    synchronized (obj) {
      obj.notify();
      // obj.notifyAll();
    }
    

    所有因obj.wait()阻塞的线程,都要通过notify来唤醒

    • notify() 唤醒条件队列中,队首节点
    • notifyAll() 唤醒条件队列中所有节点

    三、AQS中的条件队列(await/signal)

    我们初看AQS中的条件队列时,发现其提供了与JDK条件队列几乎一致的功能

    JDK AQS
    wait await
    notify singal
    notifyAll singalAll

    用法上也及其相似:

    await

    // 初始化
    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    try {
      lock.lock();
      condition.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      lock.unlock();
    }
    

    singal

    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();
    try {
      lock.lock();
      condition.signal();
    } finally {
      lock.unlock();
    }
    

    3.1、条件队列

    我们知道在AQS内部维护了一个阻塞队列,数据结构如下:

    阻塞队列FIFO数据结构

    上图描述的是一个长度为 3 的FIFO阻塞队列,因为头结点常驻内存,所以不算在内;我们可以发现阻塞队列中每个节点都包含了前、后引用

    那AQS内部的另一个条件队列又是什么样的数据结构呢?

    条件队列数据结构

    可见,条件队列为单向列表,只有指向下一个节点的引用;没有被唤醒的节点全部存储在条件队列上。上图描述的是一个长度为 5 的条件队列,即有5个线程执行了await()方法;与阻塞队列不同,条件队列没有常驻内存的“head结点”,且一个处于正常状态节点的waitStatus -2 。当有新节点加入时,将会追加至队列尾部

    3.2、唤醒

    当我们调用signal()方法时,会发生什么?我们还是拿长度为 5 的条件队列举例说明,在AQS内部会经历队列转移,即由条件队列转移至阻塞队列

    signal条件队列向阻塞队列转移

    signalAll()执行时,具体执行流程与signal()类似,即会将条件队列中的所有节点全部转移至阻塞队列(并发度为1,按顺序依次激活)中,依靠阻塞队列自身依次唤醒的机制,达到激活所有线程的目的

    四、JDK vs AQS

    经过上文的介绍,似乎AQS做了与wait/notify相同的功能,相比较而言,甚至JDK的写法更简洁;那他们在性能上的表现如何呢?让我们来做个对比

    4.1、对比

    我们模拟这样的一个场景:启动10个线程,分别调用wait()方法,当所有线程都进入阻塞后,调用notifyAll(),10个线程均被唤醒并执行完毕后,方法结束。 上述方法执行10000次,对比JDK与AQS耗时

    JDK测试代码:

    public class ConditionCompareTest {
    
      @Test
      public void runTest() throws InterruptedException {
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
          if (i % 1000 == 0) {
            System.out.println(i);
          }
          jdkTest();
        }
        long cost = System.currentTimeMillis() - begin;
        System.out.println("耗时: " + cost);
      }
      
      public void jdkTest() throws InterruptedException {
        Object lock = new Object();
        List<Thread> list = Lists.newArrayList();
        // 步骤一:启动10个线程,并进入wait等待
        for (int i = 0; i < 10; i++) {
          Thread thread = new Thread(() -> {
            try {
              synchronized (lock) {
                lock.wait();
              }
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          });
          thread.start();
          list.add(thread);
        }
    
        // 步骤二:等待10个线程全部进入wait方法
        while (true) {
          boolean allWaiting = true;
          for (Thread thread : list) {
            if (thread.getState() != Thread.State.WAITING) {
              allWaiting = false;
              break;
            }
          }
          if (allWaiting) {
            break;
          }
        }
    
        // 步骤三:唤醒10个线程
        synchronized (lock) {
          lock.notifyAll();
        }
    
        // 步骤四:等待10个线程全部执行完毕
        for (Thread thread : list) {
          thread.join();
        }
      }
    }
    

    AQS测试代码:

    public class ConditionCompareTest {
      private ReentrantLock lock = new ReentrantLock();
      private Condition condition = lock.newCondition();
    
      @Test
      public void runTest() throws InterruptedException {
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 10000; i++) {
          if (i % 1000 == 0) {
            System.out.println(i);
          }
          aqsTest();
        }
        long cost = System.currentTimeMillis() - begin;
        System.out.println("耗时: " + cost);
      }
    
      @Test
      public void aqsTest() throws InterruptedException {
        AtomicInteger lockedNum = new AtomicInteger();
        List<Thread> list = Lists.newArrayList();
        // 步骤一:启动10个线程,并进入wait等待
        for (int i = 0; i < 10; i++) {
          Thread thread = new Thread(() -> {
            try {
              lock.lock();
              lockedNum.incrementAndGet();
              condition.await();
              lock.unlock();
            } catch (InterruptedException e) {
              e.printStackTrace();
            }
          });
          thread.start();
          list.add(thread);
        }
    
        // 步骤二:等待10个线程全部进入wait方法
        while (true) {
          if (lockedNum.get() != 10) {
            continue;
          }
          boolean allWaiting = true;
          for (Thread thread : list) {
            if (thread.getState() != Thread.State.WAITING) {
              allWaiting = false;
              break;
            }
          }
          if (allWaiting) {
            break;
          }
        }
    
        // 步骤三:唤醒10个线程
        lock.lock();
        condition.signalAll();
        lock.unlock();
    
        // 步骤四:等待10个线程全部执行完毕
        for (Thread thread : list) {
          thread.join();
        }
      }
    }
    
    条件队列 耗时1 耗时2 耗时3 耗时4 耗时5 平均耗时(ms)
    JDK 5000 5076 5054 5089 4942 5032
    AQS 5358 5440 5444 5473 5472 5437

    4.2、基准测试Q&A

    基于以上的测试我们还是有一些疑问的,不要小看这些疑问,通过这些疑问我们可以把之前的知识点全都串联起来

    • Q:AQS测试中的“步骤二”,为什么在判断“等待10个线程全部进入wait方法”时,要引入lockedNum.get() != 10的判断?直接通过判断所有线程是否均为waiting方法不可以吗?
    • A:如果真的删除lockedNum.get() != 10的判断,在多次并发测试时,会有较小的概率出现程序死锁的情况(作者电脑的环境是平均5万次调用会出现一次),为什么会出现死锁呢?我们追AQS源码就会发现,不管是调用lock()还是await,挂起线程使用的方法均为LockSupport.park()方法,此方法会将线程置为WAITING状态,也就是线程状态是WAITING状态时,有可能线程刚进入lock()方法,从而导致awaitthread.join()的死锁

    • Q:既然是这样,为什么JDK的测试没有出现死锁?
    • A:我们看到JDK的加锁是通过synchronized关键字完成的,而当线程因为等待synchronized资源而阻塞时,线程状态将变为BLOCKED,而进入wait()方法后,状态才会变为WAITING

    • Q:那看来只有通过引入AtomicInteger lockedNum变量才能解决死锁问题了
    • A:其实解决问题的方式有很多种,我们甚至可以简单将ReentrantLock lock置为公平锁,也能解决上述死锁问题;因为当前场景发生死锁的情况是,singalAll()先于await()发生,而当所有线程都变成WAITING状态后,公平锁则确保了singalAll()一定是在所有线程都调用了await()。但因为synchronized本身是非公平锁,故如果AQS使用公平锁的话,性能偏差较大

    • Q:那这样看来,AQS中的阻塞队列相对比JDK的没有优势可言啊,用法上没有JDK简洁,性能上还没人家快
    • A:的确,如果真是只是单纯的使用阻塞、唤醒功能的话,还是建议使用JDK内置的方式;但AQS的优势并不在此

    五、再说AQS条件队列

    AQS的优势在于,其提供了丰富的api可以查询条件队列的状态;例如当我们想看一下在条件队列中等待节点的个数时,使用JDK的wait/notify时,是无法做的;AQS提供的api如下:

    • boolean hasWaiters() 阻塞队列中是否有等待节点
    • int getWaitQueueLength() 获取阻塞队列长度
    • Collection<Thread> getWaitingThreads() 获取阻塞队列中线程对象

    这些api为程序提供了更灵活的控制,条件队列对于javaer已不是黑盒;当然使用AQS的条件队列必然要引入独占锁,例如ReentrantLock,自然地我们还可以通过它查看条件队列外围的一些指标,例如:

    • Interrupted 响应中断,借助独占锁,提供响应中断能力; wait/notify不提供,因为虽然wait方法响应中断,但是synchronized关键字是会一直阻塞的
    • boolean tryLock() 尝试获取锁; wait/notify不提供
    • int getHoldCount() 获取阻塞线程的数量
    • boolean isLocked() 是否持有锁
    • fair/nonFair 提供公平/非公平锁
    • ...

    可见整个AQS体系相比较Objectwait/notify方法是相当灵活的,提供了很多监控条件队列、阻塞队列的指标

    六、致谢

    这里要特别感谢一下神策数据的架构师金满仓,同时也是我私下的挚友。他功力深厚,对程序有着自己独到的见地,在整个AQS编写期间,不厌其烦地给我提供了很多理论及数据上的支持,帮我拓宽视野,再次感谢!

    bk