ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

JUC:ConcurrentHashMap

2022-01-18 16:34:34  阅读:187  来源: 互联网

标签:Node JUC ConcurrentHashMap key int tab table null


ConcurrentHashMap

看前

本博客只是为了记录,学习流程,学习源码其大部分看的是ConcurrentHashMap 1.8 源码分析这位大佬的博客,写的十分详细,图文并茂非常易懂,十分推荐。我这篇博客只是在他的基础上加了点自己的理解(肯定有不少理解不到位的地方)以及查询了一些没讲到的函数。

介绍

源码: java.util.concurrent.ConcurrentHashMap
HashMap有线程安全问题,导致出现链表死循环的情况。HashTable和ConcurrentHashMap没有线程安全问题,但二者上锁的方式不同,一个是HashTable全局一把锁,使用的是synchronized,ConcurrentHashMap是分段锁只锁住一个SegMent对象,其Segment对象继承于ReentrantLock。
JDK7: segment+Hashentry+reentrantlock
JDK8: synchronized+CAS
底层数据结构JDK7是segment数组+hashentry链表,JDK8是Node,treebin(红黑树),导致了二者的遍历时间复杂度有时不同。

源码分析

有了前面的HashMap基础,我们在讲解ConcurrentHashMap时只讲解二者不同之处。

主要属性

private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
static final int MOVED     = -1; // 表示正在转移
static final int TREEBIN   = -2; // 表示已经转换成树
static final int RESERVED  = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
transient volatile Node<K,V>[] table;//默认没初始化的数组,用来保存元素
private transient volatile Node<K,V>[] nextTable;//转移的时候用的数组
private transient volatile int sizeCtl;

我们可以发现相对于HashMap新加了一个sizeCtl,是一个非常重要的变量。它的意义可以解释为。

  • sizeCtl :
    • 默认为0,用来控制table的初始化和扩容操作
    • -1 代表table正在初始化
    • -N 表示有N-1个线程正在进行扩容操作
    • 其余情况:
      1. 如果table未初始化,表示table需要初始化的大小。
      2. 如果table初始化完成,表示table的扩容阈值,默认是table大小的0.75倍

希望在看源码前对基本的构成心中有大概的轮廓,看代码才会更加轻松
在这里插入图片描述

内部类

Node

static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }

        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }

        /**
         * Virtualized support for map.get(); overridden in subclasses.
         */
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }

相较于HashMap的Node实现,ConcurrentHashMap的Node需要在子类中实现setValue,并且实现了该节点往后节点值得寻找方法find。

TreeNode

static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;

        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }

        Node<K,V> find(int h, Object k) {
            return findTreeNode(h, k, null);
        }

        /**
         * Returns the TreeNode (or null if not found) for the given key
         * starting at given root.
         */
        final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
            if (k != null) {
                TreeNode<K,V> p = this;
                do  {
                    int ph, dir; K pk; TreeNode<K,V> q;
                    TreeNode<K,V> pl = p.left, pr = p.right;
                    if ((ph = p.hash) > h)
                        p = pl;
                    else if (ph < h)
                        p = pr;
                    else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
                        return p;
                    else if (pl == null)
                        p = pr;
                    else if (pr == null)
                        p = pl;
                    else if ((kc != null ||
                              (kc = comparableClassFor(k)) != null) &&
                             (dir = compareComparables(kc, k, pk)) != 0)
                        p = (dir < 0) ? pl : pr;
                    else if ((q = pr.findTreeNode(h, k, kc)) != null)
                        return q;
                    else
                        p = pl;
                } while (p != null);
            }
            return null;
        }
    }

与HashMap不同,HashMap的TreeNode继承于LinkedHashMap.Entry<K,V>,并且使用了上百行代码对其进行了完整实现。但在ConcurrentHashMap中的TreeNode只是继承于Node,并且重写了find方法。对于树的完整实现放到了哪里呢?在ConcurrentHashMap中放到了TreeBin中。

TreeBin

这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。由于代码过多我们只贴出来他的构造函数。

