ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java并发—辅助类CyclicBarrier

2022-04-25 16:35:16  阅读:219  来源: 互联网

标签:finish Java await 并发 线程 ready threadnum CyclicBarrier


一、概述

CyclicBarrier基于ReentrantLock和Condition等待唤醒的功能实现的,在构建CyclicBarrier时,会将count-1,操作count值是直接使用ReentrantLock来保证线程安全性,如果count不为0时,则添加condition队列中,如果等于0时,把节点从condition队列添加至aqs的队列中进行全部唤醒,并且将parties的值重新赋值为count的值来实现复用。

二、实现原理

2.1 构造函数

CyclicBarrier内部使用了ReentrantLockCondition两个类。它有两个构造函数:

//构造器1
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;//parties 指示计数器的初始值
    this.count = parties;
    this.barrierCommand = barrierAction;//所以线程到达屏障后会执行一次
}

//构造器2
public CyclicBarrier(int parties) {
    this(parties, null);
}

CyclicBarrier有两个构造器,其中构造器1是它的核心构造器,在这里你可以指定本局游戏的参与者数量(要拦截的线程数)以及本局结束时要执行的任务,还可以看到计数器count的初始值被设置为parties

2.2 await

CyclicBarrier类最主要的功能就是使先到达屏障点的线程阻塞并等待后面的线程,其中它提供了两种等待的方法,分别是定时等待和非定时等待。

//非定时等待
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe);
    }
}

//定时等待
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

可以看到不管是定时等待还是非定时等待,它们都调用了dowait方法,只不过是传入的参数不同而已。下面我们就来看看dowait方法都做了些什么。

//核心等待方法
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
    //显示锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //表示当前代
        final Generation g = generation;
        //检查当前栅栏是否被打破
        if (g.broken) {
            throw new BrokenBarrierException();
        }
        //检查当前线程是否被中断
        if (Thread.interrupted()) {
            //如果当前线程被中断会做以下三件事
            //1.打破当前栅栏
            //2.唤醒拦截的所有线程
            //3.抛出中断异常
            breakBarrier();
            throw new InterruptedException();
        }
        //每次都将计数器的值减1
        int index = --count;
        //计数器的值减为0则需唤醒所有线程并转换到下一代
        if (index == 0) {
            boolean ranAction = false;
            try {
                //唤醒所有线程前先执行指定的任务
                final Runnable command = barrierCommand;
                if (command != null) {
                    command.run();
                }
                ranAction = true;
                //唤醒所有线程并转到下一代
                nextGeneration();
                return 0;
            } finally {
                //确保在任务未成功执行时能将所有线程唤醒
                if (!ranAction) {
                    breakBarrier();
                }
            }
        }
        
        //如果计数器不为0则执行此循环
        for (;;) {
            try {
                //根据传入的参数来决定是定时等待还是非定时等待
                if (!timed) {
                    trip.await();
                }else if (nanos > 0L) {
                    nanos = trip.awaitNanos(nanos);
                }
            } catch (InterruptedException ie) {
                //若当前线程在等待期间被中断则打破栅栏唤醒其他线程
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    //若在捕获中断异常前已经完成在栅栏上的等待, 则直接调用中断操作
                    Thread.currentThread().interrupt();
                }
            }
            //如果线程因为打破栅栏操作而被唤醒则抛出异常
            if (g.broken) {
                throw new BrokenBarrierException();
            }
            //如果线程因为换代操作而被唤醒则返回计数器的值
            if (g != generation) {
                return index;
            }
            //如果线程因为时间到了而被唤醒则打破栅栏并抛出异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
  }
}

上面贴出的代码中注释可以看到在dowait方法中每次都将count减1,减完后立马进行判断看看是否等于0,如果等于0的话就会先去执行之前指定好的任务,执行完之后再调用nextGeneration方法将栅栏转到下一代,在该方法中会将所有线程唤醒,将计数器的值重新设为parties,最后会重新设置栅栏代次,在执行完nextGeneration方法之后就意味着游戏进入下一局。如果计数器此时还不等于0的话就进入for循环,根据参数来决定是调用trip.awaitNanos(nanos)还是trip.await()方法,这两方法对应着定时和非定时等待。如果在等待过程中当前线程被中断就会执行breakBarrier方法,该方法叫做打破栅栏,意味着游戏在中途被掐断,设置generationbroken状态为true并唤醒所有线程。同时这也说明在等待过程中有一个线程被中断整盘游戏就结束,所有之前被阻塞的线程都会被唤醒。线程醒来后会执行下面三个判断,看看是否因为调用breakBarrier方法而被唤醒,如果是则抛出异常;看看是否是正常的换代操作而被唤醒,如果是则返回计数器的值;看看是否因为超时而被唤醒,如果是的话就调用breakBarrier打破栅栏并抛出异常。这里还需要注意的是,如果其中有一个线程因为等待超时而退出,那么整盘游戏也会结束,其他线程都会被唤醒。

2.3 nextGeneration和breakBarrier

下面贴出nextGeneration方法和breakBarrier方法的具体代码。

//切换栅栏到下一代
private void nextGeneration() {
    //唤醒条件队列所有线程
    trip.signalAll();
    //设置计数器的值为需要拦截的线程数
    count = parties;
    //重新设置栅栏代次
    generation = new Generation();
}
 
//打破当前栅栏
private void breakBarrier() {
    //将当前栅栏状态设置为打破
    generation.broken = true;
    //设置计数器的值为需要拦截的线程数
    count = parties;
    //唤醒所有线程
    trip.signalAll();
}

