参考文献

ConcurrentHashMap

JDK1.7

  • 在 JDK1.7 中,ConcurrentHashMap 使用了分段锁的机制,将哈希表分为多个 Segment,每个 Segment 上都有一把锁,不同的 Segment 可以被不同的线程同时访问,这样可以提高并发度,减少锁的争用。
  • 每个 Segment 内部使用一个 HashEntry 数组来存储元素,每个 HashEntry 都是一个链表的头节点,通过链表的方式解决哈希冲突。这种数据结构的设计使得并发情况下,只有访问同一个 Segment 上的元素才需要加锁,其他 Segment 上的元素可以并发访问,提高了并发度。

JDK1.8之后

  • 在 JDK1.8 中,ConcurrentHashMap 放弃了 Segment 分段锁的机制,采用了与HashMap相同的Node 数组+链表+红黑树的结构。这样可以减少了锁的竞争,提高了并发度,同时也减少了内存占用。

  • 在 JDK1.8 中,ConcurrentHashMap 使用了CAS +synchronized 实现更加细粒度的锁。在插入元素时,如果哈希桶上的元素只有一个,那么直接用 CAS 尝试修改节点。如果哈希桶上的元素有多个,那么先用 synchronized 加锁,然后再尝试修改节点。这样可以减少锁的竞争,提高了并发度。

    同时,在 JDK1.8 中,ConcurrentHashMap 也对哈希表扩容进行了改进。在哈希表扩容时,ConcurrentHashMap 采用了一种全新的算法,称为“流式扩容(Tree bins splitting)”算法,它可以更好地解决并发问题,同时也可以提高吞吐量和性能。具体来说,它会将链表转化为红黑树,然后再将红黑树拆分成两个链表,这样可以减少数据迁移的时间,缩短扩容的时间。

常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
 /* ---------------- Constants -------------- */

/**
* The largest possible table capacity. This value must be
* exactly 1<<30 to stay within Java array allocation and indexing
* bounds for power of two table sizes, and is further required
* because the top two bits of 32bit hash fields are used for
* control purposes.
*/
// 表示ConcurrentHashMap允许的最大容量,如果指定的容量超过了该值,就会抛出异常。
private static final int MAXIMUM_CAPACITY = 1 << 30;

/**
* The default initial table capacity. Must be a power of 2
* (i.e., at least 1) and at most MAXIMUM_CAPACITY.
*/
// 表示ConcurrentHashMap默认的初始容量,如果在创建ConcurrentHashMap时没有指定初始容量,就会使用这个默认值。
private static final int DEFAULT_CAPACITY = 16;

/**
* The largest possible (non-power of two) array size.
* Needed by toArray and related methods.
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
* The default concurrency level for this table. Unused but
* defined for compatibility with previous versions of this class.
*/
// 默认情况下,ConcurrentHashMap 的并发级别是 16,也就是说,它可以同时支持 16 个线程的并发更新操作。
// 如果需要更高的并发度,可以在创建 ConcurrentHashMap 对象时指定并发级别。
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
* The load factor for this table. Overrides of this value in
* constructors affect only the initial table capacity. The
* actual floating point value isn't normally used -- it is
* simpler to use expressions such as {@code n - (n >>> 2)} for
* the associated resizing threshold.
*/
// 表示ConcurrentHashMap默认的负载因子,如果在创建ConcurrentHashMap时没有指定负载因子,就会使用这个默认值。
private static final float LOAD_FACTOR = 0.75f;

/**
* The bin count threshold for using a tree rather than list for a
* bin. Bins are converted to trees when adding an element to a
* bin with at least this many nodes. The value must be greater
* than 2, and should be at least 8 to mesh with assumptions in
* tree removal about conversion back to plain bins upon
* shrinkage.
*/
static final int TREEIFY_THRESHOLD = 8;

/**
* The bin count threshold for untreeifying a (split) bin during a
* resize operation. Should be less than TREEIFY_THRESHOLD, and at
* most 6 to mesh with shrinkage detection under removal.
*/
static final int UNTREEIFY_THRESHOLD = 6;

