Java--ConcurrentHashMap & HashTable 详解

tech2024-08-02  66

文章目录

一、HashTable二、Colletions.synchronizedMap三、1.7的ConcurrentHashMap1.7 初始化源码解析1.7 putensureSegementscanAndLockForPutput 正统方法rehash 1.7 get1.7 size 四、1.8的ConcurrentHashMap1.8 初始化1.8 putCAS 与 ABA 1.8 get1.8 size 五、快速失败(fail-fast) & 安全失败(fail—safe)六、参考资料

一、HashTable

HashTable是线程安全的HashMap,里面的方法都是用synchronized修饰过的,跟HashMap有以下不同之处:

初始容量不同,HashMap是16,HashTable是11HashMap可以存储值为null的元素,而HashTable不可以HashMap的迭代器是基于快速失败(modCount)机制,fail-fast,而HashTable的迭代器不是快速失败的。

二、Colletions.synchronizedMap

除HashTable外,利用Colletions.synchronizedMap也可以得到线程安全的map,synchronizedMap是Collections的一个静态内部类,里面的方法都是用synchronized代码块去修饰的,因此也是线程安全的。

三、1.7的ConcurrentHashMap

在JDK 1.7的ConcurrentHashMap,底层是segement数组+HashEntry数组+链表这样的数据结构。

第一层就是segement数组,每一个segement里都有一个HashEntry数组,ConcurrentHashMap一旦初始化之后,segement数组默认长度是16,也就是所能支持的并发程度是16,但是segement数组是不能动态扩容的,而每一个segement里面的HashEntry是可以动态扩容的。

1.7的ConcurrentHashMap基于segement分段锁这样的机制,segement是继承了ReentranLock,通过ReentranLock来进行一个并发控制,操作数据时,会首先判断当前的key属于哪个segement,拿到当前的segement锁之后才能进行操作,写操作不用获取segement锁,因为value是用valotile修饰的。

1.7 初始化源码解析

如果使用空的构造方法,则会赋予三个默认值,初始容量(16)、负载因子(0.75)、并发程度(16)。最后使用默认构造函数执行有参构造方法。

public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); } @SuppressWarnings("unchecked") public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) { ... //上面都是一些合法性校验 //Segment 中的类似于 HashMap 的容量至少是2或者2的倍数 while (cap < c) cap <<= 1; // create segments and segments[0] // 创建 Segment 数组,设置 segments[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }

传入默认参数执行有参构造方法,前面都是一些参数的合法性校验,最重要的下面几句。

以默认的capacity创建Segement数组,然后创建第0号位置的segement,第0号位置的segement里的HashEntry数组大小为2,负载因子是0.75,也就是说往第0号位置的segement put第二个元素时,第0号位置的segement的HashEntry数组才会进行扩容,segement数组不会扩容。

为什么只创建第0号位置的segement?因为在put的时候是以此为原型的,可以看put方法。

1.7 put

public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); // hash 值无符号右移 28位(初始化时获得),然后与 segmentMask=15 做与运算 // 其实也就是把高4位与segmentMask(1111)做与运算 int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment // 如果查找到的 Segment 为空,初始化 s = ensureSegment(j); return s.put(key, hash, value, false); }

进入put方法,会首先获取到key的hash值,通过右移偏移量和segementMask进行与运算得到当前key属于哪个segement,如果当前segement为空则进行segement的初始化–ensureSegement方法。

ensureSegement

private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; // 判断 u 位置的 Segment 是否为null if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { Segment<K,V> proto = ss[0]; // use segment 0 as prototype // 获取0号 segment 里的 HashEntry<K,V> 初始化长度 int cap = proto.table.length; // 获取0号 segment 里的 hash 表里的扩容负载因子,所有的 segment 的 loadFactor 是相同的 float lf = proto.loadFactor; // 计算扩容阀值 int threshold = (int)(cap * lf); // 创建一个 cap 容量的 HashEntry 数组 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck // 再次检查 u 位置的 Segment 是否为null,因为这时可能有其他线程进行了操作 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 自旋检查 u 位置的 Segment 是否为null while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 使用CAS 赋值,只会成功一次 if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }

在ensureSegement中会判断当前的segement为空,那么就会进行初始化。

还记得之前在构造函数初始化segement数组的时候,它初始化了第0号位置的segement吗?这里就用到了,这里用了segement数组的第0号位置的segement作为一个原型prototype,创建当前位置的segement,生成的segement中,里面的HashEntry数组跟原型一样,都是长度为2,负载因子是0.75。

生成segement之后,会自旋检查当前的segement是不是null,因为这段时间有可能其他线程再操作,确认为null后,通过CAS将新的segement放入到segements数组里。

scanAndLockForPut

final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 获取 ReentrantLock 独占锁,获取不到,scanAndLockForPut 获取。 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); //... }

进入put方法之后,会尝试去获取当前的segement锁,如果获取不到则进入scanAndLockForPut方法。

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node // 自旋获取锁 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { ... } else if (++retries > MAX_SCAN_RETRIES) { // 自旋达到指定次数后,阻塞等到只到获取到锁 lock(); break; } else if ... } return node; }

尝试获取锁,如果在指定次数获取锁成功,那么就表示获取到锁了,如果在指定次数没有获取到锁,那么会阻塞直到获取到锁。

put 正统方法

简单总结就是,获取到锁之后,通过hash值定位到当前key属于当前segement下HashEntry数组的哪一个下标index,拿到index之后。

如果当前Index位置没有值,直接put。

如果当前inde位置有值,则会遍历链表找到key相同的,如果找的到就更新value,找不到说明发生Hash冲突,通过头插法插入到链表的头部。

final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 获取 ReentrantLock 独占锁,获取不到,scanAndLockForPut 获取。 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { HashEntry<K,V>[] tab = table; // 计算要put的数据位置 int index = (tab.length - 1) & hash; // CAS 获取 index 坐标的值 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { // 检查是否 key 已经存在,如果存在,则遍历链表寻找位置,找到后替换 value K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { // first 有值没说明 index 位置已经有值了,有冲突,链表头插法。 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; // 容量大于扩容阀值,小于最大容量,进行扩容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else // index 位置赋值 node,node 可能是一个元素,也可能是一个链表的表头 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }

rehash

如果当前的HashEntry数组大小超过了阈值,那么就会对当前segement里面的HashEntry数组进行一个扩容,并不是整一个segement数组,segement数组不会扩容。

具体的逻辑就是以旧HashEntry数组为模板,创建新的HashEntry数组,大小为原来的两倍,把旧HashEntry的数组的结点重新hash到新的数组,并把当前的segement里的hashEntry引用指向新的HashEntry数组。

private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; // 老容量 int oldCapacity = oldTable.length; // 新容量,扩大两倍 int newCapacity = oldCapacity << 1; // 新的扩容阀值 threshold = (int)(newCapacity * loadFactor); // 创建新的数组 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; // 新的掩码,默认2扩容后是4,-1是3,二进制就是11。 int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { // 遍历老数组 HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; // 计算新的位置,新的位置只可能是不便或者是老的位置+老的容量。 int idx = e.hash & sizeMask; if (next == null) // Single node on list // 如果当前位置还不是链表,只是一个元素,直接赋值 newTable[idx] = e; else { // Reuse consecutive sequence at same slot // 如果是链表了 HashEntry<K,V> lastRun = e; int lastIdx = idx; // 新的位置只可能是不便或者是老的位置+老的容量。 // 遍历结束后,lastRun 后面的元素位置都是相同的 for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } // ,lastRun 后面的元素位置都是相同的,直接作为链表赋值到新位置。 newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { // 遍历剩余元素,头插法到指定 k 位置。 V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } // 头插法插入新的节点 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }

1.7 get

1.7中的get逻辑比较简单,get操作是不用加锁的,只需要将key通过hash之后找到对应的segement,再通过一次hash值定位到对应segement的hashEntry数组对应的位置上。

因为value是用volatile修饰的,因此能够保证每次获取到的value值都是最新的。

1.7 size

尝试用不加锁的形式多次计算Concurrent的size,最多三次,比较前后统计出来的size次数,如果前后统计的size相等的话,那么就说明当前没有元素加入,返回size。

如果统计三次发现前后结果不一致,会获取所有的分段锁进行size统计,最后返回size。

public int size() { final Segment<K,V>[] segments = this.segments; int size; boolean overflow; // true if size overflows 32 bits long sum; // sum of modCounts long last = 0L; // previous sum int retries = -1; // first iteration isn't retry try { for (;;) { //死循环中最多统计三次,三次以上前后结果不相同 //获取所有的分段锁统计size if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } //如果前后次数相等,说明没有元素加入,返回结果 if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); } } return overflow ? Integer.MAX_VALUE : size; }

四、1.8的ConcurrentHashMap

在JDK1.8中,完全摒弃了分段锁segement的概念,使用了CAS+Synchronized来保证其线程安全,底层的数据结构跟正常的HashMap一样,都是node数组+链表+红黑树的结构,之前的HashEntry也被改成了Node实现。

1.8 初始化

private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 如果 sizeCtl < 0 ,说明另外的线程执行CAS 成功,正在进行初始化。 if ((sc = sizeCtl) < 0) // 让出 CPU 使用权 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { ... } } return tab; }

可以看出initTable的初始化是通过CAS和一个辅助变量sizeCtl来完成的,如果当前辅助变量sizeCtl < 0,代表有别的线程正在初始化,会通过Thread.yield让出当前Cpu的执行权。

-1 说明正在初始化 -N 说明有N-1个线程正在进行扩容

1.8 put

如果当前table为空,则会执行InitTable初始化。

如果当前index位置上元素是空的,会通过CAS尝试写入。

CAS尝试写入失败之后,或者当前index结点有值,会通过Synchronized对当前结点进行加锁,并进行put操作,最后判断当前node要不要树化。

可以看到,相对于1.7 segement来说,synchronized锁范围更小,锁住也就只是当前结点的node,并不影响其他结点,锁颗粒度小,并发度更高。此外,synchronized是基于JVM的,JVM对synchronized有一个锁升级的过程,效率比较高。

CAS 与 ABA

CAS(Compare and swap),主要关联有三个值:

当前变量的值期望值新值

只有当当前变量的值 == 期望值时,那么就把当前变量的值设置为新值。

有可能出现ABA问题,也就是另外一个线程快速的把当前变量从A修改成B再修改成A,对于原线程来说,当前变量的值还是A,但是实际上是已经被修改过的。

解决ABA问题可以用到版本号、时间戳等(具体可看atomic原子类)

1.8 get

跟1.7 一样,value是用volatile修饰的,能够第一时间从主存拿到最新的值。

如果当前位置元素存在并且key相等,直接返回value。

如果当前位置元素的hash < 0,那么有可能正在扩容或者是红黑树查找。

如果当前元素是链表,那么就遍历链表查询。

public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; // key 所在的 hash 位置 int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 如果指定位置元素存在,头结点hash值相同 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) // key hash 值相等,key值相同,直接返回元素 value return e.val; } else if (eh < 0) // 头结点hash值小于0,说明正在扩容或者是红黑树,find查找 return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { // 是链表,遍历查找 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }

1.8 size

1.8的size就有点复杂了,size是等于baseCount和counterCell数组各子项的和。

size()和mappingCount()都可以返回size,但是size返回类型是int,而mappingCount返回的是long,推荐使用mappingcount,不会因为size方法限制最大值。

//size 和 mappingCount都可以获取size (int) concurrentHashMap.size(); (long) concurrentHashMap.mappingCount(); //主要方法sumCount() final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }

关键:baseCount、CounterCell[]两个辅助变量,sumCount方法就是通过计算baseCount和counterCell的值得到value,实际上在put的时候会调用addCount方法。

private final void addCount(long x, int check) { CounterCell[] as; long b, s; //通过CAS修改baseCount if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { //CAS修改baseCount失败,CAS修改counterCells CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //CAS修改counterCells失败,进入fullAddCount(),死循环直到成功。 fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); }

首先会通过CAS修改baseCount,CAS修改baseCount失败,再通过CAS修改counterCells数组中的值,如果失败就会fullAddCount()一直死循环直到成功。

五、快速失败(fail-fast) & 安全失败(fail—safe)

快速失败是针对迭代器遍历时,因为modCount != expectmodCount然后带来的concurrentModifycation异常,哪怕是单线程也会。

经典例子:

Iterator it = list.iterator(); while(it.hasNext()){ Integer number = iterator.next(); if (number % 2 == 0) { // 抛出ConcurrentModificationException异常 list.remove(number); } //it.remove() 就不会 }

安全失败,JUC下的容器都是安全失败的,因为他们在用迭代器遍历时,都不是直接遍历,是copy一份原内容进行遍历的。

六、参考资料

https://mp.weixin.qq.com/s/AixdbEiXf3KfE724kg2YIw https://juejin.im/post/5be62527f265da617369cdc8 https://zhuanlan.zhihu.com/p/40627259

最新回复(0)