TreeBin(TreeNode<K,V> b) {
            super(TREEBIN, null, null, null);
            this.first = b;
            TreeNode<K,V> r = null;
            for (TreeNode<K,V> x = b, next; x != null; x = next) {
                next = (TreeNode<K,V>)x.next;
                x.left = x.right = null;
                if (r == null) {
                    x.parent = null;
                    x.red = false;
                    r = x;
                }
                else {
                    K k = x.key;
                    int h = x.hash;
                    Class<?> kc = null;
                    for (TreeNode<K,V> p = r;;) {
                        int dir, ph;
                        K pk = p.key;
                        if ((ph = p.hash) > h)
                            dir = -1;
                        else if (ph < h)
                            dir = 1;
                        else if ((kc == null &&
                                  (kc = comparableClassFor(k)) == null) ||
                                 (dir = compareComparables(kc, k, pk)) == 0)
                            dir = tieBreakOrder(k, pk);
                            TreeNode<K,V> xp = p;
                        if ((p = (dir <= 0) ? p.left : p.right) == null) {
                            x.parent = xp;
                            if (dir <= 0)
                                xp.left = x;
                            else
                                xp.right = x;
                            r = balanceInsertion(r, x);
                            break;
                        }
                    }
                }
            }
            this.root = r;
            assert checkInvariants(root);
        }

我们可以看到和HashMap一样熟悉的建立红黑树的操作。

ForwardingNode

在执行transfer操作期间,作为桶位的头结点。

三个原子操作

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
  • tabAt():获得在i位置上的Node节点
  • casTabAt():利用CAS算法设置i位置上的Node节点。
  • setTabAt():设置节点位置的值。
    而casTabAt()与setTabAt()方法的区别:
    所以真正要进行有原子语义的写操作需要使用casTabAt()方法,setTabAt()是在锁定桶的状态下需要使用的方法,如此方法实现只是带保守性的一种写法而已。

讲解的主要方法

//无参构造
public ConcurrentHashMap(){}
//设定容量
public ConcurrentHashMap(int initialCapacity){}
//将map转化为CHM
public ConcurrentHashMap(Map<? extends K, ? extends V> m){}
//对table进行初始化
private final Node<K,V>[] initTable()
//增
public V put(K key, V value)
//在同一个节点的个数超过8个的时候,会调用treeifyBin方法来看看是扩容还是转化为一棵树
private final void treeifyBin(Node<K,V>[] tab, int index)
数组扩容的主要方法就是transfer方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)
//查
public V get(Object key)
//删除
public V remove(Object key)

另外在讲解主要方法时会顺便讲解其调用的方法。

初始化

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
        //table初始化的操作只能有一个线程执行,sizectl<0说明有另外一个线程正在初始化。那么本线程的初始化资格就让出去,不再执行后序的初始化操作。
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
                //CAS将cs改为-1,表示本线程正在初始化。
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);//sizeCtl更新为容量的0.75
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

对于ConcurrentHashMap来说,调用它的构造方法仅仅是设置了一些参数而已。而整个table的初始化是在向ConcurrentHashMap中插入元素的时候发生的。如调用put、computeIfAbsent、compute、merge等方法的时候,调用时机是检查table==null。
初始化方法主要应用了关键属性sizeCtl 如果这个值<0,表示其他线程正在进行初始化,就放弃这个操作。在这也可以看出ConcurrentHashMap的初始化只能由一个线程完成。如果获得了初始化权限,就用CAS方法将sizeCtl置为-1,防止其他线程进入。初始化数组后,将sizeCtl的值改为0.75*n。

构造函数

public ConcurrentHashMap() {
    }
public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }

有了HashMap基础前两个好理解,不过第二个设定initialCapacity时,在最后将sizeCtl设置为cap。第三个将sizeCtl设置为初始容量。不用担心sizeCtl会在后序中进行改变。我们来看一下他调用的putAll方法。

public void putAll(Map<? extends K, ? extends V> m) {
        tryPresize(m.size());
        for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
            putVal(e.getKey(), e.getValue(), false);
    }

其中tryPresize

