ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

CyclicBarrier使用详解及源码解读

2020-12-14 19:34:01  阅读:195  来源: 互联网

标签:Thread barrier await 源码 线程 CyclicBarrier public 详解


概述

当一组线程到达一个同步点(wait方法调用出)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被拦截的线程才会继续运行。
值得注意的是同步点有多个,当线程到达各自的同步点先会被阻塞,当都到达同步点,就会在各自的同步点处往下执行。
构造方法可以设置在所有线程都到达同步点之前执行另一个线程,wait()方法可以设置等待时间。

实例

package cyclicbarrier;

import java.util.concurrent.CyclicBarrier;

/**
 * 字面意思是可循环使用的屏障
 * 让一组线程到达时被阻塞,直到最后一个线程到达时才开门(简单的说就是人齐了就开门)
 * 每个线程调用await方法告诉CyclicBarrier我已到达并阻塞
 * 适用于多线程计算数据,最后合并计算结果的场景
 * 提供一个更加高级的构造函数用于处理复杂的业务场景
 * CyclicBarrier(int parties,Runnable barrierAction)
 * 在线程到达屏障时,优先执行barrierAction,便于执行更复杂的业务场景
 * 
 * countDownLatch调用的就一个await(),其他线程都在这个await处等待
 * cyclicbarrier各自线程都有一个await(),等待所有线程都达到await时,各自向下执行
 * @author bamboo
 *
 */
public class TestCyclicBarrier implements Runnable{
	
	public static void main(String[] args) {
		CyclicBarrier barrier = new CyclicBarrier(2,new TestCyclicBarrier());
		for(int i = 0;i < 2;i++) {     // 如果是3的话,barrier.await后的部分永远不会执行
			new ThreadA(barrier).start();
		}

		
	}

	@Override
	public void run() {
		System.out.println(Thread.currentThread().getName() + ": 本线程始终优先于同步点后的代码执行");
	}

}

package cyclicbarrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class ThreadA extends Thread{

	CyclicBarrier barrier;
	
	ThreadA(CyclicBarrier barrier){
		this.barrier = barrier;
	}
	
	public void run() {
		try {
			Thread.sleep(3000);
			System.out.println(Thread.currentThread().getName() + ": 我到了,等他们到齐");
			barrier.await();
			System.out.println(Thread.currentThread().getName() + ": 到齐了,走了走了");
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			e.printStackTrace();
		}
	}
}

结果:

Thread-0: 我到了,等他们到齐
Thread-1: 我到了,等他们到齐
Thread-1: 本线程始终优先于同步点后的代码执行
Thread-1: 到齐了,走了走了
Thread-0: 到齐了,走了走了

源码

先来看看内部,结构还是很简单的,就一个静态内部类,还有一些其他的参数。
主要还是关注构造方法,一个传整形,另一个额外需要runnable实现类,这个可以在CountDownLatch设置的同步点通过之前先执行。
await方法可以在指定数量线程都抵达之前都阻塞,或者设置一个等待时间,达到一定时间停止阻塞,继续向下执行。
reset
niceHot
构造函数

    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

wait

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

reset
打破当前已存在的屏障,使之前的同步点失效。

    /**
     * Resets the barrier to its initial state.  If any parties are
     * currently waiting at the barrier, they will return with a
     * {@link BrokenBarrierException}. Note that resets <em>after</em>
     * a breakage has occurred for other reasons can be complicated to
     * carry out; threads need to re-synchronize in some other way,
     * and choose one to perform the reset.  It may be preferable to
     * instead create a new barrier for subsequent use.
     */
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

使当前屏障这一系列线程停止并使他们清醒起来,仅当他们持有锁时生效。

    /**
     * Sets current barrier generation as broken and wakes up everyone.
     * Called only while holding lock.
     */
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    /**
     * Updates state on barrier trip and wakes up everyone.
     * Called only while holding lock.
     */
    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

标签:Thread,barrier,await,源码,线程,CyclicBarrier,public,详解
来源: https://blog.csdn.net/qq_40970962/article/details/111183034

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有