/**
* The smallest table capacity for which bins may be treeified.
* (Otherwise the table is resized if too many nodes in a bin.)
* The value should be at least 4 * TREEIFY_THRESHOLD to avoid
* conflicts between resizing and treeification thresholds.
*/
static final int MIN_TREEIFY_CAPACITY = 64;

/**
* Minimum number of rebinnings per transfer step. Ranges are
* subdivided to allow multiple resizer threads. This value
* serves as a lower bound to avoid resizers encountering
* excessive memory contention. The value should be at least
* DEFAULT_CAPACITY.
*/
private static final int MIN_TRANSFER_STRIDE = 16;

/**
* The number of bits used for generation stamp in sizeCtl.
* Must be at least 6 for 32bit arrays.
*/
private static int RESIZE_STAMP_BITS = 16;

/**
* The maximum number of threads that can help resize.
* Must fit in 32 - RESIZE_STAMP_BITS bits.
*/
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

/**
* The bit shift for recording size stamp in sizeCtl.
*/
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;

/*
* Encodings for Node hash fields. See above for explanation.
*/
// 表示当前节点已经被移动到了新的数组位置,即正在做扩容操作
static final int MOVED = -1; // hash for forwarding nodes
// 表示当前节点所在的桶已经被转换为了红黑树
static final int TREEBIN = -2; // hash for roots of trees
// 表示当前节点所在的桶已经被其他线程暂时占用
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

/** Number of CPUS, to place bounds on some sizings */
static final int NCPU = Runtime.getRuntime().availableProcessors();

核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
/* ---------------- Fields -------------- */

/**
* The array of bins. Lazily initialized upon first insertion.
* Size is always a power of two. Accessed directly by iterators.
*/
// 默认没初始化的数组,用来保存元素
// 哈希桶数组table用 volatile 修饰主要是保证在数组扩容的时候保证可见性
transient volatile Node<K,V>[] table;

/**
* The next table to use; non-null only while resizing.
*/
// 转移的时候用的数组
private transient volatile Node<K,V>[] nextTable;

/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
*/
// baseCount 变量的含义是 ConcurrentHashMap 中已经插入的键值对数量,它不一定等于 ConcurrentHashMap 的大小。
// 因为在插入键值对时,可能会发生扩容操作,扩容操作会导致表的大小增加,但是 baseCount 值不会随之增加。
// 因此,在使用 ConcurrentHashMap 时,不能只依赖于 baseCount 的值来判断 ConcurrentHashMap 的大小,需要使用 ConcurrentHashMap 的 size() 方法来获取准确的大小。
private transient volatile long baseCount;

/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
/**
* sizeCtl 有四个含义:
* sizeCtl<-1 表示有 N-1 个线程正在执行扩容操作,如 -2 就表示有 2-1 个线程正在扩容。
* sizeCtl=-1 占位符,表示当前正在初始化数组。
* sizeCtl=0 默认状态,表示数组还没有被初始化。
* sizeCtl>0 记录下一次需要扩容的大小。
*
*/
private transient volatile int sizeCtl;

/**
* The next table index (plus one) to split while resizing.
*/
// 表示当前正在进行数据迁移的线程所处理的桶的索引。
// transferIndex 变量的作用是记录当前正在进行数据迁移的线程所处理的桶的索引。
// 当一个线程处理完一个桶中的数据后,它会将 transferIndex 的值加上 stride,然后处理下一个桶,直到处理完所有需要迁移的桶。
private transient volatile int transferIndex;

/**
* Spinlock (locked via CAS) used when resizing and/or creating CounterCells.
*/
// 表示当前cell数组是否在初始化或扩容中的CAS标志位
private transient volatile int cellsBusy;

