ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

【并发】锁

2022-01-15 02:31:46  阅读:21  来源: 互联网

标签:结点 同步 获取 队列 并发 int 线程


《Java并发编程的艺术》读书笔记

锁的作用

锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源。

Lock接口

在Lock接口出现之前,Java程序是靠synchronized关键字实现锁功能的,而Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功能,只是在使用时需要显式地获取和释放锁

Lock接口与synchronized关键字相比:

  • 缺点:缺少了隐式获取释放锁的便捷性
  • 优点:拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性

Lock接口提供的synchronized关键字所不具备的主要特性如下

特性 描述
尝试非阻塞地获取锁 当前线程尝试获取锁,如果这一时刻锁没有被其他线程获取到,则成功获取并持有锁
能被中断地获取锁 与synchronized不同,获取到锁的线程能够响应中断,当获取到锁的线程被中断时,中断异常将会被抛出,同时锁会被释放
超时获取锁 在指定的截止时间之前获取锁,如果截止时间到了仍旧无法获取锁,则返回

尝试非阻塞地获取锁:比如同步组件直接调用AQS::tryAcquire

能被中断地获取锁:比如同步组件通过调用AQS::acquireInterruptibly

超时获取锁:比如同步组件通过调用AQS::tryAcquireNanos

Lock是一个接口,它定义了锁获取和释放的基本操作,Lock的API如下

方法名称 描述
void lock() 获取锁,调用该方法当前线程将会获取锁,当锁获得后,从该方法返回
void lockInterruptibly() throws InterruptedException 可中断地获取锁,和lock()方法的不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程
boolean tryLock() 尝试非阻塞地获取锁,调用该方法后立刻返回,如果能够获取则返回true,否则返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException 超时地获取锁,当前线程在以下3种情况下会返回:①当前线程在超时时间内获得了锁 ②当前线程在超时时间内被中断 ③ 超时时间结束,返回false
void unlock() 释放锁
Condition newCondition() 获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,而调用后,当前线程释放锁

Lock接口的实现类基本都是通过聚合了一个同步器AbstractQueuedSynchronizer的子类来完成线程访问控制的。

AQS

AQS:AbstractQueuedSynchronizer,队列同步器,简称同步器。

AQS是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。模型如下

image-20211224103706680

AQS定义两种资源共享方式:

  • Exclusive(独占,同一时刻只有一个线程能执行,如ReentrantLock)。
  • Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。

接口

同步器的设计是基于模板方法模式的,也就是说,使用者需要写个类继承同步器并重写指定的方法(称为自定义同步器),随后将自定义同步器组合在同步组件的实现中,同步组件需要调用自定义同步器提供的模板方法,而这些模板方法将会调用自定义同步器重写的方法。

自定义同步器在重写指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态:

  • getState():获取当前同步状态。
  • setState(int newState):设置当前同步状态。
  • compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程同步队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryReleasetryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

同步器可重写的方法如下

方法名称 描述
protected boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态。成功则返回true,失败则返回false。
protected boolean tryRelease(int arg) 独占式释放同步状态,等待获取同步状态的线程将会有机会获取同步状态。成功则返回true,失败则返回false。
protected int tryAcquireShared(int arg) 共享式获取同步状态,负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
protected int tryReleaseShared(int arg) 共享式释放同步状态,如果释放后允许唤醒后续等待结点返回true,否则返回false。
protected boolean isHeldExclusively() 当前AQS是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占

实现自定义同步组件时,将会调用同步器AQS提供的模板方法,这些(部分)模板方法如下

方法名称 描述
void acquire(int arg) 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法
void acquireInterruptibly(int arg) 与acquire(int arg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException并返回
boolean tryAcquireNanos(int arg, long nanos) 在acquireInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false,如果获取到了返回true
void acquireShared(int arg) 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态
void acquireSharedInterruptibly(int arg) 与acquireShared(int arg)相同,该方法响应中断
boolean tryAcquireSharedNanos(int arg, long nanos) 在acquireSharedInterruptibly(int arg)基础上增加了超时限制
boolean release(int arg) 独占式的释放同步状态,放方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒
boolean releaseShared(int arg) 共享式的释放同步状态
Collection getQueuedThreads() 获取等待在同步队列上的线程集合

同步器提供的模板方法基本上分为3类:

  1. 独占式获取与释放同步状态
  2. 共享式获取与释放同步状态
  3. 查询同步队列中的等待线程情况

自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。

实现分析

接下来分析同步器是如何完成线程同步的,主要包括:

  1. 同步队列(核心数据结构)
  2. 独占式同步状态获取与释放(同步器的模板方法)
  3. 共享式同步状态获取与释放(同步器的模板方法)

同步队列

同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

结点Node包含的信息:获取同步状态失败的线程引用、等待状态、前驱和后继节点

    static final class Node {

        static final Node SHARED = new Node();
    
        static final Node EXCLUSIVE = null;
        
		// 表示当前结点已取消调度。由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列
		// 中取消等待,节点进入该状态将不会变化  
        static final int CANCELLED =  1;
      
        // 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继
        // 节点,使后继节点的线程得以运行。后继结点入队时,会将前继结点的状态更新为SIGNAL。
        static final int SIGNAL    = -1;
       
        // 节点在等待队列中,节点线程等待在Condition 上,当其他线程对Condition调用了signal()
        // 方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中
        static final int CONDITION = -2;

        // 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
        static final int PROPAGATE = -3;

        // 等待状态,初始值为 INITIAL(值为0)
        volatile int waitStatus;

        // 前驱节点,当节点接入同步队列时被设置(尾部添加)
        volatile Node prev;

        // 后继结点
        volatile Node next;

        // 获取同步状态的线程
        volatile Thread thread;

        // 等待队列中的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARED常量,也就是说
        // 节点类型(独占和共享)和等待队列中的后继节点共用同一个字段
        Node nextWaiter;
		...
    }

waitStatus意为等待状态,它的取值可以是CANCELLEDSIGNALCONDITIONPROPAGATEINITIAL。注意,负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常

Node是构成同步队列的基础,它是同步器AQS的静态内部类,同步器拥有首节点head和尾节点tail没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的基本结构如下

image-20211212011905425

没有成功获取同步状态的线程作为节点加入队列尾部的时候,需要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Nodeupdate),只有设置成功后,当前节点才正式与之前的尾节点建立关联。

