ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

JUC-CAS和AQS

2022-06-11 23:34:53  阅读:159  来源: 互联网

标签:node JUC AQS CAS public int 线程 final Node


一、CAS

  CAS, compare and swap比较并替换。 CAS有三个参数:需要读写的内存位值(V)、进行比较的预期原值(A)和拟写入的新值(B)。当且仅当V的值等于A时,CAS才会通过原子方式用新值B来更新V的值,否则不会执行任何操作。程序在在某个变更新时,会有一个校验逻辑——认为V的值应该是A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少。CAS是一项乐观的技术,它希望能成功地执行更新操作,并且如果有另一个线程在最近一次检查后更新了该变量,那么CAS能检测到这个错误。当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其他线程都将失败。但是,失败的线程并不会被挂起,而是被告知在这次竞争中失败,并可以多次尝试。这种灵活性可以减少与锁相关的活跃性风险,例如死锁,饥饿等问题。

AtomicInteger源码分析:

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    //是使用了Unsafe类来进行CAS操作,valueOffset表示的是value值的偏移地址,Unsafe会根据内存偏移地址获取数据的原值的, 
    //偏移量对应指针指向该变量的内存地址。
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            //内存偏移地址在静态代码块中通过Unsafe的objectFieldOffset方法获取。
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    
    //使用volatile修饰,直接从共享内存中操作变量,保证多线程之间看到的value值是同一份
    private volatile int value;

    /**
     * Creates a new AtomicInteger with the given initial value.
     *
     * @param initialValue the initial value
     */
    public AtomicInteger(int initialValue) {
        value = initialValue;
    }

    //....

   public final int get() {
        return value;  //return操作是一个原子操作,具有原子性
    }

    public final void set(int newValue) {
        value = newValue; //赋值操作是一个原子操作,具有原子性
    }

    /**
    *  更新对应的数据,返回更新前的数据
    *  AtomicInteger atomicInteger = new AtomicInteger(2);
    *  atomicInteger.incrementAndGet(i -> i * 3);
    */
    public final int getAndUpdate(IntUnaryOperator updateFunction) {
        int prev, next;
        do {
            //从内存中读取修改前的值prev,并执行给定函数式计算修改后的值next;
            prev = get();
            next = updateFunction.applyAsInt(prev);
          
          //调用compareAndSet修改value值(内部是调用了unsafe的compareAndSwapInt方法)。
          //如果此时有其他线程也在修改这个value值,那么CAS操作就会失败,继续进入do循环重新获取新值,再次执行CAS直到修改成功。
        } while (!compareAndSet(prev, next));  
        return prev;
    }
}

Unsafe

  Unsafe是实现CAS的核心类,Java无法直接访问底层操作系统,而是通过本地(native)方法来访问。Unsafe类提供了硬件级别的原子操作(java里面没有指针,但java的sun.misc.Unsafe这个类是一个特殊,使得可以使用指针偏移操作,拥有了类似C语言指针一样操作内存空间的能力)。

public final class Unsafe {
  
  //获取对象o中给定偏移地址(offset)的值。以下相关get方法作用相同
  public native int getInt(Object o, long offset);
  public native Object getObject(Object o, long offset);
  //在对象o的给定偏移地址存储数值x。以下set方法作用相同
  public native void putInt(Object o, long offset, int x);
  public native void putObject(Object o, long offset, Object x);
  
  //获取给定内存地址的一个本地指针
  public native long getAddress(long address);
  //在给定的内存地址处存放一个本地指针x
  public native void putAddress(long address, long x);

  ///------------------内存操作----------------------
  //在本地内存分配一块指定大小的新内存,内存的内容未初始化;它们通常被当做垃圾回收。
  public native long allocateMemory(long bytes);
 //重新分配给定内存地址的本地内存
  public native long reallocateMemory(long address, long bytes);
  //释放给定地址的内存
  public native void freeMemory(long address);
  //获取给定对象的偏移地址
  public native long staticFieldOffset(Field f);
  public native long objectFieldOffset(Field f);

  //------------------数组操作---------------------------------
  //获取给定数组的第一个元素的偏移地址
  public native int arrayBaseOffset(Class<?> arrayClass);
   //获取给定数组的元素增量地址,也就是说每个元素的占位数
  public native int arrayIndexScale(Class<?> arrayClass);

  ///--------------------锁指令(synchronized)-------------------------------
  //对象加锁
  public native void monitorEnter(Object o);
  //对象解锁
  public native void monitorExit(Object o);
  public native boolean tryMonitorEnter(Object o);
  //解除给定线程的阻塞
  public native void unpark(Object thread);
  //阻塞当前线程
  public native void park(boolean isAbsolute, long time);