private final void tryPresize(int size) {
		//由于有容量2的次幂限制,将容量设为大于等于size的2的次幂值。与HashMap一样该功能由tableSizeFor实现。
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            //若table还未初始化,则进行初始化。
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                //此线程扩容期间,不允许其他线程进行扩容。
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            //c没有达到扩容阈值或本身长度已经到达最大值,则不扩容,直接退出。
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            //初始化了,准备将值放到新的table中。
            else if (tab == table) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

由于水平原因,笔者在这里遇到了个问题。
问题:31-39行代码永远不会执行。
解释:在6行对sc进行了赋值,进入while循环的都是sc>=0的情况。从while到执行sc<0之前的期间没有出现对sc重新赋值的情况,那么运行到sc<0的if判断时,一定为false。那么作者写这段代码的含义是什么?
下面讲解一下resizeStamp(n)函数,他执行之后,rs的二进制位表示中低16位为1,0-15位存储n二进制表示的最高非0位前0的个数值。不明白的看下面的博客讲解。
ConcurrentHashMap的源码分析-resizeStamp
结合上面的提示可以理解程序的执行流程,这里重点提一下40行的(rs << RESIZE_STAMP_SHIFT) + 2),CAS将SizeCtl的值变为(rs << RESIZE_STAMP_SHIFT) + 2),他的值一定为负数,因为rs的16位为1,在左移16位,那符号位为1,通过原子操作将SizeCtl变为相应的负数,并且创建一个两倍长度的新table,并赋值到nextTable上。下面我们只需要搞清楚一点如何将数据迁移到newtable中。
这是该类最难的transfer函数

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
    //若nextTab==null则建立newtable
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//扩张1倍的空间。
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            //老数据存在0-n-1,所以下一个索引为n
            transferIndex = n;
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
    //玄幻处理,i表示当前处理的桶下标。bound表示分得区间的下界。
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            //控制桶的序号
            while (advance) {
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            //判断是否结束
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            //当前桶位空。插入一个fwd
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            //当前桶已经迁移
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            //迁移桶内数据
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        //处理节点是链表
                        if (fh >= 0) {
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        //处理节点是红黑树
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

transfer可以拆分为这几个部分

  • 根据CPU数划分table的步长:

    • 若为单核CPU,步长为table.length
    • 若为多核CPU,步长为max((n >>> 3) / NPC, MIN_TRANSFER_STRIDE)
  • 若传入的nextTab为null,创建nextTable

    • 将创建的nextTab赋给全局变量nextTable
    • 将用于控制扩容区间的分配transferIndex初始化为table.length
  • 开始循环处理区间内各个桶的迁移,由advance控制程序。

    • while循环对区间[bound, i]进行分配或调整(–i)(27-43)
    • 开始下面四个分支
      • i越界:判断当前区间的迁移是否结束;(45-59)
      • 当前桶位null,插入一个fwd
      • 当前桶已经迁移,跳过
      • 其他:迁移桶内数据
        • 链表(72-101)
        • 红黑树(103-137)
  • 27-43:控制区间

while (advance) {
    int nextIndex, nextBound;
    // 选择下一个桶或处理结束信号
    if (--i >= bound || finishing)
        advance = false;
    // 没有新的区间可以分配
    else if ((nextIndex = transferIndex) <= 0) {
        i = -1;
        advance = false;
    }
    // 分配区间
    else if (U.compareAndSwapInt
             (this, TRANSFERINDEX, nextIndex,
              nextBound = (nextIndex > stride ?
                           nextIndex - stride : 0))) {
        bound = nextBound;
        i = nextIndex - 1;
        advance = false;
    }
}

while循环求得本次这一次多线程处理的区间范围。第一次循环1,2判断语句都不成立,进入第三个,生成第一个区间[bound,i]其中bound为nextBound,nextBound在cas中被改为nextIndex-stride,所以第一次区间为[nextIndex-stride, nextIndex-1],更新。下一次循环进入第一个判断语句,得到当前处理的i后,退出while,最后一次循环进入第二条判断语句,从后往前遍历到头了。

  • 45-59:结束条件
// i<0时,已经完成扩容,暂不知道(i>=n || i+n>=nextn)的含义
if (i < 0 || i >= n || i + n >= nextn) {
    int sc;
    // finishing为true则结束
    if (finishing) {
        nextTable = null; // 置空
        table = nextTab; // 替换
        sizeCtl = (n << 1) - (n >>> 1); // sizeCtl设为0.75倍的table.length
        return;
    }
    // finishing为false,但是该线程的任务已经完成,sizeCtl中的线程数减1
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        // 判断sizeCtl是否回到刚开始扩容的状态
        // 是则说明所有用于迁移的线程都结束工作,否则直接返回
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        // finishing置为true
        finishing = advance = true;
        // i置为table.length,全部扫描一遍,确定所有桶都已经完成迁移
        i = n; // recheck before commit
    }
}

当i<0时,当前线程已经完成了table最后一个区间,对finishing进行判断:

  • finishing为true:

    • 全局变量nextTable为空
    • 替换table
    • 设置sizeCtl
  • finishing为false:

    • 原子操作对sizeCtl中线程数-1
    • 判断是否所有线程都结束工作,否则返回,是则继续
    • finishing为true
    • i为table.length,下一个循环开始全局检查。
  • 72-101:迁移链表

// 上锁,与putVal()同步,f为当前桶的头节点
synchronized (f) {
    if (tabAt(tab, i) == f) {
        Node<K,V> ln, hn;
        // hash>0为链表,树节点的hash=-2
        if (fh >= 0) {
            // 决定移动到低位桶还是高位桶
            int runBit = fh & n;
            Node<K,V> lastRun = f;
            // 找到最后将迁移到同一个桶的所有节点,
            // 这部分不需要创建新的节点,而是直接迁移
            for (Node<K,V> p = f.next; p != null; p = p.next) {
                int b = p.hash & n;
                if (b != runBit) {
                    runBit = b;
                    lastRun = p;
                }
            }
            if (runBit == 0) {
                ln = lastRun;
                hn = null;
            }
            else {
                hn = lastRun;
                ln = null;
            }
            // 根据每个节点的hash值迁移,由于节点中的key是不可变的,需要创建新的节点
            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                int ph = p.hash; K pk = p.key; V pv = p.val;
                if ((ph & n) == 0)
                    // 头插法,创建新节点时把自己作为next节点传入
                    // 最后链表的顺序将会颠倒(除了lastRun之后的)
                    ln = new Node<K,V>(ph, pk, pv, ln); 
                else
                    // 同上
                    hn = new Node<K,V>(ph, pk, pv, hn);
            }
            setTabAt(nextTab, i, ln); // 原子操作把低位桶置入新表
            setTabAt(nextTab, i + n, hn); // 原子操作把高位桶置入新表
            setTabAt(tab, i, fwd); // 原子操作把fwd置入旧表表示已经迁移
            advance = true;
        }
        else if (f instanceof TreeBin) {
            ...
        }
    }
}

