ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

ConcurrentHashMap源码解析3.put()方法

2021-11-25 21:33:35  阅读:152  来源: 互联网

标签:初始化 ConcurrentHashMap key sizeCtl 源码 线程 当前 put table


1.putVal()方法

写数据大体流程

写前操作

1、ConcurrentHashMap 不允许key或value为NULL,会抛出异常

2、写数据前,会先对key的hash值进行一次加工spread()

写数据流程

整个写数据是一个 自旋(死循环) 的操作。

  • 情况一: 当前的table还没有被初始化。调用initTable()去尝试初始化table,然后继续自旋.
  • 情况二: 当前table已经被初始化,调用寻址算法(hash & (table.length - 1))得到的桶位上的元素为NULL,尝试使用CAS操作构造的节点写入桶位。成功退出循环,失败继续自旋
  • 情况三:当前table已经被初始化了,但是当前桶位的节点是FWD节点,说明此时table正在发生扩容操作,此时当前线程就去参与扩容。扩容后继续自旋
  • 情况四:说明当前桶位的形成了链表或者红黑树,先使用synchronized加锁锁住当前桶位(锁对象就是当前桶位的头节点),然后判断如果当前桶位形成了链表,就遍历链表中的节点去找是否存在和待插入的节点的key完全一致的节点,如果有的话就进行value替换,没有就将节点插入到链表末尾。 如果当前桶位已经形成了红黑树,就在树中找是否存在一个节点的key和待插入节点完全一致的节点,存在就将value替换。不存在就插入到树中。

写后操作

  • ①如果当前桶位是链表的话,会检查是否达到了树化标准然后去树化。
  • ②如果待插入节点跟桶位上的某一个节点冲突后进行替换了(无论是链表还是红黑树),直接将冲突节点的value返回,不执行③的逻辑 (因为这是一次替换操作,没有新增节点)
  • ③没有发生冲突,说明这是一次添加操作,需要调用 addCount() 方法,内部先对table中的节点进行计数,然后判断是否达到扩容标准执行扩容逻辑
   public V put(K key, V value) {
        //调用了putVal方法。
        return putVal(key, value, false);
    }

     /*
      *  @param key 元素的key
      *  @param value 元素value
      *  @param onlyIfAbsent 是否替换数据
      *    false --> 当put数据时,遇到Map中有相同k-v的数据,将其替换
      *    true  --> 遇到Map中有相同k-v的数据,不替换,不进行插入
      */
     final V putVal(K key, V value, boolean onlyIfAbsent) {
        /*   注意 这里跟HashMap不同,
         *   HashMap允许key为null或者value为null, ConcurrentHashMap不允许key或者		 
         *   value为null。
         */
        if (key == null || value == null) throw new NullPointerException();
        
        //通过spread方法重新计算hash值(具体参考源码解析2 spread方法)
        int hash = spread(key.hashCode());
        
        /*
         * binCount表示当前k-v 封装成node后插入到指定桶位后,在桶位中的所属链表的下标位置
         * 0 当前桶位为null,node可以直接放
         * 2 表示当前桶位可能是红黑树
         */
        int binCount = 0;
 
        /*
         * 自旋
         * tab 引用哈希表
         */ 
        for (Node<K,V>[] tab = table;;) {
            
            /*
             *  f 表示桶位的头节点
             *  n 表示散列表的长度
             *  i 表示key通过寻址计算后得到的桶位坐标  
             *  fh 桶位头节点的哈希值
             */
            Node<K,V> f; int n, i, fh;
  		    
            /*
             *  CASE1 :表示table尚未初始化
             *  
             *   进入initTable()方法初始化 尝试初始化table。最终返回初始化完成的table。
             */
            if (tab == null || (n = tab.length) == 0)
                tab = initTable(); 
             
            /*
             *  CASE2 :table已经初始化,且通过寻址后发现当前桶位头结点为null
             *  寻址算法: ((table.length - 1) & hash) 跟HashMap寻址算法一致
             *  
             *   进入casTabAt尝试使用CAS添加Node。
             */
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //期望tab的第i个位置是NULL,是NULL的话就将创建的Node赋值上去
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    //设置成功,结束自旋。失败(有其他线程写成功了) 走其他逻辑继续自旋
                    break;                  
            }
            
            /*
             * CASE3:table已经初始化,且通过寻址后发现当前桶位头结点为FWD节点,表示当前			   
             * map正处于扩容(迁移)过程中。 
             *   (MOVED == -1 表示当前节点是FWD节点)
             *   
             *   进入helpTransfer有义务帮助当前map对象完成迁移数据的工作。
             */
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            
            
            /*
             * CASE4:当前桶位(不为NULL)可能是链表 也可能是 红黑树代理类TreeBin 
             */
            else {
                //oldVal 当插入节点的key存在时,会将旧值返回。
                V oldVal = null;
                
                /*
                 * synchronized加锁给当前桶位的头节点(理论上是头结点)。(只对当前桶位加				 
                 *  锁,不锁整个Map)
                 */
                synchronized (f) {
                    
                    /* 
                     *  为什么这里又要对比一下,看看当前桶位的头几点,是否是之前获取的头					 
                     * 	结点?
                     *   (tabAt()就是获取i位置上的节点)
                     *  为了避免其他线程已经将该桶位的头结点改掉了,导致当前线程加错锁,					 
                     * 	就会出现问题,如果这里对比失败的话,就不需要在执行了。
                     *
                     *  如果条件成立,就可以进来造了!
                     */
                    if (tabAt(tab, i) == f) {
                        //条件成立,说明当前桶位形成了链表。
                        if (fh >= 0) {
                            
                            /*
                             * 1、
                             * 当前插入key与链表中的所有元素的key都不一致,当前的插入							 
                             *  操作就是将节点追加到链表的末尾,binCount表示链表长度。
                             * 2、
                             *  当前插入key与链表当中的某个元素的key一致时,当前插入操							  
                             *  作就是替换 binCount表示冲突位置(binCount-1)   
                             */
                            binCount = 1;
                            
                            /*
                             * 遍历链表,判断是否有节点的key与当前要插入元素的key完全							  
                             * 一致,存在的话替换value,不存在插入到链表末尾。
                             *
                             */
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                //插入到结尾
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //当前桶位已经变为了红黑树
                        else if (f instanceof TreeBin) {
                            //p表示红黑树中如果与你插入节点的key有冲突时,会将这个冲突的节点赋给p
                            Node<K,V> p;
                            
                            //强制设置binCount为2,因为binCount <= 1时有其他含义,
                            binCount = 2;
                            
                            //p不为null,说明真的发生了冲突。
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value; //覆盖原key的值
                            }
                        }
                    }
                }
                //说明当前桶位不为null,可能是红黑树,也可能是链表
                if (binCount != 0) {
                    //binCount >= 8,说明当前桶位一定是链表
                    if (binCount >= TREEIFY_THRESHOLD)
                        //可能树化(还得看table长度)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        //返回冲突节点的value。
                        return oldVal;
                    break;
                }
            }
        }
        /*
         *  退出自旋之后,调用addCount
         * 1、统计当前table一共有多少节点
         * 2、判断是够达到扩容标准。 
         */
        addCount(1L, binCount);
        return null;
    }