  // CAS
  public final native boolean compareAndSwapObject(Object o, long offset,
                                                 Object expected,
                                                 Object x);
   //....
}

   

二、AbstractQueuedSynchronizer

AbstractQueuedSynchronizer是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来,如常用的ReentrantLock、CountDownLatch等。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
   
    //....
    static final class Node {
        /** 标识节点当前在共享模式下 */
        static final Node SHARED = new Node();
        /** 标识节点当前在独占模式下 */
        static final Node EXCLUSIVE = null;

        /** 值为1,当前节点由于超时或中断被取消。 */
        static final int CANCELLED =  1;
        /** 后继节点在等待当前节点唤醒 */
        static final int SIGNAL    = -1;
        /** 等待在 condition 上 */
        static final int CONDITION = -2;
        /**
         * 状态需要向后传播,针对共享锁
         */
        static final int PROPAGATE = -3;

        /**
         * 状态,默认 0  上面列出的几个常量状态  0代表没有被占用
         */
        volatile int waitStatus;

        /**
         * 前驱节点
         */
        volatile Node prev;

        /**
         * 后继节点
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;
        //...
    }

    /**
    *  队列维护的state值,
    */
    private volatile int state;

    /**
    *  队列维护的state值内存地址的偏移量,通过unsafe类修改(CAS)
    */
    private static final long stateOffset;
   
}
      
      

同步队列其内部都是一个双向链表结构,主要的内容包含thread + waitStatus + pre + next

 

 以ReentrantLock为例,进行代码阅读:

1、ReentrantLock下面有三个内部类:Sync同步器,FairSync同步器, NonfairSync同步器。
2、AQS(AbstractQueuedSynchronizer)继承AOS(AbstractOwnableSynchronizer)
2、Sync继承AQS(AbstractQueuedSynchronizer:设置和获取独占锁的拥有者线程)
3、NonfairSync(非公平锁)、FairSync(公平锁)分别继承Sync

 

 

public class ReentrantLock implements Lock, java.io.Serializable {
    
  
    private final Sync sync;
    
    //ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。
    abstract static class Sync extends AbstractQueuedSynchronizer {
        //....
    }

    /**
     * 非公平锁
     */
    static final class NonfairSync extends Sync {
        //....
    }

    /**
     * 公平锁
     */
    static final class FairSync extends Sync {
        //....
    }
    
    //默认构造函数,生成非公平锁
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /**
     * 根据实际情况,构造公平锁和非公平锁
     */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    
    //....
}

  

2.1 公平锁

