当前位置 博文首页 > Yanci丶:Java并发之AQS原理剖析

    Yanci丶:Java并发之AQS原理剖析

    作者:Yanci丶 时间:2021-06-02 18:23

    概述:

    AbstractQueuedSynchronizer,可以称为抽象队列同步器。

    AQS有独占模式和共享模式两种:

    • 独占模式:

    公平锁:

    非公平锁:

    • 共享模式:

     

    数据结构:

    • 基本属性:
    /**
     * 同步等待队列的头结点
     */
    private transient volatile Node head;
    
    /**
     * 同步等待队列的尾结点
     */
    private transient volatile Node tail;
    
    /**
     * 同步资源状态
     */
    private volatile int state;
    • 内部类:
    static final class Node {
        /**
         * 标记节点为共享模式
         */
        static final Node SHARED = new Node();
        /**
         * 标记节点为独占模式
         */
        static final Node EXCLUSIVE = null;
    
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
    
        /**
         *   CANCELLED:  值为1,表示当前的线程被取消
         *   SIGNAL: 值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
         *   CONDITION:  值为-2,表示当前节点在等待condition,也就是在condition队列中;
         *   PROPAGATE:  值为-3,表示当前场景下后续的acquireShared能够得以执行;
         *   0:  表示当前节点在sync队列中,等待着获取锁。
         *  表示当前节点的状态值
         */
        volatile int waitStatus;
    
        /**
         * 前置节点
         */
        volatile Node prev;
    
        /**
         * 后继节点
         */
        volatile Node next;
    
        /**
         * 节点同步状态的线程
         */
        volatile Thread thread;
    
        /**
         * 存储condition队列中的后继节点
         */
        Node nextWaiter;
    
        /**
         * 是否为共享模式
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
    
        /**
         * 获取前驱结点
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
    
        Node() {    // Used to establish initial head or SHARED marker
        }
    
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
    
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

    主要方法解析:

    • tryAcquire/tryAcquireShared(int arg)

      独占/共享模式获取锁;由子类实现,仅仅获取锁,获取锁失败时不进行阻塞排队。

    • tryRelease/tryReleaseShared(int arg)

      独占/共享模式释放锁;由子类实现,仅仅释放锁,释放锁成功不对后继节点进行唤醒操作。

    • acquire/acquireShared(int arg)

      独占/共享模式获取锁,如果线程被中断唤醒,会返回线程中断状态,不会抛异常中止执行操作(忽略中断)。

    • acquireInterruptibly/acquireSharedInterruptibly(int arg)

      独占/共享模式获取锁,线程如果被中断唤醒,则抛出InterruptedException异常(中断即中止)。

    • tryAcquireNanos/tryAcquireSharedNanos(int arg, long nanosTimeout)

      独占/共享时间中断模式获取锁,线程如果被中断唤醒,则抛出InterruptedException异常(中断即中止);如果超出等待时间则返回加锁失败。

    • release/releaseShared(int arg)

      独占/共享模式释放锁。

    • addWaiter(Node mode)

      将给定模式节点进行入队操作。

     1     private Node addWaiter(Node mode) {
     2         // 根据指定模式,新建一个当前节点的对象
     3         Node node = new Node(Thread.currentThread(), mode);
     4         // Try the fast path of enq; backup to full enq on failure
     5         Node pred = tail;
     6         if (pred != null) {
     7             // 将当前节点的前置节点指向之前的尾结点
     8             node.prev = pred;
     9             // 将当前等待的节点设置为尾结点(原子操作)
    10             if (compareAndSetTail(pred, node)) {
    11                 // 之前尾结点的后继节点设置为当前等待的节点
    12                 pred.next = node;
    13                 return node;
    14             }
    15         }
    16         enq(node);
    17         return node;
    18     }
    • enq(final Node node)

      将节点设置为尾结点。注意这里会进行自旋操作,确保节点设置成功。因为等待的线程需要被唤醒操作;如果操作失败,当前节点没有与其他节点没有引用指向关系,一直就不会被唤醒(除非程序代码中断线程)。

     1     private Node enq(final Node node) {
     2         for (;;) {
     3             Node t = tail;
     4             // 判断尾结点是否为空,尾结点初始值是为空
     5             if (t == null) { // Must initialize
     6                 // 尾结点为空,需要初始化
     7                 if (compareAndSetHead(new Node()))
     8                     tail = head;
     9             } else {
    10                 // 设置当前节点设置为尾结点
    11                 node.prev = t;
    12                 if (compareAndSetTail(t, node)) {
    13                     t.next = node;
    14                     return t;
    15                 }
    16             }
    17         }
    18     }
    • acquireQueued(final Node node, int arg)

      已经在队列当中的节点,准备阻塞获取锁。在阻塞前会判断前置节点是否为头结点,如果为头结点;这时会尝试获取下锁(因为这时头结点有可能会释放锁)。

     1     final boolean acquireQueued(final Node node, int arg) {
     2         boolean failed = true;
     3         try {
     4             boolean interrupted = false;
     5             for (;;) {
     6                 // 当前节点的前置节点
     7                 final Node p = node.predecessor();
     8                 // 入队前会先判断下该节点的前置节点是否是头节点(此时头结点有可能会释放锁);然后尝试去抢锁
     9                 // 在非公平锁场景下有可能会抢锁失败,这时候会继续往下执行 阻塞线程
    10                 if (p == head && tryAcquire(arg)) {
    11                     //如果抢到锁,将头节点后移(也就是将该节点设置为头结点)
    12                     setHead(node);
    13                     p.next = null; // help GC
    14                     failed = false;
    15                     return interrupted;
    16                 }
    17                 // 如果前置节点不是头结点,或者当前节点抢锁失败;通过shouldParkAfterFailedAcquire判断是否应该阻塞
    18                 // 当前置节点的状态为SIGNAL=-1,才可以安全被parkAndCheckInterrupt阻塞线程
    19                 if (shouldParkAfterFailedAcquire(p, node) &&
    20                     parkAndCheckInterrupt())
    21                     // 该线程已被中断
    22                     interrupted = true;
    23             }
    24         } finally {
    25             if (failed)
    26                 cancelAcquire(node);
    27         }
    28     }
    • shouldParkAfterFailedAcquire(Node pred, Node node)

      检查和更新未能获取锁节点的状态,返回是否可以被安全阻塞。

     1     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     2         int ws = pred.waitStatus;   // 获取前置节点的状态
     3         if (ws == Node.SIGNAL)
     4             /*
     5              * 前置节点的状态waitStatus为SIGNAL=-1,当前线程可以安全的阻塞
     6              */
     7             return true;
     8         if (ws > 0) {
     9             /*
    10              * 如果前置节点的状态waitStatus>0,即waitStatus为CANCELLED=1(无效节点),需要从同步状态队列中取消等待(移除队列)
    11              */
    12             do {
    13                 node.prev = pred = pred.prev;
    14             } while (pred.waitStatus > 0);
    15             pred.next = node;
    16         } else {
    17             /*
    18              * 将前置状态的waitStatus修改为SIGNAL=-1,然后当前节点才可以被安全的阻塞
    19              */
    20             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    21         }
    22         return false;
    23     }
    • parkAndCheckInterrupt()

      阻塞当前节点,返回当前线程的中断状态。

    1     private final boolean parkAndCheckInterrupt() {
    2         LockSupport.park(this); //阻塞
    3         return Thread.interrupted();
    4     }
    • cancelAcquire(Node node)

      取消进行的获取锁操作,在非忽略中断模式下,线程被中断唤醒抛异常时会调用该方法。

     1     //  将当前节点的状态设置为CANCELLED,无效的节点,同时移除队列       
     2     private void cancelAcquire(Node node) {
     3         if (node == null)
     4             return;
     5 
     6         node.thread = null;
     7         Node pred = node.prev;
     8         while (pred.waitStatus > 0)
     9             node.prev = pred = pred.prev;
    10 
    11         Node predNext = pred.next;
    12         node.waitStatus = Node.CANCELLED;
    13         if (node == tail && compareAndSetTail(node, pred)) {
    14             compareAndSetNext(pred, predNext, null);
    15         } else {
    16             int ws;
    17             if (pred != head &&
    18                 ((ws = pred.waitStatus) == Node.SIGNAL ||
    19                  (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
    20                 pred.thread != null) {
    21                 Node next = node.next;
    22                 if (next != null && next.waitStatus <= 0)
    23                     compareAndSetNext(pred, predNext, next);
    24             } else {
    25                 unparkSuccessor(node);
    26             }
    27 
    28             node.next = node; // help GC
    29         }
    30     }    
    • hasQueuedPredecessors()

      判断当前线程是否应该排队。

      1.第一种结果——返回true:(1.1和1.2同时存在,1.2.1和1.2.2有一个存在)

        1.1  h != t为true,说明头结点和尾结点不相等,表示队列中至少有两个不同节点存在,至少有一点不为null。

        1.2  ((s = h.next) == null || s.thread != Thread.currentThread())为true

          1.2.1   (s = h.next) == null为true,表示头结点之后没有后续节点。

          1.2.2   (s = h.next) == null为false,s.thread != Thread.currentThread()为true