同步器将节点加入到同步队列的过程如下

image-20211212012605325

同步队列遵循FIFO首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点,该过程如下

image-20211212012444961

设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可。

独占式同步状态获取与释放

获取

acquire(int arg)方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入同步队列,直到获取到资源为止,且整个过程忽略中断的影响,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。

acquire(int arg)方法如下

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现

acquireQueued方法中,如果线程在等待过程中被中断过,它是不响应的,只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

tryAcquire方法如下

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

此方法需要自定义同步器实现,该方法尝试去获取独占资源,如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,但不只限于tryLock()。

Node.EXCLUSIVENode的静态成员变量

    static final class Node {
    	...
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;    
        ...    
    }

addWaiter方法如下

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 快速尝试在尾部添加
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // CAS操作将结点添加到尾部,成功则直接返回node
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 快速尝试失败,调用enq方法
        enq(node);
        return node;
    }

此方法会创建结点并且将其加入同步队列的队尾中,结点包含的信息有 当前线程 和 模式modemode有两种可能的取值:

  • Node.EXCLUSIVE:独占式
  • Node.SHARED:共享式

在这里是Node.EXCLUSIVE

enq方法内部是一个死循环,循环内部不断使用CAS操作尝试将结点添加到同步队列的尾部,直到添加成功

    private Node enq(final Node node) {
        // CAS自旋,直到成功加入队尾
        for (;;) {
            // t指向原来队列的尾部
            Node t = tail;
            // 原来的队列为空,那么就初始化,在队列头部插入一个结点
            if (t == null) { 
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 使用CAS操作尝试将node添加到队列的尾部,添加成功则返回node的前驱
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

这里使用了headtail指针,配合CAS操作确保节点能够被线程安全添加。不使用LinkedList来维护节点之间的关系,是因为多个线程并发地将节点添加到LinkedList中,难以保证线程安全,可能导致节点的数量有偏差、顺序混乱

接着查看acquireQueued方法是如何获取同步状态的

    final boolean acquireQueued(final Node node, int arg) {
        // failed变量表示"同步状态是否获取失败"
        boolean failed = true;
        try {
            // interrupted变量表示是否被中断
            boolean interrupted = false;
            // 自旋操作
            for (;;) {
                // 拿到前驱结点
                final Node p = node.predecessor();
                // 如果前驱结点是头结点,便尝试去获取资源
                if (p == head && tryAcquire(arg)) {
                    // 拿到资源后,将node设置为头结点,该方法还会将node.prev设置为null
                    setHead(node);
                    // 将之前head的next设置为null,方便GC回收上一个head结点,这也意味着
                    // 上一个head结点出队了
                    p.next = null; // help GC
                    // 成功获取到资源
                    failed = false;
                    // 返回等待过程是否被中断过
                    return interrupted;
                }
                
                // shouldParkAfterFailedAcquire:若当前线程可以进入等待态则返回true
                // parkAndCheckInterrupt:让线程进入等待态,并且检查线程是否被中断,中断则返回true
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 只要线程等待过程中被中断过,就将interrupted标记为true
                    interrupted = true;
            }
        } finally {
            // 如果等待过程中没有成功获取资源,那么取消结点在队列中的等待。
            if (failed)
                cancelAcquire(node);
        }
    }

parkAndCheckInterrupt中会通过park让线程进入等待状态,等待的过程中,如果被其他结点唤醒,或者等待的线程被中断,线程就会从park中醒来,再次尝试获取资源。

看下面这个图

image-20211213020349858

由于非首节点线程前驱节点出队或者被中断而从等待状态返回,随后检查自己的前驱是否是头节点,如果是则尝试获取同步状态。可以看到节点和节点之间在循环检查的过程中基本不相互通信,而是简单地判断自己的前驱是否为头节点,这样就使得节点的释放规则符合FIFO,并且也便于对过早通知的处理(过早通知是指前驱节点不是头节点的线程由于中断而被唤醒)。

我们查看AQS::shouldParkAfterFailedAcquire方法:

	# node:正在获取同步状态的结点
    # pred:node的前驱结点    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 拿到前驱结点的状态
        int ws = pred.waitStatus;
        // 如果前驱的状态是SIGNAL,那么前驱结点在释放同步状态后会唤醒node结点,这意味node结点
        // 对应的线程可以安心休息了(进入等待态)。
        if (ws == Node.SIGNAL)
            return true;
        // ws > 0 表示前驱结点是CANCELLED,即前驱结点已经取消等待
        if (ws > 0) {
			// 一直往前找,直到找到一个距离node最近的处于正常等待状态的结点,并排在它的后边
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 前驱结点的等待状态正常,就将前驱的状态设置为SIGNAL,让它释放结点后通知node结点
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        // 表示还不能进入等待态
        return false;
    }

该方法主要用于检查前驱结点的状态,判断等待获取同步状态的线程是否可以进入等待态。只要前驱结点的状态不是SIGNAL,那么当前获取同步状态的线程就不能安心休息(等待态),需要去找个安心的休息点,同时可以再尝试下看自己有没有机会获取到同步资源。

如果线程找到安全休息点后,就会调用AQS::parkAndCheckInterrupt方法,进入等待态:

    private final boolean parkAndCheckInterrupt() {
        // 线程进入阻塞状态
        LockSupport.park(this);
        // 如果被唤醒(前驱结点唤醒 或 被中断),返回自己是否是因为中断被唤醒的
        return Thread.interrupted();
    }

线程进入等待状态后,有两种方式可以唤醒线程:1)、被unpark;2)、被中断。Thread.interrupted方法会清除当前线程的中断标记位。

acquireQueued方法小结:

  1. 结点进入队尾后,检查状态,找到安全休息点
  2. 调用park进入waiting状态,等待unparkinterrupt唤醒自己;
  3. 被唤醒后,判断自己是否可以获取共享资源。若可以,head指向当前结点,并返回从入队~获取到共享资源的整个过程中是否被中断过;若不可以获取共享资源,则继续流程1

独占式同步状态获取流程,也就是acquire(int arg)方法调用流程,如下图

image-20211213021250328

当同步状态获取成功之后,当前线程从acquire(int arg)方法返回,如果对于锁这种并发组件而言,代表着当前线程获取了锁。如果线程在等待过程中被中断过,它是不响应的,只是获取资源后才再进行自我中断selfInterrupt(),将中断补上

释放

当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,此方法是独占模式下线程释放共享资源的顶层入口,它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒其后继节点(进而使后继节点重新尝试获取同步状态),这也正是unlock的语义,当然不仅仅只限于unlock

AQS::release如下

    public final boolean release(int arg) {
        // 尝试独占式释放同步状态
        if (tryRelease(arg)) {
            // 找到头结点
            Node h = head;
            // 唤醒头节点的后继节点线程
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

该方法先调用tryRelease尝试释放资源,如果释放成功,就调用unparkSuccessor(Node node)方法,唤醒头节点的后继节点线程。

AQS::tryRelease方法:

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

该方法需要独占模式的自定义同步器去实现,在该方法中,会去释放指定量的资源,实现的时候让state减去相应的资源量(arg)。

要注意的是,AQS::release根据tryRelease方法的返回值判断该线程是否已经完成释放资源的工作,从而决定是否要唤醒后继结点,自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false

几个思考的点:

  1. 为什么自定义同步器的tryRelease方法,要state = 0才返回true?因为这是独占式的,同一时刻只有一个线程可以获取同步状态。

  2. 自定义同步器的tryRelease方法,释放资源就释放资源,为什么要加上"彻底"两个字?这主要是基于锁是可重入的考量,即已经获取锁的线程可以再次获取锁(可重入),若锁是可重入的,那么获取多少次锁就要释放多么次锁,这样才能保证state能回到零态(即彻底释放资源)。

AQS::unparkSuccessor方法用于唤醒同步队列中的下一个线程,如下:

    private void unparkSuccessor(Node node) {
        // node一般为当前线程所在的结点
        int ws = node.waitStatus;
        // 若当前状态为SIGNAL、CONDITION、PROPAGATE,则设置为INITIAL(0)
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 找到写下一个需要唤醒的结点
        Node s = node.next;
		// 下一个结点为null 或者 下一个结点已经取消(CANCELLED)
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从后向前找,找到位于最前面的有效的结点
            for (Node t = tail; t != null && t != node; t = t.prev)
                // t.waitStatus <= 0表示该节点还是有效的
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 唤醒该节点
            LockSupport.unpark(s.thread);
    }

简单来说,该方法就是用unpark方法唤醒同步队列中最前边的那个未放弃线程。我们将该方法与acquireQueuedshouldParkAfterFailedAcquire方法联系起来,假设head结点要唤醒后继的结点,那么在unparkSuccessor方法会找到位于最前面的有效的结点,也就是s结点,然后唤醒该结点。

image-20211226134502629

这时候s结点会在acquireQueued方法中,在for里面,判断p == head条件不成立,接着会走到shouldParkAfterFailedAcquire方法,在该方法中,会将s结点调整为headnext,如下

image-20211226134846219

shouldParkAfterFailedAcquire方法会返回false,这时候s结点在acquireQueued方法中会再进行一轮for循环,这时候p == head条件成立,然后s结点调用tryAcquire方法尝试去获取资源,只要获取成功,s结点就把自己设置为head结点,并将自己和原先的head结点断开引用,表示s结点成功获取到资源了,这时候独占式获取资源的acquire方法也返回了。s结点表示的线程拿到锁之后就可以做自己想做的事了。

总结

在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。持有同步状态的线程在释放同步状态时,最终会调用到AQSrelease(int arg)方法来释放同步状态,并根据情况判断是否要唤醒头节点的后继节点。

共享式同步状态获取与释放

获取

共享式获取与独占式获取最主要的区别在于:同一时刻能否有多个线程同时获取到同步状态。如果因获取同步状态失败而加入同步队列,在等待的过程中,也是忽略中断的,在资源获取成功后,如果有被中断过则进行补中断的操作。

例子:以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对于该文件的写操作均被阻塞,而读操作能够同时进行。写操作要求对资源的独占式访问,而读操作可以是共享式访问,两种不同的访问模式在同一时刻对文件或资源的访问情况,如下图

image-20211213150723548

左半部分,共享式访问资源时,其他共享式的访问均被允许,而独占式访问被阻塞,右半部分是独占式访问资源时,同一时刻其他访问均被阻塞

AQSacquireShared(int arg)方法可以共享式地获取同步状态,它是共享模式下线程获取共享资源的顶层入口,它会获取指定量的资源,获取成功则直接返回,获取失败则进入同步队列,直到获取到资源为止,整个过程忽略中断,该方法如下:

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

tryAcquireShared需要自定义同步器实现。acquireShared执行的大致流程如下:

  1. 调用tryAcquireShared方法尝试获取资源,成功则直接返回。

  2. 失败则通过doAcquireShared方法,线程进入同步队列,直到获取到资源为止才返回。

更具体的过程如下:

  1. 当线程调用acquireShared方法申请获取同步状态时,如果成功,则进入临界区。
  2. 当获取同步状态失败时,就会创建一个共享类型的节点并加入同步队列,开始自旋。
  3. 当同步队列中的等待线程被唤醒后尝试获取同步状态,如果成功则唤醒后面还在等待的共享节点并把唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,然后进入临界区,否则继续挂起等待。

tryAcquireShared方法如下:

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

tryAcquireShared方法由用户自己实现,在实现的时候需要注意两点:

  1. 该方法必须自己检查当前上下文是否支持获取共享锁,如果支持再进行获取

  2. 该方法有三种可能的返回值:

  • 负数:获取同步状态失败。
  • 零:获取同步状态成功,但没有剩余资源。
  • 正数:获取同步状态成功,还有剩余资源,其他线程还可以去获取。

AQS::doAcquireShared方法会将当前线程加入同步队列尾部休息,同样有两种方式可以唤醒线程:1)、被unpark;2)、被中断。自己成功拿到相应量的资源后才返回,该方法如下:

    /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        // 将结点以共享的模式加入同步队列
        final Node node = addWaiter(Node.SHARED);
        // failed变量表示"同步状态是否获取失败"
        boolean failed = true;
        try {
            // interrupted变量表示是否被中断
            boolean interrupted = false;
            for (;;) {
                // 获取前驱结点
                final Node p = node.predecessor();
                // 若 前驱结点是头结点 并且 成功获取同步状态,则返回
                if (p == head) {
                    // 尝试获取资源
                    int r = tryAcquireShared(arg);
                    // r >= 0说明同步状态获取成功
                    if (r >= 0) {
                        // 将head指向自己,还有剩余资源可以再唤醒之后的线程
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        // 如果等待过程中被打断过,此时将中断补上
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                
                // shouldParkAfterFailedAcquire:若当前线程应该阻塞则返回true
                // parkAndCheckInterrupt:阻塞操作,并且检查线程是否被中断,中断则返回true
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

该方法以共享、不可中断的模式去获取同步状态。该方法和共享式的acquireQueued很相似,主要有下面的几点不同:

  • 共享式将补中断的selfInterrupt操作放到doAcquireShared里面了,而独占模式是将补中断的selfInterrupt操作放到acquireQueued之外。
  • 共享式在获取资源后,如果还有剩余的资源,会唤醒后续线程,而独占式在获取资源后,由于是"独占式"的,所以没有唤醒后续线程的动作。

AQS::setHeadAndPropagate方法如下:

    # propagate:tryAcquireShared方法的返回值 
    private void setHeadAndPropagate(Node node, int propagate) {
        // 记录当前头结点
        Node h = head;
        // 设置node为head,并且node.prev = null
        // 这里是获取到锁之后的操作(通过tryAcquireShared方法已获取锁了),所以不需要并发控制
        setHead(node);

        // 有两种情况需要执行唤醒操作:
        // 1. propagate > 0 表示调用方指明了后继节点需要被唤醒
        // 2. 头节点后面的节点需要被唤醒(waitStatus<0)
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            // 如果没有后继节点或者后继节点为共享类型,则唤醒
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

此方法在setHead方法的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,因为这是共享模式。

doReleaseShared方法后面分析。

释放

AQS::releaseShared方法是共享模式下线程释放共享资源的顶层入口,它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒同步队列里的其他线程来获取资源,该方法如下:

	# param arg:可以表示释放的资源量(也可以表示其他的,自己定义)
    # return:取决于tryReleaseShared的返回值    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

简单来说,该方法就是释放掉资源后,唤醒后继。该方法与独占式释放同步状态的区别:

  • 独占模式下,由于独占可重入的特点,tryRelease需要完全释放掉资源(state=0)后,才会返回true去唤醒其他线程。
  • 共享模式实质就是控制一定量的线程并发执行。共享模式下,拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点,哪怕已有的资源不能满足后继结点的要求,也会让它去尝试获取同步状态。

共享模式的例子:例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行。

tryReleaseShared方法需要自定义同步器实现:

    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

AQS::doReleaseShared方法主要用于唤醒后继,它在两个地方会被调用到:

  • AQS::setHeadAndPropagate方法(结点获取资源后继续唤醒后面的结点)
  • AQS::releaseShared方法,某结点释放资源后,可能会调用该方法

该方法如下:

 	// 在共享模式下,释放同步状态,发送信号给后继结点并且确保其传播
    private void doReleaseShared() {
        for (;;) {
       		// 唤醒操作由头结点开始
            Node h = head;
            // 存在后继结点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                // 表示后继节点需要被唤醒
                if (ws == Node.SIGNAL) {
                    // 为了保证线程安全需要进行CAS操作
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 唤醒后继结点
                    unparkSuccessor(h);
                }
                // 如果后继节点暂时不需要唤醒,则把当前节点状态设置为PROPAGATE确保以后可以传递下去
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }

            if (h == head)                   // loop if head changed
                break;
        }
    }

总结

acquireShared方法获取同步状态的流程:

  1. 调用tryAcquireShared方法尝试获取同步状态,成功则直接返回。
  2. 失败则通过doAcquireShared进入同步队列park(自旋),直到被unpark/interrupt并成功获取到资源才返回。整个等待过程也是忽略中断的。

跟独占式获取同步状态的acquire方法的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继线程的操作(这才是共享嘛)

releaseShared方法释放同步状态的流程:

  1. 调用tryReleaseShared方法释放同步状态。
  2. 若成功释放同步状态且允许唤醒后续等待线程,则唤醒同步队列里的其他线程来获取资源。

小结

独占式和共享式获取同步状态的acquireacquireShared方法,线程在同步队列中都是忽略中断的

AQS也支持线程在等待过程中响应中断,它们是acquireInterruptibly/acquireSharedInterruptibly方法,它们的源码和acquire/acquireShared差不多,这里不再介绍。

实现一个同步组件

这里通过一个例子,来看AQS的简单应用。

下面的Mutex是一个不可重入的互斥锁(同步组件)实现,锁资源(AQS里的state)只有两种状态:0表示未锁定,1表示锁定。Mutex的核心源码如下:

class Mutex implements Lock, java.io.Serializable {
    // 自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 判断是否锁定状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 尝试获取资源,立即返回。成功则返回true,否则false。
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // 这里限定只能为1个量
            if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
                setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
                return true;
            }
            return false;
        }

        // 尝试释放资源,立即返回。成功则为true,否则false。
        protected boolean tryRelease(int releases) {
            assert releases == 1; // 限定为1个量
            if (getState() == 0)
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);//释放资源,放弃占有状态
            return true;
        }
    }

    // 同步组件的具体实现依赖于自定义同步器Sync
    private final Sync sync = new Sync();

    //lock<-->acquire。两者语义一样:获取资源,即便等待,直到成功才返回。
    public void lock() {
        sync.acquire(1);
    }

    //tryLock<-->tryAcquire。两者语义一样:尝试获取资源,要求立即返回。成功则为true,失败则为false。
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    //unlock<-->release。两者语义一样:释放资源。
    public void unlock() {
        sync.release(1);
    }

    //锁是否占有状态
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
}

同步组件会实现接口(例如Lock接口),对外提供服务接口的方法。同步组件一般会定义一个继承至AQS的自定义同步器作为内部类,供自己使用。同步组件对外提供的方法的实现依赖于自定义同步器(如Sync),而自定义同步器(如Sync)只用实现资源state的获取、释放,至于线程的排队、等待、唤醒等,上层的AQS都已经实现好了,我们不用关心。

同步组件的实现大多都是这种模式,不同的地方在于自定义同步器的tryAcquiretryRelease等方法中,是如何获取和释放资源的

重入锁

重入锁ReentrantLock:支持重进入的互斥(独占性)锁,它表示该锁能够支持一个线程对资源的重复加锁。

「实现一个同步组件」中的Mutex是不支持可重入的锁,当一个线程调用Mutexlock()方法获取锁之后,如果再次调用lock()方法,则该线程将会被自己所阻塞。

synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获得该锁,而不像Mutex由于获取了锁,而在下一次获取锁时出现阻塞自己的情况。

重入锁ReentrantLock支持获取锁时的公平和非公平性选择。如果在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是不公平的。公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。ReentrantLock提供了一个构造函数,能够控制锁是否是公平的。

  • 非公平锁
    • 优点:非公平锁使用时,产生的线程切换次数较少,所以非公平锁的效率往往比公平锁效率高,使用非公平锁时,系统会有更大的吞吐量。
    • 缺点:线程发生"饥饿"的概论较高。
  • 公平锁
    • 优点:保证了锁的获取按照FIFO原则,能够减少"饥饿"发生的概率,等待越久的请求越是能够得到优先满足。
    • 缺点:因为要保证公平地获取锁,所以线程切换的次数较多,效率较低,系统的吞吐量较小。

ReentrantLockstate初始值为0,表示未锁定状态。A线程lock时,会独占该锁并将state+1。此后,其他线程尝试获取锁就会失败,直到A线程unlockstate=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的

实现锁的可重入需要解决下面两个问题:

  1. 线程再次获取锁:需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取,而不是阻塞它。
  2. 锁的最终释放:线程重复n次获取了锁,前n-1次释放,其他线程不嫩获取锁,第n次释放该锁时,其他线程能够获取到该锁。

ReentrantLock类的结构

public class ReentrantLock implements Lock, java.io.Serializable {
	...
    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
		abstract void lock();
        
        final boolean nonfairTryAcquire(int acquires) {...}
    	protected final boolean tryRelease(int releases) {...}  
        protected final boolean isHeldExclusively() {...}
        ...
    }
    
    static final class NonfairSync extends Sync {
        ...
        final void lock() {...}
        protected final boolean tryAcquire(int acquires) {...}
    }    
    
    static final class FairSync extends Sync {
        ...
    	final void lock() {...}
        protected final boolean tryAcquire(int acquires) {...}        
    }
    
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    
    public void lock() {
        sync.lock();
    }
    ...
    public void unlock() {
        sync.release(1);
    }
}