2.initTable()方法

initTable方法流程。

当前线程尝试去初始化table,此时有两种情况

  • sizeCtl的值为-1,表示当前有其他线程正在初始化table,当前线程先释放CPU执行权,然后再次尝试直到table被初始化完成退出循环
  • sizeCtl的值大于等于0。当前线程使用CAS的方式尝试去初始化table。初始化table后**(只有一个线程可以真正的初始化table),将sizeCtl的值变为当前table长度的0.75,即扩容阈值**。

源码解析

    private final Node<K,V>[] initTable() {
        
        /*
         *  tab 表示table的引用                          
         *  sc 表示临时sizeCtl的值
         */
        Node<K,V>[] tab; int sc;
        
        
        /*
         * 带条件(当前table尚未初始化)的自旋,当table被初始化后就退出。
         */
        while ((tab = table) == null || tab.length == 0) {
            
            /*
             *  先判断sizeCtl的值,
             *  1、sizeCtl < 0 
             *     1.1 sizeCtl = -1 表示有线程正在创建table数组,当前线程需要自旋等待
             *     1.2 其他表示当前table数组正在扩容,高16位表示扩容的时间戳,低16位表示(1 + nThread)个线程正在并发扩容
             *  2、sizeCtl = 0
             *    表示创建table数组时,使用DEFAULT_CAPACITY(16)大小
             *
             *  3、sizeCtl > 0 
             *     两种情况   
             *     1.如果table未初始化,表示初始化大小
             *     2.如果table已经初始化,表示下次扩容时的触发条件(阈值)  
             */
            if ((sc = sizeCtl) < 0)
                //sizeCtl的值大概率为-1,即当前有其他线程正在创建table数组
                //yield表示释放CPU执行权。
                Thread.yield(); // lost initialization race; just spin
            
            
             /*
             *  有两种情况会走到这里
             *  2、sizeCtl = 0
             *    表示创建table数组时,使用DEFAULT_CAPACITY(16)大小
             *
             *  3、sizeCtl > 0 
             *     两种情况   
             *     1.如果table未初始化,表示初始化大小
             *     2.如果table已经初始化,表示下次扩容时的触发条件(阈值)      
             *  
             *     通过调用UnSafe的CAS操作获取锁去修改sizeCtl的值
             *     注意: 这里的CAS方法的SIZECTL常量就是sizeCtl变量的内存地址,即修改的就是sizeCtl变量的值。
             */
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    //仍然判断table是否为null,防止多线程情况下多次初始化。
                    if ((tab = table) == null || tab.length == 0) {
                        /*
                         * sc大于0,创建table时使用sc为指定大小,
                         * 否则使用默认值16初始化table
                         */
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        //创建Node[].
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        //赋值给table
                        table = tab = nt;
                        // sc变为0.75n,即扩容阈值。
                        sc = n - (n >>> 2);
                    }
                } finally {
                    /*
                     * 此时有两种情况
                     * 1、当前线程是第一次初始化table的线程,此时的sc就是扩容阈值
                     * 2、当前线程不是第一次初始化table的线程,所以不会进入初始化table
                     * 的逻辑,但是sizeCtl的值已经被改为-1了,最后需要将它的值改回来
                     */
                    sizeCtl = sc;
                }
                break;
            } 
        }
        //最后返回tab。
        return tab;
    }