/**
* 使用CounterCell的原因
* ConCurrentHashMap是采用CounterCell数组来记录元素个数的,
* 一般的集合记录集合大小,直接定义一个size的成员变量即可,当出现改变的时候只要更新这个变量就行。
* 但是ConcurrentHashMap不行。ConCurrentHashMap是并发集合,如果用一个成员变量来统计元素个数的话,
* 为了保证并发情况下共享变量的安全性,势必会需要通过加锁或者自旋来实现,
* 如果竞争比较激烈的情况下,size的设置上会出现比较大的冲突反而影响性能,所以在ConcurrentHashMap采用了分片的方法来记录大小
*/
/**
* Table of counter cells. When non-null, size is a power of 2.
*/
// counterCells数组,总数值的分值分别存在每个cell中
private transient volatile CounterCell[] counterCells;
  • sizeCtl 有四个含义:
    • sizeCtl<-1 表示有 N-1 个线程正在执行扩容操作,如 -2 就表示有 2-1 个线程正在扩容。
    •  `sizeCtl=-1` 占位符,表示当前正在初始化数组。
      
    •  `sizeCtl=0 `默认状态,表示数组还没有被初始化。
      
    •  `sizeCtl>0` 记录下一次需要扩容的大小。
      

Node节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
// 添加了volatile
volatile V val;
// 添加了volatile
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
// ...略
}

get(Object key)方法

  • 根据key获取元素
  • get方法不需要加锁。因为Node的元素value和指针next是用volatile修饰的,在多线程环境下线程A修改节点的value或者新增节点的时候是对线程B可见的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Returns the value to which the specified key is mapped,
* or {@code null} if this map contains no mapping for the key.
*
* <p>More formally, if this map contains a mapping from a key
* {@code k} to a value {@code v} such that {@code key.equals(k)},
* then this method returns {@code v}; otherwise it returns
* {@code null}. (There can be at most one such mapping.)
*
* @throws NullPointerException if the specified key is null
*/
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 第一种情况: 当前桶位上的元素和查找的元素的key完全一致,表示找到了,将节点的值返回
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 第二种情况: 当前节点为红黑树
else if (eh < 0)
// 从红黑树中查找节点并返回
return (p = e.find(h, key)) != null ? p.val : null;
// 第三种情况: 当前节点为链表,遍历链表查询元素
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