ReentrantLock内部有个抽象的同步器SyncSync有两个子类实现:FairSyncNonfairSync,分别表示公平锁和非公平锁。在ReentrantLock的构造函数中会根据所给的构造参数,决定是使用公平锁还是非公平锁,默认是使用非公平锁。

非公平锁的实现

非公平锁:sync = NonfairSync

获取

调用ReentrantLock::lock,内部调用到sync::lock,即NonfairSync::lock

    final void lock() {
        // 当前没有线程获取锁,直接获取
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        // 当前有线程获取锁,则调用AQS的acquire方法
        else
            acquire(1);
    }
  1. 若当前没有其他线程获取锁(state = 0),该线程直接state = 1,并设置OwnerThread,该线程成功获取锁。
  2. 若当前有线程获取锁,则调用AQS::acquire(注意参数是1),有两种情况
  • 正持有锁的线程就是该线程,这时候要可重入
  • 正持有锁的线程是其他线程,这时候会阻塞

AQS::acquire会调用到NonfairSync::tryAcquire

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }

调用父类Sync::nonfairTryAcquire

    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // 该线程再次尝试获取锁,获取成功则返回true
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 若该线程是正持有锁的线程
        else if (current == getExclusiveOwnerThread()) {
            // 计算重入后的资源量是多少
            int nextc = c + acquires;
            // 重入次数太多,导致值溢出,抛出异常
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
			// 设置重入后的资源量
            setState(nextc);
            return true;
        }
        return false;
    }
  1. 若线程是持有锁的线程,则重入,返回true,接着AQS::acquire会返回,ReentrantLock::lock会返回,重入获取锁成功。
  2. 若线程非持有锁的线程,直接返回false,然后在AQS::acquire方法中阻塞。