首先以公平锁FairSync为例

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
       
        //争锁
        final void lock() {
            acquire(1);
        }
        //代码来自父类AbstractQueuedSynchronizer,
        //如果tryAcquire(arg) 返回true, 也就结束了。
        // 否则,acquireQueued方法会将线程压到队列中
        public final void acquire(int arg) {
          //尝试获取suo
          if (!tryAcquire(arg) &&
            // addWaiter 获取资源/锁失败后,将当前线程加入同步队列的尾部,并标记为独占模式,返回新的同步队列;
            // 使线程在同步队列等待获取资源,一直获取到后才返回,如果在等待过程中被中断过,则返回true,否则返回false。
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //没有获取到锁,放到同步队列后,这个时候需要把当前线程挂起,
            selfInterrupt();
         }


        /**
         * 尝试直接获取锁,返回值是boolean
         * 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // state == 0 此时此刻没有线程持有锁
            if (c == 0) {
                //虽然此时此刻锁是可以用的,但是这是公平锁,要求先来先到
                //hasQueuedPredecessors :当前线程是否还有前节点
                //compareAndSetState 通过CAS操作修改AQS类中stateOffset地偏移量对应的state值
                // 设置锁的持有者为当前线程。
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果当前锁状态state不为0,同时当前线程已经持有锁,由于锁是可重入(多次获取)的,则更新重入后的锁状态
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

       //判断当前线程是否还有前节点
       public final boolean hasQueuedPredecessors() {
          // The correctness of this depends on head being initialized
          // before tail and on head.next being accurate if the current
          // thread is first in queue.
          Node t = tail; // Read fields in reverse initialization order
          Node h = head;
          Node s;
          //队列不为空,且头结点的下一个元素不是当前节点
          return h != t &&
              ((s = h.next) == null || s.thread != Thread.currentThread());
      }
      //父类AbstractOwnableSynchronizer(AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer)中的方法 设置锁的持有者为当前线程。
      protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
      }

      //以尾差法的方式插入到同步队列中
      private Node addWaiter(Node mode) {
          Node node = new Node(Thread.currentThread(), mode);
          // Try the fast path of enq; backup to full enq on failure
          Node pred = tail;
          if (pred != null) {
              node.prev = pred;
              if (compareAndSetTail(pred, node)) {
                  pred.next = node;
                  return node;
              }
          }
          //pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队)
          enq(node);
          return node;
      }
      //(队列是空的) 或者 CAS失败(有线程在竞争入队)
      //采用自旋的方式入队: CAS设置tail过程中,竞争一次竞争不到,我就多次竞争,总会排到的
      private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //通过CAS操作,对双向链表的head和tail初始化
                if (compareAndSetHead(new Node()))
                    tail = head;
                //此时没有return,初始化链表的头尾节点,继续for循环,走到else分支
            } else {
                //将当前线程排到队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    //...
  }

在公平锁的lock中,当没有获取到锁,将当前线程加入到同步队列后,会调用acquireQueued方法。使线程在同步队列等待获取资源,一直获取到后才返回,如果在等待过程中被中断过,则返回true,否则返回false。

//将线程放入阻塞队列中
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); //获取前驱节点
                //p == head 说明当前节点虽然进到了阻塞队列,但是是阻塞队列的第一个,因为它的前驱是head
                //所以当前节点可以去试抢一下锁(首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node(enq(node)))
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
             // tryAcquire() 方法抛异常的情况
            if (failed)
                cancelAcquire(node);
        }
    }
    
    //当前线程没有抢到锁,是否需要挂起当前线程
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //前驱节点的 waitStatus == -1 ,说明前驱节点状态正常,当前线程需要挂起,直接可以返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        // 前驱节点 waitStatus大于0 ,说明前驱节点取消了排队。
        // 需要知道这点: 进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
        // 将当前节点的prev指向waitStatus<=0的节点
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2(使用condition条件下),-3(针对共享锁)
            // 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
            // 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0
            // 用CAS将前驱节点的waitStatus设置为Node.SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
       // 这个方法返回 false,那么会再走一次 acquireQueued 中的for循序
       // 此时会从第一个分支返回 true
       return false;
    }

    //将node节点作为头结点
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    
    //负责挂起线程,通过LockSupport.park(this)来挂起线程,然后就停在这里了,等待被唤醒
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

最后,就是还介绍下唤醒的动作unlock。正常情况下,如果线程没获取到锁,线程会被 LockSupport.park(this); 挂起停止,等待被唤醒。

    public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); //唤醒队列中的节点
            return true;
        }
        return false;
    }


    protected final boolean tryRelease(int releases) {
      	int c = getState() - releases;
        //判断当前线程是否持有锁
      	if (Thread.currentThread() != getExclusiveOwnerThread())
        	throw new IllegalMonitorStateException();
        // 是否完全释放锁
      	boolean free = false;
      	if (c == 0) {
        	free = true;
        	setExclusiveOwnerThread(null);
     	 	}
      	setState(c);
      	return free;
    }
     
    //唤醒后继节点,从调用处知道参数node是head头结点
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        //如果head节点当前waitStatus<0, 将其修改为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * 唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)所以需要从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

总结公平锁的流程:

2.2 非公平锁

非公平锁NonfairSync代码如下,通过对比公平锁和非公平锁tryAcquire的代码可以看到,非公平锁的获取略去了!hasQueuedPredecessors()这一操作,也就是说它不会判断当前线程是否还有前节点(prev node)在等待获取锁,而是直接去进行锁获取操作。

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
    
   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //主要区别点
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

非公平锁和公平锁的两处不同: 

1. 非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了,若失败则调用tryAcquire;而公平锁则是直接调用tryAcquire方法。

2. 非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态(是否处在队列的头部位置),如果有则不去抢锁,乖乖排到后面。如果非公平锁在两次 CAS操作后都不成功,那么它和公平锁是一样的,都要进入到阻塞队列等待唤醒。

3、在非公平锁下,当同步队列中有节点时,这时候来了新的更多的线程来抢锁,这些新线程还没有加入到同步队列中去,新线程会跟排队中苏醒的线程进行锁争抢,失败的去同步队列中排队。这里的公平与否,针对的是苏醒线程与还未加入同步队列的线程。对于已经在同步队列中阻塞的线程而言,它们内部自身其实是公平的,因为它们是按顺序被唤醒的,

标签:node,JUC,AQS,CAS,public,int,线程,final,Node
来源: https://www.cnblogs.com/helloworldcode/p/16367133.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有