put(K key, V value)方法

  • 添加元素
  • 大致步骤
    • 根据key计算出hash
    • 判断是否需要进行初始化,需要则进行初始化table
    • 根据hash计算出桶位,定位到Node,拿到首节点f,对首节点进行以下判断
      • 第一种情况: 若首节点fnul,表示该桶位没有任何节点(没有冲突),使用CAS方式添加元素
        • 如果插入成功,更新baseCount
        • 如果链表转化为红黑树或者扩容,则需要更新transferIndexcellsBusy
      • 第二种情况: 若首节点f满足f.hash == MOVED,表示当前桶位正在进行数组扩张的数据复制阶段,则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失
      • 第三种情况: 首节点f即不为null且当前桶位没有进行扩容操作,则对当前桶位加上synchronized锁,做double-check检查首节点是否还是之前获取的,是的进行下面的操作
        • 若首节点fhash值>=0,表示当前结构为链表,遍历链表查找链表中是否有节点与当前需要添加的节点的hash值是否完全一致(后面提到的条件)
          • 完全一致,继而判断!onlyIfAbsent是否为true,条件成立,则更新找到的节点的value
          • 遍历到链表尾部都没找到符合条件的节点,则创建的节点插入到链表尾部
        • 反之(首节点fhash值<0,当转换为树之后,hash值为-2),结构为红黑树,和链表做类似操作,
          • 先找有没有满足条件的节点,满足了更新value
          • 没有满足新增节点插入红黑树
        • 最后根据当前桶位插入的元素的个数binCount判断链表是否需要转换为红黑树
          • 转换成红黑树的条件: 当前桶位上的链表长度 >= TREEIFY_THRESHOLD && 散列表table的长度 >= MIN_TREEIFY_CAPACITY
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
*
* <p>The value can be retrieved by calling the {@code get} method
* with a key that is equal to the original key.
*
* @param key key with which the specified value is to be associated
* @param value value to be associated with the specified key
* @return the previous value associated with {@code key}, or
* {@code null} if there was no mapping for {@code key}
* @throws NullPointerException if the specified key or value is null
*/
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// K,V都不能为空,否则的话跑出异常
// 因为 ConcurrentHashMap 是用于多线程的,如果ConcurrentHashMap.get(key)得到了 null,这就无法判断,是映射的value是 null ,还是没有找到对应的key而为 null,就有了二义性。
if (key == null || value == null) throw new NullPointerException();
// 取得key的hash值
int hash = spread(key.hashCode());
// 用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树
int binCount = 0;
// 这里其实就是自旋操作,当出现线程竞争时不断自旋
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 第一次put的时候table没有初始化,则初始化table
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// tabAt方法通过hash值对应的数组下标得到第一个节点。以volatile读的方式来读取table数组中的元素,保证每次拿到的数据都是最新的
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果该下标返回的节点为空,则直接通过CAS将新的值封装成node插入即可,如果CAS失败,说明存在竞争,则进入下一次循环。
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
/*
* 如果检测到某个节点的hash值是MOVED,则表示正在进行数组扩张的数据复制阶段,
* 则当前线程也会参与去复制,通过允许多线程复制的功能,一次来减少数组的复制所带来的性能损失
*/
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 如果在这个位置有元素的话,通过synchronized关键字对table的index位置加锁,
// 需要注意的是,当前线程只会锁住table的index位置,其他位置上没有锁住,所以此时其他线程可以安全的获得其他的table位置来进行操作
synchronized (f) {
// 再次取出要存储的位置的元素,跟前面取出来的比较
if (tabAt(tab, i) == f) {
// 如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历
if (fh >= 0) {
binCount = 1;
// 遍历这个链表
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 如果 onlyIfAbsent 为 true,那么插入操作只有在 key 对应的值不存在时才会进行;
// 如果 onlyIfAbsent 为 false,那么无论 key 对应的值是否存在,都会进行插入操作,即如果 key 对应的值已经存在,会被新的 value 覆盖掉。
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 遍历到链表尾部也没找到与要添加的元素的hash相同的节点,则创建的新的节点插入到链表尾部
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 当转换为树之后,hash值为-2
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

initTable()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// sizeCtl 这个标志是在Node数组初始化或者扩容的时候一个控制位标识,负数代表正在进行初始化或者扩容操作。
// -1代表正在初始化
// -N代表有N-1个线程正在进行扩容操作,这里不是简单的理解成n个线程,sizeCtl就是-N
while ((tab = table) == null || tab.length == 0) {
// 被其他线程抢占了初始化的操作,则直接让出自己的CPU时间片
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// SIZECTL:表示当前对象的内存偏移量,sc表示期望值,-1表示要替换的值,设定为-1表示要初始化表了
// volatile的数组只针对数组引用具有volatile的语义,而不是它的元素
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// 默认初始容量为16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 初始化数组,长度为16,或者初始化在构造ConcurrentHashMap的时候传入的长度
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 计算容器容量多少的时候触发扩容,实际就是当前容量的0.75倍,这里使用了右移来计算
sc = n - (n >>> 2);
}
} finally {
// 设置sizeCtl为sc,如果默认是16的话,那么这个时候sc = 16 * 0.75 = 12
sizeCtl = sc;
}
break;
}
}
return tab;
}

helpTransfer方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Helps transfer if a resize is in progress.
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// 判断此时是否仍然在执行扩容,nextTab = null 的时候说明扩容已经结束了
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
// 说明扩容还未完成的情况下不断循环来尝试将当前线程加入到扩容操作中
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// 下面部分的整个代码表示扩容结束,直接退出循环
// (sc >>> RESIZE_STAMP_SHIFT) != rs, 如果在同一轮扩容中,那么 sc 无符号右移比较高位和 rs 的值,那么应该是相等的。如果不相等,说明扩容结束了
// sc == rs + MAX_RESIZERS 表示扩容线程数达到最大扩容线程数
// sc == rs+1 表示扩容结束
// transferIndex <= 0 表示所有的 Node 都已经分配了线程
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
//在低16位 上增加扩容线程数
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

treeifyBin方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Replaces all linked nodes in bin at given index unless table is
* too small, in which case resizes instead.
*/
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
// MIN_TREEIFY_CAPACITY 64
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
// 数组扩容
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
// 把Node组成的链表,转化为TreeNode的链表,头结点任然放在相同的位置
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