释放

释放需要调用ReentrantLock::unlock

    public void unlock() {
        sync.release(1);
    }

调用到sync::release,也就是AQS::release,在AQS::release中会调用到子类的tryRelease,在抽象类Sync有实现,Sync::tryRelease如下:

    protected final boolean tryRelease(int releases) {
        // 计算释放资源后,state值会是多少
        int c = getState() - releases;
        // 若非持有锁的线程尝试释放锁,则抛异常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        // 表示是否彻底释放资源
        boolean free = false;
        // c==0说明彻底释放资源了,把OwnerThread设置为null
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        // 设置state值
        setState(c);
        return free;
    }
  1. 若非持有锁的线程尝试释放锁,则抛异常,也就是线程没有调用ReentrantLock::lock,直接调用ReentrantLock::unlock,抛出异常。
  2. 计算释放资源后state值是多少,若为0,说明彻底释放了(基于可重入的考量),设置OwnerThreadnull,随后会返回true
  3. 释放资源后state值不为0,说明没有彻底释放资源,只是设置下state值,然后返回false

Sync::tryRelease的返回值表示是否要唤醒后续等待的线程,若彻底释放了资源则返回true,唤醒后续等待的线程,若未彻底释放资源则返回false,暂时不唤醒后续等待的线程(前文介绍AQS::release时有相应的解释)。

