ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

【Java 并发编程】— AQS 源码探索之共享式

2022-06-08 18:31:10  阅读:224  来源: 互联网

标签:Node Java AQS int 获取 源码 arg 共享 节点


【Java 并发编程】——AQS 源码探索之独占式一文中从源码详细介绍了 AQS 独占式的实现方式。本文将介绍 AQS 的共享式,顾名思义,共享式就是允许多个线程同时访问同一个资源。

共享式实例

在独占式中,AQS 中的状态用来表示可获取或者已独占(比如 0 表示可获取,1 表示已被占用)。共享式中,状态已不再是具体数值,而是一个范围:大于等于 0 表示可获取,小于 0 表示已被占满。

下面是一个自定义共享式同步工具类 TwinsLock,同一时刻最多允许两个线程访问:


public class TwinsLock implements Lock {

    private Sync sync = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer {
        private Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must larger than 0");
            }
            // 初始化 state 的值,表示可同时访问的线程数量
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int reduceCount) {
            for(;;) {
                // 获取当前
                int currentCount = getState();
                // 计算剩余可用数量
                int newCount = currentCount - reduceCount;
                if (newCount < 0 || compareAndSetState(currentCount, newCount)) {
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int returnCount) {
            for(;;) {
                int currentCount = getState();
                int newCount = currentCount + returnCount;
                if (compareAndSetState(currentCount, newCount)) {
                    return true;
                }
            }
        }
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }
    
    // 省略其他方法
}

按照惯例,定义静态内部类实现 AbstractQueuedSynchronizer,共享式需要重写 tryAcquireShared() 和 tryReleaseShared() 方法。TwinsLock 类的作用是同时允许两个线程通过,其他线程需要等待。获取和释放的具体逻辑可以看上面代码注释,使用方式如下


TwinsLock lock = new TwinsLock();
lock.lock();
try {
    // do sth...
} finaly {
    lock.unlock();
}

使用方式上和 ReentrantLock 一毛一样有木有。

获取

接下来分析获取的流程,lock.lock() 调用的是 sync.acquireShared() 方法


public final void acquireShared(int arg) {
    // tryAcquireShared 需要子类重写
    if (tryAcquireShared(arg) < 0)
        // 获取失败后调用
        doAcquireShared(arg);
}

/**
 * 共享模式获取,不响应中断
 */
private void doAcquireShared(int arg) {
    // 入队
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取前驱节点
            final Node p = node.predecessor();
            // 为头节点
            if (p == head) {
                // 重新获取
                int r = tryAcquireShared(arg);
                // 获取成功
                if (r >= 0) {
                    // 设置头节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    // 如果检测到中断,就中断自己
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            
            // 判断是否应该阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                // 阻塞并检查中断
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

共享式获取失败后的操作 doAcquireShared() 和独占式中操作很相似,获取的前提是前驱节点必须是头节点,否则进行阻塞。获取成功后执行 setHeadAndPropagate() 方法,并检查中断,如果需要中断,那就中断当前线程,最后返回。看看 setHeadAndPropagate 方法


private void setHeadAndPropagate(Node node, int propagate) {
    // 把获取成功的节点设置为头头节点
    Node h = head; // Record old head for check below
    setHead(node);
    
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        // 获取下一个节点
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

重点瞅瞅 doReleaseShared()


/**
 * Release action for shared mode -- signal successor and ensure
 * propagation. (Note: For exclusive mode, release just amounts
 * to calling unparkSuccessor of head if it needs signal.)
 *
 * 共享模式下的唤醒动作 -- 唤醒后继者并保证这种方式传播下去
 */
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 1. h = null,队列还没初始化
        // 2. h == tail,队列刚初始化,就一个头节点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 头节点的状态为 SIGNAL
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒后继节点
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果头节点没有发生变化,退出循环
        if (h == head)                   // loop if head changed
            break;
    }
}

这个方法是将头节点的的状态先从 SIGNAL 改为 0,再从 0 改为 PROPAGATE,至于为什么不一步到位,可以看看 unparkSuccessor() 中的代码


if (ws < 0)
    compareAndSetWaitStatus(node, ws, 0);

这里将状态小于 0 的改成 0,如果 doReleaseShared() 方法直接将头节点的状态改为 PROPAGATE,那这里相当于做了一次无用功。如果改变状态失败,说明头节点被改变了,那么进行下一次循环,重新获取头节点。从if (h == head) {break;}可以知道每次只会唤醒头节点的后继节点。

释放

释放代码


public final boolean releaseShared(int arg) {
    // tryReleaseShared() 由子类实现
    if (tryReleaseShared(arg)) {
        // 释放操作,分析如上
        doReleaseShared();
        return true;
    }
    return false;
}

共享式和独占式区别

状态

独占式中状态表示可获取已占用,比如 0 表示可以获取,获取成功后将状态改为 1,这种改变通过 CAS 实现,代码如下


if (compareAndSetState(0, 1)) {
    // 获取成功
}

释放的时候将状态改为 0 即可

setState(0);

而共享式中状态一般用来表示的可用许可数量,当许可大于或等于 0 表示允许获取,每次获取成功后减掉指定许可数量并改变状态,直到状态小于 0 表示不可获取


for(;;) {
    int currentCount = getState();
    int newCount = currentCount - reduceCount;
    if (newCount < 0 || compareAndSetState(currentCount, newCount)) {
        return newCount;
    }
}

可以看到,不管是独占式还是共享式,核心还是状态的改变。

唤醒

独占式中,队列中阻塞线程需要前驱节点唤醒,而只有前驱节点在释放操作是才会去唤醒。

而共享式中,除了释放的时候唤醒,重新获取成功的时候也会去唤醒后继节点。

入队

独占式中获取代码如下


public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

代码中可以知道,如果线程获取同步状态失败,那么就将加入队列尾部。

而共享式则不同,


public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

共享式中获取失败就直接返回,不会再加入队列。因为共享式一般用来允许指定数量的线程同时访问共享资源,当同步状态小于 0,则表示访问的线程数已达上限,后来的线程只能拒之门外。

标签:Node,Java,AQS,int,获取,源码,arg,共享,节点
来源: https://www.cnblogs.com/tailife/p/16356739.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有