3.addCount()方法

addCount()方法的执行逻辑

1.先调用类似LongAdder的add逻辑,先尝试去写数据,

如果cells数组已经初始化了 或 尝试CAS写数据到base中失败了,就会继续判断cells数组的情况然后可能进入fullAddCount (就是LongAdder中的longAccumulate()方法)

如果写成功,就会求一次节点个数总和。

注意:如果有线程真的调用了longAccumulate(),那么直接后return。既不求和也不参与table数组扩容的逻辑

2.进入扩容的逻辑,先计算出来一个扩容戳,然后自旋进行判断当前线程是否可以参与扩容的逻辑**(四个条件见源码解析)**,可以参与的话,使用CAS修改sizeCtl的值(此时sizeCtl为负数,高16位表示时间戳,低16位表示参与扩容的线程数),CAS尝试将扩容线程+1,成功的话进入扩容逻辑,失败继续自旋。

 private final void addCount(long x, int check) {
     
     	/*
     	 * 这一部分代码就是LongAdder中的代码,先add()然后调用sum()去求和。
     	 * as 表示LongAdder中的cells数组
     	 * b 表示LongAdder中的base 
     	 * s 表示table中的节点个数
     	 */
        CounterCell[] as; long b, s;
      	
     	/*
     	 * 进入if的两个条件(或的关系)
     	 * 条件一: cells数组已经初始化了
     	 * 条件二: cells数组未初始化,但是CAS尝试写base失败(发生了竞争),
     	 */   
        if ((as = counterCells) != null ||
           !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            
            /*
             * a 表示当前线程寻址后命中的cell
             * v 表示当前线程写cell时的期望值
             * m 表示当前cells数组的长度
             */
            CounterCell a; long v; int m;
            
            //true表示未竞争,false表示发生了竞争
            boolean uncontended = true;
            
            /*
             * fullAddCount()方法就是LongAdder中的longAccumulate()方法。
             *  关于详细的longAccumulate()方法参考LongAdder源码解析
             *  这里有三种情况会进入longAccumulate()方法
             *  1、cells数组为NULL
             *  2、cells数组不为NULL,但是当前线程寻址后命中的Cell为NULL。
             *  3、cells数组不为NULL,并且命中的Cell也不为NULL,但是尝试CAS向Cell中写				
             *  数据失败
             */
            
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                //详细讲解参考 [https://blog.csdn.net/qq_46312987/article/details/121499330]
                fullAddCount(x, uncontended);
                
                /*考虑到fullAddCount()事情比较累,直接让当前线程不参与扩容的逻辑了。
                直接返回了。*/
                return;
            }
            if (check <= 1)
                return;
            //求和(就是LongAdder中的sum方法) 是一个期望值,(最终一致性)
            s = sumCount();
        }
