当前位置 : 主页 > 编程语言 > java >

Java8 ConcurrentHashMap实现原理

来源:互联网 收集:自由互联 发布时间:2023-03-22
前言 Java8 ConcurrentHashMap实现原理,基本和Java8的HashMap相同,不同于他的是前者保证了线程的安全性,和Java7的ConcurrentHashMap区别在于,两者保证线程安全性的机理不同,Java7中采用了“分

前言

Java8 ConcurrentHashMap实现原理,基本和Java8的HashMap相同,不同于他的是前者保证了线程的安全性,和Java7的ConcurrentHashMap区别在于,两者保证线程安全性的机理不同,Java7中采用了“分段”锁的概念,每一个分段都有一把锁,锁内存储的着数据,锁的个数在初始化之后不能扩容,Java7 ConcurrentHashMap原理

 

但是Java8摒弃了这种设计理念,和Java8一样采用数组+链表+红黑树的概念来实现,那么要保证是线程安全(synchronized+CAS)的,所以比起Java8的HashMap,代码更加的复杂。

ConcurrentHashMap源码解析

继承结构

一、基本属性

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> implements ConcurrentMap<K,V>, Serializable { // 最大容量,默认是2的30次方 private static final int MAXIMUM_CAPACITY = 1 << 30; // 默认table初始容量大小, 默认容量是2的4次方 private static final int DEFAULT_CAPACITY = 16; // 默认支持并发更新的线程数量 private static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 负载因子 private static final float LOAD_FACTOR = 0.75f; // 链表转换为红黑树的节点数阈值,超过这个值,当确定数组容量>64进行树化, 链表转换为红黑树 static final int TREEIFY_THRESHOLD = 8; // 在扩容期间,由红黑树转换为链表的阈值,小于这个值,resize期间红黑树就会转为链表 static final int UNTREEIFY_THRESHOLD = 6; // 转为红黑树时,红黑树中节点的最小个数 static final int MIN_TREEIFY_CAPACITY = 64; // 扩容时,并发转移节点(transfer方法)时,每次转移的最小节点数 private static final int MIN_TRANSFER_STRIDE = 16; //sizectl位移位数 private static int RESIZE_STAMP_BITS = 16; //最大迁移时辅助线程数 private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; //记录ctrl位移位数 private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // 以下常量定义了特定节点类hash字段的值 static final int MOVED = -1; // ForwardingNode类对象的hash值 // 表示已经转换成树 static final int TREEBIN = -2; // TreeBin类对象的hash值 static final int RESERVED = -3; // ReservationNode类对象的hash值 static final int HASH_BITS = 0x7fffffff; // 表示2^31-1,int的最大正整数 /* ---------------- Fields -------------- */ // table数组 transient volatile Node<K,V>[] table; //执行迁移时,用于临时存储扩容之后的数据,扩容结束后,nextTable赋值给table private transient volatile Node<K,V>[] nextTable; // 记录基本计数器,用于统计节点个数,每次插入/删除数据时更新, // 当没冲突时候,则累加到该变量上,冲突剧烈则添加到counterCells数组里面。 private transient volatile long baseCount; /** * 用来控制table初始化和扩容的,默认值为0,当在初始化的时候指定了大小,这会将这个大小保存在sizeCtl中,大小为数组的0.75 * 当为负的时候,说明表正在初始化或扩张, * 0:默认状态,表示数组还没有被初始化。 * -1:初始化数组 * -(1+n):n:表示活动的扩容线程 * sizeCtl>0:记录下一次需要扩容的大小。为3/4数组最大长度 */ private transient volatile int sizeCtl; // 扩容时,下一个节点转移的bucket索引下标 private transient volatile int transferIndex; // 一种自旋锁,是专为防止多处理器并发而引入的一种锁,用于创建CounterCells时使用, // 主要用于size方法计数时,有并发线程插入而计算修改的节点数量, // 这个数量会与baseCount计数器汇总后得出size的结果。 private transient volatile int cellsBusy; // 主要用于size方法计数时,有并发线程插入而计算修改的节点数量, // 这个数量会与baseCount计数器汇总后得出size的结果。 private transient volatile CounterCell[] counterCells; }

二、数据存储结构

  • 结构1、采用是的是Node内部类,每一个类都持有next的引用,实际就是链表的结构
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; volatile V val; volatile Node<K,V> next; }
  • 结构2、采用是的TreeNode内部类,继承了Node,这就是红黑树了
static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { super(hash, key, val, next); this.parent = parent; } }
  • 结构3、内部类TreeBin 同样继承了Node,在转换成红黑树的时候使用
static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock }

三、初始化方法

//默认无参构造 public ConcurrentHashMap() { } //指定初始化大小 public ConcurrentHashMap(int initialCapacity) { //确保容量正数 if (initialCapacity < 0) throw new IllegalArgumentException(); //当初始容量>最大容量/2,则为最大容量 int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); //控制表初始化和扩容 this.sizeCtl = cap; } //有初始元素的初始化 public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); } //指定初始化大小和负载因子 public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, 1); } //指定初始化容量、负载因子、并行级别 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads //下次扩容的阈值 long size = (long)(1.0 + (long)initialCapacity / loadFactor); //阈值不能超过最大限度 int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }

计算阈值与HashMap中一致

