Java-Concurrent-ConcurrentHashMap
我们知道哈希表是一种非常高效的数据结构,设计优良的哈希函数可以使其上的增删改查操作达到O(1)级别。Java为我们提供了一个现成的哈希结构,那就是HashMap类,在前面的文章中我曾经介绍过HashMap类,知道它的所有方法都未进行同步,因此在多线程环境中是不安全的。为此,Java为我们提供了另外一个HashTable类,它对于多线程同步的处理非常简单粗暴,那就是在HashMap的基础上对其所有方法都使用synchronized关键字进行加锁。这种方法虽然简单,但导致了一个问题,那就是在同一时间内只能由一个线程去操作哈希表。即使这些线程都只是进行读操作也必须要排队,这在竞争激烈的多线程环境中极为影响性能。本篇介绍的ConcurrentHashMap就是为了解决这个问题的,它的内部使用分段锁将锁进行细粒度化,从而 使得多个线程能够同时操作哈希表,这样极大的提高了性能。下图是其内部结构的示意图。

1. ConcurrentHashMap有哪些成员变量?
//默认初始化容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
//默认加载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//默认并发级别
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//集合最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;
//分段锁的最小数量
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//分段锁的最大数量
static final int MAX_SEGMENTS = 1 << 16;
//加锁前的重试次数
static final int RETRIES_BEFORE_LOCK = 2;
//分段锁的掩码值
final int segmentMask;
//分段锁的移位值
final int segmentShift;
//分段锁数组
final Segment<K,V>[] segments;
在阅读完本篇文章之前,相信读者不能理解这些成员变量的具体含义和作用,不过请读者们耐心看下去,后面将会在具体场景中一一介绍到这些成员变量的作用。在这里读者只需对这些成员变量留个眼熟即可。但是仍有个别变量是我们现在需要了解的,例如Segment数组代表分段锁集合,并发级别则代表分段锁的数量(也意味有多少线程可以同时操作),初始化容量代表整个容器的容量,加载因子代表容器元素可以达到多满的一种程度。
2. 分段锁的内部结构是怎样的?
//分段锁
static final class Segment<K,V> extends ReentrantLock implements Serializable {
//自旋最大次数
static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
//哈希表
transient volatile HashEntry<K,V>[] table;
//元素总数
transient int count;
//修改次数
transient int modCount;
//元素阀值
transient int threshold;
//加载因子
final float loadFactor;
//省略以下内容
...
}
Segment是ConcurrentHashMap的静态内部类,可以看到它继承自ReentrantLock,因此它在本质上是一个锁。它在内部持有一个HashEntry数组(哈希表),并且保证所有对该数组的增删改查方法都是线程安全的,具体是怎样实现的后面会讲到。所有对ConcurrentHashMap的增删改查操作都可以委托Segment来进行,因此ConcurrentHashMap能够保证在多线程环境下是安全的。又因为不同的Segment是不同的锁,所以多线程可以同时操作不同的Segment,也就意味着多线程可以同时操作ConcurrentHashMap,这样就能避免HashTable的缺陷,从而极大的提高性能。
3. ConcurrentHashMap初始化时做了些什么?
//核心构造器
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) {
throw new IllegalArgumentException();
}
//确保并发级别不大于限定值
if (concurrencyLevel > MAX_SEGMENTS) {
concurrencyLevel = MAX_SEGMENTS;
}
int sshift = 0;
int ssize = 1;
//保证ssize为2的幂, 且是最接近的大于等于并发级别的数
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//计算分段锁的移位值
this.segmentShift = 32 - sshift;
//计算分段锁的掩码值
this.segmentMask = ssize - 1;
//总的初始容量不能大于限定值
if (initialCapacity > MAXIMUM_CAPACITY) {
initialCapacity = MAXIMUM_CAPACITY;
}
//获取每个分段锁的初始容量
int c = initialCapacity / ssize;
//分段锁容量总和不小于初始总容量
if (c * ssize < initialCapacity) {
++c;
}
int cap = MIN_SEGMENT_TABLE_CAPACITY;
//保证cap为2的幂, 且是最接近的大于等于c的数
while (cap < c) {
cap <<= 1;
}
//新建一个Segment对象模版
Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
//新建指定大小的分段锁数组
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
//使用UnSafe给数组第0个元素赋值
UNSAFE.putOrderedObject(ss, SBASE, s0);
this.segments = ss;
}