公平锁的实现

公平锁:sync = FairSync()

获取

获取锁要调用ReentrantLock::lock,该方法会调用sync.lock(),即调用FairSync::lock

    final void lock() {
        acquire(1);
    }

FairSync::lock调用父类的AQS::acquireAQS::acquire方法里面会调用子类的tryAcquire方法,即FairSync::tryAcquire

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // state为0说明当前没有线程持有锁
        if (c == 0) {
            // 若同步队列中没有线程在等待,或者自己是等待最久的线程,就CAS尝试获取锁资源,获取成功
            // 则返回true
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 若该线程是正持有锁的线程,则进行锁的重入
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            // 重入次数太多,导致值溢出,抛异常
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
			// 计算重入后的资源量
            setState(nextc);
            return true;
        }
        return false;
    }

该方法与非公平锁NonfairSync获取锁资源使用Sync::nonfairTryAcquire方法类似,唯一的区别是:如果当前没有线程持有锁,会先调用AQS::hasQueuedPredecessors方法:

    /**
     * Queries whether any threads have been waiting to acquire longer
     * than the current thread.
     * ...
     * @return {@code true} if there is a queued thread preceding the
     *         current thread, and {@code false} if the current thread
     *         is at the head of the queue or the queue is empty
     * @since 1.7
     */
    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }     

它会判断同步队列中是否有等待获取锁的时间长于当前线程等待时间的线程,若有则返回true,否则返回false

FairSync::tryAcquire方法里,仅当 「当前没有线程持有锁(state = 0) 并且 没有等待时间更长的线程(hasQueuedPredecessors方法返回false)」,那么当前线程才会使用CAS尝试去获取锁。通过这种方式,保证锁的获取顺序符合请求的绝对时间顺序,也就是FIFO

释放

「公平锁释放锁的流程」和「非公平锁释放锁的流程」完全一样,不再赘述。

可重入的意义

ReentrantLocksynchronized都是互斥、可重入的。

  • 互斥:同一时刻最多只能有一个线程获取锁
  • 可重入:一个线程获取了某个锁,该线程在持有该锁的情况下可以再次获取该锁。

那么可重入有什么意义呢?比如有一个类似下面的方法:

Lock lock = new ReentrantLock();

public void fun(){
    lock.lock();
    操作...
    lock.unlock();    
}

出于某些原因,同一时刻只允许一个线程执行fun方法中的代码,通过lock的互斥性可以保证这一点,但是在fun方法中可能会有递归操作,也就是fun会调用自身,由于锁的可重入性,当前线程可以重复地获取该锁,避免了当前线程的阻塞(假如该锁是不可重入的,那么在重复获取该锁的时候,当前线程就会阻塞)。

读写锁

读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。

读写锁能够简化读写交互场景的编程方式。

读写锁能够保证数据的可见性。另外,当写锁被获取到时,后续(非当前写操作线程)的读写操作都会被阻塞。

一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。

Java并发包提供读写锁的实现是ReentrantReadWriteLock,它有如下特性:

特性 说明
公平性选择 支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平
重进入 该锁支持重进入,以读写线程为例:读线程在获取了读锁之后,能够再次获取读锁。而写线程在获取了写锁之后能够再次获取写锁,同时也可以获取读锁
锁降级 遵循获取写锁、获取读锁再释放写锁的次序,写锁能够降级成为读锁

ReentrantReadWriteLock结构

ReentrantReadWriteLock实现了ReadWriteLock接口

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {...}

ReadWriteLock定义了获取「读锁」和「写锁」的方法

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

ReentrantReadWriteLock整体结构如下:

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
	...
    // 读锁    
    private final ReentrantReadWriteLock.ReadLock readerLock;
    // 写锁
    private final ReentrantReadWriteLock.WriteLock writerLock;
    // 自定义同步器
    final Sync sync;
    
    public ReentrantReadWriteLock() {
        // 默认使用非公平锁
        this(false);
    }
    
    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }    
    
    public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
    public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
    	...
        protected final boolean tryRelease(int releases) {...}
        protected final boolean tryAcquire(int acquires) {...}
        protected final boolean tryReleaseShared(int unused) {...} 
        ...
        protected final int tryAcquireShared(int unused) {...}
        ...
        final boolean tryWriteLock() {...}
        final boolean tryReadLock() {...}
        protected final boolean isHeldExclusively() {...} 
        final ConditionObject newCondition() {...}     
        final int getReadLockCount() {...}
        final boolean isWriteLocked() {...}
        final int getWriteHoldCount() {...}
        final int getReadHoldCount() {...}
        private void readObject(java.io.ObjectInputStream s)
            throws java.io.IOException, ClassNotFoundException {...}
        final int getCount() { ... }        
    }
    
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }    
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }            
    }
    
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }
    
    public static class ReadLock implements Lock, java.io.Serializable {
        ...
    	private final Sync sync;
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        } 
        public void lock() {
            sync.acquireShared(1);
        }
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
        public boolean tryLock() {
            return sync.tryReadLock();
        }
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
        public void unlock() {
            sync.releaseShared(1);
        }
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }
        ...
    }
    
    public static class WriteLock implements Lock, java.io.Serializable {
    	...
        private final Sync sync;
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
        public void lock() {
            sync.acquire(1);
        }
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
        public boolean tryLock( ) {
            return sync.tryWriteLock();
        }
        public boolean tryLock(long timeout, TimeUnit unit)
                throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
        public void unlock() {
            sync.release(1);
        }
        public Condition newCondition() {
            return sync.newCondition();
        }
        ...
    }
    ...
}
  • FairSyncNonfairSync分别表示公平锁和非公平锁
  • ReadLock访问锁的方式是共享式,WriteLock访问锁的方式是独占式

读写锁的实现分析

主要包括:

  • 读写状态的设计
  • 写锁的获取与释放
  • 读锁的获取与释放
  • 锁降级

读写状态的设计