//结果为2的幂次方,大于且最靠近指定的值 private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }

3.1 putAll() 复制所有的元素

public void putAll(Map<? extends K, ? extends V> m) { //扩容 tryPresize(m.size()); //循环复制元素 for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) putVal(e.getKey(), e.getValue(), false); }

3.2 tryPresize() 扩容

  • 扩容的容量是原来容量的两倍
// 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了 private final void tryPresize(int size) { // c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; // 这个 if 分支和之前说的初始化数组的代码基本上是一样的,在这里,我们可以不用管这块代码 if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); // 0.75 * n } } finally { sizeCtl = sc; } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { // 我没看懂 rs 的真正含义是什么,不过也关系不大 int rs = resizeStamp(n); if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; // 2. 用 CAS 将 sizeCtl 加 1,然后执行 transfer 方法 // 此时 nextTab 不为 null if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } // 1. 将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2) // 我是没看懂这个值真正的意义是什么?不过可以计算出来的是,结果是一个比较大的负数 // 调用 transfer 方法,此时 nextTab 参数为 null else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null);//具体的扩容后面说 } } }

四、put 方法

4.1 put方法简要流程

image.png

4.2 put方法的逻辑

  • 1、计算key的hash值。
  • 2、通过自旋,将key/value添加到map中
    • 1、倘若tab没初始化,则初始化。
    • 2、tab初始化了,当前数组下标中没元素,则新建一个链表元素,CAS添加到数组中。成功则结束循环体,失败则继续自旋。
    • 3、当前节点有元素,且节点处于迁移状态,则辅助去迁移。
    • 4、节点有元素,说明hash冲突了,用synchronized锁住头结点,添加数据到链表or红黑树中。
      • 节点个数达到8以上,则进行扩容或者树化节点。
      • 若只是修改数据,则直接返回。
    • 5、对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容
  • 3、累加元素个数

4.3 put方法源码

public V put(K key, V value) { /* * onlyIfAbsent * false:这个value一定会设置 * true:只有当这个key的value为空的时候才会设置 */ return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { //不允许key/value为null,否则及时失败 if (key == null || value == null) throw new NullPointerException(); //获取key的hashCode int hash = spread(key.hashCode()); //用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树 int binCount = 0; //通过自旋将value添加到map for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //1:初始化tab if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ( //2:当前位置没有元素的话,则通过cas的方式尝试添加,注意这个时候是没有加锁的 //(n-1)&hash:计算元素索引,这里绝对不会超过表下标。 (f = tabAt(tab, i = (n - 1) & hash)) == null) { //2.1:添加失败则继续自旋,成功则结束自旋。 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) /* * 3:如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段, * 则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失 */ tab = helpTransfer(tab, f); else { //4:走到这里说明hash冲突了,则将value添加到链表或者红黑树结构下 V oldVal = null; synchronized (f) { //4.1:再次取出要存储的位置的元素,跟前面取出来的比较,相同则处理,否则继续自旋添加。 if (tabAt(tab, i) == f) { //4.1.1:取出来的元素的hash值大于0,说明是链表。当转换为树之后,hash值为-2 if (fh >= 0) { //统计遍历个链表节点个数 binCount = 1; //4.1.2:遍历每一个节点 for (Node<K,V> e = f;; ++binCount) { K ek; //4.1.3:节点hash值一样且key一样 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; //4.1.4:判定是否存在,存在则进行修改。 if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; //4.1.4:下一个节点为null,说明遍历结束,则新建对象. if ((e = e.next) == null) { //添加链表尾端 pred.next = new Node<K,V>(hash, key, value, null); break; } } } //对红黑树的处理 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //5:链表节点其中元素个数达到8同时数组长度>64,则转成树。否则仅扩容。 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); //说明仅是修改,则直接返回值。 if (oldVal != null) return oldVal; break; } } } //6:累加元素个数,传入添加的数量1,以及链表节点个数,用于控制是否执行扩容操作。 addCount(1L, binCount); return null; }

4.3.1 initTable(初始化)

  • 1、自旋创建数组
    • 1、sizeCtl <0 则说明其他线程在扩容或者初始化,则礼让。
    • 2、CAS修改sizeCtl=1,表示初始化中。
      • 1、执行初始化数组。没指定大小,则默认16
      • 2、设置sizeCtl为数组3/4
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //1:sizeCtl初始值为0,当小于0的时候表示在别的线程在初始化表或扩展表 //则暂停当前正在执行的线程对象,并执行其他线程。 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if ( //2.1:如果当前内存偏移量SIZECTL的值为sc,则将sizeCtl原子修改为-1,表示处于初始化状态 //修改成功则初始化数组,否则继续自旋。搭配第一步则能确保同一时间只有一个创建数组。 U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //双重检查加锁,避免其他方法新建tab时覆盖,增加容错 if ((tab = table) == null || tab.length == 0) { //2.2:指定了大小的时候就创建指定大小的Node数组,否则创建指定大小(16)的Node数组 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; //2.3:初始化table完成,并赋值给tab用于返回 table = tab = nt; //2.4:sc=n*3/4 sc = n - (n >>> 2); } } finally { //2.5:初始化后,sizeCtl长度为数组长度的3/4 sizeCtl = sc; } break; } } return tab; }

4.3.2 helpTransfer(辅助扩容)

  • 1、tab不为空,且节点为Move类型,且节点下一个元素不为空。(为空则说明是空节点,直接返回)
  • 2、当处于迁移状态时(即sizeCtl<0则说明处于迁移态),自旋。
    • 1、扩容容量改变,或扩容结束,或辅助扩容线程达到最大限制,或扩容任务已经分配完,则退出。
    • 2、修改sizeCtl线程数+1,并辅助迁移数据。辅助完则退出。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { //辅助去扩容 Node<K,V>[] nextTab; int sc; //1:tab不为空,同时f为头节点(当节点处于move状态时,头节点为ForwardingNode) //同时头节点的下一个节点不为空则进入循环体。为空说明仅仅只是标记当前节点已经迁移,但节点内部没数据。所以无需辅助迁移。 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); //2:判断待迁移数据,和临时存储迁移数据数组是否有改变,没改变则添加到迁移任务中。 //sizeCtl<0则表示处于迁移状态中。 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { //2.1:如果 sc 的高 16 位也就是当前n扩容标识,不等于标识符,这说明扩容的容量变化了,不是当前原容量扩容,则退出。 //走到这里,sc的第32位为1,高16~31位表示数组的长度转成2进制前面有多少个0。低位第二位为1,这是第一个线程扩容时位移后执行了+2操作。 //当sc位移16位会覆盖低位的数,一般情况下值会和resizeStamp(tab.length)一样。这里可以理解为解码比较是否相等。 if ((sc >>> RESIZE_STAMP_SHIFT) != rs //2.2:如果 sc == 标识符 + 1 ,说明扩容结束了,不再有线程进行扩容。则退出 //(默认第一个线程设置 sc ==rs 左移 16 位 + 2, //当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1) || sc == rs + 1 || //2.3:如果 sc == 标识符 + 65535(帮助线程数已经达到最大),则退出。 sc == rs + MAX_RESIZERS || //2.4:说明已经扩容完成又或者有足够的线程扩容,则退出。 transferIndex <= 0) break; //3:线程数+1,帮助一起转换 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; }

4.3.3 transfer(转移节点)

  • 1、设置步长,即每个线程每次任务迁移多少个步长节点数据。
  • 2、新建扩容之后的数组,容量扩容一倍。设置transferIndex表示迁移到哪儿了。
  • 3、循环迁移节点
    • 1、新建循环体用于控制从(transferIndex-1)到(transferIndex-stride),执行数据的迁移且重新分配transferIndex的值,用于不停向前推进更新迁移数据。
    • 2、对迁移完数据的后置处理。包括重新检查一遍迁移数据,以及归还线程。
    • 3、把数组中null的元素设置为ForwardingNode节点(hash值为MOVED),让循环体处理下一个节点。后续辅助线程发现节点为Move则会直接跳过。
    • 4、锁住节点,进行迁移。

ConcurrentHashMap 是多线程协助进行扩容的,如下所示 image.png

图中数字表示table的下标,把每个下标处的位置比作一个桶,那么每个线程负责对一定数量的桶进行扩容操作,每个线程最少负责16个桶,每个线程负责的桶数目是通过计算得出的,如下代码所示

 

当cpu核数为1时,线程负责处理所有的桶,当大于1时,计算 (n >>> 3) / NCPU 的值, 当这个值小于16时,把每个线程负责的桶设置成16,否则使用算出的数值

int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range

重温一下每个桶扩容后是如何分布的,在执行put方法时,是通过(n - 1) & hash定位table下标的,hash是通过key的hashCode计算出来的hash,n 是 2 的倍数,假设n = 2的k次方,那么数组下标只与二进制的低k位有关

(n - 1) & hash 假设 n = 32 n 对应的二进制为 00000000000000000000000000100000 n-1对应的二进制位为 00000000000000000000000000011111 (n - 1) & hash 后二进制只有低 5 位有值 扩容后n成倍增长,那么新的n = 64, n-1 对应二进制为 00000000000000000000000000111111 这里为了方便浏览,把扩容后的n用 nd 表示, nd = 2*n 假设 hash1 为 00000000010000000000000100010001 那么扩容前 (n - 1) & hash 为 00000000000000000000000000010001 扩容前后(nd - 1) & hash 为 00000000000000000000000000010001 假设hash2 为 00000000010000000000000100110001 那么扩容前 (n - 1) & hash 为 00000000000000000000000000010001 扩容后(nd - 1) & hash 为 00000000000000000000000000110001 留意到,如果扩容前,n&hash == 0 , 那么扩容前后 (n - 1) & hash 都相等 如果扩容前, n&hash > 0 , 那么扩容后 (nd - 1) & hash = 扩容前 (n - 1) & hash + n

如下图例子所示,红黑树也类似 image.png

简单理解完扩容的逻辑后,我们进入扩容的代码实现中

//转移节点 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //1:数组长度为n,表示n个任务,每个线程处理多少个任务。 //如果为单核,则处理所有的任务。多核则处理n>>>3/核心数个任务,最少处理16个任务。用stride记录每个处理多少任务。 //当分配第一个线程处理任务时,transferIndex累加stride。 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //2:当第一个线程进此方法的时候,nextTab才会为null,辅助线程直接跳过。初始化扩容之后的数组,容量扩大一倍。 //并初始化转移的索引,为待迁移数组的最大长度。 if (nextTab == null) { // initiating try { //扩容一倍数组容量 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; /* * 3:创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节,表示处于move状态。 */ ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); //是否继续向前查找的标志位 boolean advance = true; //在完成之前重新在扫描一遍数组,看看有没完成的没 boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; //4:该循环体用于控制从(transferIndex-1)到(transferIndex-stride),执行数据的迁移 //且重新分配transferIndex的值,用于不停向前推进更新迁移数据。 while (advance) { int nextIndex, nextBound; //4.1:i每次自减,小于范围(表示从(transferIndex-1)到(transferIndex-stride),执行完数据的迁移) //或者 当前线程执行完成 则标记不需要再向前查找 if (--i >= bound || finishing) advance = false; //4.2:transferIndex<=0,则说明目前每个桶位置都有线程在进行处理,跳出循环 //每次执行一次任务,transferIndex减少一次步长stride //transferIndex值第一次为待迁移表长度 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //4.3:更新transferIndex的值,减少一个步长的值。 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { //这里分别设置i和bound,分别表示从(transferIndex-1)开始递减遍历到(transferIndex-stride),执行数据的迁移。 //假设当前数组长度为32,stride=16 //则nextIndex=32 //transferIndex=nextBound= nextIndex - stride=16 //bound=16 bound = nextBound; //i=31 i = nextIndex - 1; //设置不用向前推进 advance = false; } } //5.1:对迁移完数据的后置处理。包括重新检查一遍迁移数据,以及归还线程。 //i<0:说明从(transferIndex-1)到(transferIndex-stride)节点数据迁移结束。 //i>=n和i + n >= nextn:说明出现了扩容并发修改了transferIndex,造成修改,则结束修改 if (i < 0 || i >= n || i + n >= nextn) { int sc; //5.1.1:已经完成转移,更新数组的值 if (finishing) { nextTable = null; //这里完成nextTab=>table转换 table = nextTab; //为扩容后的0.75 sizeCtl = (n << 1) - (n >>> 1); return; } //5.1.2:正在工作的线程数-1,并返回 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //第一个线程转移节点的时候,会sc-2,后续的线程参与转移时会,sc+1。 //当sc-2时值不一样,说明不是最后一个线程,是辅助线程在执行,则直接返回。 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; //说明是最后一个线程,则重新check一遍,在返回。 //执行到这里,如果是单线程进行迁移,则从0~数组最大长度,重新检查一遍。 //如果是多线程进行迁移,也极大最后 finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) //5.2:把数组中null的元素设置为ForwardingNode节点(hash值为MOVED),让循环体处理下一个节点。后续辅助线程发现节点为Move则会直接跳过。 advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) //5.3:表示已有线程正在处理,让循环体处理下一个节点。 advance = true; // already processed else { //5.4:锁住节点,进行迁移. synchronized (f) { //双重检查加锁 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; //5.4.1:>=0说明是node节点 if (fh >= 0) { //为0则表示放在扩容后数组当前索引下,否则放在n+之前位置索引下 int runBit = fh & n; Node<K,V> lastRun = f; /* 循环结束之后,runBit就是最后不变的hash&n的值 也就是说由lastRun节点后的hash&n的值一样,这样就可以直接保存,而不需要处理后面的节点 */ for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } //说明之后的节点都是低位 if (runBit == 0) { ln = lastRun; hn = null; } else { //说明之后的节点都是高位 hn = lastRun; ln = null; } //前面的节点不确定高低位,所以遍历f~lastRun范围的所有节点 //分别逆序存入ln或hn链表中 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } //存入之前的位置 setTabAt(nextTab, i, ln); //存入改变后的位置 setTabAt(nextTab, i + n, hn); //设置fwd,这样其他线程执行的时候,会跳过去. setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { //5.4.2:红黑树处理 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } /* * 在复制完树节点之后,判断该节点处构成的树还有几个节点, * 如果≤6个的话,就转为一个链表 */ ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; //低位链表存储i处 setTabAt(nextTab, i, ln); //高位存储i+n处 setTabAt(nextTab, i + n, hn); //原来tab中存储fwd,标识该桶扩容完成 setTabAt(tab, i, fwd); advance = true; } } } } } }

nextTable

  • 扩容期间,将table数组中的元素 迁移到 nextTable。

sizeCtl属性

private transient volatile int sizeCtl;
  • 多线程之间,以volatile的方式读取sizeCtl属性,来判断ConcurrentHashMap当前所处的状态。通过cas设置sizeCtl属性,告知其他线程ConcurrentHashMap的状态变更。

不同状态,sizeCtl所代表的含义也有所不同

  • 未初始化:sizeCtl=0:表示没有指定初始容量。
  • sizeCtl>0:表示初始容量。
  • 初始化中:sizeCtl=-1,标记作用,告知其他线程,正在初始化。
  • 正常状态:sizeCtl=0.75n,扩容阈值。
  • 扩容中:sizeCtl < 0 : 表示有其他线程正在执行扩容。
  • sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT)+2 表示此时只有一个线程在执行扩容。

transferIndex 扩容索引

  • 扩容索引,表示已经分配给扩容线程的table数组索引位置。主要用来协调多个线程,并发安全地获取迁移任务(hash桶)。
private transient volatile int transferIndex; private static final int MIN_TRANSFER_STRIDE = 16; //扩容线程每次最少要迁移16个hash桶
  • 1、在扩容之前,transferIndex 在数组的最右边 。此时有一个线程发现已经到达扩容阈值,准备开始扩容。
  • 2、扩容线程,在迁移数据之前,首先要将transferIndex右移(以CAS的方式修改 transferIndex=transferIndex-stride(要迁移hash桶的个数)),获取迁移任务。每个扩容线程都会通过for循环+CAS的方式设置transferIndex,因此可以确保多线程扩容的并发安全。

换个角度,我们可以将待迁移的table数组,看成一个任务队列,transferIndex看成任务队列的头指针。而扩容线程,就是这个队列的消费者。扩容线程通过CAS设置transferIndex索引的过程,就是消费者从任务队列中获取任务的过程。为了性能考虑,我们当然不会每次只获取一个任务(hash桶),因此ConcurrentHashMap规定,每次至少要获取16个迁移任务(迁移16个hash桶,MIN_TRANSFER_STRIDE = 16)

也就是说transferIndex其实代表当前剩下需要进行扩容的桶的个数。如果transferIndex == 0 那就代表当前没有桶需要进行扩容了。每次一个线程参加扩容transferIndex就需要减MIN_TRANSFER_STRIDE。

ForwardingNode节点

  • 1、标记作用,表示其他线程正在扩容,并且此节点已经扩容完毕
  • 2、关联了nextTable,扩容期间可以通过find方法,访问已经迁移到了nextTable中的数据

扩容过程分析

  • 1、线程执行put操作,发现容量已经达到扩容阈值,需要进行扩容操作,例如此时transferindex=tab.length=32
  • 2、扩容线程A 以CAS的方式修改transferindex=31-16=15,然后按照降序迁移table[31]至table[16]这个区间的hash桶(逆序遍历扩容)
  • 3、迁移hash桶时,会将桶内的链表或者红黑树,按照一定算法,拆分成2份,将其插入nextTable[i]和nextTable[i+n](n是table数组的长度)。 迁移完毕的hash桶,会被设置成ForwardingNode节点,以此告知访问此桶的其他线程,此节点已经迁移完毕。
  • 4、此时线程2访问到了ForwardingNode节点,如果线程2执行的put或remove等写操作,那么就会先帮其扩容。如果线程2执行的是get等读方法,则会调用ForwardingNode的find方法,去nextTable里面查找相关元素。

注意

  • 多线程无锁扩容的关键就是通过CAS设置sizeCtl与transferIndex变量,协调多个线程对table数组中的node进行迁移。

4.3.4 treeifyBin扩容以及红黑树化节点数据

/** * Replaces all linked nodes in bin at given index unless table is * too small, in which case resizes instead. * 数组长度<64,则扩容一倍 * 否则转成树 */ private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { //1:数组长度<64则扩容一倍 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { //2:转成红黑树 synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); //把Node组成的链表,转化为TreeNode的链表,头结点依然放在相同的位置 if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } //把TreeNode的链表放入容器TreeBin中,内部将单节点树转换成红黑树 setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }

4.3.5 tryPresize(尝试扩容)

  • 1、寻找贴近扩容的大小容量
  • 2、自旋扩容,<0时,说明在扩容,则不扩容。
    • 1、如果数组还没有初始化,则执行初始化
    • 2、扩容后的大小<=sizeCtl,说明当前数组已经满足需要扩容的容量。或者当前数组长度>容量上限,没法分配容量了,则退出
    • 3、执行扩容,如果处于正在扩容则添加当前线程一起扩容。
//尝试扩容 private final void tryPresize(int size) { //1:寻找贴近扩容的大小容量 // 扩容大小>=最大的一半,直接设置成最大容量 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : //当size < 0.5*size时,size最高位<<1位,其余补0 //size >= 0.5*size时,size最高位<<2位,其余补0 tableSizeFor(size + (size >>> 1) + 1); int sc; //2:自旋扩容,<0时,说明在扩容,则不扩容。 while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; //2.1:如果数组还没有初始化,putAll的时候,会执行这儿 if (tab == null || (n = tab.length) == 0) { //2.2:sizeCtl可能会发生修改,所以这里在判断一次。 n = (sc > c) ? sc : c; //2.3:扩容,并更新sc //SIZECTL设置-1,表示正在初始化 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //双重检查 if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; //sc=3/4*n sc = n - (n >>> 2); } } finally { //2.3:更新扩容后的大小 sizeCtl = sc; } } } //3:扩容后的大小<=sizeCtl,说明当前数组已经满足需要扩容的容量。或者当前数组长度>容量上限,没法分配容量了,则退出 else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { //4: rs高16为都是0,第16位为1,低15位记录n为二进制时前面有多少0 int rs = resizeStamp(n); //4.1:表示正在扩容,这里方法好像永远不会进来。 if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //transfer线程数+1,当前线程将加入对transfer的处理 //transfer的时候,sc表示在transfer工作的线程数 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //4.2:没有在初始化或扩容,则开始扩容,sizectl目前第32位为1表示负数,高16~31位记录n为二进制时前面有多少0。 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) //4.2.1:移动节点 transfer(tab, null); } } }

4.3.6 addCount(累加元素个数)

  • 1、累加节点个数
    • 1、counterCells数组已创建了,或者CAS累加基本计数器失败,则更新元素个数。
      • 1、没开启容量检查则直接返回
      • 2、重新计算CounterCell总个数
  • 2、检查容量,执行扩容
    • 1、自旋扩容,需要满足数组节点元素个数达到扩容的阀值,且table不为空,且数组长度小于最大容量条件
      • 1、sizeCtl<0,则辅助扩容
      • 2、初始化扩容
      • 3、更新数组节点元素总个数
private final void addCount(long x, int check) { CounterCell[] as; long b, s; /* 1:累加节点个数 as不为空,说明counterCells数组已创建了,进入条件体继续执行 若为空,则说明数组还没创建,预测竞争线程少,直接cas操作baseCount,更新元素个数 如果成功,则执行下一步,若失败,则进入条件体继续执行 */ if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; //标记未发生竞争 boolean uncontended = true; //1.1:数组为空 if (as == null || (m = as.length - 1) < 0 || //1.2:给当前线程随机生成一个数,获取counterCells对应值,若为null,则进入条件体 (a = as[ThreadLocalRandom.getProbe() & m]) == null || //1.3:条件语句执行到这里,说明counterCells不为空,且有值.则尝试修改个数 //修改失败,则标记有很多线程竞争 !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //1.4:继续累加次数 fullAddCount(x, uncontended); return; } //1.5没开启容量检查则直接返回 if (check <= 1) return; //1.6:计算CounterCell总个数 s = sumCount(); } //走到这里,s赋值为表中所有节点个数总和。 //2:检查容量,执行扩容 if (check >= 0) { Node<K,V>[] tab, nt; int n, sc; //2.1:数组元素总个数达到扩容的阀值,且table不为空,且数组长度小于最大容量 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { //根据length获取一个标识符,高16位置0,第16位为1,低15位存当前n扩容标识 int rs = resizeStamp(n); //2.2:sc<0说明正在扩容,则去辅助扩容 if (sc < 0) { // 如果 sc 的高 16 位也就是当前n扩容标识,不等于标识符,这说明扩容的容量变化了,不是当前原容量扩容 // 如果 sc == 标识符 + 1 // (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2, // 当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1) // 如果 sc == 标识符 + 65535(帮助线程数已经达到最大) // 如果 nextTable == null(结束扩容了) // 如果 transferIndex <= 0 (已经有足够线程分配迁移数据啦,不需要参与迁移啦。) // 结束循环 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //sc+1,帮助扩容 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //2.3:初始化扩容 else if (U.compareAndSwapInt(this, SIZECTL, sc, //高第16位为1,显示负数 //高15位容量n扩容标志 //低16位,并行扩容线程数+1 (rs << RESIZE_STAMP_SHIFT) + 2)) //扩容,第二个参数代表新表,传入null,表示第一次初始化新表 transfer(tab, null); //2.4:再次更新节点次数,用于下一次遍历扩容。 s = sumCount(); } } }

4.3.7 fullAddCount

  • 1、如果当前线程随机数为0,则强制初始一个值,用于访问CounterCell数组中固定的元素。
  • 2、自旋累加节点次数
    • 1、counterCells不为null
      • 1、当前线程所在格子为空,抢占锁并添加元素值,成功则退出。
      • 2、当前线程格子直接CAS累加次数,成功则退出。
      • 3、counterCells数组没变化,且有碰撞,则需要扩容
        • CAS加锁成功后,扩容。扩容成功,则跳过当前循环,继续执行下一次循环。
      • 4、线程出现争抢,则更改当前线程的探针哈希值,进行下次循环
    • 2、counterCells为空,则尝试加锁
      • 加锁成功后,再次判断counterCells是否有变化,没变化则初始化counterCells,并添加节点次数,跳出循环。
    • 3、数组为空,抢锁失败,则尝试直接累加baseCount值
private final void fullAddCount(long x, boolean wasUncontended) { //h作用:将线程和数组中的不用元素对应起来,尽量避免线程争用同一数组元素。 //可理解为创建一个hash值与当前线程绑定,这样当前线程每次访问的只会是CounterCell数组中固定的一个位置的数。 //当出现争抢时,ThreadLocalRandom.advanceProbe(h);重新更新线程的探针哈希值,让线程去使用另一个数组元素 int h; //1:如果当前线程随机数为0,则强制初始一个值 if ((h = ThreadLocalRandom.getProbe()) == 0) { //初始化当前线程的探针哈希值 ThreadLocalRandom.localInit(); // force initialization h = ThreadLocalRandom.getProbe(); //设置无竞争 wasUncontended = true; } //标记是否发生碰撞 boolean collide = false; // True if last slot nonempty //2:自旋累加节点次数。 for (;;) { CounterCell[] as; CounterCell a; int n; long v; //2.1:counterCells不为null if ((as = counterCells) != null && (n = as.length) > 0) { //2.1.1:当前线程所在格子为空 if ((a = as[(n - 1) & h]) == null) { //锁未占用 if (cellsBusy == 0) { // Try to attach new Cell //新建CounterCell CounterCell r = new CounterCell(x); // Optimistic create //再次检查锁未占用,尝试加锁 if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { //表示创建CounterCell是否成功状态 boolean created = false; try { // Recheck under lock CounterCell[] rs; int m, j; //再次检查counterCells不为空,且格子为占用 if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { //设置格子 rs[j] = r; //设置创建状态 created = true; } } finally { //释放锁 cellsBusy = 0; } //正确创建则退出循环 if (created) break; continue; // Slot is now non-empty } } //表明cellsBusy=1锁上发生竞争,则重新生成随机数,进行下次循环 collide = false; } //2.1.2:CAS 失败,重新继续 else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //执行到这里说明wasUncontended=true,认为无竞争 //且所在槽有值 //2.1.3:尝试直接累加 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break; //2.1.4:数组发生变化,说明发生了扩容或者数组长度>=cpu核心数, //则认为无碰撞 //当扩容超过限制后,则会不停的执行3和4,直到2成功 else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale //2.1.5:数组没变化且数组长度<CPU核心数,且collide认为无碰撞,则设置有碰撞 else if (!collide) collide = true; //2.1.6:执行到这里,说明数组没变化,且有碰撞,则需要扩容 else if ( //无锁 cellsBusy == 0 && //尝试加锁 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { //2.1.6.1:counterCells没发生变化,扩容counterCells一倍容量 if (counterCells == as) {// Expand table unless stale CounterCell[] rs = new CounterCell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { //释放锁 cellsBusy = 0; } collide = false; continue; // Retry with expanded table } //2.1.6.2:说明线程出现争抢,则更改当前线程的探针哈希值,进行下次循环 h = ThreadLocalRandom.advanceProbe(h); } //2.2:counterCells为空,则尝试加锁 else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table //再次判断counterCells是否有变化 if (counterCells == as) { //新建长度2的数组 CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; //标记初始化完成 init = true; } } finally { //释放锁 cellsBusy = 0; } //跳出循环 if (init) break; } //2.3:数组为空,抢锁失败,则尝试直接累加baseCount值 else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }

4.3.8 sumCount(计算CounterCell总个数)

final long sumCount() { //遍历累加CounterCell数组值到baseCount CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }

五、get()方法

  • 1、计算 hash 值
  • 2、根据 hash 值找到数组对应位置: (n - 1) & h
  • 3、根据该位置处结点性质进行相应查找
    • 1、如果该位置为 null,那么直接返回 null 就可以了
    • 2、如果该位置处的节点key就是要获取的key,返回该节点的值即可
    • 3、如果该位置节点的 hash 值小于 0,说明正在扩容,或者是红黑树,调用find 方法
      • 1、如果该节点是TreeNode或TreeBin说明是红黑树,则调用find方法
      • 2、如果该节点是ForwardingNode说明正在扩容并且该节点已经扩容完成,并且ForwardingNode关联了nextTable,扩容期间可以通过find方法,访问已经迁移到了nextTable中的数据。
    • 4、如果以上 3 条都不满足,那就是链表,进行遍历比对即可。

image.png

public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; //获取key的hash值,方便计算在数组中索引 int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //1:如果节点第一个元素匹配key-value,则直接返回 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) //2:<0时,情况1:e目前处于迁移。情况2:e目前已经转成红黑树了。则直接调用节点方法查询。 return (p = e.find(h, key)) != null ? p.val : null; //3:说明是链表,则遍历查找 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }

5.1 find(查询节点)

static final class ForwardingNode<K,V> extends Node<K,V> { //持有nextTable引用 final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } Node<K,V> find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node<K,V>[] tab = nextTable;;) { Node<K,V> e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { //对ForwardingNode类型的node查询,这里没啥用。 if (e instanceof ForwardingNode) { tab = ((ForwardingNode<K,V>)e).nextTable; continue outer; } else //对红黑树的查询,调用红黑树的find方法查询 return e.find(h, k); } if ((e = e.next) == null) return null; } } } }

六、其他方法

6.1 replace

public V remove(Object key) { //参数2为空,表示删除 return replaceNode(key, null, null); } public V replace(K key, V value) { if (key == null || value == null) throw new NullPointerException(); return replaceNode(key, value, null); } /** * {@inheritDoc} * * @throws NullPointerException if any of the arguments are null * key映射有值,值为oldValue,则更新 */ public boolean replace(K key, V oldValue, V newValue) { if (key == null || oldValue == null || newValue == null) throw new NullPointerException(); return replaceNode(key, newValue, oldValue) != null; }

6.2 replaceNode

final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //空判断 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; //正在扩容,则加入扩容队伍 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; synchronized (f) { //双重检查加锁,DCL if (tabAt(tab, i) == f) { //对链表的处理 if (fh >= 0) { validated = true; for (Node<K,V> e = f, pred = null;;) { K ek; //匹配key if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; //匹配value if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; //修改 if (value != null) e.val = value; //value=null,说明执行删除操作 else if (pred != null) pred.next = e.next; //当匹配的值是first节点的处理 else setTabAt(tab, i, e.next); } break; } //临时保存上一个节点,便于执行删除节点操作 pred = e; //执行最后一个节点,则退出 if ((e = e.next) == null) break; } } //红黑树的处理 else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } if (validated) { if (oldVal != null) { if (value == null) //个数-1,参数二:标记不需要检测是否需要扩容 addCount(-1L, -1); return oldVal; } break; } } } return null; }

6.3 size(获取大小)

public int size() { //size和isEmpty一样,返回一个近似值, long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); }

6.4 mappingCount(获取大小,官方推荐)

public long mappingCount() { long n = sumCount(); //忽略瞬间的负值 return (n < 0L) ? 0L : n; // ignore transient negative values }

6.5 clear(清除节点)

public void clear() { //记录删除个数,负数表示。 long delta = 0L; // negative number of deletions int i = 0; Node<K,V>[] tab = table; //遍历节点删除 while (tab != null && i < tab.length) { int fh; Node<K,V> f = tabAt(tab, i); //1:为null直接跳过 if (f == null) ++i; else if ((fh = f.hash) == MOVED) { //2:若处于迁移态,则帮助去迁移。 tab = helpTransfer(tab, f); //3:迁移完成后,重置数组索引,重头开始遍历。 i = 0; // restart } else { synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> p = (fh >= 0 ? f : (f instanceof TreeBin) ? ((TreeBin<K,V>)f).first : null); //逐个删除元素 while (p != null) { --delta; p = p.next; } //最后一个元素置为null,自此该节点删除。 setTabAt(tab, i++, null); } } } } //倘若删除了节点,则修改元素个数,并不扩容。 if (delta != 0L) addCount(delta, -1); }

七、相关问题

7.1 扩容时机

  • 单节点容量>=8且整体容量<64,则扩容一倍。
  • 当数组中元素达到了sizeCtl的数量的时候,则会调用transfer方法来进行扩容。

7.2 节点树化和退化时机

  • 单个节点元素>=8个,且数组元素>=64,树化。
  • 元素节点<=6,则退化成链表。

7.3 jdk1.8的map实现

  • 和hashmap一样,jdk 1.8中ConcurrentHashmap采用的底层数据结构为数组+链表+红黑树的形式。数组可以扩容,链表可以转化为红黑树。

7.4 JDK1.8为什么放弃分段锁?

  • 段Segment继承了重入锁ReentrantLock,有了锁的功能,每个锁控制的是一段,当每个Segment越来越大时,锁的粒度就变得有些大了。
  • 分段锁的优势在于保证在操作不同段 map 的时候可以并发执行,操作同段 map 的时候,进行锁的竞争和等待。这相对于直接对整个map同步synchronized是有优势的。
  • 缺点在于分成很多段时会比较浪费内存空间(不连续,碎片化);操作map时竞争同一个分段锁的概率非常小时,分段锁反而会造成更新等操作的长时间等待;当某个段很大时,分段锁的性能会下降。

7.5 为什么不用ReentrantLock而用synchronized ?

  • 减少内存开销:如果使用ReentrantLock则需要节点继承AQS来获得同步支持,增加内存开销,而1.8中只有头节点需要进行同步。
  • 内部优化:synchronized则是JVM直接支持的,JVM能够在运行时作出相应的优化措施:锁粗化、锁消除、锁自旋等等。

7.6 ConcurrentHashMap多个线程又是如何同步处理的呢?

  • 1.同步处理主要是通过Synchronized和unsafe两种方式来完成的。
  • 2.在取得sizeCtl、某个位置的Node的时候,使用的都是unsafe的方法,来达到并发安全的目的,当需要在某个位置设置节点的时候,则会通过Synchronized的同步机制来锁定该位置的节点。
  • 3.在数组扩容的时候,则通过处理的步长和fwd节点来达到并发安全的目的,通过设置hash值为MOVED。
  • 4.当把某个位置的节点复制到扩张后的table的时候,也通过Synchronized的同步机制来保证现程安全。

7.7 为什么 key 和 value 不允许为 null?

在 HashMap 中,key 和 value 都是可以为 null 的,但是在 ConcurrentHashMap 中却不允许,这是为什么呢?

 

作者 Doug Lea 本身对这个问题有过回答,在并发编程中,null 值容易引来歧义, 假如先调用 get(key) 返回的结果是 null,那么我们无法确认是因为当时这个 key 对应的 value 本身放的就是 null,还是说这个 key 值根本不存在,这会引起歧义,如果在非并发编程中,可以进一步通过调用 containsKey 方法来进行判断,但是并发编程中无法保证两个方法之间没有其他线程来修改 key 值,所以就直接禁止了 null 值的存在。

 

而且作者 Doug Lea 本身也认为,假如允许在集合,如 map 和 set 等存在 null 值的话,即使在非并发集合中也有一种公开允许程序中存在错误的意思,这也是 Doug Lea 和 Josh Bloch(HashMap作者之一) 在设计问题上少数不同意见之一,而 ConcurrentHashMap 是 Doug Lea 一个人开发的,所以就直接禁止了 null 值的存在。

 

参考: https://blog.csdn.net/hu10131013/article/details/106841142/

https://blog.csdn.net/cx897459376/article/details/106427587/

https://blog.csdn.net/weixin_51676493/article/details/126621553

https://blog.csdn.net/wuyangyang555/article/details/82782859

https://blog.csdn.net/weixin_44844089/article/details/116028213

https://blog.csdn.net/qq_19636353/article/details/103137354

https://blog.csdn.net/nuan_feng/article/details/128986512

https://zhuanlan.zhihu.com/p/139746843

https://www.cnblogs.com/zaizhoumo/p/7709755.html

https://blog.csdn.net/varyall/article/details/81283231

上一篇:BOS中常用方法和类
下一篇:没有了
网友评论