tryPresize方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* Tries to presize table to accommodate the given number of elements.
*
* @param size number of elements (doesn't need to be perfectly accurate)
*/
private final void tryPresize(int size) {
/*
* MAXIMUM_CAPACITY = 1 << 30
* 如果给定的大小大于等于数组容量的一半,则直接使用最大容量,
* 否则使用tableSizeFor算出来
* 后面table一直要扩容到这个值小于等于sizeCtrl(数组长度的3/4)才退出扩容
*/
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
else if (tab == table) {
int rs = resizeStamp(n);
if (sc < 0) {
Node<K,V>[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}

transfer方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 将(n>>>3相当于n/8)然后除以CPU核心数。如果得到的结果小于16,那么就使用16
// 这里的目的是让每个CPU处理的桶一样多,避免出现转移任务不均匀的现象,如果桶较少的话,默认一个CPU(一个线程)处理16个桶,也就是长度为16的时候,扩容的首只会有一个线程来扩容
// MIN_TRANSFER_STRIDE ==> 16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
// 新建一个n<<1原始table大小的nextTab,也就是32
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
// 赋值给nextTab
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 扩容失败,sizeCtl使用int的最大值
sizeCtl = Integer.MAX_VALUE;
return;
}
// 更新成员变量
nextTable = nextTab;
transferIndex = n;
}
// 新的tab长度
int nextn = nextTab.length;
// 创建一个fwd节点,表示一个正在被迁移的Node,并且它的hash值为-1(MOVED),也就是前面我们在讲putval方法的时候,会有一个判断MOVED的逻辑。
// 它的作用是用来占位,表示原数组中位置i处的节点完成迁移以后,就会在i位置设置一个fwd来告诉其他线程这个位置已经处理过了。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 首次推进为true,如果等于true,说明需要再次推进一个下标(i--),反之,如果是false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
boolean advance = true;
// 判断是否已经扩容完成,完成了就return,退出循环
boolean finishing = false; // to ensure sweep before committing nextTab
// 通过自循环处理每个槽位中的链表元素,默认advace为真,通过CAS设置transferIndex属性值,并初始化i和bound值,i指当前处理的槽位序号,bound指需要处理的槽位边界,先处理槽位15的节点。
for (int i = 0, bound = 0;;) {
// 这个循环使用CAS不断尝试为当前线程分配任务
// 直到分配成功或任务队列已经被全部分配完毕
// 如果当前线程已经被分配过bucket区域
// 那么会通过--i指向下一个待处理bucket然后退出该循环
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
// --i表示下一个待处理bucket,如果它>=bound,表示当前线程已经分配过bucket区域
if (--i >= bound || finishing)
advance = false;
// 表示所有bucket已经分配完毕
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 通过CAS来修改TRANSFERINDEX,为当前线程分配任务,处理的节点区间为(nextBound,nextIndex) -> (0,15)
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// i < 0 说明已经遍历完整的数组,也就是当前线程已经处理完所有负责的bucket
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 如果完成了扩容
if (finishing) {
// 删除成员变量
nextTable = null;
// 更新table数组
table = nextTab;
// 更新阈值(32 * 0.75 = 24)
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// sizeCtl在迁移前会设置为(rs << RESIZE_STAMP_SHIFT)+2
// 然后,每增加一个线程参与迁移就会将sizeCtl加1
// 这里使用CAS操作对sizeCtl的低16位进行减1,代表偶昨晚了属于自己的任务
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 第一个扩容的线程,执行transfer方法之前,会设置sizeCtl=(resizeStamp(n)<<RESIZE_STAMP_SHIFT)+2,后续帮其扩容的线程,执行transfer方法之前,
// 会这只sizeCtl=sizeCtl+1每一个退出transfer的方法的线程,退出之前,会这只sizeCtl=sizeCtl-1
// 那么最后一个线程退出时,必然有sc == (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2),即 (sc - 2)== resizeStamp(n) << RESIZE_STAMP_SHIFT
// 如果sc-2不等于表示符左移16位。如果他们相等了,说明没有线程在帮助他们扩容了。也就是说扩容结束了。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// 如果相等,扩容结束了,更新finising变量
finishing = advance = true;
// 再次循环检查一下整张表
i = n; // recheck before commit
}
}
// 如果位置i处是空的,没有任何节点,那么放入刚刚初始化的ForwardingNode”空节点“
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 表示该位置已经完成了迁移,也就是如果线程A已经处理过这个节点,那么线程B处理这个节点时,hash值一定为MOVED
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 对数组该节点位置加锁,开始处理数组该位置的迁移工作
synchronized (f) {
// 再做一次校验
if (tabAt(tab, i) == f) {
// ln表示低位,hn表示高位;
Node<K,V> ln, hn;
// 链表扩容节点
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
// 遍历当前bucket的链表,目的是尽量重用Node链表尾部的一部分
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 如果最后更新的runBit是0,设置低位节点
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
// 否则设置高位节点
hn = lastRun;
ln = null;
}
// 构造高位以及低位的链表
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 将低位的链表放在i位置也就是不动
setTabAt(nextTab, i, ln);
// 将高位链表放在i+n位置
setTabAt(nextTab, i + n, hn);
// 把旧的hash桶中放置转发节点(fwd),表明此hash桶已经处理
setTabAt(tab, i, fwd);
advance = true;
}
// 红黑树扩容阶段
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

addCount方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Adds to count, and if table is too small and not already
* resizing, initiates transfer. If already resizing, helps
* perform transfer if work is available. Rechecks occupancy
* after a transfer to see if another resize is already needed
* because resizings are lagging additions.
*
* @param x the count to add
* @param check if <0, don't check resize, if <= 1 only check if uncontended
*/
private final void addCount(long x, int check) {
// x表示这次需要在表中增加的元素个数,check参数表示是否需要进行扩容检查,大于等于0都需要进行检查
CounterCell[] as; long b, s;
// 判断counterCells是否为空
// 1,如果为空,就通过CAS操作尝试修改baseCount变量,对这个变量进行原子累加操作
//(做这个操作的意义是:如果在没有竞争的情况下,仍然采用baseCount来记录元素个数)
// 2,如果CAS失败说明存在竞争,这个时候不能在采用baseCount来累加,而是通过counterCell来记录
if ((as = counterCells) != null ||
// baseCount来表示当前的记录数量
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
// 是否冲突表示,默认为没有冲突
boolean uncontended = true;
// 1.as == null || (m = as.length - 1) < 0 ==> 计数表为空则直接调用fullAddCount
// 2.(a = as[ThreadLocalRandom.getProbe() & m]) == null ==> 从计数表中随机取出一个数组的位置为空,直接调用fullAddCount
// 3.!(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) ==>
// 通过CAS修改CounterCell随机位置的值,如果修改失败说明出现并发情况(这里又用到了一种巧妙的方法),调用fullAndCount
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 初始化数组counterCells
fullAddCount(x, uncontended);
return;
}
// 链表长度小于等于1,不需要考虑扩容
if (check <= 1)
return;
s = sumCount();
}
// 如果binCount>=0,表示需要检查扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// s标识集合大小,如果集合大小大于或等于扩容阈值(默认值的0.75)
// 并且table不为空并且table的长度小于最大容量
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
// sc<0,也就是sizeCtl<0,说明已经有别的线程在扩容了
if (sc < 0) {
// 这5个条件只要有一个条件为true,说明当前线程不能帮助进行此次扩容,直接跳出循环
// sc >>> RESIZE_STAMP_SHIFT != rs 表示比较高RESIZE_STAMP_BITS 位生成戳和rs是否相等,相同
// sc == rs+1 表示扩容结束
// sc == rs + MAX_RESIZERS 表示帮助线程已经达到最大值了
// (nt = nextTable) == null 表示扩容已经结束
// transferIndex<=0 表示所有的tranfer任务都被领完了,没有剩余的hash桶来给当前线程做

if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 当前线程尝试帮助此次扩容,如果成功,则调用transfer
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 如果当前没有在扩容,那么rs肯定是一个正数,通过rs<<RESIZE_STAMP_SHIFT将sc设置为一个负数,+2表示有一个线程在执行扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}