ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

【并发编程系列7】CountDownLatch,springboot书籍推荐零基础

2021-12-14 14:05:41  阅读:198  来源: 互联网

标签:await springboot 编程 屏障 线程 CountDownLatch new CyclicBarrier 方法


共享模式和独占模式在对象中表现出来的区别我们可以进入Node类看一下:

在这里插入图片描述

在这里插入图片描述

所以独占和共享模式构建的节点唯一区别就是共享节点中的nextWaiter不为空(另外还有Condition队列中的nextWaiter也不为空)。

这个方法中前面的一些逻辑AQS中分析过来,这里就不重复分析,这时候我们进来r>=0肯定是不成立的,所以会走到后面的线程挂起,挂起之后线程就阻塞了,那么阻塞了就一定需要被唤醒,所以我们猜测上文示例中的countDown()不但是将计数器减1,肯定还会有判断当减少到0的时候需要唤醒线程。

CountDownLatch#countDown()

调用之后进入CountDownLatch

调用的是sync类中的方法releaseShared(arg),注意这里固定传的是1,因为调用一次countDown()方法计数减1。

AQS#releaseShared(arg)

在这里插入图片描述

这里做了一个if判断,尝试是否可以释放,如果可以释放之后再执行释放,我们进入tryReleaseShared(arg)方法中一窥究竟。

CountDownLatch#tryReleaseShared(releases)

在这里插入图片描述

注意上面是一个死循环,只有两种情况可以跳出循环,一种就是当前state已经等于0,另一种就是CAS成功,也就是说减1成功。

如果返回false,就说明还需要阻塞等待其他线程;如果返回的是true,就会直接后面的doReleaseShared()方法。

AQS#doReleaseShared()

这个方法主要就是通过一个循环将head节点唤醒,因为中途可能会被其他线程唤醒了或者也可能加入了新节点,所以需要通过一个死循环来确保释放成功

在这里插入图片描述

回到AQS#doAcquireSharedInterruptibly(arg)

在这里插入图片描述

上面await()方法的线程阻塞在1014这个if条件这里,唤醒之后如果没有被中断过,那么会继续执行for循环,这时候r>=0肯定成立了,所以会进入setHeadAndPropagate(Node,int)方法,去依次传播所有需要唤醒的节点

AQS#setHeadAndPropagate(Node,int)

在这里插入图片描述

这里注意到参数中的Node是head节点的下一个节点,所以这里要做的是把第二个节点替换成Node节点,然后执行同一个方法doReleaseShared()方法去唤醒头节点,唤醒之后会回到上面的for循环,继续唤醒后一个节点,直到全部线程均被唤醒。

CyclicBarrier

==========================================================================

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一 组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会 开门,所有被屏障拦截的线程才会继续运行。CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景

CyclicBarrier使用示例1


package com.zwx.concurrent.jucUtil;

import java.util.concurrent.CyclicBarrier;

public class CyclicbarrierDemo {

static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

public static void main(String[] args) {

new Thread(()-> {

try {

cyclicBarrier.await();

} catch (Exception e) {

e.printStackTrace();

}

System.out.println(“我是线程t1”);

},“t1”).start();

new Thread(()-> {

try {

cyclicBarrier.await();

} catch (Exception e) {

e.printStackTrace();

}

System.out.println(“我是线程t2”);

},“t2”).start();

System.out.println(“主线程==end”);

}

}

输出结果:

在这里插入图片描述

这个t1和t2的输出结果是随机的。

CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrier- Action),用于在线程到达屏障时,优先执行barrierAction。

CyclicBarrier使用示例2


package com.zwx.concurrent.jucUtil;

import java.util.concurrent.CyclicBarrier;

public class CyclicbarrierDemo2 {

static CyclicBarrier cyclicBarrier = new CyclicBarrier(2,new MyThread());

public static void main(String[] args) {

new Thread(()-> {

try {

cyclicBarrier.await();

} catch (Exception e) {

e.printStackTrace();

}

System.out.println(“我是线程t1”);

},“t1”).start();

new Thread(()-> {

try {

cyclicBarrier.await();

} catch (Exception e) {

e.printStackTrace();

}

System.out.println(“我是线程t2”);

},“t2”).start();

System.out.println(“主线程==end”);

}

static class MyThread extends Thread {

@Override

public void run() {

System.out.println(“do something”);

}

}

}

输出结果:

在这里插入图片描述

我们可以看到,在线程t1和t2输出前会先输出自定义线程的信息。

CyclicBarrier实现原理


CyclicBarrier 相比 CountDownLatch 来说,要简单很多,源码实现是基于 ReentrantLock 和 Condition 的组合使用

CyclicBarrier源码分析


CyclicBarrier(parties)

进入CyclicBarrier默认构造器:

在这里插入图片描述

可以发现,最终其实还是调用的CyclicBarrier(int parties,Runnable barrier- Action)构造器:

在这里插入图片描述

注意了,构造CyclicBarrier对象时,初始化了多少个parties,则必须对应有parties个线程调用await()方法,否则线程不会往后执行。

CyclicBarrier#await()

在这里插入图片描述

调用了dowait(timed,nanos)方法,第一个参数false表示未设置超时时间,后面表示纳秒数,因为await还有另一个对应的方法带上超时时间:await(long,timeunit),这个方法中调用dowait(timed,nanos)方法时第一个参数就会是true,然后带上超时时间,表示到了设定时间之后线程就不会被阻塞,会继续往后执行。

CyclicBarrier#dowait()

/**

  • Main barrier code, covering the various policies.

  • 主要屏障代码,覆盖了各种策略

*/

private int dowait(boolean timed, long nanos)

throws InterruptedException, BrokenBarrierException,

TimeoutException {

final ReentrantLock lock = this.lock;//定义一个重入锁:private final ReentrantLock lock = new ReentrantLock();

lock.lock();

try {

//同一个屏障初始进来时属于同一代或者说一个周期,构建一个"代"(Generation)对象,同一个Generation表示同一代

final Generation g = generation;//Generation中设置了broke=false,表示屏障没有损坏

if (g.broken)//如果broken=true表示当前屏障被损坏了,抛出异常

throw new BrokenBarrierException();

if (Thread.interrupted()) {//如果线程被中断过

breakBarrier();//设置屏障为损坏状态并唤醒所有持有锁的线程

throw new InterruptedException();//抛出中断异常

}

int index = --count;//未调用await()方法的线程计数-1

if (index == 0) {//如果屏障数为0,(表示所有线程都到达await()方法)

boolean ranAction = false;

try {

final Runnable command = barrierCommand;

if (command != null)

command.run();//表示到达屏障之后,如果我们有设置barrierCommand,则优先执行

ranAction = true;

//执行到这里的时候,说明所有线程都到了await()方法,且设置的barrierCommand也已经执行完了

//接下来要做的事情就是换代(所以CyclicBarrier是通过换代的方式实现重新计数的)

//换代之后相当于进入一个新的周期,所有线程在后续中又可以通过await()阻塞一次

nextGeneration();

return 0;

} finally {

if (!ranAction)//如果ranAction = f

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》

【docs.qq.com/doc/DSmxTbFJ1cmN1R2dB】 完整内容开源分享

alse说明当前屏障还有流程没执行完,所以需要屏障设置会损坏状态

breakBarrier();

}

}

// loop until tripped, broken, interrupted, or timed out

//死循环等到count=0,调用breakBarrier方法(表示屏障有问题的场景),中断或者超时

for (;

标签:await,springboot,编程,屏障,线程,CountDownLatch,new,CyclicBarrier,方法
来源: https://blog.csdn.net/m0_64867896/article/details/121925800

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有