当前位置 博文首页 > Admol:源码分析:Exchanger之数据交换器

    Admol:源码分析:Exchanger之数据交换器

    作者:Admol 时间:2021-02-20 12:33

    简介

    Exchanger是Java5 开始引入的一个类,它允许两个线程之间交换持有的数据。当Exchanger在一个线程中调用exchange方法之后,会阻塞等待另一个线程调用同样的exchange方法,然后以线程安全的方式交换数据,之后线程继续执行。

    官方示例

    在JDK的源码注释中,提供了一个简单的示例demo,稍加修改后就可以运行

    public class FillAndEmpty {
        Exchanger<Integer> exchanger = new Exchanger<Integer>();
        Integer initialEmptyBuffer = 1;
        Integer initialFullBuffer = 2;
    
         class FillingLoop implements Runnable {
            public void run() {
                Integer currentBuffer = initialEmptyBuffer;
                try {
                    while (currentBuffer != 2) {
                         currentBuffer = exchanger.exchange(currentBuffer);
                    }
                    System.out.println("FillingLoop:"+currentBuffer);
                } catch (InterruptedException ex) {
    
                }
            }
        }
    
         class EmptyingLoop implements Runnable {
            public void run() {
                Integer currentBuffer = initialFullBuffer;
                try {
                    while (currentBuffer != 1) {
                        currentBuffer = exchanger.exchange(currentBuffer);
                    }
                    System.out.println("EmptyingLoop:"+currentBuffer);
                } catch (InterruptedException ex) {
    
                }
            }
        }
    
        void start() {
            new Thread(new FillingLoop()).start();
            new Thread(new EmptyingLoop()).start();
        }
    
        public static void main(String[] args){
            FillAndEmpty f = new FillAndEmpty();
            f.start();
        }
    }
    

    源码分析

    内部类

    Exchanger 中定义了两个内部类:Node、Participant

    // 使用 @sun.misc.Contended 注解避免出现伪共享
    @sun.misc.Contended static final class Node {
        int index;              // Arena 中的索引
        int bound;              // Exchanger.bound的最后记录值
        int collides;           // 当前 bound 的CAS 失败数
        int hash;               // Pseudo-random for spins
        Object item;            // 线程的当前数据项
        volatile Object match;  // 由释放线程提供的项目
        volatile Thread parked; // 当阻塞(parked)时,设置此线程,否则为null
    }
    
    /** 继承了ThreadLocal,并初始化了Node对象 */
    static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }
    }
    

    重要的属性

    /** 每个线程的状态 */
    private final Participant participant;
    /** 消除数组;在启用(在slotExchange中)之前为空。元素访问使用volatile get和CAS */
    private volatile Node[] arena;
    /** 在检测到争用之前一直使用的插槽,可以理解为先到的线程的数据项 */
    private volatile Node slot;
    /** 每次更新时,将最大有效竞技场位置的索引与高位SEQ号进行“或”运算。 */
    private volatile int bound;
    

    exchange()方法

    等待另一个线程到达交换点(除非当前线程被中断),然后将给定的对象传递给它,作为回报接收另一个的对象。

    public V exchange(V x) throws InterruptedException {
        // 交换后的对象v
        Object v; 
        // item 为交换出去的对象,如果为null则换成NULL_ITEM对象
        Object item = (x == null) ? NULL_ITEM : x; // translate null args
        // 1.1构造方法没有初始化arena,所以第一个进来的线程看见的arena肯定为null
        // 1.2第一个进来的线程继续调用slotExchange(item, false, 0L)方法
        if ((arena != null || (v = slotExchange(item, false, 0L)) == null) &&
            // 2.1 Thread.interrupted(): 检测线程是否有被中断
            // 2.2 arenaExchange(item, false, 0L):slotExchange方法 返回了null时会进入到这个方法
            ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
        return (v == NULL_ITEM) ? null : (V)v;
    }
    

    arenaExchange()方法总结:

    1. 调用exchange方法的线程等待另一个线程到达交换点完成交换数据
    2. 如果交换的数据为null,会被转换成一个NULL_ITEM 的Object对象作为转换的数据项
    3. 构造方法未初始化arena对象,所以会先调用slotExchange方法借用slot插槽来交换对象
    4. 如果slotExchange方法成功返回了另一个交换到的对象,则直接返回交换到的数据项
    5. 如果slotExchange方法成功返回了null,会继续调用arenaExchange方法完成数据交换并返回

    slotExchange()方法

    /**
     * item:要交换的项目
     * timed:是否有设置超时
     * ns: 设置的超时时间
     * return: 返回另一个线程的数据项;如果启用arena或线程在完成之前被中断,则为null;如果超时,则为TIMED_OUT
     */
    private final Object slotExchange(Object item, boolean timed, long ns) {
        // 获取当前线程node节点对象
        Node p = participant.get();
        Thread t = Thread.currentThread(); // 当前线程
        if (t.isInterrupted()) // preserve interrupt status so caller can recheck
            return null;
        // 自旋
        for (Node q;;) {
            if ((q = slot) != null) { // 两个线程先到的线程,slot肯定为null,一般后到的线程会进入到这个if分支
                // 如果在当前线程之前已经有线程调用了exchange方法,slot就肯定不为null,条件成立
                if (U.compareAndSwapObject(this, SLOT, q, null)) {// 后来的线程会调用CAS吧slot再置为null
                    // q.item 是较早的线程的数据项
                    Object v = q.item;
                    // item 是当前线程的数据项;by: https://jinglingwang.cn
                    q.match = item;
                    // 之前阻塞(park)的线程
                    Thread w = q.parked;
                    if (w != null) //可能另一个线程还在自旋,没有阻塞,所以这里可能会为null
                        // 唤醒之前被阻塞的线程
                        U.unpark(w);
                    // 返回之前的线程的数据项
                    return v;
                }
                // create arena on contention, but continue until slot null
                // 上面CAS修改slot失败后,会进入到这里;https://jinglingwang.cn
                // SEQ = MMASK + 1 = 256
                if (NCPU > 1 && bound == 0 && U.compareAndSwapInt(this, BOUND, 0, SEQ))
                    // if条件成立,初始化arena数组
                    // 我8核的CPU,计算的length是 (4+2) << 7 == 768
                    arena = new Node[(FULL + 2) << ASHIFT];
            }
            else if (arena != null) 
                 // 如果上面的if条件成立并且初始化了arena数组,会进入到arenaExchange方法
                return null; // caller must reroute to arenaExchange
            else {
                p.item = item; // p节点的item设置为当前项item
                if (U.compareAndSwapObject(this, SLOT, null, p)) // CAS 修改slot的值,修改成功退出自旋
                    break;
                p.item = null; //CAS 修改失败没有退出自旋,重置p节点的item为null
            }
        }
        // 理论上第一个先到的线程会进入到下面,会阻塞自己,等待另一个线程的数据项到来
        // await release
        int h = p.hash;
        long end = timed ? System.nanoTime() + ns : 0L;  // 超时时间
        // 根据CPU的核数确定自旋的次数1024 or 1
        int spins = (NCPU > 1) ? SPINS : 1;
        Object v;
        while ((v = p.match) == null) { // 先到的线程 p.match 可能会为null,下面开始自旋等待另一个线程交换的数据设置到match
            if (spins > 0) { **// 至少先自旋 1024 次,等待match数据项,自旋后才阻塞自己**
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0)
                    h = SPINS | (int)t.getId(); // 重新计算hash
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                    // 减少自旋次数
                    Thread.yield(); // 让出CPU的使用权
            } else if (slot != p) // 上面自旋次数已经减到0了,并且slot != p,没有冲突的话理论上slot 应该是等于 p 的
                spins = SPINS; // 重置自旋次数
            else if (!t.isInterrupted() && arena == null &&  (!timed || (ns = end - System.nanoTime()) > 0L)) {
                U.putObject(t, BLOCKER, this);
                p.parked = t;
                if (slot == p)
                    U.park(false, ns); // 调用底层阻塞最早的线程
                // 线程被唤醒了,回到上面再次判断while自旋,p.match理论上不会是null了,p.match是后到的线程的数据项,是需要返回给当前线程的项
                p.parked = null;
                U.putObject(t, BLOCKER, null);
            } else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                // 如果线程阻塞超时了,还是没等待要交换的数据项,会进入到这里,返回一个TIMED_OUT 对象或null
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        // 将 当前线程p 的 match 属性设置成 null
        U.putOrderedObject(p, MATCH, null);
        p.item = null;
        p.hash = h;
        // 返回匹配后的数据项v
        return v;
    }
    

    slotExchange()方法总结:

    1. 线程进入该方法后,会先拿到[Exchanger](https://jinglingwang.cn)Participant,也就是Node数据节点p
    2. 检查线程的状态,是否有被中断,如果是返回null,会进入到下面的arenaExchange方法逻辑
    3. 先调用slotExchange()方法的线程会使用CAS的方式线程安全的占用slot插槽
    4. 然后会自旋至少1024次并不断让出CPU使用权,期间如果成功等待到了另外的线程的数据项(p.match != null),则直接返回交换到的数据(v = p.match
    5. 如果自旋后没有等到交换的数据项,调用U.park阻塞当前线程,等待另一个线程的到来将其唤醒或者超时
    6. 另一个线程进入slotExchange()方法后,发现slot插槽已经被占用(已经有线程在等它交换数据了),取出slot插槽中的item数据(第一个线程的数据),并设置自己的数据到插槽的match项,然后唤醒另一个线程,成功换反交换到的数据。
    7. 被唤醒的线程成功获得match数据,并返回交换后的match数据

    slotExchange方法返回null的2种情况:

    1. 线程被中断,会返回null
    2. 设置了超时时间,并且时间超时,会返回TIMED_OUT
    3. 第一个线程超时了,把slot从p置为null的同事第二个线程刚好调用CAS也在把slot从q修改为null,这时候第二个线程会修改失败,然后就会去初始化arena数组,然后第二个线程就可能返回null

    arenaExchange()方法

    exchange()方法实现中可以看到,只有当slotExchange()方法返回null之后才会执行到arenaExchange()方法,而线程中断的情况是不会进入到该方法的,所以只有另一种情况,但是要进入的几率太小了,断点调试的话难以构造这种情况。

    private final Object arenaExchange(Object item, boolean timed, long ns) {
        // 实质上就是个Node数组
        Node[] a = arena;
        // 获取当前线程node节点对象
        Node p = participant.get();
        // p.index 访问插槽的索引位置,初始值为0
        for (int i = p.index;;) {                      // access slot at i
            // j是原始数组偏移量 https://jinglingwang.cn
            int b, m, c; long j;                       // j is raw array offset
            // ABASE:返回Node数组中第一个元素的偏移地址+128; i << ASHIFT : i<<7
            // getObjectVolatile:获取obj对象中offset偏移地址对应的object型field的值,支持volatile load语义
            // q节点就是通过CAS获取arena数组偏移(i + 1) *  128个地址位上的node
            Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
            // 如果获取到的节点不为空,并且再次吧j位置的q元素置为null
            if (q != null && U.compareAndSwapObject(a, j, q, null)) { // 整个条件成立,代表线程获得了交换的数据
                Object v = q.item;                     // release
                q.match = item;
                Thread w = q.parked;
                if (w != null)  // 有阻塞的线程就唤醒
                    U.unpark(w);
                return v; // 返回交换的数据
            } else if (i <= (m = (b = bound) & MMASK) && q == null) {  // i 没有越界,并且q==null
                // 把当前线程的数据赋予给p节点的item
                p.item = item;                         // offer
                if (U.compareAndSwapObject(a, j, null, p)) { // 再使用CAS的方式把p节点安全的放入到数组的j位置上
                    // CAS 修改成功
                    // 计算超时时间
                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                    Thread t = Thread.currentThread(); // wait   当前线程
                    // 自旋 1024
                    for (int h = p.hash, spins = SPINS;;) {
                        Object v = p.match;  //交换的数据
                        if (v != null) {  // 交换的数据不为null,说明有其他线程把交换的数据送进来了
                            U.putOrderedObject(p, MATCH, null);
                            // 将match和item置为null
                            p.item = null;             // clear for next use
                            p.hash = h; 
                            return v;// 返回数据
                        } else if (spins > 0) {
                            // 异或移位
                            h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                            if (h == 0)                // initialize hash  初始化hash
                                h = SPINS | (int)t.getId();
                            else if (h < 0 &&          // approx 50% true
                                     (--spins & ((SPINS >>> 1) - 1)) == 0) // 减少自旋次数
                                Thread.yield();        // two yields per wait  让出CPU使用权
                        } else if (U.getObjectVolatile(a, j) != p) // 和slotExchange方法中的类似
                            // 重置自旋次数
                            spins = SPINS;       // releaser hasn't set match yet  
                        else if (!t.isInterrupted() && m == 0 &&
                                 (!timed || // 超时时间设置
                                  (ns = end - System.nanoTime()) > 0L)) {
                            U.putObject(t, BLOCKER, this); // emulate LockSupport
                            p.parked = t;              // minimize window
                            if (U.getObjectVolatile(a, j) == p)
                                U.park(false, ns);  // 阻塞当前线程,等待被唤醒
                            p.parked = null; // 线程被唤醒了
                            U.putObject(t, BLOCKER, null);
                        } else if (U.getObjectVolatile(a, j) == p && U.compareAndSwapObject(a, j, p, null)) {
                            // m会跟着bound变化,初始会是0
                            if (m != 0)                // try to shrink
                                U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1); // 修改b
                            p.item = null;
                            p.hash = h;
                            // i = p.index无符号右移1位
                            i = p.index >>>= 1;        // descend
                            if (Thread.interrupted()) //线程被中断
                                return null;
                            if (timed && m == 0 && ns <= 0L) // 超时,返回TIME_OUT
                                return TIMED_OUT;
                            break;                     // expired; restart
                        }
                    }
                } else // 使用CAS的方式把p节点安全的放入到数组的j位置上失败(可能有其他线程已经捷足先登),重置p节点的item
                    p.item = null;                     // clear offer
            } else { // 上面两个if条件都没成立:比如q!=null,compareAndSwapObject失败,数组未越界
                if (p.bound != b) {                    // stale; reset
                    p.bound = b; // b变化了,重置bond
                    p.collides = 0; // 当前 bound 的CAS 失败数
                    i = (i != m || m == 0) ? m : m - 1; // 确定索引i
                } else if ((c = p.collides) < m || m == FULL ||  !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                    p.collides = c + 1; // bound 的CAS 失败数+1
                    // 确定循环遍历i,继续回到上面最初的地方自旋
                    i = (i == 0) ? m : i - 1;          // cyclically traverse
                } else
                    // 此时表示bound值增加了SEQ+1
                    i = m + 1;                         // grow
                p.index = i; // 设置下标,继续自旋
            }
        }
    }
    

    Exchanger总结:

    1. Exchanger 可以以线程安全的方式完成两个线程之间数据的交换工作
    2. By: http://jinglingwang.cn
    3. Exchanger 主要是使用了自旋和CAS来保证数据的原子性
    4. 一般情况下,slotExchange()方法即可完成数据交换的工作
    5. JDK8 版本的Exchanger 使用了 @sun.misc.Contended注解来避免伪共享
    6. 数据交换过程可以总结为:A、B线程交换数据 ,A发现slot为空就把自己的数据放入到slot插槽中的item项,自旋或阻塞等待B线程的数据,B线程进来发现A线程的数据后取走数据并设置自己的数据到match,然后再唤醒A线程取走B线程的match数据。多个线程交换时,需要用到slot数组。
    bk