与HashMap扩容一样,链表中要分为两支队伍,根据更高一位是0 or 1来分队伍。在这里用头插法,然后将两队挂到不同的对应索引下。第一个for循环用来找到并构建其中一个新链表的最后位置(因为第二个for中用到的是头插法构建新链表),也是后面for循环的终止条件。

  • 103-137:迁移红黑树
synchronized (f) {
    if (tabAt(tab, i) == f) {
        Node<K,V> ln, hn;
        if (fh >= 0) {
            ...
        }
        // 节点为红黑树
        else if (f instanceof TreeBin) {
            TreeBin<K,V> t = (TreeBin<K,V>)f;
            TreeNode<K,V> lo = null, loTail = null;
            TreeNode<K,V> hi = null, hiTail = null;
            int lc = 0, hc = 0;
            // 遍历
            for (Node<K,V> e = t.first; e != null; e = e.next) {
                int h = e.hash;
                TreeNode<K,V> p = new TreeNode<K,V>
                    (h, e.key, e.val, null, null);
                // 和链表相同的判断,与运算==0的放在低位
                if ((h & n) == 0) {
                    if ((p.prev = loTail) == null)
                        lo = p;
                    else
                        loTail.next = p;
                    loTail = p;
                    ++lc;
                } // 不是0的放在高位
                else {
                    if ((p.prev = hiTail) == null)
                        hi = p;
                    else
                        hiTail.next = p;
                    hiTail = p;
                    ++hc;
                }
            }
            // 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
            (hc != 0) ? new TreeBin<K,V>(lo) : t;
            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
            (lc != 0) ? new TreeBin<K,V>(hi) : t;
            // 低位树
            setTabAt(nextTab, i, ln);
            // 高位数
            setTabAt(nextTab, i + n, hn);
            // 旧的设置成占位符
            setTabAt(tab, i, fwd);
            // 继续向后推进
            advance = true;
        }
    }
}

前面和HashMap中的split一致,逻辑也一致,只是使用了原子操作将两条链表或树挂到对应位置。

插入

