ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

AbstractQueuedSynchronizer(AQS)源码分析

2019-08-11 22:51:11  阅读:247  来源: 互联网

标签:node Node 结点 AQS pred AbstractQueuedSynchronizer 源码 线程 null


  学习是一个循序渐进的过程,在学习现场池相关知识的时候,注意到线程池使用了阻塞队列,阻塞队列LinkedBlockingQueue中又使用到了ReentrantLock可重入锁,因此先对ReentrantLock展开学习。

  ReentrantLock中很重要的一部分是Sync,他继承了AQS(AbstractQueuedSynchronizer),AQS提供一个框架,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关的同步器(信号量,事件等)。此类被设计为*是大多数类型的同步器的有用基础,这些同步器依赖于单个原子 int值来表示状态。子类必须定义更改此状态的受保护方法,以及*根据要获取或释放的此对象定义该状态的含义。鉴于这些,这个类中的其他方法带有out所有排队和阻塞机制。子类可以维护其他状态字段,但仅使用方法getState,setState和compareAndSetState操作的原子更新的 int值被跟踪同步。

  AQS是一种基于CAS(Compare And Swap)的并发算法,CAS是一种有名的无锁(lock-free)算法。也是一种现代 CPU 广泛支持的CPU指令级的操作,只有一步原子操作,所以非常快。而且CAS避免了请求操作系统来裁定锁的问题,不用麻烦操作系统,直接在CPU内部就搞定了。  

  参考链接:https://segmentfault.com/a/1190000015239603

  此文在学习过程中参考了@活在夢裡 的文章https://www.cnblogs.com/micrari/p/6937995.html。强烈推荐阅读该作者的文章,对于如何正确学习和理解AQS,需要考虑哪些问题,作者都给出了明确的方法。以下文章部分引用了原作者的内容,加上自己的分析。

  AQS则继承了AOS(AbstractOwnableSynchronizer),AOS中维护着一个独占线程,提供了基础的get/set方法。

  AQS中的内部类Node等待队列节点类。

     *      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+

  

 1         /** Marker to indicate a node is waiting in exclusive mode*/
 2         static final Node EXCLUSIVE = null;
 3 
 4         /** waitStatus值表示线程已取消. */
 5         static final int CANCELLED =  1;
 6         /**waitStatus值表示后继者的线程需要取消停放,一般由后续结点进行设置,当他之前的结点为阻塞状态,则他为阻塞状态 */
 7         static final int SIGNAL    = -1;
 8         /**waitStatus值表示线程正在等待. */
 9         static final int CONDITION = -2;
10         /**
11          * waitStatus值表示下一个acquireShared应
12          *无条件传播。
13          */
14         static final int PROPAGATE = -3;    
Node

  以上的状态是节点的重要组成部分,后续将根据结点状态进行判断结点是否进入同步队列。

  AQS的精妙之处就在于通过状态来进行线程的管理。接下来通过获得锁来展开AQS的具体实现。

  

1   public final void acquire(int arg) {
2         if (!tryAcquire(arg) &&
3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4             selfInterrupt();
5     }

  先尝试获得锁,如果获得锁失败则封装为Node放入同步队列。

 1     /**
 2      *  为当前线程和给定模式创建排队节点。
 3      */
 4     private Node addWaiter(Node mode) {
 5         Node node = new Node(mode);
 6 
 7         for (;;) {
 8             //如果不是新队列则将结点赋给尾结点
 9             Node oldTail = tail;
10             if (oldTail != null) {
11                 node.setPrevRelaxed(oldTail);
12                 if (compareAndSetTail(oldTail, node)) {
13                     oldTail.next = node;
14                     return node;
15                 }
16             } else {
17                 initializeSyncQueue();
18             }
19         }
20     }

  对特殊情况的头部结点与尾部结点处理

 1   /**
 2      * 在第一次争用时初始化头部和尾部字段。
 3      */
 4     private final void initializeSyncQueue() {
 5         Node h;
 6         if (HEAD.compareAndSet(this, null, (h = new Node())))
 7             tail = h;
 8     }
 9 
10     /**
11      * CAS处理尾部结点
12      */
13     private final boolean compareAndSetTail(Node expect, Node update) {
14         return TAIL.compareAndSet(this, expect, update);
15     }

  acquireQueued

 1      /**
 2      * 在队列中的节点通过此方法获取锁。
 3      * 
 4      */
 5     final boolean acquireQueued(final Node node, int arg) {
 6         boolean interrupted = false;
 7         try {
 8             for (;;) {
 9                 final Node p = node.predecessor();
10                 //如果结点是头结点则尝试获得锁
11                 //尝试成功则获得锁
12                 //尝试失败则继续等待 
13                 if (p == head && tryAcquire(arg)) {
14                     setHead(node);
15                     p.next = null; // help GC
16                     return interrupted;
17                 }
18                 if (shouldParkAfterFailedAcquire(p, node))
19                     interrupted |= parkAndCheckInterrupt();
20             }
21         } catch (Throwable t) {
22             cancelAcquire(node);
23             if (interrupted)
24                 selfInterrupt();
25             throw t;
26         }
27     }    

  shouldParkAfterFailedAcquire

 1     /**
 2      * 检查并更新无法获取的节点的状态。 如果线程应该阻塞,则返回    
 3      * true。
 4      */
 5     private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 6         int ws = pred.waitStatus;
 7         if (ws == Node.SIGNAL)
 8             /*
 9              * 如果结点的waitStatus是SINGAL则返回true
10              */
11             return true;
12         if (ws > 0) {
13             /*
14              * 前结点状态CANCEL。跳过前结点,直至前结点状态不为取消   
15              */
16             do {
17                 node.prev = pred = pred.prev;
18             } while (pred.waitStatus > 0);
19             pred.next = node;
20         } else {
21             /*
22              *  waitStatus必须为0或PROPAGATE(-3)。 设置前结点的等待状态为SIGNAL
23              *  
24              */
25             pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
26         }
27         return false;
28     }    

   阻塞线程。通过LockSupport进行线程的管理