//-------------------------------LongAdder Code ENd --------------------------    
//----------------------------------扩容逻辑 Start----------------------------- 	 
        /*
         *  check >=0 表示一定是一个put操作调用的addCount
     	 */
        if (check >= 0) {
            /*
             *  tab 引用table
             *  nt 表示nextTable(扩容的table)
             *  n  表示table数组的长度
             *  sc 表示sizeCtl的临时值
             */
            Node<K,V>[] tab, nt; int n, sc;
            
             /*
             *  先判断sizeCtl的值,走到这里sizeCtl可能的情况
             *  1、sizeCtl < 0 
             *   (不可能)1.1 sizeCtl = -1 表示有线程正在创建table数组,当前线程需要自旋等待
             *          1.2 其他表示当前table数组正在扩容,高16位表示扩容的时间戳,低16位表示 
             *          (1 + nThread)个线程正在并发扩容 (可能)	
             *
             *  2、sizeCtl = 0
             *    表示创建table数组时,使用DEFAULT_CAPACITY(16)大小 (不可能)
             *
             *  3、sizeCtl > 0 
             *     两种情况   
             *     1.如果table未初始化,表示初始化大小 (不可能)
             *     2.如果table已经初始化,表示下次扩容时的触发条件(阈值)   (可能)
             */
            
            /*
             *  自旋
             *  条件一:s >= (long)(sc = sizeCtl)
             * 		    true  -> 1.当前sizeCtl是一个负数,表示正在扩容	
             *			false -> 2.当前sizeCtl是一个正数,表示扩容阈值,并且当前的元				
             *	        素数量没有达到扩容阈值,无需扩容
             *
             *  条件二:(tab = table) != null 恒成立
             *  
             *  条件三:(n = tab.length) < MAXIMUM_CAPACITY)
             *         true表示当前table长度小于最大值限制,可以进行扩容
             */
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                
                /*
                 *  扩容批次唯一表示戳
                 *  假设当前16 -> 32 计算出来的 rs = 1000 000 0001 1011
                 */
                int rs = resizeStamp(n);
                
                /*
                 *   sc小于0 表示当前table正在扩容
                 *   当前线程CAS尝试应该协助table完成扩容
                 */
                if (sc < 0) {
                    
                    /*
                     * 当前线程是否可以进行扩容的条件。
                     * 
                     * 条件一: 
                     *  (sc >>> RESIZE_STAMP_SHIFT) != rs 
                     *   true表示当前线程获得的扩容唯一表示戳 非 本批次扩容 
                     *   false表示当前线程获得的扩容唯一表示戳 是 本批次扩容
                     *  
                     * 条件二:
                     *   sc == rs + 1 (这里有BUG,后续JDK版本已经修改)
                     *   实际上想表达的是 sc == rs << 16 + 1
                     *   true 表示扩容完毕,当前线程不需要参与
                     *   false 表示扩容还在进行中,当前线程可以参与
                     *
                     *  条件三:
                     *    sc == rs + MAX_RESIZERS (这里有BUG,后续JDK版本已修改)
                     *    实际想表达的是 sc == rs << 16 + MAX_RESIZERS
                     *   true 表示当前参与的扩容线程数已经到了最大数 当前线程不需要参与
                     *   false 表示当前当前参与的扩容线程数未到达最大数 当前线程可以参与
                     *   
                     *   条件四:
                     *     (nt = nextTable) == null 
                     *    true表示本次扩容结束(nextTable只有当在扩容时不为NULL,扩容结束就置为NULL)  
                     *    false表示本次扩容未结束  
                     */
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    
                    /*
                     * 前置条件:当前table正在扩容中,当前线程有机会参与扩容
                     * 因为当前table正在扩容,说明sizeCtl的值一定为负数,低16位表示的
                     * 就是当前正在参与扩容的线程数,所以尝试+1,进入扩容的逻辑中。
                     *  
                     *  尝试失败:
                     *  1.当前有很多线程都在此处尝试修改sizeCtl,有其他一个线程修改成
                     *  功,导致你的sc期望值与内存中的值不一致,修改失败
                     *  
                     *  2.transfer任务内部的线程也修改了sizeCtl。
                     *  失败继续自旋。
                     */
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        //协助扩容线程,持有nextTable参数
                        transfer(tab, nt);
                }
                
                /*
                 *  CAS修改sizeCtl的值,成功的话将sizeCtl变为负数,说明当前现场是触发
                 *  扩容的第一个线程,在transfer()方法中需要做一些扩容的额外工作
                 */
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    //触发扩容条件的线程,不持有nextTable。
                    transfer(tab, null); 
                //求和。
                s = sumCount();
            }
        }
    }

4.小结

  • (1)元素个数的存储方式类似于LongAdder类,存储在不同的段上,减少不同线程同时更新size时的冲突;

  • (2)计算元素个数时把这些段的值及baseCount相加算出总的元素个数;

  • (3)正常情况下sizeCtl存储着扩容门槛,扩容门槛为容量的 0.75倍

  • (4)扩容时sizeCtl(sizeCtl < 0)高位存储扩容戳(resizeStamp),低位存储扩容线程数加1(1+nThreads)

  • (5)其它线程添加元素后如果发现存在扩容,也会尝试加入的扩容行列中来;

标签:初始化,ConcurrentHashMap,key,sizeCtl,源码,线程,当前,put,table
来源: https://blog.csdn.net/qq_46312987/article/details/121547736

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有