本文共 22200 字,大约阅读时间需要 74 分钟。
JDK1.7的实现
ConcurrentHashMap的数据结构是由一个Segment数组和多个HashEntry组成,如图:
Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,segment的结构和HashMap类似,是一种数组和链表的结构,一个segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改,必须首先获得与它对应的segment锁
1.初始化
int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } int segmentShift = 32 - sshift; int segmentMask = ssize - 1;
segments数组的长度ssize是通过concurrencyLevel计算得出的,ssize用位与运算来计算,so,segments数组的长度是2的n次方,concurrencyLevel的最大值为65536,这意味着segments数组的长度最大值为65536,对应的二进制为16位
sshift等于ssize从1向左移位的次数,默认concurrencyLevel等于16,1需要向左移位4次,so,是shift等于4;segmentShift用于定位参与散列运算的位数,这里等于28,因为ConcurrentHashMap里的hash方法输出的最大数是32位的,so,32-4=28;segmentMask是散列运算的掩码,掩码的二进制各个位都是1
2.get
ConcurrentHashMap的get操作和HashMap类似,只是第一次需要经过hash定位到segment的位置,然后再hash定位到HashEntry,遍历该HashEntry下的链表进行对比,成功就返回,不成功就返回null
可以看出,定位segment和定位HashEntry所使用的算法不一
public V get(Object key) { Segments; // manually integrate access methods to reduce overhead HashEntry [] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment )UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry e = (HashEntry ) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
3.put
put需要对共享变量进行写入操作,so,需要加锁。该方法首先定位到segment,在segment里进行插入操作。插入操作需要两步,第一判断是否需要对segment里的HashEntry数据进行扩容,第二定位添加元素的位置,然后将其放在HashEntry数组里
(1)是否需要扩容
在插入元素前会先判断segment里的HashEntry数组是否超过容量threshold,如果超过阈值则进行扩容,注意,segment扩容比HashMap更恰当,因为HashMap是在插入元素后判断元素是否已经达到容量的,如果达到了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap即进行了一次无效扩容
(2)如何扩容
首先创建一个容量是原来两倍的数组,然后将原数组里的元素进行再散列插入到新数组里,为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容
static final class Segmentextends ReentrantLock implements Serializable {
public V put(K key, V value) { Segments; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment )UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
在将数据插入指定的HashEntry位置时,会通过继承的ReentrantLock的tryLock方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该segment的锁,那么当前线程会以自旋的方式继续调用tryLock方法区获取锁,超过指定次数就挂起,等待唤醒
final V put(K key, int hash, V value, boolean onlyIfAbsent) { HashEntrynode = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry [] tab = table; int index = (tab.length - 1) & hash; HashEntry first = entryAt(tab, index); for (HashEntry e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry (hash, key, value, first); int c = count + 1; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
通过tryLock获得锁
private HashEntryscanAndLockForPut(K key, int hash, V value) { HashEntry first = entryForHash(this, hash); HashEntry e = first; HashEntry node = null; int retries = -1; // negative while locating node while (!tryLock()) { HashEntry f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node node = new HashEntry (hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else e = e.next; } else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node; }
4.size
ConcurrentHashMap先尝试2次通过不锁住segment的方式来统计各个segment大小,如果统计的过程中容器的count发生了变化,则再采用加锁的方式来统计所有segment的大小
如何判断在统计的时候容器发生变化?使用modCount变量,在put、remove和clean方法里操作元素前都会将该变量加1,在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化
public int size() { // Try a few times to get accurate count. On failure due to // continuous async changes in table, resort to locking. final Segment[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }
5.扩容rehash
和HashMap的resize原理类似,避免让所有的结点都进行复制:扩容是基于2的幂指来操作,假设扩容前某HashEntry对应的segment中数组的index为i,数组容量为 capacity,那么扩容后该HashEntry对应到新数组中的index只可能为i或者i+capacity,因此大多数HashEntry结点在扩容前后index可以保持不变,基于此,rehash方法中会定位第一个后续所有结点在扩容后index都保持不变的结点,然后将这个结点之前的所有节点重排即可
JDK1.8的实现
首先取消了segment分段锁的数据结构,取而代之的是数组+链表(红黑树)的结构,而对于锁的粒度,调整为对每个数组元素加锁;然后是定位结点的hash算法被简化,这样带来的弊端是hash冲突会加剧,因此在链表节点数量大于8时,会将链表转为红黑树进行存储,这样一来,查询的时间复杂度就会从原来的O(n)变为O(logN)
1.相关属性
//负载因子private static final float LOAD_FACTOR = 0.75f;//链表转为红黑树的阈值,大于8则转为红黑树结构static final int TREEIFY_THRESHOLD = 8;//红黑树转链表的阈值static final int UNTREEIFY_THRESHOLD = 6;//sizeCtl用于table的初始化和扩容操作,不同值代表状态如下://-1表示正在初始化;-N表示有N-1个线程正在进行扩容操作//非负情况:如果table未初始化,则表示table需要初始化的大小;如果初始化完成,则表示table扩容的阈值,默认为table容量的0.75倍private transient volatile int sizeCtl;
构造函数,在创建ConcurrentHashMap时,并没有初始化table数组,只对Map容量、并发级别等做了赋值操作
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) // Use at least as many bins initialCapacity = concurrencyLevel; // as estimated threads long size = (long)(1.0 + (long)initialCapacity / loadFactor); int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); this.sizeCtl = cap; }
2.Node
Node是ConcurrentHashMap存储结构的基本单元,只允许对数据进行查找,不允许修改
static class Nodeimplements Map.Entry { //链表的数据结构 final int hash; final K key; //val和next都会在扩容时发生变化,so加上volatile来保持可见性和禁止重排序 volatile V val; volatile Node next; Node(int hash, K key, V val, Node next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } public final String toString(){ return key + "=" + val; } //不允许更新value public final V setValue(V value) { throw new UnsupportedOperationException(); } public final boolean equals(Object o) { Object k, v, u; Map.Entry e; return ((o instanceof Map.Entry) && (k = (e = (Map.Entry )o).getKey()) != null && (v = e.getValue()) != null && (k == key || k.equals(key)) && (v == (u = val) || v.equals(u))); } /** * Virtualized support for map.get(); overridden in subclasses. */ Node find(int h, Object k) { Node e = this; if (k != null) { do { K ek; if (e.hash == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; } while ((e = e.next) != null); } return null; } }
3.TreeNode
继承Node,但是数据结构换成了二叉树结构,是红黑树的存储结构。ConcurrentHashMap链表转树时,并不会直接转,只是把这些节点包装成TreeNode放到TreeBin中,再由TreeBin转化红黑树
static final class TreeNodeextends Node { //树形结构的属性定义 TreeNode parent; // red-black tree links TreeNode left; TreeNode right; TreeNode prev; // needed to unlink next upon deletion boolean red;//标志红黑树的红结点 TreeNode(int hash, K key, V val, Node next, TreeNode parent) { super(hash, key, val, next); this.parent = parent; } Node find(int h, Object k) { return findTreeNode(h, k, null); } /** * 根据key查找,从根节点开始找出相应的TreeNode */ final TreeNode findTreeNode(int h, Object k, Class kc) { if (k != null) { TreeNode p = this; do { int ph, dir; K pk; TreeNode q; TreeNode pl = p.left, pr = p.right; if ((ph = p.hash) > h) p = pl; else if (ph < h) p = pr; else if ((pk = p.key) == k || (pk != null && k.equals(pk))) return p; else if (pl == null) p = pr; else if (pr == null) p = pl; else if ((kc != null || (kc = comparableClassFor(k)) != null) && (dir = compareComparables(kc, k, pk)) != 0) p = (dir < 0) ? pl : pr; else if ((q = pr.findTreeNode(h, k, kc)) != null) return q; else p = pl; } while (p != null); } return null; } }
4.TreeBin
封装TreeNode的容器,提供转换红黑树的一些条件和锁的控制,ConcurrentHashMap底层存放的就是TreeBin对象,而不是TreeNode对象
static final class TreeBinextends Node { //指向TreeNode列表和根节点 TreeNode root; volatile TreeNode first; volatile Thread waiter; volatile int lockState; // 读写锁的状态 static final int WRITER = 1; // 获取写锁时 static final int WAITER = 2; // 等待写锁时 static final int READER = 4; // 增加数据时读锁的状态 /** * Tie-breaking utility for ordering insertions when equal * hashCodes and non-comparable. We don't require a total * order, just a consistent insertion rule to maintain * equivalence across rebalancings. Tie-breaking further than * necessary simplifies testing a bit. */ static int tieBreakOrder(Object a, Object b) { int d; if (a == null || b == null || (d = a.getClass().getName(). compareTo(b.getClass().getName())) == 0) d = (System.identityHashCode(a) <= System.identityHashCode(b) ? -1 : 1); return d; } /** * 初始化红黑树 */ TreeBin(TreeNode b) { super(TREEBIN, null, null, null); this.first = b; TreeNode r = null; for (TreeNode x = b, next; x != null; x = next) { next = (TreeNode )x.next; x.left = x.right = null; if (r == null) { x.parent = null; x.red = false; r = x; } else { K k = x.key; int h = x.hash; Class kc = null; for (TreeNode p = r;;) { int dir, ph; K pk = p.key; if ((ph = p.hash) > h) dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); TreeNode xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) { x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); break; } } } } this.root = r; assert checkInvariants(root); }
5.put操作
发现,代码中加锁片段用的是synchronized关键字,而不是像1.7中的ReentrantLock,说明synchronized在新版本的JDK中优化的程度和ReentrantLock差不多了
ConcurrentHashMap在对key求Hash值的时候,为了实现segment均匀分布,进行了两次hash
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }
public V put(K key, V value) { return putVal(key, value, false); } /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); //两次hash,减少hash冲突,可以均匀分布 int hash = spread(key.hashCode()); int binCount = 0; //对table进行迭代 for (Node[] tab = table;;) {//类似于while(true),这道插入成功 Node f; int n, i, fh; //上面构造方法是否进行初始化在这里判断,为null就调用initTable进行初始化,属于懒汉模式初始化 if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果i位置没有数据,就直接无锁插入 if (casTabAt(tab, i, null, new Node (hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED)//如果在进行扩容,则先进行扩容操作 tab = helpTransfer(tab, f); else { V oldVal = null; //如果以上都不满足,则进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点 synchronized (f) {//锁定,hash值相同的链表的头结点 if (tabAt(tab, i) == f) {//避免多线程,需要重新检查 if (fh >= 0) {//表示该结点是链表结构 binCount = 1; //该for循环先查找链表中是否出现了此key如果出现则更新value并跳出循环,否则将结点插入到链表末尾并跳出循环 for (Node e = f;; ++binCount) { K ek; //相同的key进行put就会覆盖原先的value if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; if ((e = e.next) == null) {//插入链表尾部 pred.next = new Node (hash, key, value, null); break; } } } else if (f instanceof TreeBin) {//红黑树结构 Node p; binCount = 2; //红黑树结构旋转插入 if ((p = ((TreeBin )f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } //插入成功后,如果插入的是链表结点,需要判断下该桶位是否要转化为树 //如果链表的长度大于8就进行红黑树的转换 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
从代码可以看出,put的大致步骤如下:
- 如果没有初始化,就先调用initTable方法进行初始化过程
- 如果没有hash冲突则直接CAS插入
- 如果还在进行扩容操作就先进行扩容
- 如果存在hash冲突,就加锁来保证线程安全,有两种情况,一是链表形式就直接遍历到尾端插入,二是按照红黑树结构插入
- 如果该链表的数量大于阈值8,就先转换成红黑树结构,break再一次进入循环
- 如果添加成功就调用addCount方法统计size,并检查是否需要扩容
treeifyBin方法,检查下table长度是否大于64,如果不大于,则调用tryPresize方法将table两倍扩容即可,就不将其转为树了,如果大于,则就将table[i]的链表转为红黑树
private final void treeifyBin(Node[] tab, int index) { Node b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY)//容量<64,则两倍扩容 tryPresize(n << 1); else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) {//读写锁 if (tabAt(tab, index) == b) { TreeNode hd = null, tl = null; for (Node e = b; e != null; e = e.next) { TreeNode p = new TreeNode (e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin (hd)); } } } } }
不过在结构转换之前,会对数组长度进行判断
private final void tryPresize(int size) {//若给定的容量>= (MAXIMUM_CAPACITY的一半,直接扩容到最大值,否则调用tableSizeFor扩容 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : //tableSizeFor(count)作用是找到大于等于count的最小值 tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) {//只有大于等于0才表示该线程可以扩容 Node[] tab = table; int n; if (tab == null || (n = tab.length) == 0) {//表示没有被初始化 n = (sc > c) ? sc : c; //期间没有其他线程对表操作,则CAS将SIZECTL设为-1,表示正在初始化 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if (table == tab) {//再一次检查 @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node [n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc;//更新扩容阈值 } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) break; else if (tab == table) { int rs = resizeStamp(n); if (sc < 0) { Node [] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); } } }
新增结点之后,会调用addCount方法记录元素个数,并检查是否需要进行扩容,当数组元素个数达到阈值时,会触发transfer方法,重新调整结点的位置,详细,参考:
6.get操作
ForwardingNode表示临时结点,在扩容时使用
static final class ForwardingNodeextends Node { final Node [] nextTable; ForwardingNode(Node [] tab) { super(MOVED, null, null, null); this.nextTable = tab; } Node find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node [] tab = nextTable;;) { Node e; int n;
public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; int h = spread(key.hashCode());//定位到table中的i if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {//读取首节点的node if ((eh = e.hash) == h) {//如果该结点是首节点就返回 if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //hash值为负值表示正在扩容,这个时候查的是ForwardingNode方法来定位到nextTable来查找,找得到就返回 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
- 计算hash,定位到该table索引值,如果是首节点符合就返回
- 如果遇到扩容,会调用标志正在扩容结点ForwardingNode的find方法,查找该结点,匹配就返回
- 以上都不符合的话,就往下遍历,匹配就返回,否则最后返回null
7.size
//1.2时就加入的 public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } //1.8加入的API public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
JDK1.8中新增了mappingCount的API,这个API的不同之处就是返回值是long类型,这样就不受Integer.MAX_VALUE大小的限制了,两个方法都调用sumCount,且返回的都是一个估计值(JDK1.7使用加锁方式实现,而1.8牺牲了精度来换取更高的效率)
总结
从JDK1.7的ReentrantLock+Segment+HashEntry到JDK1.8的synchronized+CAS+HashEntry+红黑树:
1.JDK1.8的实现降低了锁的粒度,JDK1.7版本的锁的粒度是基于segment的,包含多个HashEntry,而JDK1.8锁的粒度就是HashEntry
2.JDK1.8的数据结构变得更简单,使得操作也更加清晰流畅,因为已经使用synchronized来进行同步,所以不需要分段锁的概念,但是由于粒度的降低,实现的复杂度也相对应提高
3.JDK1.8使用红黑树来优化链表,基于长度很长的链表的遍历是一个漫长的过程,而红黑树的遍历效率时很快的,代替一定阈值的链表,形成最佳拍档
参考
《java并发编程的艺术》
转载地址:https://blog.csdn.net/Autumn03/article/details/81141728 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!