ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

011Java并发包012AQS

2021-10-08 19:02:22  阅读:144  来源: 互联网

标签:012AQS Node 节点 int 011Java 线程 发包 final 资源


注意:本文基于JDK1.8进行记录。

1 简介

1.1 是什么

AQS是英文单词AbstractQueuedSynchronizer的缩写,翻译过来就是抽象的队列式的同步器,AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它,如常用的ReentrantLock、Semaphore、CountDownLatch等等。

AQS是用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个state整型变量表示持有锁的状态。

1.2 抽象

AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。

1.3 原理

抢到资源的线程直接使用处理业务逻辑,抢不到资源的必然涉及一种排队等候机制。既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢。

如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的Node结点,通过CAS、自旋以及LockSupport的凭证机制,维护state变量的状态,使并发达到同步的控制效果。

CLH:Craig、Landin、Hagersten(三个科学家名字)队列,原版是一个单向链表,AQS中的队列是CLH变体的虚拟双向队列FIFO,其头节点在初始化后变为空节点。

1.4 资源使用方式

AQS定义两种资源使用方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

2 体系架构

 1 public abstract class AbstractQueuedSynchronizer
 2     extends AbstractOwnableSynchronizer
 3     implements java.io.Serializable {
 4     ...
 5     // 内部封装Node节点
 6     static final class Node {
 7         // 标记线程以共享的模式等待锁
 8         static final Node SHARED = new Node();
 9         // 标记线程以独占的模式等待锁
10         static final Node EXCLUSIVE = null;
11         // waitStatus取值为1表示线程取消(超时、中断),被取消的节点不会阻塞
12         static final int CANCELLED =  1;
13         // waitStatus取值为-1表示后继节点已经准备完成,等待线程释放资源
14         static final int SIGNAL    = -1;
15         // waitStatus取值为-2表示线程在Condition队列中阻塞,当其他线程调用了Condition中的唤醒方法后,将节点从Condition队列转移到CLH等待队列(Condition中有使用)
16         static final int CONDITION = -2;
17         // waitStatus取值为-3表示线程及后续线程无条件传播(共享模式可用,CountDownLatch中有使用)
18         static final int PROPAGATE = -3;
19         // 线程的等待状态,初始值为0
20         volatile int waitStatus;
21         // 前驱节点
22         volatile Node prev
23         // 后继节点
24         volatile Node next;
25         // 线程对象
26         volatile Thread thread;
27         ...
28     }
29     // 头节点
30     private transient volatile Node head
31     // 尾节点
32     private transient volatile Node tail;
33     // 资源状态,0表示可获取,大于等于1表示已占用
34     private volatile int state;
35     // 获取资源状态
36     protected final int getState() {
37         return state;
38     }
39     // 设置资源状态
40     protected final void setState(int newState) {
41         state = newState;
42     }
43     // CAS设置资源状态
44     protected final boolean compareAndSetState(int expect, int update) {
45         // See below for intrinsics setup to support this
46         return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
47     }
48     ...
49 }

AQS使用了一个volatile修饰的整型变量state用来表示同步状态,通过内置的CLH同步队列来完成线程的排队工作。

其中,对state值的修改是通过CAS完成的,0表示资源可用,大于等于1表示资源不可用。AQS提供了三种操作state的方法:getState()、setState()、compareAndSetState()。

当前线程根据state的值判断能否获取资源,如果获取失败,AQS会将当前线程thread以及等待状态waitStatus等信息封装成Node节点,并将其加CLH入同步队列,同时阻塞当前线程。当state的值变为可获取资源后,会把Node节点中的线程唤醒,再次尝试获取资源。

3 Lock与AQS