读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。

读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,如果在一个整型变量上维护多种状态,就一定需要"按位切割使用"这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写,划分方式如下

image-20220104102915645

当前同步状态表示一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁。

读写锁通过位运算可以确定读和写的状态,假设当前同步状态值为S:

  • 写状态:等于S&0x0000FFFF(将高16位全部抹去)
  • 读状态:等于S>>>16(无符号补0右移16位)

一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)大于0,即读锁已被获取。

写锁的获取与释放

写锁是一个支持重进入的排它锁

获取

获取写锁会调用WriteLock::lock

    public void lock() {
        sync.acquire(1);
    }

调用了Sync::acquire,也就是AQS::acquire,里面会调用到Sync::tryAcquire

    protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        // 获取同步状态
        int c = getState();
        // 计算出「写状态」的值
        int w = exclusiveCount(c);
        // 若锁正在被使用
        if (c != 0) {
            // (Note: if c != 0 and w == 0 then shared count != 0)
            // 1.c != 0 && w == 0 -> 读状态大于0,即读锁已被获取,导致写锁无法被获取,当前线程
            // 进入等待状态。
            // 2.c != 0 && w != 0 && current != getExclusiveOwnerThread() -> 有其它线程
            // 获取了写锁,当前线程进入等待状态。
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            // 若获取写锁后,写状态的值超过最大值,则抛异常
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            // Reentrant acquire
            // 写锁重进入,不需要CAS操作
            setState(c + acquires);
            return true;
        }
        // writerShouldBlock:表示当前尝试获取写锁的线程(current)是否应该阻塞
        // compareAndSetState:更新同步状态的值,并发环境下,通过CAS操作保证线程安全
        if (writerShouldBlock() ||
            !compareAndSetState(c, c + acquires))
            return false;
        setExclusiveOwnerThread(current);
        return true;
    }

该方法执行流程如下:

  1. 若当前没有线程获取锁,则会调用writerShouldBlock方法,该方法返回当前线程是否需要阻塞,若不需要阻塞,则通过CAS设置同步状态,若设置成功,则表示写锁获取成功,方法返回。

  2. 若当前有线程获取锁,下面几种情况只要满足一个,当前线程就无法成功获取写锁,会在同步队列中等待

    • 其它线程持有读锁和写锁
    • 当前线程(自身)持有读锁

    仅当是当前线程(自身)已经持有写锁,并且重进入获取写锁后写状态的值不会超过最大值(超过最大值则抛出异常),那么就执行重进入获取写锁的操作

为什么读锁被某个线程获取的情况下,写锁无法被获取呢

读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。

writerShouldBlockNonfairSyncFairSync均有实现。

NonfairSync::writerShouldBlock

    final boolean writerShouldBlock() {
        return false; // writers can always barge
    }

直接返回false,表示写线程不需要阻塞,体现出非公平的特性

FairSync::writerShouldBlock

    final boolean writerShouldBlock() {
        return hasQueuedPredecessors();
    }

调用了AQS::hasQueuedPredecessors:该方法会查询同步队列里是否有等待获取锁的时间长于当前线程的线程,若有则返回true,否则返回false(当前线程在队列的前面或者队列是空的)。

所以,FairSync::writerShouldBlock方法通过查看同步队列里面,是否有等待时间更长的线程,有则表示当前线程不能抢先获取,writerShouldBlock返回true表示当前线程应该阻塞,若同步队列里面没有等待时间更长的线程,writerShouldBlock返回false表示当前线程不需要阻塞,可以去获取锁。FairSync::writerShouldBlock体现出了公平的特性

释放

写锁的释放与ReentrantLock的释放过程基本类似

写锁释放会调用WriteLock::unlock

    public void unlock() {
        sync.release(1);
    }

调用sync::release,其实就是AQS::release,然后会调用到Sync::tryRelease

    protected final boolean tryRelease(int releases) {
        // 若当前线程不是持有写锁的线程,则抛异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        // 计算释放资源后,同步状态的值
        int nextc = getState() - releases;
        // 判断写状态是否完全释放(基于写锁可重入的考量)
        boolean free = exclusiveCount(nextc) == 0;
        // 若写状态完全释放,则把ExclusiveOwnerThread设置为null,并且方法最后会返回true
        if (free)
            setExclusiveOwnerThread(null);
        // 设置同步状态的值
        setState(nextc);
        // 返回的值表示是否要唤醒同步队列的后继结点
        return free;
    }

每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见

读锁的获取与释放

读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。

读状态是所有线程获取读锁次数的总和,而每个线程各自获取读锁的次数只能选择保存在ThreadLocal,由线程自身维护。

获取

获取读锁调用ReadLock::lock

    public void lock() {
        sync.acquireShared(1);
    }

调用了sync::acquireShared,也就是AQS::acquireShared,会调用到sync::tryAcquireShared

    protected final int tryAcquireShared(int unused) {
        Thread current = Thread.currentThread();
        int c = getState();
        // 写锁被其他线程持有,当前线程无法获取读锁
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return -1;
        // 获取读状态的值
        int r = sharedCount(c);
        // 通过CAS获取读锁
        if (!readerShouldBlock() &&
            r < MAX_COUNT &&
            compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            // 获取成功
            return 1;
        }
        return fullTryAcquireShared(current);
    }

tryAcquireShared方法中,如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁。

释放

读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的值是1<<16

锁降级

ReentrantReadWriteLock支持锁降级,锁降级指的是写锁降级成为读锁,过程为:线程获取写锁 -> 线程获取读锁 -> 线程释放写锁。

另外,ReentrantReadWriteLock不支持锁升级,也就是不支持「把持读锁、获取写锁,最后释放读锁」的过程,这一点从前面「写锁的获取」的源码分析中也可以得出,即当前线程(自身)持有读锁的时候再获取写锁,则当前线程就无法成功获取写锁,会在同步队列中等待。

问题1:为什么不支持锁升级?

