当前位置 博文首页 > JavaEdge全是干货的技术号:Java集合源码解析-ConcurrentHashMap

    JavaEdge全是干货的技术号:Java集合源码解析-ConcurrentHashMap

    作者:[db:作者] 时间:2021-07-07 12:43

    为并发而生的 ConcurrentHashMap

    数据结构

    Java 7为实现并发访问,引入了Segment这一结构,实现了分段锁,理论上最大并发度与Segment个数相等。

    Java 8取消了基于 Segment 的分段锁思想,改用CAS + synchronized 控制并发操作,在某些方面提升了性能。并且追随 1.8 版本的 HashMap 底层实现,使用数组+链表+红黑树进行数据存储。
    JAVA 8 ConcurrentHashMap

    • 和 HashMap 中的语义一样,代表整个哈希表。在第一次插入时才懒加载初始化。大小永远是 2 的次幂。被迭代器直接访问。

    • 一个连接表,用于哈希表扩容,扩容完成后会被重置为 null

    • 保存着整个哈希表中存储的所有的结点的个数总和,类似于 HashMap 的 size 属性。主要用于当没有线程竞争时使用,也会作为哈希表初始化过程中的反馈。通过CAS 更新。

    这是一个重要的属性,无论是初始化哈希表,还是扩容 rehash,都需要该依赖。有如下取值:

    • >0:相当于 HashMap 中的 threshold,表示阈值
    • 0:默认值
    • -1:代表哈希表正在进行初始化
    • <-1:代表有多个线程正在进行扩容
    • 构造函数的实现也和HashMap类似

      若传入 32,实际大小 64。即最接近1.5n+1的 2的次幂。因为如果你想存入 15 个元素,那么 16 是存不下的,需要扩容,所以直接给你初始化为 32 的容量。

    寻址方式

    同样是通过Key的哈希值与数组长度取模确定该Key在数组中的索引;
    同样为了避免不太好的Key的hashCode设计,它通过如下方法计算得到Key的最终哈希值.

    // usable bits of normal node hash
    static final int HASH_BITS = 0x7fffffff;
    

    不同的是,Java 8的ConcurrentHashMap作者认为引入红黑树后,即使哈希冲突比较严重,寻址效率也足够高,所以作者并未在哈希值的计算上做过多设计,只是将Key的hashCode值与其高16位作异或并保证最高位为0(从而保证最终结果为正整数)

    8.3 同步方式

    对于put操作,如果Key对应的数组元素为null,则通过CAS操作将其设置为当前值;
    如果Key对应的数组元素(也即链表表头或者树的根元素)不为null,则对该元素使用synchronized关键字申请锁,然后进行操作;
    如果该put操作使得当前链表长度超过一定阈值,则将该链表转换为树,从而提高寻址效率.

    对于读操作,由于数组被volatile修饰,因此不用担心数组的可见性问题;
    同时每个元素是一个Node实例(Java 7中每个元素是一个HashEntry),它的Key值和hash值都由final修饰,不可变更,无须关心它们被修改后的可见性问题;
    而其Value及对下一个元素的引用由volatile修饰,可见性也有保障.

    8.4 操作

    put方法和remove方法都会通过addCount方法维护Map的size;
    size方法通过sumCount获取由addCount方法维护的Map的size.

    下面我们主要来分析下 ConcurrentHashMap 的一个核心方法 put,我们也会一并解决掉该方法中涉及到的扩容、辅助扩容,初始化哈希表等方法。

    8.4.1 put

    对于 HashMap 来说,多线程并发添加元素会导致数据丢失等并发问题,那么 ConcurrentHashMap 又是如何做到并发添加的呢?

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        //对传入的参数进行合法性判断
        if (key == null || value == null) throw new NullPointerException();
        
        //计算键所对应的 hash 值
        int hash = spread(key.hashCode());
        
        int binCount = 0;
        
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            
            //如果哈希表还未初始化,那么初始化它
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
                
            //根据键的 hash 值找到哈希数组相应的索引位置
            //如果为空,那么以CAS无锁式向该位置添加一个节点
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   
            }
    

    这里需要详细说明的只有initTable 方法:初始化哈希表,它同时只允许一个线程进行初始化操作。

    /**
      * Initializes table, using the size recorded in sizeCtl.
      */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        
        // 如果表为空才进行初始化操作
        while ((tab = table) == null || tab.length == 0) {
        
            // sizeCtl 小于零说明已经有线程正在进行初始化操作
            // 当前线程应该放弃 CPU 的使用
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
                
            // 否则说明尚未有线程对表进行初始化,那么本线程就来做这个工作
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                
                //保险起见,再次判断下表是否为空
                try {
                    if ((tab = table) == null || tab.length == 0) {
                    
                        //至此, sc 大于零说明容量已经初始化了,否则使用默认容量,其他线程再也无法初始化!!!
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        
                        //根据容量构建数组
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //计算阈值,等效于 n*0.75
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //设置阈值
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
    

    该方法的核心思想就是,只允许一个线程对表进行初始化;
    如果不巧有其他线程进来了,那么会让其他线程交出 CPU 等待下次系统调度;
    这样,保证了表同时只会被一个线程初始化.

    接着回到 putVal 方法,这样的话,我们第一部分的 putVal 源码就分析结束了
    下面我们看后一部分的源码

    //检测到桶结点是 ForwardingNode 类型,协助扩容
    else if ((fh = f.hash) == MOVED)
         tab = helpTransfer(tab, f);
    

    //桶结点是普通的结点,锁住该桶头结点并试图在该链表的尾部添加一个节点
    else {
           V oldVal = null;
           synchronized (f) {
               if (tabAt(tab, i) == f) {
               
                  //向普通的链表中添加元素,无需赘述
                  if (fh >= 0) {
                     binCount = 1;
                     for (Node<K,V> e = f;; ++binCount) {
                         K ek;
                         if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {
                             oldVal = e.val;
                             if (!onlyIfAbsent)
                                e.val = value;
                                break;
                          }
                          Node<K,V> pred = e;
                          if ((e = e.next) == null) {
                             pred.next = new Node<K,V>(hash, key,value, null);
                             break;
                          }
                     }
               }
               
               //向红黑树中添加元素,TreeBin 结点的hash值为TREEBIN(-2)
               else if (f instanceof TreeBin) {
                   Node<K,V> p;
                   binCount = 2;
                     if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                       oldVal = p.val;
                       if (!onlyIfAbsent)
                          p.val = value;
                    }
               }
           }
       }
       
      //binCount != 0 说明向链表或者红黑树中添加或修改一个节点成功
      //binCount  == 0 说明 put 操作将一个新节点添加成为某个桶的首节点
      if (binCount != 0) {
      
             //链表深度超过 8 转换为红黑树
             if (binCount >= TREEIFY_THRESHOLD)
                 treeifyBin(tab, i);
                 
             //oldVal != null 说明此次操作是修改操作
             //直接返回旧值即可,无需做下面的扩容边界检查
             if (oldVal != null)
                 return oldVal;
               break;
            }
        }
    }
    
    //CAS 式更新baseCount,并判断是否需要扩容
    addCount(1L, binCount);
    
    //程序走到这一步说明此次 put 操作是一个添加操作,否则早就 return 返回了
    return null;
    

    这一部分的源码大体上已如注释所描述,至此整个 putVal 方法的大体逻辑实现相信你也已经清晰了,好好回味一下.

    下面我们对这部分中的某些方法的实现细节再做一些深入学习.
    首先需要介绍一下,ForwardingNode 这个节点类型

    这个节点内部保存了一个 nextTable 引用,它指向一张 hash 表;
    在扩容操作中,我们需要对每个桶中的结点进行分离和转移;
    如果某个桶结点中所有节点都已经迁移完成了(已经被转移到新表 nextTable);
    那么会在原 table 表的该位置挂上一个 ForwardingNode 结点,说明此桶已经完成迁移.

    ForwardingNode继承自 Node 结点,并且它唯一的构造函数将构建一个k/v/next 都为 null 的结点,反正它就是个标识,无需那些属性;
    但是 hash 值却为 MOVED.

    所以,我们在 putVal 方法中遍历整个 hash 表的桶结点,如果遇到 hash 值等于 MOVED,说明已经有线程正在扩容 rehash 操作,整体上还未完成,,过我们要插入的桶的位置已经完成了所有节点的迁移

    由于检测到当前哈希表正在扩容,于是让当前线程去协助扩容.

    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab !=