前言
jdk1.5之前,线程安全的容器类有HashTable
,Vector
等实现。但是其内部使用的是synchronized
内置锁这样重量级的实现,在高并发的情况下大大影响性能。 jdk1.5之后,j.u.c
中有了基于CAS
实现的更加高并发的实现来极大的提高伸缩性。
ConcurrentHashMap
ConcurrentHashMap
是当前最常使用的并发容器之一,它同样也实现了AbstractMap
接口,所以使用它就像是使用HashMap
等线程不安全的容器一样使用。
存储结构
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
}
和HashMap
类似,也是基于哈希桶的结构,最主要的区别在于,ConcurrentHashMap
采用了分段锁Segment
来加锁,并不是整个结构都加锁,使得多个线程可以访问不同分段锁上的桶,从而实现更高的并发。
关于并发等级
,即分段锁的个数,在源码里默认的并发等级是16:
/**
* The default concurrency level for this table. Unused but
* defined for compatibility with previous versions of this class.
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
Segment
的结构:
/**
* Stripped-down version of helper class used in previous version,
* declared for the sake of serialization compatibility
*/
static class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
final float loadFactor;
Segment(float lf) { this.loadFactor = lf; }
}
所以其大致的结构是这样的:
重要的方法
put(K key, V value)
public V put(K key, V value) {
return putVal(key, value, false);
}
如果key存在也替换value值:
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
简诉以上代码逻辑:
- 判断key,value均不能为null。
- 计算key的hash值。
- 开始遍历整个table。
- table为null时初始化table
- 计算节点位置,如果该位置没有别的节点则直接插入,不需要加锁
(fh = f.hash) == MOVED
,如果有线程正在扩容,则先帮助扩容 4 如果该节点位置有别的节点,加锁处理。- 如果该节点位置上目前
fh >= 0
,则为链表结构,遍历链表,如果key节点相同则替换value,否则插入链表尾部。 - 如果该位置节点上是
TreeBin
类型,则以红黑树的方式赠加节点。
- 如果该节点位置上目前
- ConcurrentHashMap 的 size + 1。
get(Object key)
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
简诉以上代码逻辑:
- 计算key的hash值。
- 判断table是否有值,没值直接返回null。
- 获取table中的node节点,如果是链表则遍历查找到相同key的value,如果是红黑树则调用
e.find(h, key)
找到value。
size()
size()操作是一个费劲的操作,在1.7以及之前的版本中,size()是需要获取每个Segment
中的count值累加获取的,由于可能存在size的同时有线程正在put或者remove操作, 所以其返回的值可能不精确。
而1.8之后,ConcurrentHashMap提供了baseCount
、counterCells
两个辅助变量和一个CounterCell
辅助内部类来计算size()。
具体代码暂不展示了,在实际操作中实际上很少用到size()
方法。
CopyOnWriteArrayList
CopyOnWriteArrayList
是用于替代同步List,它提供了更好的并发性。同理的CopyOnWriteArrayList
是替代同步Set。
写入时复制(Copy-On-Write)
,这一类容器的特点是,访问该对象时不需要再进行一次同步,但每次修改时都会创建并重新发布一个新的容器副本从而实现可变性。
读写分离,写在一个新的副本数组上执行,此处需加锁;读操作直接读原始数组。写操作完成后会把原始数组指向新数组:
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
final void setArray(Object[] a) {
array = a;
}
因此,增删元素等都是花费很大的开销,典型的空间换时间的方式,只有当迭代操作远远多于修改操作时,才应当使用Copy-On-Write
容器。
总结
除此之外,还有一些有各自应用场景的并发集合类比如ConcurrentSkipListMap
和ConcurrentSkipListSet
。 最主要的重点集合类便是ConcurrentHashMap
了,其代码在jdk1.7和1.8之间还有不同。建议阅读源码顺序:1.7的HashMap
->1.7的ConcurrentHashMap
->1.8的HashMap
->1.8的ConcurrentHashMap
。