为了保证数据可见性,如果读锁已被多个线程获取,假如现在又有一个线程获取到写锁,那么写锁对数据的更新对获取到读锁的线程是不可见的,因为此时获取读锁的线程的读操作是针对线程的工作内存的,它们根本无法意识到主内存更新数据了,这样就造成了读写不同步的问题。其实,从一个更简单的角度去看这个问题,锁升级会造成「同时有线程在读和写」的问题,从而造成「读写不同步」的问题,这显然是不被允许的。

问题2:锁降级有这个问题吗?

虽然锁降级也会造成「有线程获取了写锁的同时,又有线程获取了读锁」,但是注意获取到写锁的读锁的是同一个线程,而「问题1」的关键是「一个线程获取了写锁,另一个线程获取了读锁」。在「问题2」中,一个线程同时获取到了读锁和写锁并没有什么问题,因为一个线程只有一个工作内存,并不会造成「读写不同步」的问题,所以「锁降级」是被允许的

LockSupport工具

AQS内部使用到了LockSupport工具,实现对线程的阻塞。

LockSupport定义了一组的公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能。以park开头的方法用于阻塞当前线程,unpark(Thread thread)方法用于唤醒一个被阻塞的线程。这些方法如下:

image-20220109001018854

Java 6中,LockSupport增加了park(Object blocker)parkNanos(Object blocker,long nanos)parkUntil(Object blocker,long deadline) 3个方法,用于实现阻塞当前线程的功能,其中参数blocker是用来标识当前线程在等待的对象(以下称为阻塞对象),该对象主要用于问题排查和系统监控。

提供了blocker参数的park相关方法,在线程的dump结果中,可以传递给开发人员更多的现场信息,

Condition接口

Objectwait()、wait(long timeout)、notify()、notifyAll()方法,与synchronized同步关键字配合,可以实现等待/通知模式

Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

通过对比Object的监视器方法和Condition接口,可以更详细地了解Condition的特性,对比如下:

image-20220109013013426

示例与API

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock::newCondition方法)创建出来的,换句话说,Condition是依赖Lock对象的。

使用Condition前需要获取锁,简单示例如下:

    Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    public void conditionWait() throws InterruptedException {
        lock.lock();
        try {
            condition.await();
        } finally {
            lock.unlock();
        }
    }

    public void conditionSignal() throws InterruptedException {
        lock.lock();
        try {
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。

Condition定义的部分方法如下:

image-20220109151113480

实现分析

ConditionObject是同步器AQS的内部类,它是Condition的实现。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。

下面将分析Condition的实现,主要包括:

  • 等待队列
  • 等待和通知

等待队列

等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。

事实上,节点的定义复用了AQS中节点的定义,也就是说,同步队列和等待队列中节点类型都是AQS的静态内部类AQS.Node

一个ConditionObject包含一个等待队列,ConditionObject拥有首节点firstWaiter和尾节点lastWaiter。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列,等待队列的基本结构如下

image-20220110134544594

Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的

Object的监视器模型上,一个对象拥有一个同步队列(即synchronized的那个队列)和等待队列(即Object.await的那个队列),而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列,如下:

image-20220110135134740

ConditionObjectAQS的内部类,因此每个ConditionObject实例都能够访问同步器提供的方法,相当于每个ConditionObject都拥有所属同步器的引用。

等待

调用Conditionawait系列的方法,会使当前线程进入Condition等待队列并释放锁,同时线程状态变为等待状态。当从await方法返回时,当前线程一定获取了Condition相关联的锁。

如果从队列(同步队列和等待队列)的角度看await方法,当调用await方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。如下:

image-20220110192959441

ConditionObject::await

    public final void await() throws InterruptedException {
        // 若线程已被中断,则抛异常
        if (Thread.interrupted())
            throw new InterruptedException();
        // 将当前线程构造为结点Node,加入等待队列的尾部,并获取该结点
        Node node = addConditionWaiter();
        // 释放同步状态,也就是释放锁
        int savedState = fullyRelease(node);  
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

该方法主要做的事情:

  1. addConditionWaiter:将当前线程构造成节点并加入等待队列中。
  2. fullyRelease:释放同步状态,唤醒同步队列中的后继节点。
  3. while循环:判断条件是结点node是否在同步队列上,刚加入到等待队列的时候肯定不在同步队列上,因此会被park,也就是线程会进入等待状态。
  4. 被唤醒时:由后面的「通知」的signal方法可知,当结点被通知(signal)的时候,该结点会从「等待队列」转移到同步队列(暂时不考虑中断),再次进入while循环条件判断,因为此时结点在同步队列中,所以不会进入while里面的语句,会通过acquireQueued方法加入到「获取同步状态的竞争」中。

成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁。

  1. 等待时被中断:如果等待线程不是由其他线程调用Condition.signal方法唤醒,而是被中断,则会抛出InterruptedException异常。

通知

调用Condition.signal方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中

ConditionObject::signal

    public final void signal() {
        // 检测当前线程是否获取了锁,没有则抛出异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        // 获取等待队列的首节点
        Node first = firstWaiter;
        if (first != null)
            // 将等待队列中的首节点移动到同步队列
            doSignal(first);
    }

ConditionObject::doSignal

    private void doSignal(Node first) {
        do {
            // 将first的后继结点设置为Condition等待队列的头结点
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
            // 将节点first移动到等待锁的同步队列中
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }

AQS::transferForSignal

    final boolean transferForSignal(Node node) {
        // 通过CAS设置结点的状态
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        // 使用CAS操作将node结点添加到锁的同步队列的尾部,并获取node的前驱
        Node p = enq(node);
        int ws = p.waitStatus;
        // 将前驱结点的状态设置为Node.SIGNAL(提示它要记得唤醒后面的结点)
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

所以,ConditionObject::signal就是将Condition等待队列的首节点转移到了锁的同步队列的尾部。图示:

image-20220111162552304

ConditionsignalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,加入到「获取同步状态的竞争」中。

参考

  1. Java并发之AQS详解 - waterystone - 博客园
  2. 《Java并发编程的艺术》

标签:结点,同步,获取,队列,并发,int,线程
来源: https://www.cnblogs.com/giagor/p/15806026.html

专注分享技术,共同学习,共同进步。侵权联系[admin#icode9.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有