主函数

public V put(K key, V value) {
        return putVal(key, value, false);
    }
final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

注意:hash值进行了更新操作为 hashcode ^ (hashcode >>> 16)) & HASH_BITS
函数流程:

  • table未被初始化,初始化table
  • table已经初始化,但是对应桶为空,新建Node加入,跳出
  • table已经初始化,且相应桶不为空,但桶正在迁移,当前线程帮助迁移。
  • table已经初始化,且相应桶不为空,且该桶未在迁移,将键值对加入桶中。
    • 桶中是链表,遍历这个链表:
      • 若存在:根据onlyIfAbsent修改value,跳出
      • 若不存在创建新的node挂到尾部,跳出
    • 桶中是红黑树: 通过TreeNode中putTreeValue写入。
      上面函数没有讲过的有treeifBin(),addCount()。二者都包含了扩容操作,但是扩容的结果是不同的。
      将treeifyBin中使用tryPresize进行的扩容为P扩容。
      将addCount进行的扩容称为C扩容。

treeifyBin

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 扩容
            tryPresize(n << 1); 
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 转化红黑树
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

如果插入一个键值对后,链表长度大于等于 TREEIFY_THRESHOLD,就需要进行扩容或转化红黑树。treeifyBin() 是这一操作的入口,首先判断 table 容量是否小于 MIN_TREEIFY_CAPACITY,如果是则进行 P 扩容(由 tryPresize() 进行的,有别于下文中由 addCount() 进行的),否则进行红黑树转化。

addCount

