ConcurrentHashMap

1.6

// 用来存放数据的数组
final Segment<K,V>[] segments;

1.8

// 存放数据的数组
transient volatile Node<K,V>[] table;
/**
 * 用来初始化数组大小或者进行扩容
 * 
 * 数组初始化前
 * -1:当前存在线程对数组进行初始化
 * n:ConcurrentHashMap构造函数传入了容量大小
 * 
 * 数组初始化后,该值为数组进行扩容的临界值
 */
private transient volatile int sizeCtl;
public V put(K key, V value) {
    return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
    // key 和 value 都不能为空
    if (key == null || value == null) throw new NullPointerException();
    // 计算 hash 值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // 如果 table 是空的,则进行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 如果当前索引位置为空,则直接创建
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 使用cas操作在数组的i位置创建新节点
            // 如果能创建成功,则结束for自循,否则继续自旋
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 如果当前槽点是转移节点,表示该槽点正在扩容,就会一直等待扩容完成
        // 转移节点的 hash 值是固定的,都是 MOVED
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            V oldVal = null;
            // 锁定当前槽点,其余线程不能操作,保证了安全
            synchronized (f) {
                // 再次判断i索引位置的数据有没有被修改
                if (tabAt(tab, i) == f) {
                    // fh>=0则为链表
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 如果该key已经存在,则直接替换节点的value,并退出循环
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                // 将旧value赋值给oldVal
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // e.next == null, 说明达到了链表的尾节点
                            if ((e = e.next) == null) {
                                // 在链表的尾部插入新增节点,然后退出循环
                                pred.next = new Node<K,V>(hash, key,
                                                          value, null);
                                break;
                            }
                        }
                    }
                    // 红黑树,这里没有使用 TreeNode,使用的是 TreeBin,
                    // TreeNode 只是红黑树的一个节点
                    // TreeBin 持有红黑树的引用,并且会对其加锁,保证其操作的线程安全
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        // 使用TreeBin的putTreeVal方法新增元素
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                              value)) != null) {
                            // 如果p不为空,则说明链表中存在该key
                            // 将旧value赋值给oldVal
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            // 如果 binCount 不为0,则说明put操作成功了
            if (binCount != 0) {
                // 这里判断是否需要转换成红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                // 如果存在旧值,则需要返回旧值
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 这里面会检查是否需要进行扩容
    addCount(1L, binCount);
    return null;
}
// 初始化数组
// 通过对 sizeCtl 的变量赋值来保证数组只能被初始化一次
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    // 这里使用了自旋来保证数组初始化成功
    while ((tab = table) == null || tab.length == 0) {
        // 如果 sizeCtl < 0, 说明有线程在初始化数组
        // 此时需要释放CPU资源
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // CAS 赋值保证当前只有一个线程在初始化,-1 代表当前有一个线程正在初始化
        // 保证了数组的初始化的安全性
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                // 再次判断当前数组是否为空
                if ((tab = table) == null || tab.length == 0) {
                    // DEFAULT_CAPACITY 默认为16
                    // 初始化数组大小
                    // 如果 sc > 0说明ConcurrentHashMap构造函数传入了容量大小
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    sc = n - (n >>> 2);
                }
            } finally {
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

ConcurrentHashMap 的扩容时机和 HashMap 相同,都是在 put 方法的最后一步检查是否需要扩容,如果需要则进行扩容,但两者扩容的过程完全不同,ConcurrentHashMap 扩容的方法叫做 transfer,从 put 方法的 addCount 方法进去,就能找到 transfer 方法,transfer 方法的主要思路是:

  1. 首先需要把老数组的值全部拷贝到扩容之后的新数组上,先从数组的队尾开始拷贝;
  2. 拷贝数组的槽点时,先把原数组槽点锁住,保证原数组槽点不能操作,成功拷贝到新数组时,把原数组槽点赋值为转移节点;
  3. 这时如果有新数据正好需要 put 到此槽点时,发现槽点为转移节点,就会一直等待,所以在扩容完成之前,该槽点对应的数据是不会发生变化的;
  4. 从数组的尾部拷贝到头部,每拷贝成功一次,就把原数组中的节点设置成转移节点;
  5. 直到所有数组数据都拷贝到新数组时,直接把新数组整个赋值给数组容器,拷贝完成。
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!