当前位置 博文首页 > JavaEdge全是干货的技术号:Java集合源码解析-ConcurrentHashMap
Java 7为实现并发访问,引入了Segment这一结构,实现了分段锁,理论上最大并发度与Segment个数相等。
Java 8取消了基于 Segment 的分段锁思想,改用CAS + synchronized 控制并发操作,在某些方面提升了性能。并且追随 1.8 版本的 HashMap 底层实现,使用数组+链表+红黑树进行数据存储。
和 HashMap 中的语义一样,代表整个哈希表。在第一次插入时才懒加载初始化。大小永远是 2 的次幂。被迭代器直接访问。
一个连接表,用于哈希表扩容,扩容完成后会被重置为 null
保存着整个哈希表中存储的所有的结点的个数总和,类似于 HashMap 的 size 属性。主要用于当没有线程竞争时使用,也会作为哈希表初始化过程中的反馈。通过CAS 更新。
这是一个重要的属性,无论是初始化哈希表,还是扩容 rehash,都需要该依赖。有如下取值:
同样是通过Key的哈希值与数组长度取模确定该Key在数组中的索引;
同样为了避免不太好的Key的hashCode设计,它通过如下方法计算得到Key的最终哈希值.
// usable bits of normal node hash
static final int HASH_BITS = 0x7fffffff;
不同的是,Java 8的ConcurrentHashMap作者认为引入红黑树后,即使哈希冲突比较严重,寻址效率也足够高,所以作者并未在哈希值的计算上做过多设计,只是将Key的hashCode值与其高16位作异或并保证最高位为0(从而保证最终结果为正整数)
对于put操作,如果Key对应的数组元素为null,则通过CAS操作将其设置为当前值;
如果Key对应的数组元素(也即链表表头或者树的根元素)不为null,则对该元素使用synchronized关键字申请锁,然后进行操作;
如果该put操作使得当前链表长度超过一定阈值,则将该链表转换为树,从而提高寻址效率.
对于读操作,由于数组被volatile
修饰,因此不用担心数组的可见性问题;
同时每个元素是一个Node实例(Java 7中每个元素是一个HashEntry),它的Key值和hash值都由final修饰,不可变更,无须关心它们被修改后的可见性问题;
而其Value及对下一个元素的引用由volatile修饰,可见性也有保障.
put方法和remove方法都会通过addCount方法维护Map的size;
size方法通过sumCount获取由addCount方法维护的Map的size.
下面我们主要来分析下 ConcurrentHashMap 的一个核心方法 put,我们也会一并解决掉该方法中涉及到的扩容、辅助扩容,初始化哈希表等方法。
对于 HashMap 来说,多线程并发添加元素会导致数据丢失等并发问题,那么 ConcurrentHashMap 又是如何做到并发添加
的呢?
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
//对传入的参数进行合法性判断
if (key == null || value == null) throw new NullPointerException();
//计算键所对应的 hash 值
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果哈希表还未初始化,那么初始化它
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//根据键的 hash 值找到哈希数组相应的索引位置
//如果为空,那么以CAS无锁式向该位置添加一个节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break;
}
这里需要详细说明的只有initTable
方法:初始化哈希表,它同时只允许一个线程进行初始化操作。
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 如果表为空才进行初始化操作
while ((tab = table) == null || tab.length == 0) {
// sizeCtl 小于零说明已经有线程正在进行初始化操作
// 当前线程应该放弃 CPU 的使用
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 否则说明尚未有线程对表进行初始化,那么本线程就来做这个工作
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//保险起见,再次判断下表是否为空
try {
if ((tab = table) == null || tab.length == 0) {
//至此, sc 大于零说明容量已经初始化了,否则使用默认容量,其他线程再也无法初始化!!!
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//根据容量构建数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//计算阈值,等效于 n*0.75
sc = n - (n >>> 2);
}
} finally {
//设置阈值
sizeCtl = sc;
}
break;
}
}
return tab;
}
该方法的核心思想就是,只允许一个线程对表进行初始化;
如果不巧有其他线程进来了,那么会让其他线程交出 CPU 等待下次系统调度;
这样,保证了表同时只会被一个线程初始化.
接着回到 putVal 方法,这样的话,我们第一部分的 putVal 源码就分析结束了
下面我们看后一部分的源码
//检测到桶结点是 ForwardingNode 类型,协助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
//桶结点是普通的结点,锁住该桶头结点并试图在该链表的尾部添加一个节点
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
//向普通的链表中添加元素,无需赘述
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,value, null);
break;
}
}
}
//向红黑树中添加元素,TreeBin 结点的hash值为TREEBIN(-2)
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//binCount != 0 说明向链表或者红黑树中添加或修改一个节点成功
//binCount == 0 说明 put 操作将一个新节点添加成为某个桶的首节点
if (binCount != 0) {
//链表深度超过 8 转换为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
//oldVal != null 说明此次操作是修改操作
//直接返回旧值即可,无需做下面的扩容边界检查
if (oldVal != null)
return oldVal;
break;
}
}
}
//CAS 式更新baseCount,并判断是否需要扩容
addCount(1L, binCount);
//程序走到这一步说明此次 put 操作是一个添加操作,否则早就 return 返回了
return null;
这一部分的源码大体上已如注释所描述,至此整个 putVal 方法的大体逻辑实现相信你也已经清晰了,好好回味一下.
下面我们对这部分中的某些方法的实现细节再做一些深入学习.
首先需要介绍一下,ForwardingNode
这个节点类型
这个节点内部保存了一个 nextTable
引用,它指向一张 hash 表;
在扩容操作中,我们需要对每个桶中的结点进行分离和转移;
如果某个桶结点中所有节点都已经迁移完成了(已经被转移到新表 nextTable);
那么会在原 table 表的该位置挂上一个 ForwardingNode 结点,说明此桶已经完成迁移.
ForwardingNode
继承自 Node 结点,并且它唯一的构造函数将构建一个k/v/next 都为 null 的结点,反正它就是个标识,无需那些属性;
但是 hash 值却为 MOVED.
所以,我们在 putVal 方法中遍历整个 hash 表的桶结点,如果遇到 hash 值等于 MOVED,说明已经有线程正在扩容 rehash 操作,整体上还未完成,,过我们要插入的桶的位置已经完成了所有节点的迁移
由于检测到当前哈希表正在扩容,于是让当前线程去协助扩容.
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab !=