java.util.concurrent.ConcurrentHashMap类

java.util.concurrent.ConcurrentHashMap类


(一)数据结构

数组 + 链表 + 红黑树


1.JDK7及之前

ReentrantLock分段锁:

  1. Segment数组
  2. HashEntry数组+链表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class Segment<K, V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;

//HashEntry数组,是用于存放真正数据的桶
//【和HashMap很类似,内部是数组+链表,不同的是它用volatile去修饰它的数据value以及下一个节点next】
transient volatile HashEntry<K, V>[] table;

transient int count;

//集合修改次数,用于fail-fast
transient int modCount;

//容量
transient int threshold;

//负载因子
transient float loadFactor;
}
  1. 一个线程占用锁访问一个Segment时,不会影响到其他的Segment
  2. get()不需要加锁;put()首先尝试获取锁,如果失败会调用scanAndLockForPut()自选获取锁,如果重试次数达到了MAX_SCAN_RETRIES则改为阻塞获取锁,保证获取成功

2.JDK8中的优化

  1. 数组 + 链表/红黑树:链表长度超过8需要转化为红黑树,降低遍历耗时
  2. 取消分段锁,采用CAS + synchronized,享受synchronized的锁优化

(二)并发安全性

  1. synchronized每个数组Entry元素(或者说bucket)设置为synchronized,这样如果一个Entry只允许一个线程持有,但是与此同时,其他Entry不受影响
  2. CAS:①允许多线程初始化元素数组,不加锁而是使用CAS机制,保证只有一个线程可以初始化成功;②put()时如果对应的Entry数组元素为null,则允许多线程插入数组,但是通过CAS保证只有一个线程插入数组成功

ConcurrentHashMap的“线程安全”】

所有人都会告诉你ConcurrentHashMap是线程安全的,因为这是真的。
可是读到下面的Spring的源码,你是否和我一样,大大的眼睛写满了满满的疑惑?ε(┬┬﹏┬┬)3当然不是每个人都和我一样有一双卡姿兰大眼睛的,你小小的眼睛写满了满满的疑惑也是可以的。
什么?这个synchronized是干嘛使的?ConcurrentHashMap不是线程安全的吗?
恭喜你,和我钻了同样的牛角尖。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//singletonObjects是一个ConcurrentHashMap<String, Object>实例
public void registerSingleton(String beanName, Object singletonObject) throws IllegalStateException{
//Bean不可无名称
Assert.notNull(beanName, "'beanName' must not be null");

//来了来了!用synchronized锁住了ConcurrentHashMap实例
//不是说ConcurrentHashMap是线程安全的吗?为什么还要这么做呢?
synchronized(this.singletonObjects){
//ConcurrentHashMap # get(K key):读
Object oldObject = this.singletonObjects.get(beanName);
//Bean已存在
if(oldObject != null){
throw new IllegalStateException("...");
}
//Bean不存在,则注册
addSingleton(beanName, singletonObject);
//如果不加synchronized,get之后put之前可能被截胡
}
}
  • ConcurrentHashMap的线程安全,指的是:多个线程的读(get)和写(put)是线程安全的,它可以支持完全并发的读,并且保证多个线程写只有一个会成功
  • 如果你多个线程同时读写,想要保证自己的数据准确性,这不是ConcurrentHashMap所能保证的!需要你自己做出额外的同步,比如synchronized。请看下面的代码:
1
2
3
4
5
6
ConcurrentHashMap<String, String> map;
String value = map.get(key);
if(value == null) {
value = "newValue";
map.put(key, x);
}
  • 上述代码中,试想一下:如果一个线程调用了map.get(key)获得了value,而此时value就是null,所以它会调用map.put(key, x)尝试写入;
  • 但是,在该线程get之后、put之前,有一个线程调用了map.put(key, y),那么该线程调用put(key, x)必然就会覆盖yx有可能就是脏数据(但不绝对,取决于版本号)。

原因

ConcurrentHashMap只保证读写线程安全,但是不会通过互斥措施保证数据的准确性

解决措施

自己实现ConcurrentHashMap的互斥措施,比如对整个ConcurrentHashMap实例加synchronized

再回过头来看Spring中Bean的注册,就能够理解了:
之所以有synchronized(this.singletonObjects),就是为了防止get(beanName)时发现Bean不存在、但是在put(beanName, obj)之前却被其他线程抢先创建了,导致出现脏数据。

结论

  • ConcurrentHashMap能够保证完全并发的读、同一时刻只有一个线程能写,所以它被认为是“线程安全”的
  • 但是如果有多个线程写,ConcurrentHashMap并不能保证写入的数据就是你想要的数据

    如果你觉得线程安全就是“不采用额外的同步措施就能保证获得的数据就是我想要的”,从而认定ConcurrentHashMap不是线程安全的,那么你一定是疯了( ̄ε(# ̄)那么多线程都去插入ConcurrentHashMap,它TM怎么能保证自己就一定是你的形状?你得自己给它加个锁啊

put之后再get数据不一致

  • 这个问题还可以换一种方式来问:在一个线程中操作ConcurrentHashMapput之后立马get,获得的数据一定是我想要的数据吗?
  • 现在我们可以回答这个问题了,put之后释放锁,另一个线程在当前线程get之前调用了put覆盖了当前线程写入的值,你获得的数据就不是你自己想要的。
  • 除非你实现自己的互斥措施,比如给ConcurrentHashMap对象加synchronized

(三)成员变量

  • Node<K, V>[] table存储ConcurrentHashMap节点元素的数组

  • Node<K, V>[] nextTable存储扩容后的ConcurrentHashMap节点元素的数组,数组扩容为原来的2倍

    使用的思想是Copy On Write

  • int sizeCtl多个线程共享的变量,是操作的控制标识符,在不同的地方有不同的值和用途

    -1已经有其他线程正在初始化table
    -N:N-1个线程正在进行扩容;
    0table数组尚未初始化;
    N:正数,表示下一次扩容时的容量大小

  • Node ForwardingNode一个特殊的Node节点,哈希值为-1,存储着nextTable的引用,在扩容时发挥作用,作为一个占位符放在table中表示当前节点为null或已被移动


(四)API

1.构造函数

  1. public ConcurrentHashMap():无参构造函数,默认容量大小16,sizeCtl == 0,表示尚未初始化
  2. public ConcurrentHashMap(int initialCapacity):传入容量大小的构造函数
  3. public ConcurrentHashMap(Map<? extends K, ? extends V> m):传入指定Map的构造函数
  4. public ConcurrentHashMap(int initialCapacity, float loadFactor):传入容量大小和负载因子的构造函数
  5. public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel):传入容量大小、负载因子和并发数大小的构造函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

//无参构造函数
//默认容量大小是16,默认sizeCtl为0,表示尚未初始化
public ConcurrentHashMap() {
}

//传入容量大小的构造函数
public ConcurrentHashMap(int initialCapacity) {
//如果传入的容量大小小于0 则抛出异常。
if (initialCapacity < 0)
throw new IllegalArgumentException();
//最大容量不得超过MAXIMUM_CAPACITY的一半,即2^29(原因是每次扩容1倍)
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
//tableSizeFor这个函数即实现了将一个整数取2的幂次方
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
//将上面计算出的cap 赋值给sizeCtl,注意此时sizeCtl为正数,代表进行扩容的容量大小
this.sizeCtl = cap;
}

//包含指定Map的构造函数
//置sizeCtl为默认容量大小,即16
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

//传入容量大小和负载因子的构造函数
//默认concurrencyLevel是1
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}

//传入容量大小、负载因子和并发数大小的构造函数
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
//如果initialCapacity < concurrencyLevel,则容量大小取并发数大小,这样做的原因是确保每一个Node只会分配给一个线程
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
//size不得超过MAXIMUM_CAPACITY
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

2.put:插入

  • public V put(K key, V value)
  1. ConcurrentHashMap不允许null key/null value,如果key/valuenull抛出NullPointerException

    HashMap允许null key
    HashTable不允许null key

  2. 对于非null key,调用扰动函数spread()处理key.hashCode()获取hash

    ConcurrentHashMap的扰动函数是(h ^ (h >>> 16)) & HASH_BITS

  3. h ^ (h >>> 16)HashMap相同,很好理解,就是让高16位和低16位异或,让元素分布更加均衡

  4. 那么& HASH_BITS的目的是什么呢?ConcurrentHashMap中,如果一个Entry数组元素的hash值为-1,表示该ConcurrentHashMap需要扩容,而& HASH_BITS可以保证得到的hash值高位为0,即正数,这样可以保证只有需要扩容时才会扩容,不会由于计算得到的hash == -1而导致误扩容

  5. 如果当前table为空,即ConcurrentHashMap未初始化,则允许多线程初始化元素数组,不加锁而是使用CAS机制,保证只有一个线程可以初始化成功

  6. hash & (n - 1)获取key对应的数组下标,若对应数组元素为null,则允许多线程put(),但是通过CAScasTabAt()方法保证只有一个线程插入数组成功,如果成功则完成put(),结束返回。插入失败(被其他线程截胡)则继续往下执行

  7. 如果该下标上的头结点(即数组Entry元素)的哈希地址为-1,调用helpTransfer()扩容

  8. 如果该下标的bucket非空,且不需要扩容,则进入bucket,通过synchronized锁住整个bucket

    注意只是锁住对应下标的bucket,其他bucket仍然是可以访问的,这也是一种分段锁的思想。

  9. 判断bucket存储的是链表节点还是红黑树节点 —— 如果是链表节点,遍历整个链表,如果有key相同的节点,则根据传入参数选择覆盖或不覆盖;如果没有则尾插。判断链表长度是否大于阈值(8),如果超过则需要转为红黑树

  10. 如果是红黑树节点,则直接插入节点

  11. addCount()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
//不允许null key和null value
if(key == null || value == null) throw new NullPointerException();
//通过扰动函数获取hash值
int hash = spread(key.hashCode());
int binCount = 0;
for(Node<K, V> tab = table;;) {
Node<K, V> f; int n, i, fh;
//如果table为空,调用initTable()初始化
if(tab == null || (n = tab.length) == 0)
tab = initTable();
//数组下标对应位置尚未被插入元素
else if(f = tabAt(tab, i = (n - 1) & hash) == null){
//CAS插入:null表示期待值为null,如果是null则插入成功
if(casTabAt(tab, i, null,
new Node<K, V>(hash, key, value, null)))
bread;
}
//如果数组下标上的节点hash == -1,代表需要扩容
else if((fh = f.hash) == MOVED) {
tab = helpTransfer(tab, f);
}
//不需要初始化也不需要扩容,那么就直接插入
else{
V oldVal = null;
//锁住单个bucket
synchronized(f) {
if(tabAt(tab, i) == f){
//如果下标上的节点hash值大于0,表示这个是链表节点
if(fh >= 0){
binCount = 1;
//遍历链表
for(Node<K, V> e = f;;++binCount){
K ek;
//如果插入元素已存在
if(e.hash == hash
&& ((ek = e.key) == key
|| (ek != null && key.equals(ek)))){
oldVal = e.val;
//替换
if(!onlyIfAbsent)
e.val = value;
break;
}
Node<K, V> pred = e;
//尾插
if((e = e.next) == null){
pred.next = new Node<K, V>(hash, key, value, null);
break;
}
}
}
//红黑树节点,直接插入
else if(f instanceof TreeBin) {
Node<K, V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash,
key,value)) != null){
oldVal = p.val;
if(!onlyIfAbsent)
p.val = value;
}
}
}
}
//判断链表是否需要转红黑树
if(binCount != 0) {
if(binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if(oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

//扰动函数:
//1.高16位与低16位,使元素分布更均匀,这一点和HashMap一样
//2.& HASH_BITS(0x7fffffff)的目的是让高位为0,即保证hash值为正数,
//因为如果hash值为-1表示要扩容,必须要避免这种可能性出现
static final spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

//初始化table:允许多线程竞争,通过CAS保证只有一个线程成功
//1.判断table是否为null,如果非null表明已有其他线程初始化了,直接返回
//2.如果table为null,进入循环
//3.如果sizeCtl < 0,表明其他线程正在初始化,直接挂起线程
//4.如果sizeCtl >= 0,则CAS将sizeCtl设为-1,表示自己在初始化
//5.如果CAS设置成功,判断table是否为null,不为空则初始化table,
//默认初始容量大小为16
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl < 0,已有其他线程正在初始化
if ((sc = sizeCtl) < 0)
Thread.yield(); //挂起
//CAS将sizeCtl设置为-1,表示自己想要初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//再次确认当前table为null即还未初始化
if ((tab = table) == null || tab.length == 0) {
//如果sc(sizeCtl)大于0,则n=sc,否则n=默认的容量大
16
//这里的sc=sizeCtl=0,即如果在构造函数没有指定容量
大小,
//否则使用了有参数的构造函数,sc=sizeCtl=指定的容量大小。
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//创建指定容量的Node数组(table)。
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//计算阈值,n - (n >>> 2) = 0.75n当ConcurrentHashMap储存的键值对数量
//大于这个阈值,就会发生扩容。
//这里的0.75相当于HashMap的默认负载因子,可以发现HashMap、Hashtable如果
//使用传入了负载因子的构造函数初始化的话,那么每次扩容,新阈值都是=新容
//量 * 负载因子,而ConcurrentHashMap不管使用的哪一种构造函数初始化,
//新阈值都是=新容量 * 0.75。
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

3.get:查找

  • public V get(Object key)
  1. 调用扰动函数spread()计算key.hashCode()得到hash
  2. hash & (n - 1)获得key对应的数组下标,如果非空则到bucket中查找
  3. 如果bucket的头结点hash < 0,代表这个bucket存储的是红黑树;如果hash > 0,代表这个bucket存储的是链表
  4. 查找key对应的value
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//扰动函数求key.hashCode()得到hash值
int h = spread(key.hashCode());
//如果table不为空 且 table长度大于0 且 计算出的下标上bucket不为空,
//则代表这个bucket存在,进入到bucket中查找,
//其中(n - 1) & h为计算出键key相对应的数组下标的算法。
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//如果哈希地址、键key相同则表示查找到,返回value,这里查找到的是头节点。
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//如果bucket头节点的哈希地址小于0,则代表bucket为红黑树,在红黑树中查找。
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//如果bucket头节点的哈希地址不小于0,则代表bucket为链表,遍历链表,在链表中查找。
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

4.remove:删除

public V remove(Object key)

  1. 调用扰动函数spread()得到hash
  2. hash & (n - 1)得到数组下标,如果table为空或bucket为空,返回null
  3. 如果该下标上的头结点(即数组Entry元素)的哈希地址为-1,调用helpTransfer()扩容
  4. 如果table和bucket都不为空,table也不处于在扩容状态,则锁住当前bucket,对bucket进行操作
  5. 判断节点是链表还是红黑树节点,调用对应的删除方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public V remove(Object key) {
return replaceNode(key, null, null);
}

final V replaceNode(Object key, V value, Object cv) {
//扰动函数求hash
int hash = spread(key.hashCode());
//遍历table
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//table或key所在的bucket为空,则表示无需要删除元素
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
//如果该下标上的头结点hash == -1,调用helpTransfer()扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
//将键key所在的bucket加锁
synchronized (f) {
if (tabAt(tab, i) == f) {
//bucket头节点hash >= 0,为链表。
if (fh >= 0) {
validated = true;
//遍历链表。
for (Node<K,V> e = f, pred = null;;) {
K ek;
//找到hash、key相同的节点,进行移除。
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
//如果bucket的头节点hash < 0,即为红黑树。
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
//找到节点,并且移除
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
//调用addCount方法
//将当前ConcurrentHashMap存储的键值对数量-1
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}

(五)extends

  • AbstractMap

(六)implements

  • ConcurrentMap
-------------本文结束感谢您的阅读-------------

本文标题:java.util.concurrent.ConcurrentHashMap类

文章作者:DragonBaby308

发布时间:2019年08月30日 - 18:39

最后更新:2020年01月16日 - 22:29

原始链接:http://www.dragonbaby308.com/java-util-concurrent-ConcurrentHashMap/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。

急事可以使用右下角的DaoVoice,我绑定了微信会立即回复,否则还是推荐Valine留言喔( ఠൠఠ )ノ
0%