这个ConcurrentHashmap的设计非常精妙,如果有疑问的地方,欢迎大家在评论区进行激烈讨论!
一、静态工具方法
1 private static final int tableSizeFor(int c) {2 int n = c - 1;3 n |= n >>> 1;4 n |= n >>> 2;5 n |= n >>> 4;6 n |= n >>> 8;7 n |= n >>> 16;8 return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;9 }
此方法是对给定的int型数据c,返回一个值(比如叫x),则x满足x >=c且x是2的整数次幂。
首先为什么先将c-1,我们等下再说,先解释下从代码第3行到第7行的意思,第三行的意思是先将n与n无符号右移1位后的值做“或”运算,然后将值再赋给n,之后的以此类推。
为什么最后只到了右移16位呢?因为int数据在内存中只有32位。经过这一系列操作,就保证了n的二进制表示中将第一个出现1的位置的后面全部设置为1。然后再返回n+1就保证了
大于c并且是2的整数次幂。然后再解释下为什么开头先将c减去1,因为如果c本来就是2的m次幂的话,我们使用同样的方法最后会得到2的m+1次幂的结果。
二、初始化table:
1 private final Node<K,V>[] initTable() { 2 Node<K,V>[] tab; int sc; 3 while ((tab = table) == null || tab.length == 0) { 4 if ((sc = sizeCtl) < 0) 5 Thread.yield(); // lost initialization race; just spin 6 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 7 try { 8 if ((tab = table) == null || tab.length == 0) { 9 int n = (sc > 0) ? sc : DEFAULT_CAPACITY;10 @SuppressWarnings("unchecked")11 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];12 table = tab = nt;13 sc = n - (n >>> 2);14 }15 } finally {16 sizeCtl = sc;17 }18 break;19 }20 }21 return tab;22 }
当多个线程同事执行第6行时,只会有一个返回true。compareAndSwapInt方法会将堆上的字段sizeCtl改为-1.这样其他线程会继续在while循环中一直处于第4行的判断内。直到线程将使用sizeCtl将table初始化完。在初始化table后,sizeCtl会修改为下次需要扩容的阈值,即table容量乘以负载因子(n*0.75),这里使用位移的方法(如第13行)。从这里可以看出,其实这个负载因子是固定不变的。构造函数中的loadFactor,仅仅影响table初始化的容量:
1 public ConcurrentHashMap(int initialCapacity, 2 float loadFactor, int concurrencyLevel) { 3 if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) 4 throw new IllegalArgumentException(); 5 if (initialCapacity < concurrencyLevel) // Use at least as many bins 6 initialCapacity = concurrencyLevel; // as estimated threads 7 long size = (long)(1.0 + (long)initialCapacity / loadFactor); 8 int cap = (size >= (long)MAXIMUM_CAPACITY) ? 9 MAXIMUM_CAPACITY : tableSizeFor((int)size);10 this.sizeCtl = cap;11 }
三、新增数据(put方法)
1 final V putVal(K key, V value, boolean onlyIfAbsent) { 2 if (key == null || value == null) throw new NullPointerException(); 3 int hash = spread(key.hashCode()); 4 int binCount = 0; 5 for (Node<K,V>[] tab = table;;) { 6 Node<K,V> f; int n, i, fh; 7 if (tab == null || (n = tab.length) == 0) 8 tab = initTable(); 9 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {10 if (casTabAt(tab, i, null,11 new Node<K,V>(hash, key, value, null)))12 break; // no lock when adding to empty bin13 }14 else if ((fh = f.hash) == MOVED)15 tab = helpTransfer(tab, f);16 else {17 V oldVal = null;18 synchronized (f) {19 if (tabAt(tab, i) == f) {20 if (fh >= 0) {21 binCount = 1;22 for (Node<K,V> e = f;; ++binCount) {23 K ek;24 if (e.hash == hash &&25 ((ek = e.key) == key ||26 (ek != null && key.equals(ek)))) {27 oldVal = e.val;28 if (!onlyIfAbsent)29 e.val = value;30 break;31 }32 Node<K,V> pred = e;33 if ((e = e.next) == null) {34 pred.next = new Node<K,V>(hash, key,35 value, null);36 break;37 }38 }39 }40 else if (f instanceof TreeBin) {41 Node<K,V> p;42 binCount = 2;43 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,44 value)) != null) {45 oldVal = p.val;46 if (!onlyIfAbsent)47 p.val = value;48 }49 }50 }51 }52 if (binCount != 0) {53 if (binCount >= TREEIFY_THRESHOLD)54 treeifyBin(tab, i);55 if (oldVal != null)56 return oldVal;57 break;58 }59 }60 }61 addCount(1L, binCount);62 return null;63 }
方法的主要逻辑是:在key和数组长度计算出的值来确定在数组中插入的位置,如果此位置原来没有节点则构造一个节点插入该位置,如果以前有节点,则在节点所在的链表添加新的节点,如果此位置的节点是一颗树,则在树上添加新的节点。如果插入后的节点数量大于TREEIFY_THRESHOLD,则将该节点转化为树形结构。(该树为一颗红黑树)。其中第15行方法helpTransfer,第54行treeifyBin和第61行addCount方法,将会在后面进行说明。值得注意的是:在方法treefyBin中,会判断如果table的长度小于MIN_TREEIFY_CAPACITY的话,则不会将节点构造成树,而是将table扩容。
四、树化
1 private final void treeifyBin(Node<K,V>[] tab, int index) { 2 Node<K,V> b; int n, sc; 3 if (tab != null) { 4 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) 5 tryPresize(n << 1); 6 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { 7 synchronized (b) { 8 if (tabAt(tab, index) == b) { 9 TreeNode<K,V> hd = null, tl = null;10 for (Node<K,V> e = b; e != null; e = e.next) {11 TreeNode<K,V> p =12 new TreeNode<K,V>(e.hash, e.key, e.val,13 null, null);14 if ((p.prev = tl) == null)15 hd = p;16 else17 tl.next = p;18 tl = p;19 }20 setTabAt(tab, index, new TreeBin<K,V>(hd));21 }22 }23 }24 }25 }
真正的树化动作是在第20行的new TreeBin<K,V>(hd)此构造方法中进行的。其中TreeBin类是一个红黑树结构。
五、扩容方法(transfer)
1 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { 2 int n = tab.length, stride; 3 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) 4 stride = MIN_TRANSFER_STRIDE; // subdivide range 5 if (nextTab == null) { // initiating 6 try { 7 @SuppressWarnings("unchecked") 8 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; 9 nextTab = nt; 10 } catch (Throwable ex) { // try to cope with OOME 11 sizeCtl = Integer.MAX_VALUE; 12 return; 13 } 14 nextTable = nextTab; 15 transferIndex = n; 16 } 17 int nextn = nextTab.length; 18 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); 19 boolean advance = true; 20 boolean finishing = false; // to ensure sweep before committing nextTab 21 for (int i = 0, bound = 0;;) { 22 Node<K,V> f; int fh; 23 while (advance) { 24 int nextIndex, nextBound; 25 if (--i >= bound || finishing) 26 advance = false; 27 else if ((nextIndex = transferIndex) <= 0) { 28 i = -1; 29 advance = false; 30 } 31 else if (U.compareAndSwapInt 32 (this, TRANSFERINDEX, nextIndex, 33 nextBound = (nextIndex > stride ? 34 nextIndex - stride : 0))) { 35 bound = nextBound; 36 i = nextIndex - 1; 37 advance = false; 38 } 39 } 40 if (i < 0 || i >= n || i + n >= nextn) { 41 int sc; 42 if (finishing) { 43 nextTable = null; 44 table = nextTab; 45 sizeCtl = (n << 1) - (n >>> 1); 46 return; 47 } 48 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { 49 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 50 return; 51 finishing = advance = true; 52 i = n; // recheck before commit 53 } 54 } 55 else if ((f = tabAt(tab, i)) == null) 56 advance = casTabAt(tab, i, null, fwd); 57 else if ((fh = f.hash) == MOVED) 58 advance = true; // already processed 59 else { 60 synchronized (f) { 61 if (tabAt(tab, i) == f) { 62 Node<K,V> ln, hn; 63 if (fh >= 0) { 64 int runBit = fh & n; 65 Node<K,V> lastRun = f; 66 for (Node<K,V> p = f.next; p != null; p = p.next) { 67 int b = p.hash & n; 68 if (b != runBit) { 69 runBit = b; 70 lastRun = p; 71 } 72 } 73 if (runBit == 0) { 74 ln = lastRun; 75 hn = null; 76 } 77 else { 78 hn = lastRun; 79 ln = null; 80 } 81 for (Node<K,V> p = f; p != lastRun; p = p.next) { 82 int ph = p.hash; K pk = p.key; V pv = p.val; 83 if ((ph & n) == 0) 84 ln = new Node<K,V>(ph, pk, pv, ln); 85 else 86 hn = new Node<K,V>(ph, pk, pv, hn); 87 } 88 setTabAt(nextTab, i, ln); 89 setTabAt(nextTab, i + n, hn); 90 setTabAt(tab, i, fwd); 91 advance = true; 92 } 93 else if (f instanceof TreeBin) { 94 TreeBin<K,V> t = (TreeBin<K,V>)f; 95 TreeNode<K,V> lo = null, loTail = null; 96 TreeNode<K,V> hi = null, hiTail = null; 97 int lc = 0, hc = 0; 98 for (Node<K,V> e = t.first; e != null; e = e.next) { 99 int h = e.hash;100 TreeNode<K,V> p = new TreeNode<K,V>101 (h, e.key, e.val, null, null);102 if ((h & n) == 0) {103 if ((p.prev = loTail) == null)104 lo = p;105 else106 loTail.next = p;107 loTail = p;108 ++lc;109 }110 else {111 if ((p.prev = hiTail) == null)112 hi = p;113 else114 hiTail.next = p;115 hiTail = p;116 ++hc;117 }118 }119 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :120 (hc != 0) ? new TreeBin<K,V>(lo) : t;121 hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :122 (lc != 0) ? new TreeBin<K,V>(hi) : t;123 setTabAt(nextTab, i, ln);124 setTabAt(nextTab, i + n, hn);125 setTabAt(tab, i, fwd);126 advance = true;127 }128 }129 }130 }131 }132 }
总体逻辑:
1.如果nextTable为null则初始化一个。(容量为以前的2倍)
2.然后将table中的节点按MIN_TRANSFER_STRIDE分为多个区间
3.对每个区间的节点数据进行转移,转以后在table中将节点(即使该节点为null)置为ForwardingNode,标记为已转移。
4.最后table = nextTable。
需要注意的细节:
问:为什么要分为多个区间?
答:可以允许多个线程同时进行扩容不同的区间而不受其他线程影响。关键代码位置:第27行到第37行。
第63行条件判断开始处理链表节点的转移:
runBit记录节点的hash属性与原数组长度n的“&”运算结果,lastRun记录的是链表中最后面位置中节点的runBit值相同的子链表的头结点。
因为n是原数组的长度,而数组长度必须是2的整数次幂(n == 2的x次方),所以n的二进制表示中,只有第x+1位是1,其他全为0。所以如果hash&n == 0,则表示hash中第x+1位为0,否则为1.
我们现在假设要转移的节点(node)所在数组中的位置为i,则i == node.hash&(n-1),如果node.hash&n == 0的话,那么node.hash&(2n-1) == node.hash&(n-1) == i。即扩容后node的位置不变。
如果node.hash&n > 0的话,则node.hash&(2n-1) == i + n。即扩容后node的位置向后移动n位。
上面的第64行到第87行是将当前节点分为2个子链表,分别是ln为hash&n == 0的子链表 和hn为hash&n > 0的子链表。根据我们之前讨论的结果ln应该还在当前的位置i,而hn则应该向后移动n位,即在i+n位置。
然后将旧数组的当前节点i位置用一个标志节点来标记此位置已经转移了。
转移树形结构的节点与链表类似,因为TreeNode是Node的子类,其本身也是一个链表,可以通过next属性遍历整个树。
再看下sizeCtl这个实例变量:
1 private transient volatile int sizeCtl;
当它为负值时,表示正在初始化或者扩容(-1表示初始化);
否则,当table为null时,它为默认初始值0,或者保存着table的初始化值;
当table初始化结束时,它保存着下次需要扩容的阈值。
六、计数方法
1 private final void addCount(long x, int check) { 2 CounterCell[] as; long b, s; 3 if ((as = counterCells) != null || 4 !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { 5 CounterCell a; long v; int m; 6 boolean uncontended = true; 7 if (as == null || (m = as.length - 1) < 0 || 8 (a = as[ThreadLocalRandom.getProbe() & m]) == null || 9 !(uncontended =10 U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {11 fullAddCount(x, uncontended);12 return;13 }14 if (check <= 1)15 return;16 s = sumCount();17 }18 if (check >= 0) {19 Node<K,V>[] tab, nt; int n, sc;20 while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&21 (n = tab.length) < MAXIMUM_CAPACITY) {22 int rs = resizeStamp(n);23 if (sc < 0) {24 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||25 sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||26 transferIndex <= 0)27 break;28 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))29 transfer(tab, nt);30 }31 else if (U.compareAndSwapInt(this, SIZECTL, sc,32 (rs << RESIZE_STAMP_SHIFT) + 2))33 transfer(tab, null);34 s = sumCount();35 }36 }37
该方法涉及的两个实例变量baseCount和counterCells是用来保存当前table中有多少数据的。
第17行之前是计算数据个数的,之后是检查是否需要扩容或者如果正在扩容则参与到扩容中。然后重新检查看看是否需要继续扩容。
其中的第24行判断,是错误的,在好几个方法中用到此种判断的都是同样的错误,请注意(这是jdk的一个BUG,在现在的已发布的JDK8-JDK11版本中都未解决,估计以后在JDK12发布版本中会fix掉,现在的openJDK12中已经fix了,参考https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427),其中的sc==rs + 1 || sc == rs + MAX_RESIZES应该改为sc == (rs << RESIZE_STAMP_SHIFT)+ 1 || sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZES,
用来判断扩容是否结束和扩容线程是否达到最大值。可以看下第31行,表示初始化扩容,所以开始扩容时, sizeCtl == (rs << RESIZE_STAMP_SHIFT) + 2,因为每个扩容线程在进行帮助扩容时都会使用CAS将sizeCtl+1,然后进入transfer,接着在transfer方法中,每当一个扩容线程结束时都会将sizeCtl - 1,所以当所有扩容线程结束时sizeCtl == (rs << RESIZE_STAMP_SHIFT)+ 1,因为第一个进入扩容的线程没有将sizeCtl+1.
七、最后看下size计算table中数据的个数的方法
1 public int size() { 2 long n = sumCount(); 3 return ((n < 0L) ? 0 : 4 (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : 5 (int)n); 6 } 7 8 9 10 final long sumCount() {11 CounterCell[] as = counterCells; CounterCell a;12 long sum = baseCount;13 if (as != null) {14 for (int i = 0; i < as.length; ++i) {15 if ((a = as[i]) != null)16 sum += a.value;17 }18 }19 return sum;20 }
因为size中调用sumCount方法来间接计算个数,所以直接看sumCount方法。
它是将baseCount和counterCells中每个节点的value属性值累加在一起得到最后的数量。