1      /**
2      * 此处使用LockSupport的park方法进行线程管理。
3      * 除非条件允许,否则禁用当前线程以进行线程调度。
4      * 此部分在LockSupport中展开描述,在此不过多赘述
5      */
6     private final boolean parkAndCheckInterrupt() {
7         LockSupport.park(this);
8         return Thread.interrupted();
9     }
  取消正在进行的尝试获取锁的结点。
 1     /**
 2      *  取消一个正在尝试获得锁的结点的尝试
 3      *
 4      */
 5     private void cancelAcquire(Node node) {
 6         // 如果结点为空则忽略
 7         if (node == null)
 8             return;
 9 
10         node.thread = null;
11 
12         // 跳过状态是取消状态的前驱结点
13         Node pred = node.prev;
14         while (pred.waitStatus > 0)
15             node.prev = pred = pred.prev;
16  
17         // 因为我们无法处理node是head的情况,所以需要交给CAS进行   
18         // 处理,此处保存一份前驱结点的next结点
19         Node predNext = pred.next;
20 
21         // 设置node的状态为取消状态
22         node.waitStatus = Node.CANCELLED;
23 
24         // 如果node结点是tail,尝试快速设置,若快速设置删除自己
25         if (node == tail && compareAndSetTail(node, pred)) {
26             pred.compareAndSetNext(predNext, null);
27         } else {
28             // 如果前结点不为head,且状态为阻塞状态或者status<0的其他情况
29             // 并且前结点的thread属性不为空
30             // 设置前驱结点的状态为阻塞状态
31             // 否则unparkSuccessor唤醒当前结点
32             int ws;
33             if (pred != head &&
34                 ((ws = pred.waitStatus) == Node.SIGNAL ||
35                  (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
36                 pred.thread != null) {
37                 Node next = node.next;
38                 if (next != null && next.waitStatus <= 0)
39                     pred.compareAndSetNext(predNext, next);
40             } else {
41                 unparkSuccessor(node);
42             }
43 
44             node.next = node; // help GC
45         }
46     }

  如果CAS将tail从node置为pred节点了 ,则剩下要做的事情就是尝试用CAS将pred节点的next更新为null以彻底切断pred和node的联系。 这样一来就断开了pred与pred的所有后继节点,这些节点由于变得不可达,最终会被回收掉。 由于node没有后继节点,所以这种情况到这里整个cancel就算是处理完毕了。   

  在前一个状态不为取消的结点上进行判断,如果结点状态为阻塞,则设置切断当前结点与前驱结点的联系,并将当前结点的后继结点作为前驱结点的后继结点。

     如果状态为-2或者-3,则设置前驱节点状态为阻塞状态。

  若是head(此处不可能为取消状态),则唤醒线程。

 1    /**
 2      * 唤醒线程,如果结点不为null
 3      *
 4      * @param node the node
 5      */
 6     private void unparkSuccessor(Node node) {
 7         /*
 8          * 将node的状态设置为无状态,进行再一次的竞争,因为此时的 
 9          * node可能不是tail
10          */
11         int ws = node.waitStatus;
12         if (ws < 0)
13             node.compareAndSetWaitStatus(ws, 0);
14 
15         /*
16          * unpark的线程在后继中保存,通常只是下一个节点。但如果取消 
17          * 或者显然为空,
18          * 从尾部向后遍历以找到实际的*未取消的继承者。
19          */
20         Node s = node.next;
21         if (s == null || s.waitStatus > 0) {
22             s = null;
23             for (Node p = tail; p != node && p != null; p = p.prev)
24                 if (p.waitStatus <= 0)
25                     s = p;
26         }
27         if (s != null)
28             LockSupport.unpark(s.thread);
29     }

  此处参考了@活在夢裡的文章。引用他的一段描述https://www.cnblogs.com/micrari/p/6937995.html

    /*
     * 这里的逻辑就是如果node.next存在并且状态不为取消,则直接唤醒s即可
     * 否则需要从tail开始向前找到node之后最近的非取消节点。
     *
     * 这里为什么要从tail开始向前查找也是值得琢磨的:
     * 如果读到s == null,不代表node就为tail,参考addWaiter以及enq函数中的我的注释。
     * 不妨考虑到如下场景:
     * 1. node某时刻为tail
     * 2. 有新线程通过addWaiter中的if分支或者enq方法添加自己
     * 3. compareAndSetTail成功
     * 4. 此时这里的Node s = node.next读出来s == null,但事实上node已经不是tail,它有后继了!
     */

  

 

标签:node,Node,结点,AQS,pred,AbstractQueuedSynchronizer,源码,线程,null
来源: https://www.cnblogs.com/yi-zeng/p/11331192.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有