Lock接口的实现类,基本都是通过聚合了一个队列同步器的子类完成线程访问控制的。

 1 public class ReentrantLock implements Lock, java.io.Serializable {
 2     ...
 3     abstract static class Sync extends AbstractQueuedSynchronizer {
 4         ...
 5         final boolean nonfairTryAcquire(int acquires) {
 6             final Thread current = Thread.currentThread();
 7             int c = getState();
 8             if (c == 0) {
 9                 if (compareAndSetState(0, acquires)) {
10                     setExclusiveOwnerThread(current);
11                     return true;
12                 }
13             }
14             else if (current == getExclusiveOwnerThread()) {
15                 int nextc = c + acquires;
16                 if (nextc < 0) // overflow
17                     throw new Error("Maximum lock count exceeded");
18                 setState(nextc);
19                 return true;
20             }
21             return false;
22         }
23         ...
24     }
25     static final class NonfairSync extends Sync {
26         ...
27         final void lock() {
28             if (compareAndSetState(0, 1))
29                 setExclusiveOwnerThread(Thread.currentThread());
30             else
31                 acquire(1);
32         }
33         protected final boolean tryAcquire(int acquires) {
34             return nonfairTryAcquire(acquires);
35         }
36     }
37     static final class FairSync extends Sync {
38         ...
39         final void lock() {
40             acquire(1);
41         }
42         protected final boolean tryAcquire(int acquires) {
43             final Thread current = Thread.currentThread();
44             int c = getState();
45             if (c == 0) {
46                 if (!hasQueuedPredecessors() &&
47                     compareAndSetState(0, acquires)) {
48                     setExclusiveOwnerThread(current);
49                     return true;
50                 }
51             }
52             else if (current == getExclusiveOwnerThread()) {
53                 int nextc = c + acquires;
54                 if (nextc < 0)
55                     throw new Error("Maximum lock count exceeded");
56                 setState(nextc);
57                 return true;
58             }
59             return false;
60         }
61     }
62     public ReentrantLock() {
63         sync = new NonfairSync();
64     }
65     public ReentrantLock(boolean fair) {
66         sync = fair ? new FairSync() : new NonfairSync();
67     }
68     ...
69 }

ReentrantLock类的内部聚合了一个Sync类,Sync类继承了AQS类,并且非公平锁NonfairSync和公平锁FairSync都继承自Sync,默认创建的是非公平锁NonfairSync。

4 分析ReentrantLock

4.1 概述

整个ReentrantLock的加锁过程,可以分为三个阶段:

1)尝试加锁。

2)加锁失败,线程入队列。

3)线程入队列后,进入阻赛状态。

4.2 场景举例

举例三个客户在银行办理业务,使用默认的非公平锁:

 1 public static void main(String[] args) {
 2     Lock lock = new ReentrantLock();
 3     new Thread(()->{
 4         lock.lock();
 5         try {
 6             System.out.println(Thread.currentThread().getName() + "-----办理业务");
 7             try {
 8                 TimeUnit.SECONDS.sleep(60);
 9             } catch (InterruptedException e) {
10                 e.printStackTrace();
11             }
12             System.out.println(Thread.currentThread().getName() + "-----离开");
13         } finally {
14             lock.unlock();
15         }
16     }, "A").start();
17     new Thread(()->{
18         lock.lock();
19         try {
20             System.out.println(Thread.currentThread().getName() + "-----办理业务");
21             try {
22                 TimeUnit.SECONDS.sleep(60);
23             } catch (InterruptedException e) {
24                 e.printStackTrace();
25             }
26             System.out.println(Thread.currentThread().getName() + "-----离开");
27         } finally {
28             lock.unlock();
29         }
30     }, "B").start();
31     new Thread(()->{
32         lock.lock();
33         try {
34             System.out.println(Thread.currentThread().getName() + "-----办理业务");
35             try {
36                 TimeUnit.SECONDS.sleep(60);
37             } catch (InterruptedException e) {
38                 e.printStackTrace();
39             }
40             System.out.println(Thread.currentThread().getName() + "-----离开");
41         } finally {
42             lock.unlock();
43         }
44     }, "C").start();
45 }

5 程序分析

5.1 线程A开始并执行

5.1.1 获取资源

线程A进入,调用lock()方法,查看实现:

1 final void lock() {
2     // 使用CAS设置state为1
3     if (compareAndSetState(0, 1))
4         // 表示获取资源成功,将当前线程设为占用线程
5         setExclusiveOwnerThread(Thread.currentThread());
6     else
7         // 表示获取资源失败,继续抢占资源
8         acquire(1);
9 }

因为线程A是第一个获取资源的线程,所以使用compareAndSetState()方法设置成功,继续调用setExclusiveOwnerThread()方法将当前线程设为占用线程,然后继续执行业务。

5.2 线程B开始并阻塞

5.2.1 获取资源

线程B进入,调用lock()方法。

因为线程B是第二个获取资源的线程,线程A已经将state从0改为了1,所以使用compareAndSetState()方法设置失败,继续调用acquire()方法获取资源,查看实现:

1 public final void acquire(int arg) {
2     // 抢占资源
3     if (!tryAcquire(arg) &&
4         // 加入等待队列
5         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
6         // 线程阻塞
7         selfInterrupt();
8 }

如果抢占资源成功,调用tryAcquire()方法返回true,判断条件结束,继续执行业务。

如果抢占资源失败,继续判断acquireQueued()方法返回。执行addWaiter()方法并传入参数表示使用独占模式将线程加入到等待队列。

5.2.2 抢占资源

线程B进入,继续调用acquire()方法获取资源,执行tryAcquire()方法,查看实现:

1 protected final boolean tryAcquire(int acquires) {
2     // 继续调用非公平锁的尝试抢占方法
3     return nonfairTryAcquire(acquires);
4 }

继续调用非公平锁的nonfairTryAcquire()方法,返回false表示占用失败:

 1 final boolean nonfairTryAcquire(int acquires) {
 2     // 记录当前线程
 3     final Thread current = Thread.currentThread();
 4     // 记录当前资源状态
 5     int c = getState();
 6     // 0表示当前资源可用
 7     if (c == 0) {
 8         // 使用CAS设置state为请求数
 9         if (compareAndSetState(0, acquires)) {
10             // 表示获取资源成功,将当前线程设为占用线程
11             setExclusiveOwnerThread(current);
12             return true;
13         }
14     }
15     // 大于等于1表示当前资源被占用,判断当前线程是否为占用线程(可重入锁的情况)
16     else if (current == getExclusiveOwnerThread()) {
17         // 当前线程为占用线程,记录资源状态
18         int nextc = c + acquires
19         // 判断是否溢出
20         if (nextc < 0) // overflow
21             throw new Error("Maximum lock count exceeded");
22         // 设置state为新的资源状态
23         setState(nextc);
24         return true;
25     }
26     return false;
27 }

5.2.3 进入等待

线程B进入,继续调用addWaiter()方法将当前线程加入等待队列,查看实现:

 1 private Node addWaiter(Node mode) {
 2     // 将当前线程和传入的独占模式封装为节点
 3     Node node = new Node(Thread.currentThread(), mode);
 4     // Try the fast path of enq; backup to full enq on failure
 5     Node pred = tail;
 6     // 尾节点不为空,表示CLH队列已经初始化,CAS操作将当前节点设为尾节点
 7     if (pred != null) {
 8         node.prev = pred;
 9         if (compareAndSetTail(pred, node)) {
10             pred.next = node;
11             return node;
12         }
13     }
14     // 尾节点为空,表示CLH队列还未初始化,初始化队列
15     enq(node);
16     return node;
17 }

因为线程B是第一个进入等待的线程,尾节点为空,继续查看enq()方法:

 1 private Node enq(final Node node) {
 2     for (;;) {
 3         Node t = tail;
 4         // 尾节点为空,通过CAS设置头节点和尾节点为空节点
 5         if (t == null) { // Must initialize
 6             if (compareAndSetHead(new Node()))
 7                 tail = head;
 8         } else {
 9             // 尾节点不为空,通过CAS将当前节点作为新的尾节点
10             node.prev = t;
11             if (compareAndSetTail(t, node)) {
12                 t.next = node;
13                 return t;
14             }
15         }
16     }
17 }

初始化CLH队列后,头节点为空节点,尾节点为当前节点。

5.2.4 阻塞线程

线程B得到当前节点后,作为参数传入acquireQueued()方法继续执行:

 1 final boolean acquireQueued(final Node node, int arg) {
 2     // 记录当前节点是否取消,默认为true,表示取消
 3     boolean failed = true;
 4     try {
 5         // 标记当前节点是否中断,默认为false,表示当前节点没有中断
 6         boolean interrupted = false
 7         // 自旋
 8         for (;;) {
 9             // 获取当前节点的上一节点
10             final Node p = node.predecessor();
11             // 如果当前节点是头节点,表示当前节点即将被唤醒,尝试抢占资源
12             if (p == head && tryAcquire(arg)) {
13                 // 将当前节点设为头节点,置空当前节点的上一节点,并取消当前节点同当前线程的绑定
14                 setHead(node);
15                 // 将原头节点的下一节点置空,方便GC回收
16                 p.next = null; // help GC
17                 // 标记当前节点为false,表示没有取消
18                 failed = false;
19                 // 返回false,表示当前节点没有中断
20                 return interrupted;
21             }
22             // 不管当前节点是不是头节点,执行到这里就表示获取资源失败,处理前置节点并阻塞当前节点
23             if (shouldParkAfterFailedAcquire(p, node) &&
24                 parkAndCheckInterrupt())
25                 // 标记为true,表示当前节点中断
26                 interrupted = true;
27         }
28     } finally {
29         // 当前节点如果被取消,执行取消操作
30         if (failed)
31             cancelAcquire(node);
32     }
33 }

因为线程B是第一个进入等待的线程,上一节点为头节点,尝试获取资源。获取成功则将当前节点作为头节点并移除当前线程,获取失败则进入判断。

在判断条件中调用shouldParkAfterFailedAcquire()方法,处理前置节点:

 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2     // 记录上一节点的等待状态
 3     int ws = pred.waitStatus;
 4     if (ws == Node.SIGNAL)
 5         // 如果上一节点的等待状态为-1,表示当前线程可以被阻塞,返回true,执行parkAndCheckInterrupt()方法
 6         return true;
 7     if (ws > 0) {
 8         // 如果上一节点的等待状态为1,表示上一节点被取消,循环移除被取消的上一节点
 9         do {
10             node.prev = pred = pred.prev;
11         } while (pred.waitStatus > 0);
12         pred.next = node;
13     } else {
14         // 上述条件不满足,表示上一节点的等待状态为0或者-3,通过CAS将等待状态设置为-1
15         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
16     }
17     // 返回false,跳过parkAndCheckInterrupt()方法,重新进入自旋
18     return false;
19 }

