ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

JUC 中 4 个常用的并发工具类

2022-01-17 10:01:32  阅读:173  来源: 互联网

标签:count JUC 常用 CyclicBarrier thread 并发 线程 执行 pool


JUC就是java.util.concurrent包,这个包俗称JUC,里面都是解决并发问题的一些东西。

该包的位置位于java下面的rt.jar包下面

4大常用并发工具类:

CountDownLatch

CountDownLatch是我目前使用比较多的类,CountDownLatch初始化时会给定一个计数,然后每次调用countDown() 计数减1,

当计数未到达0之前调用await() 方法会阻塞直到计数减到0;

使用场景:多用于划分任务由多个线程执行,例如:最近写个豆瓣爬虫,需要爬取每个电影的前五页短评,可以划分成五个线程来处理数据。通过latch.await()保证全部完成再返回。

    public void latch() throws InterruptedException {
        int count= 5;
        CountDownLatch latch = new CountDownLatch(count);
        for (int x=0;x<count;x++){
            new Worker(x*20,latch).start();
        }
        latch.await();
        System.out.println("全部执行完毕");
    }
    
    class Worker extends Thread{
        Integer start;
        CountDownLatch latch;
        public Worker(Integer start,CountDownLatch latch){
            this.start=start;
            this.latch=latch;
        }        @Override
        public void run() {
            System.out.println(start+" 已执行");
            latch.countDown();
        }
    }

输出如下:

20 已执行
0 已执行
40 已执行
60 已执行
80 已执行
全部执行完毕

CyclicBarrier

它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)也就是阻塞在调用cyclicBarrier.await()的地方。

看上去CyclicBarrier 跟CountDownLatch 功能上类似,java培训在官方doc上CountDownLatch的描述上就说明了,CountDownLatch 的计数无法被重置,

如果需要重置计数,请考虑使用CyclicBarrier。

CyclicBarrier初始时还可添加一个Runnable的参数, 此Runnable在CyclicBarrier的数目达到后,所有其它线程被唤醒前被最后一个进入 CyclicBarrier 的线程执行

使用场景:类似CyclicBarrier,但是 CyclicBarrier提供了几个countdownlatch 没有的方法以应付更复杂的场景,例如:

getNumberWaiting() 获取阻塞线程数量,

isBroken() 用来知道阻塞的线程是否被中断等方法。

reset() 将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个 BrokenBarrierException。

    public void latch() throws InterruptedException {
        int count = 5;
        CyclicBarrier cb = new CyclicBarrier(count, new Runnable() {
            @Override
            public void run() {
                System.out.println("全部执行完毕");
            }
        });
        ExecutorService executorService = Executors.newFixedThreadPool(count);
        while (true){
            for (int x=0;x<count;x++){
                executorService.execute(new Worker(x,cb));
            }
        }
    }    
    
    class Worker extends Thread {
        Integer start;
        CyclicBarrier cyclicBarrier;        public Worker(Integer start, CyclicBarrier cyclicBarrier) {
            this.start = start;
            this.cyclicBarrier = cyclicBarrier;
        }        @Override
        public void run() {
            System.out.println(start + " 已执行");
            try {
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }

输出如下:

0 已执行
3 已执行
4 已执行
2 已执行
1 已执行
全部执行完毕
0 已执行
1 已执行
2 已执行
3 已执行
4 已执行
全部执行完毕

Semaphore

Semaphore 信号量维护了一个许可集,每次使用时执行acquire()从Semaphore获取许可,如果没有则会阻塞,每次使用完执行release()释放许可。北京java培训

使用场景:Semaphore对用于对资源的控制,比如数据连接有限,使用Semaphore限制访问数据库的线程数。

    public void latch() throws InterruptedException, IOException {
        int count = 5;
        Semaphore semaphore = new Semaphore(1);
        ExecutorService executorService = Executors.newFixedThreadPool(count);
            for (int x=0;x<count;x++){
                executorService.execute(new Worker(x,semaphore));
            }
        System.in.read();
    }    
    
    class Worker extends Thread {
        Integer start;
        Semaphore semaphore;        public Worker(Integer start, Semaphore semaphore) {
            this.start = start;
            this.semaphore = semaphore;
        }        @Override
        public void run() throws IllegalArgumentException {
            try {
                System.out.println(start + " 准备执行");
                TimeUnit.SECONDS.sleep(1);
                semaphore.acquire();
                System.out.println(start + " 已经执行");
                semaphore.release();
                System.out.println(start + " 已经释放");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }        }
    }

输出如下:

0 准备执行
2 准备执行
1 准备执行
3 准备执行
4 准备执行
2 已经执行
2 已经释放
4 已经执行
4 已经释放
1 已经执行
1 已经释放
0 已经执行
0 已经释放
3 已经执行
3 已经释放

Exchanger

Exchanger 用于两个线程间的数据交换,它提供一个同步点,在这个同步点两个线程可以交换彼此的数据。

使用场景:两个线程相互等待处理结果并进行数据传递。

    public void latch() throws InterruptedException, IOException {
        int count = 5;
        Exchanger<String> exchanger = new Exchanger<>();
        ExecutorService executorService = Executors.newFixedThreadPool(count);
            for (int x=0;x<count;x++){
                executorService.execute(new Worker(x,exchanger));
            }
        System.in.read();
    }    
    
    class Worker extends Thread {
        Integer start;
        Exchanger<String>  exchanger;        public Worker(Integer start, Exchanger<String> exchanger) {
            this.start = start;
            this.exchanger = exchanger;
        }        @Override
        public void run() throws IllegalArgumentException {
            try {
                System.out.println(Thread.currentThread().getName() + " 准备执行");
                TimeUnit.SECONDS.sleep(start);
                System.out.println(Thread.currentThread().getName() + " 等待交换");
                String value = exchanger.exchange(Thread.currentThread().getName());
                System.out.println(Thread.currentThread().getName() + " 交换得到数据为:"+value);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }        }
    }

输出如下:

pool-1-thread-1 准备执行
pool-1-thread-1 等待交换
pool-1-thread-3 准备执行
pool-1-thread-2 准备执行
pool-1-thread-5 准备执行
pool-1-thread-4 准备执行
pool-1-thread-2 等待交换
pool-1-thread-1 交换得到数据为:pool-1-thread-2
pool-1-thread-2 交换得到数据为:pool-1-thread-1
pool-1-thread-3 等待交换
pool-1-thread-4 等待交换
pool-1-thread-4 交换得到数据为:pool-1-thread-3
pool-1-thread-3 交换得到数据为:pool-1-thread-4
pool-1-thread-5 等待交换

Exchanger必须成对出现,否则会像上面代码执行结果那样,pool-1-thread-5一直阻塞等待与其交换数据的线程,为了避免这一现象,可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时长。

标签:count,JUC,常用,CyclicBarrier,thread,并发,线程,执行,pool
来源: https://www.cnblogs.com/msjhw/p/15812277.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有