private final void addCount(long x, int check) {
    // 计数部分,暂不考虑
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                //CAS失败进入fulladdCount(x,false),否则fullAddCount(x,true)
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
    // 判断是否C扩容
    if (check >= 0) { 
        Node<K,V>[] tab, nt; int n, sc;
        // 满足三个条件则C扩容:1.size达到sizeCtl;2.table非空;3.table长度非最大值
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
               (n = tab.length) < MAXIMUM_CAPACITY) {
            // 以下为C扩容,和tryPresize()中的一个分支完全一致
            int rs = resizeStamp(n);
            if (sc < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                    transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                         (rs << RESIZE_STAMP_SHIFT) + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

20行之前addCount通过CounterCell数组计算键值对个数,后面对check进行判断若其>=0,且

  1. 键值对数达到szieCtl
  2. table非空
  3. table长度并非最大值。
    进行C扩容。下面的结构与tryPresize中的一个分支完全一致。

计数

主要讲解的就是addCount前20行实现。
通过 size() 可以获得当前键值对的数量,它将 sumCount() 获得的 long 类型的值转化为 int 返回。
sumCount() 则计算 baseCount 字段与 counterCells 数组中所有非空元素的记录值的和。

// 未发生争用前都用它计数
private transient volatile long baseCount;
// 用于同步counterCells数组结构修改的乐观锁资源
private transient volatile int cellsBusy;
// 支持多个线程同时通过counterCells中的多个元素计数
private transient volatile CounterCell[] counterCells;
public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 : 
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : 
            (int)n); 
}
// 计算 baseCount 字段与所有 counterCells 数组的非空元素的和
final long sumCount() {
    CounterCell[] as = counterCells; CounterCell a;
    long sum = baseCount;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

CHM将每个桶中的键值对数量保存在baseCount和CounterCell 数组 counterCells 中。在什么地方对baseCount 和 counterCells进行赋值的呢?在addCount前20行。
addCount()中有两层if,第一层:

  • counterCells不为空:跳转到第二层if;
  • counterCells为空:CAS操作增加baseCount,成功结束if,失败进入二层if
    第二层if创建counterCells
  • counterCells==null || couterCells[线程hash]==null,调用fullAddCount(x,true)
  • couterCells[线程hash]!=null,CAS更改cellvalue值
    • 成功:结束
    • 失败:调用fullAddCount(x, false)【线程冲突】
private final void fullAddCount(long x, boolean wasUncontended) {
        int h;
    	//线程Hash是否初始化
        if ((h = ThreadLocalRandom.getProbe()) == 0) {
            //初始化
            ThreadLocalRandom.localInit();      // force initialization
            h = ThreadLocalRandom.getProbe();
            //初始化后不会发生冲突
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            CounterCell[] as; CounterCell a; int n; long v;
            //counterCells是否可以用来计数,即判断他整体是否为空
            if ((as = counterCells) != null && (n = as.length) > 0) {
                //这个桶为空,初始化counterCells
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {            // Try to attach new Cell
                        //初始化,记录这个桶内的元素数量为要新建的元素个数。
                        CounterCell r = new CounterCell(x); // Optimistic create
                        //尝试拿锁
                        if (cellsBusy == 0 &&
                            U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                            boolean created = false;
                            try {               // Recheck under lock
                                //拿到锁了,将新建的counterCell元素放到指定位置。
                                CounterCell[] rs; int m, j;
                                if ((rs = counterCells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    //初始化创建完成
                                    created = true;
                                }
                            } finally {
                                //无论成功与否都释放锁。
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;//无需扩容
                }
                //存在竞争,重置wasUncontended后,下一句就是rehash,然后重试
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                //上面获取锁失败,说明有其他线程得到了锁,那么尝试将值加到BaseCount上
                else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
                    break;
                //counterCells是否扩容or其长度是否>=处理器数
                else if (counterCells != as || n >= NCPU)
                    collide = false;            // At max size or stale
                //没有扩容,数组长度<处理器数量,collide为true,计划给CounterCells数组扩容。
                else if (!collide)
                    collide = true;
                //尝试拿锁
                else if (cellsBusy == 0 &&
                         U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                    try {
                        //给counterCells扩容
                        if (counterCells == as) {// Expand table unless stale
                            CounterCell[] rs = new CounterCell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            counterCells = rs;
                        }
                    } finally {
                        //放锁
                        cellsBusy = 0;
                    }
                    //不需要扩容
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                //更改当前线程的hash值
                h = ThreadLocalRandom.advanceProbe(h);
            }
            //没有执行上面的if说明counterCells为null,初始化counterCells
            else if (cellsBusy == 0 && counterCells == as &&
                     U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
                boolean init = false;
                try {                           // Initialize table
                    if (counterCells == as) {
                        CounterCell[] rs = new CounterCell[2];//初始长度为2,最小的2次幂
                        rs[h & 1] = new CounterCell(x);
                        counterCells = rs;
                        init = true;//初始化成功
                    }
                } finally {
                    //放锁
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            //counterCells为空or长度为0,并且拿锁失败。
            //尝试将其加到basecount上
            else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
                break;                          // Fall back on using base
        }
    }

如何理解流程呢?
他的主体就是for,循环中三个if

  1. if ((as = counterCells) != null && (n = as.length) > 0)counterCells非空情况
  2. else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1))counterCells为null
  3. else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))counterCells为空且获取锁失败
  4. h = ThreadLocalRandom.advanceProbe(h);重新生成hash
    三种情况,具体的每种情况的逻辑看具体的注释。

读取

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            // 首节点即匹配
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // 该桶已经被迁移,则交由节点find()方法查找
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            // 搜索桶内所有节点
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

通过查看注释十分好理解,在首点匹配,桶迁移情况(调用find寻找)匹配失败后,搜索所有节点。

Node<K,V> find(int h, Object k) {
    // loop to avoid arbitrarily deep recursion on forwarding nodes
    outer: for (Node<K,V>[] tab = nextTable;;) {
        Node<K,V> e; int n;
        if (k == null || tab == null || (n = tab.length) == 0 ||
            (e = tabAt(tab, (n - 1) & h)) == null)
            return null;
        for (;;) {
            int eh; K ek;
            if ((eh = e.hash) == h &&
                ((ek = e.key) == k || (ek != null && k.equals(ek))))
                return e;
            if (eh < 0) {
                if (e instanceof ForwardingNode) {
                    tab = ((ForwardingNode<K,V>)e).nextTable;
                    continue outer;
                }
                else
                    return e.find(h, k);
            }
            if ((e = e.next) == null)
                return null;
        }
    }
}

参考

  1. ConcurrentHashMap源码分析(1.8)
  2. 源码级解读ConcurrentHashMap,妈妈再也不用担心我的学的学习
  3. ConcurrentHashMap底层原理剖析
  4. ConcurrentHashMap 1.8 源码分析

标签:Node,JUC,ConcurrentHashMap,key,int,tab,table,null
来源: https://blog.csdn.net/weixin_42559124/article/details/122562513

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有