因为线程B是第一个进入等待的线程,上一节点为头节点,头节点为空节点,等待状态为0,所以两次进入此方法。

第一次进入shouldParkAfterFailedAcquire()方法将上一节点的等待状态设置为-1后返回false,条件判断为false重新进入自旋。

第二次进入shouldParkAfterFailedAcquire()方法检测到上一节点的等待状态为-1,返回true,继续判断parkAndCheckInterrupt()方法。

在判断条件中调用parkAndCheckInterrupt()方法,阻塞当前节点:

1 private final boolean parkAndCheckInterrupt() {
2     // 使用LockSupport的park()方法阻塞当前节点
3     LockSupport.park(this);
4     // 返回线程的中断状态
5     return Thread.interrupted();
6 }

线程B在此被阻塞。

5.3 线程C开始并阻塞

5.3.1 获取资源

线程C进入,调用lock()方法。

因为线程C是第三个获取资源的线程,线程A已经将state从0改为了1,所以使用compareAndSetState()方法设置失败,继续调用acquire()方法获取资源。

如果抢占资源成功,调用tryAcquire()方法返回true,判断条件结束,继续执行业务。

如果抢占资源失败,继续判断acquireQueued()方法返回。执行addWaiter()方法并传入参数表示使用独占模式将线程加入到等待队列。

5.3.2 抢占资源

线程C进入,继续调用acquire()方法获取资源,执行tryAcquire()方法。

继续调用非公平锁的nonfairTryAcquire()方法,返回false表示占用失败。

5.3.3 进入等待

线程C进入,继续调用addWaiter()方法将当前线程加入等待队列。

因为线程C是第二个进入等待的线程,线程B已经完成了队列初始化,尾节点不为空,将当前节点作为新的尾节点。

5.3.4 阻塞线程

线程C得到当前节点后,作为参数传入acquireQueued()方法继续执行。

因为线程C是第二个进入等待的线程,上一节点不为头节点,直接进入判断。

在判断条件中调用shouldParkAfterFailedAcquire()方法,处理前置节点,将B节点的等待状态设为-1,返回true,继续判断parkAndCheckInterrupt()方法。

在判断条件中调用parkAndCheckInterrupt()方法,阻塞当前节点。

线程C在此被阻塞。

5.4 线程A结束

5.4.1 解锁资源

线程A执行完毕,调用unlock()方法释放资源并唤醒线程,查看实现:

1 public void unlock() {
2     sync.release(1);
3 }

继续查看release()方法:

 1 public final boolean release(int arg) {
 2     // 调用tryRelease()方法尝试释放资源
 3     if (tryRelease(arg)) {
 4         // 获取头节点
 5         Node h = head;
 6         // 如果头节点不为空,并且等待状态不为0,表示需要唤醒其他线程
 7         if (h != null && h.waitStatus != 0)
 8             // 调用unparkSuccessor()方法并传入头节点,唤醒线程
 9             unparkSuccessor(h);
10         return true;
11     }
12     // 释放失败返回false
13     return false;
14 }

5.4.2 释放资源

继续查看tryRelease()方法:

 1 protected final boolean tryRelease(int releases) {
 2     // 记录资源状态
 3     int c = getState() - releases;
 4     // 如果当前线程不为占用线程则抛出异常
 5     if (Thread.currentThread() != getExclusiveOwnerThread())
 6         throw new IllegalMonitorStateException();
 7     // 标记资源空闲,默认为false
 8     boolean free = false
 9     // 资源状态为0则标记资源空闲为true,并将占用线程置空
10     if (c == 0) {
11         free = true;
12         setExclusiveOwnerThread(null);
13     }
14     // 设置资源状态
15     setState(c);
16     // 返回资源空闲
17     return free;
18 }

线程A释放资源并返回true,继续执行。

5.4.3 唤醒线程

因为线程B和线程C已经进入等待队列,所以头节点不为空,继续查看unparkSuccessor()方法:

 1 private void unparkSuccessor(Node node) {
 2     // 记录头节点的等待状态
 3     int ws = node.waitStatus;
 4     // 如果头节点的等待状态小于0,则将头节点的等待状态设为0
 5     if (ws < 0)
 6         compareAndSetWaitStatus(node, ws, 0);
 7     // 记录头节点的下一节点
 8     Node s = node.next;
 9     // 判断下一节点是否为空或者下一节点的等待状态是否大于0
10     if (s == null || s.waitStatus > 0) {
11         s = null;
12         // 遍历下一节点,找到不为空并且等待状态小于等于0的节点,将其设为下一节点
13         for (Node t = tail; t != null && t != node; t = t.prev)
14             if (t.waitStatus <= 0)
15                 s = t;
16     }
17     // 如果下一节点不为空,则使用LockSupport的unpark()方法唤醒下一节点中的线程
18     if (s != null)
19         LockSupport.unpark(s.thread);
20 }

头节点的下一节点为线程B所在的节点,线程B被唤醒。

5.5 线程B执行并结束

5.5.1 抢占资源

线程B在parkAndCheckInterrupt()方法中被释放后,返回中断状态为false,重新进入自旋:

 1 final boolean acquireQueued(final Node node, int arg) {
 2     // 记录当前节点是否取消,默认为true,表示取消
 3     boolean failed = true;
 4     try {
 5         // 标记当前节点是否中断,默认为false,表示当前节点没有中断
 6         boolean interrupted = false
 7         // 自旋
 8         for (;;) {
 9             // 获取当前节点的上一节点
10             final Node p = node.predecessor();
11             // 如果当前节点是头节点,表示当前节点即将被唤醒,尝试抢占资源
12             if (p == head && tryAcquire(arg)) {
13                 // 将当前节点设为头节点,置空当前节点的上一节点,并取消当前节点同当前线程的绑定
14                 setHead(node);
15                 // 将原头节点的下一节点置空,方便GC回收
16                 p.next = null; // help GC
17                 // 标记当前节点为false,表示没有取消
18                 failed = false;
19                 // 返回false,表示当前节点没有中断
20                 return interrupted;
21             }
22             // 不管当前节点是不是头节点,执行到这里就表示获取资源失败,处理前置节点并阻塞当前节点
23             if (shouldParkAfterFailedAcquire(p, node) &&
24                 parkAndCheckInterrupt())
25                 // 标记为true,表示当前节点中断
26                 interrupted = true;
27         }
28     } finally {
29         // 当前节点如果被取消,执行取消操作
30         if (failed)
31             cancelAcquire(node);
32     }
33 }

因为线程B的上一节点为头节点,进入tryAcquire()方法抢占资源,抢占成功返回true并将当前节点设为头节点,同时解除同线程B的绑定。

5.5.2 解锁资源

线程B执行完毕,调用unlock()方法释放资源并唤醒线程。

头节点的下一节点为线程C所在的节点,线程C被唤醒。

5.6 线程C执行并结束

5.6.1 抢占资源

线程C在parkAndCheckInterrupt()方法中被释放后,返回中断状态为false,重新进入自旋。

因为线程C的上一节点为头节点,进入tryAcquire()方法抢占资源,抢占成功返回true并将当前节点设为头节点,同时解除同线程C的绑定。

5.6.2 解锁资源

线程C执行完毕,调用unlock()方法释放资源并唤醒线程。

头节点的下一节点为空,不会有任何线程被唤醒。

6 公平锁与非公平锁

6.1 非公平锁

非公平锁的线程在获取资源时,会尝试获取资源,如果成功则立刻占用资源,如果失败则尝试占用资源。

在资源可用时不会判断当前队列是否有线程在等待,也就是说刚加入的线程可以同唤醒的线程竞争资源。

 1 final void lock() {
 2     if (compareAndSetState(0, 1))
 3         setExclusiveOwnerThread(Thread.currentThread());
 4     else
 5         acquire(1);
 6 }
 7 ...
 8 public final void acquire(int arg) {
 9     if (!tryAcquire(arg) &&
10         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
11         selfInterrupt();
12 }
13 ...
14 protected final boolean tryAcquire(int acquires) {
15     return nonfairTryAcquire(acquires);
16 }
17 ...
18 final boolean nonfairTryAcquire(int acquires) {
19     final Thread current = Thread.currentThread();
20     int c = getState();
21     if (c == 0) {
22         if (compareAndSetState(0, acquires)) {
23             setExclusiveOwnerThread(current);
24             return true;
25         }
26     }
27     else if (current == getExclusiveOwnerThread()) {
28         int nextc = c + acquires;
29         if (nextc < 0) // overflow
30             throw new Error("Maximum lock count exceeded");
31         setState(nextc);
32         return true;
33     }
34     return false;
35 }

6.2 公平锁

公平锁的线程在获取资源时,不会尝试获取资源,而是尝试占用资源。

在资源可用时会判断当前队列是否有线程在等待,刚加入的线程不可用同唤醒的线程竞争资源。

 1 final void lock() {
 2     acquire(1);
 3 }
 4 ...
 5 public final void acquire(int arg) {
 6     if (!tryAcquire(arg) &&
 7         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 8         selfInterrupt();
 9 }
10 ...
11 protected final boolean tryAcquire(int acquires) {
12     final Thread current = Thread.currentThread();
13     int c = getState();
14     if (c == 0) {
15         if (!hasQueuedPredecessors() &&
16             compareAndSetState(0, acquires)) {
17             setExclusiveOwnerThread(current);
18             return true;
19         }
20     }
21     else if (current == getExclusiveOwnerThread()) {
22         int nextc = c + acquires;
23         if (nextc < 0)
24             throw new Error("Maximum lock count exceeded");
25         setState(nextc);
26         return true;
27     }
28     return false;
29 }

6.3 分析hasQueuedPredecessors()方法

比较NonfairSync和FairSync中tryAcquire()方法的实现,发现FairSync中的tryAcquire()方法中多了一项判断:

1 !hasQueuedPredecessors()

hasQueuedPredecessors()方法用于公平锁加锁时判断等待队列中是否存在有效节点的方法。该方法用于判断当前节点前是否有其他节点排队:

返回false表示没有,取反后为true表示当前节点不需要排队,需要继续执行占用资源的操作。

返回true表示有,取反后为false表示当前节点需要排队,需要执行加入等待队列的操作。

查看AQS中定义的hasQueuedPredecessors()方法:

 1 public final boolean hasQueuedPredecessors() {
 2     // The correctness of this depends on head being initialized
 3     // before tail and on head.next being accurate if the current
 4     // thread is first in queue.
 5     Node t = tail; // Read fields in reverse initialization order
 6     Node h = head;
 7     Node s;
 8     return h != t &&
 9         ((s = h.next) == null || s.thread != Thread.currentThread());
10 }

判断h是否不等于t,如果不成立,说明h等于t,说明头节点和尾节点相同,说明当前队列未初始化(头节点和尾节点都是空节点)或者当前队列只有一个节点(头结点和尾节点都是空节点),说明不需要排队,返回false,取反后为true,尝试占用资源。

判断h是否不等于t,如果成立,说明h不等于t,说明存在两个不同节点。继续判断头节点的下一节点是否为空节点,如果成立,说明下一节点为空,可能上个线程在执行初始化enq()方法,刚刚通过CAS操作compareAndSetHead()将头节点初始化,尚未给尾节点赋值,此时头节点不为空,尾节点为空,并且头节点的下一节点为空,返回true,取反后为false,需要排队。

判断h是否不等于t,如果成立,说明h不等于t,说明存在两个不同节点。继续判断头节点的下一节点是否为空节点,如果不成立,说明下一节点不为空。继续判断下一节点封装的线程是否不等于当前线程,如果成立,说明下一线程不为当前线程,返回true,取反后为false,需要排队。

判断h是否不等于t,如果成立,说明h不等于t,说明存在两个不同节点。继续判断头节点的下一节点是否为空节点,如果不成立,说明下一节点不为空。继续判断下一节点封装的线程是否不等于当前线程,如果不成立,说明下一线程为当前线程,返回false,取反后为true,尝试占用资源。

7 自定义同步器

7.1 实现方法

不同的自定义同步器争用共享资源的方式也不同,自定义同步器在实现时只需要实现共享资源state的获取与释放即可,至于具体线程等待队列的维护(如获取资源失败入队和唤醒出队等),AQS已经在底层实现好了。

自定义同步器实现时主要实现以下几种方法:

isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。

tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。

tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。

tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。

tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

7.2 举例说明

7.2.1 ReentrantLock

以ReentrantLock为例,state初始化为0,表示未锁定状态。

当线程A调用lock()方法获取资源时,会调用tryAcquire()占用资源,并将state的值加1。

此后,其他线程再tryAcquire()时就会失败,直到线程A调用unlock()方法释放资源,并将state的值减0,其它线程才有机会获取该锁。

当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多少次,这样才能保证state是能回到零态的。

7.2.2 CountDownLatch

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(N与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后都会调用一次countDown()方法,使用CAS操作将state值减1。等到所有子线程都执行完后,state的值变为0,这时会调用unpark()方法唤醒主线程,然后主线程就会从await()方法唤醒,继续后余动作。

 

标签:012AQS,Node,节点,int,011Java,线程,发包,final,资源
来源: https://www.cnblogs.com/zhibiweilai/p/15381329.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有