2.4 reset

重置栅栏

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

我们设想一下,如果初始化时,指定了线程parties = 4,前面有3个线程调用了await等待,在第4个线程调用await之前,我们调用reset方法,那么会发生什么?

首先,打破栅栏,那意味着所有等待的线程(3个等待的线程)会唤醒,await方法会通过抛出BrokenBarrierException异常返回。然后开启新的一代,重置了countgeneration,相当于一切归零了。

三、应用场景

CyclicBarrier 可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个 Excel 保存了用户所有银行流水,每个 Sheet 保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个 sheet 里的银行流水,都执行完之后,得到每个 sheet 的日均银行流水,最后,再用 barrierAction 用这些线程的计算结果,计算出整个 Excel 的日均银行流水。

public class CyclicBarrierExample {
  // 请求的数量
  private static final int threadCount = 550;
  // 需要同步的线程数量
  private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

  public static void main(String[] args) throws InterruptedException {
    // 创建线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(10);

    for (int i = 0; i < threadCount; i++) {
      final int threadNum = i;
      Thread.sleep(1000);
      threadPool.execute(() -> {
        try {
          test(threadNum);
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        } catch (BrokenBarrierException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      });
    }
    threadPool.shutdown();
  }

  public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
    System.out.println("threadnum:" + threadnum + "is ready");
    try {
      /**等待60秒,保证子线程完全执行结束*/
      cyclicBarrier.await(60, TimeUnit.SECONDS);
    } catch (Exception e) {
      System.out.println("-----CyclicBarrierException------");
    }
    System.out.println("threadnum:" + threadnum + "is finish");
  }

}

运行结果,如下:

threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
threadnum:4is finish
threadnum:0is finish
threadnum:1is finish
threadnum:2is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
threadnum:9is finish
threadnum:5is finish
threadnum:8is finish
threadnum:7is finish
threadnum:6is finish
......

可以看到当线程数量也就是请求数量达到我们定义的 5 个的时候, await() 方法之后的方法才被执行。

另外,CyclicBarrier 还提供一个更高级的构造函数 CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景。示例代码如下:

/**
 *
 * @author SnailClimb
 * @date 2018年10月1日
 * @Description: 新建 CyclicBarrier 的时候指定一个 Runnable
 */
public class CyclicBarrierExample3 {
  // 请求的数量
  private static final int threadCount = 550;
  // 需要同步的线程数量
  private static final CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
    System.out.println("------当线程数达到之后,优先执行------");
  });

  public static void main(String[] args) throws InterruptedException {
    // 创建线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(10);

    for (int i = 0; i < threadCount; i++) {
      final int threadNum = i;
      Thread.sleep(1000);
      threadPool.execute(() -> {
        try {
          test(threadNum);
        } catch (InterruptedException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        } catch (BrokenBarrierException e) {
          // TODO Auto-generated catch block
          e.printStackTrace();
        }
      });
    }
    threadPool.shutdown();
  }

  public static void test(int threadnum) throws InterruptedException, BrokenBarrierException {
    System.out.println("threadnum:" + threadnum + "is ready");
    cyclicBarrier.await();
    System.out.println("threadnum:" + threadnum + "is finish");
  }

}

运行结果,如下:

threadnum:0is ready
threadnum:1is ready
threadnum:2is ready
threadnum:3is ready
threadnum:4is ready
------当线程数达到之后,优先执行------
threadnum:4is finish
threadnum:0is finish
threadnum:2is finish
threadnum:1is finish
threadnum:3is finish
threadnum:5is ready
threadnum:6is ready
threadnum:7is ready
threadnum:8is ready
threadnum:9is ready
------当线程数达到之后,优先执行------
threadnum:9is finish
threadnum:5is finish
threadnum:6is finish
threadnum:8is finish
threadnum:7is finish
......

四、总结

当我们创建回环屏障对象时,传入的计数器值M,前M-1个线程调用await方法时,获得独占锁,串行话执行dowait方法,都将count递减1,并且将M-1个线程加入到trip的条件队列中去。当最后一个线程执行到await方法时,最终将count置为了0,同时唤醒trip条件队列中所有被阻塞的线程,使得所有的M个线程继续往下执行。

CyclicBarrier和CountDownLatch的区别

CountDownLatch基于AQS实现,countDown()方法利用casstate-1await方法让头节点一直在等待state0时,释放所有等待的线程。CyclicBarrier基于ReentrantLockCondition,自身维护countparties变量,每次调用awaitcount-1,并将线程加入到condition队列中,等到count为0时,将condition队列的节点移交到AQS队列中,并全部释放。CountDownLatch允许一个或多个线程一直等待,直到这些线程完成它们的操作,而CyclicBarrier则是当线程到达某状态后,暂停下来等待其他线程,等到所有线程均到达后,才继续执行。两者等待主体不同,CountDownLatch调用await()通常是主线程调用线程,而CyclicBarrier调用await()是在任务线程调用的,所以CyclicBarrier中的阻塞的是任务线程,主线程不受影响。

下面这个是国外一个大佬的回答:

CountDownLatch 是计数器,只能使用一次,而 CyclicBarrier 的计数器提供 reset 功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。我们来从 jdk 作者设计的目的来看,javadoc 是这么描述它们的:

CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;) CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。)

对于 CountDownLatch 来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。

CountDownLatch 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 CyclicBarrier 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。

标签:finish,Java,await,并发,线程,ready,threadnum,CyclicBarrier
来源: https://www.cnblogs.com/ciel717/p/16190780.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有