ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

2022.8.21 Forkjoin与异步回调

2022-08-21 22:02:44  阅读:140  来源: 互联网

标签:end 21 sum System Long start 2022.8 println Forkjoin


14、Forkjoin(分支合并)

什么是 ForkJoin

ForkJoin 在 JDK 1.7 , 并行执行任务!提高效率。在大数据量中!

大数据:Map Reduce (把大任务拆分为小任务)

Forkjoin 特点:工作窃取,这里面维护的是双端队列

接口

 

 

 

通过forkjoinPool来执行forkjoin

构造方法

 

 

使用forkjoin

 package com.xing.forkjoin;
 ​
 import java.util.concurrent.RecursiveTask;
 ​
 /**
  * 求和计算的任务
  * 3000 6000(frokjoin) 9000(Stream并行流)
  * 如何使用frokjoin
  * 1.forkjoinPool通过它来执行
  * 2.计算任务forkjoinPool.execute(ForkJoinTask task)
  * 3.计算类要继承RecursiveTask(递归任务有返回值)
  */
 //                                       重写方法的返回值类型
 public class ForkJoinDemo extends RecursiveTask<Long> {
     private Long start;
     private Long end;
     //临界值
     private Long temp = 10000L;
 ​
     public ForkJoinDemo(Long start, Long end) {
         this.start = start;
         this.end = end;
    }
 ​
     //重写的方法
     @Override
     protected Long compute() {
         //正常计算
         if((end-start) < temp){
             Long sum = 0L;
             for (long i = start; i <= end; i++) {
 ​
                 sum += i;
            }
             return sum;
        }else {
             //forkjoin 递归
             long middle = (start + end) / 2; //中间值
 ​
             // 将一个任务拆分成两个任务
             ForkJoinDemo task1 = new ForkJoinDemo(start,middle);
             task1.fork();//拆分任务,把任务压入线程队列
 ​
             ForkJoinDemo task2 = new ForkJoinDemo(middle,end);
             task2.fork();
 ​
             //返回结果
             return task1.join() + task2.join();
 ​
        }
    }
 }
 ​

不同方法的执行速度 package com.xing.forkjoin;

 ​
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinTask;
 import java.util.stream.LongStream;
 ​
 public class Test {
     public static void main(String[] args) {
         test1();
         try {
             test2();
        } catch (ExecutionException e) {
             e.printStackTrace();
        } catch (InterruptedException e) {
             e.printStackTrace();
        }
         test3();
    }
     //普通程序员
     public static void test1(){
         Long sum = 0L;
         long start = System.currentTimeMillis();
         for (long i = 1L; i <= 10_0000_0000; i++) {
             sum += i;
        }
         long end = System.currentTimeMillis();
         System.out.println("sum = " + sum + "时间:" + (end - start));
    }
     //使用forkjoin
     public static void test2() throws ExecutionException, InterruptedException {
         long start = System.currentTimeMillis();
 ​
         ForkJoinPool forkJoinPool = new ForkJoinPool();
         ForkJoinTask<Long> task = new ForkJoinDemo(0L,10_0000_0000L); //向下转型
 ​
         ForkJoinTask<Long> submit = forkJoinPool.submit(task);//提交任务
         Long sum = submit.get();//获得结果
 ​
 ​
         long end = System.currentTimeMillis();
         System.out.println("sum=" + sum +"时间:" + (end - start));
    }
     //用Stream并行流
     public static void test3(){
         long start = System.currentTimeMillis();
 ​
         //stream并行流                         包含10_0000_0000
         Long sum = LongStream.rangeClosed(0L, 10_0000_0000L)
                .parallel()//并行计算
                .reduce(0,Long::sum);//调用Long下面的sum方法 输出结果
 ​
         
         long end = System.currentTimeMillis();
         System.out.println("sum =" + sum + "时间:" + (end - start));
    }
 }
 ​
 ​

15、异步回调(Future)

Future 设计的初衷: 对将来的某个事件的结果进行建模

同步回调

我们常用的一些请求都是同步回调的,同步回调是阻塞的,单个的线程需要等待结果的返回才能继续执行。

 

 

 

异步回调

有的时候,我们不希望程序在某个执行方法上一直阻塞,需要先执行后续的方法,那就是这里的异步回调。我们在调用一个方法时,如果执行时间比较长,我们可以传入一个回调的方法,当方法执行完时,让被调用者执行给定的回调方法。

 

 

 

 

 

 

 package com.xing.future;
 ​
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 ​
 /**
  * 异步调用:CompletableFuture
  * 异步执行
  * 成功回调
  * 失败回调
  */
 public class Demo01 {
     public static void main(String[] args) throws ExecutionException, InterruptedException {
      /*
         //发起一个请求
         //异步回调 没有返回值的异步回调
         CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(()->{
             try {
                 TimeUnit.SECONDS.sleep(2);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
             System.out.println(Thread.currentThread().getName() + "runAsync=>void");
         });
         System.out.println("11111");
         completableFuture.get();//获取执行结果
         */
 ​
         //有返回值的异步回调
         //Ajax 成功和失败的回调
         //返回的是错误信息
         CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(()->{
             System.out.println(Thread.currentThread().getName() + "supplyAsync=>Integer");
             //int i = 10/0;
             return 1024;
        });
         System.out.println(completableFuture.whenComplete((t,u)->{//结果编译成功的时候返回  
             //成功的时候t为1024 u为null
             System.out.println("t=>" + t);//错的是时候t为null,
             System.out.println("u=>" + u);//错的时候u打印错误信息 java.util.concurrent.CompletionException:java.lang.ArithmeticException: / by zero
        }).exceptionally((e)->{//编译失败的时候返回
             System.out.println(e.getMessage());//打印异常信息 java.lang.ArithmeticException: / by zero
             return 2333;//可以获取错误的返回结果
        }).get());
    }
 ​
 }
 ​

 

标签:end,21,sum,System,Long,start,2022.8,println,Forkjoin
来源: https://www.cnblogs.com/shanzha/p/16611047.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有