ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

JDK8的异步处理方式-CompletableFuture的使用

2021-11-08 18:31:50  阅读:169  来源: 互联网

标签:异步 String System JDK8 CompletableFuture println dataMap out


一、背景

jdk8中加入了实现类CompletableFuture,用于异步编程。底层做任务使用的是ForkJoin, 顾名思义,是将任务的数据集分为多个子数据集,而每个子集,都可以由独立的子任务来处理,最后将每个子任务的结果汇集起来。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。从api文档看,它实现了2个接口CompletionStage和Future。CompletionStage支持lambda表达式,接口的方法的功能都是在某个阶段得到结果后要做的事情。因此,CompletableFuture不仅拥有Future的所有特性,而且还内置了lambda表达式,支持异步回调,结果转换等功能,它有以下Future实现不了的功能:

  1. 合并两个相互独立的异步计算的结果

  2. 等待异步任务的所有任务都完成

  3. 等待异步任务的其中一个任务完成就返回结果

  4. 任务完成后调用回调方法

  5. 任务完成的结果可以用于下一个任务。

  6. 任务完成时发出通知提供原生的异常处理api

二、代码

     

package com.example.demo;


import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

public class CompletableFutureDemo {
     //CPU核数
    private static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
    private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(AVAILABLE_PROCESSORS,
            3 * AVAILABLE_PROCESSORS,
            3, TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(20));

    public static void main(String[] args) throws Exception {
        long startTime = System.currentTimeMillis();
        System.out.println("demo start....." + startTime);
        demo3();
        System.out.println("demo end.....costTime = " + (System.currentTimeMillis() - startTime));
    }

    /**
     * 基于allOf,并行处理多个任务,等待所有任务执行完毕后返回
     */

    public static void demo3() throws Exception {
       //用户整体接收各个任务的返回值
        Map<String,String> dataMap = new ConcurrentHashMap<>();
        List<CompletableFuture<String>> futureList = new ArrayList<>();
        futureList.add(doSomethingA("A", dataMap));
        futureList.add(doSomethingB("B", dataMap));
        futureList.add(doSomethingC("C", dataMap));
        CompletableFuture<Void> result = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0]));
        try {
                result.get(3, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("dataMap = " + dataMap);
       //结果为:{doSomeThingB=B, doSomeThingA=A}
    }

    /**
     * 基于thenCompose,第一个任务执行完后,第二个任务使用第一个任务的返回作为参数
     */
    public static void demo1() throws Exception {
        Map<String,String> dataMap = new HashMap<>();
        CompletableFuture<String> completableFuture = doSomethingA("A", dataMap)
                .thenCompose(id -> doSomethingB(id, dataMap));
        String result = completableFuture.get(3, TimeUnit.SECONDS);
        System.out.println("result = " + result);
        //结果为:A is done is done

    }

    /**
     * 基于thenCombine,当两个任务都完成后,使用两者的结果作为参数再执行一个异步任务
     */
    public static void demo2() throws Exception {
        Map<String,String> dataMap = new HashMap<>();
        CompletableFuture<String> completableFuture = doSomethingA("A", dataMap)
                .thenCombine(doSomethingB("B", dataMap), (a, b) -> a + " - " + b);
        String result = completableFuture.get(3, TimeUnit.SECONDS);
        System.out.println("result = " + result);
//结果为:A is done - B is done
    }

    /**
     * @param dataMap 用户整体接收方法的返回值
     * @return
     */
    public static CompletableFuture<String> doSomethingA(String taskId, Map<String,String> dataMap) {
        System.out.println("doSomethingA start....." + System.currentTimeMillis());
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            dataMap.put("doSomeThingA", "A");
            System.out.println(taskId + " is done and dataMap"+dataMap);
            return taskId + " is done";
        }, threadPoolExecutor);
    }

    public static CompletableFuture<String> doSomethingB(String taskId, Map<String,String> dataMap) {
        System.out.println("doSomethingB start....." + System.currentTimeMillis());
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            dataMap.put("doSomeThingB", "B");
            System.out.println(taskId + " is done and dataMap"+dataMap);
            return taskId + " -> B is done";
        }, threadPoolExecutor);
    }

    public static CompletableFuture<String> doSomethingC(String taskId, Map<String,String> dataMap) {
        System.out.println("doSomethingC start....." + System.currentTimeMillis());
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            dataMap.put("doSomeThingC", "C");
            System.out.println(taskId + " is done and dataMap"+dataMap);
            return taskId + " is done";
        }, threadPoolExecutor);

    }

}

三、效率比较

很明显,异步更快

package com.example.demo;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
 * @author d00018641
 * @date 2021/11/4 15:10
 */
public class TestDemo2 {
    private static final String key = "llllllllllllllllllllllll";
    public static void main(String[] args) {

        List<String> requestList = new ArrayList<>();
        requestList.add("3");
        requestList.add("4");
        requestList.add("5");
        requestList.add("6");
        // 响应参数list
        String[] returnArray = new String[requestList.size()];
        // 异步查询每一列,定义响应列数的futures
        List<CompletableFuture<String>> futures = new ArrayList<>();
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < requestList.size(); i++) {
            final int a = i;
            CompletableFuture<String> tf = CompletableFuture.supplyAsync(() -> {
                return calc(requestList.get(a));
            }).whenComplete((m, e) -> returnArray[a] = m);
            futures.add(tf);
        }
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        //CompletableFuture end.....costTime = 147
        System.out.println("CompletableFuture end.....costTime = " + (System.currentTimeMillis() - startTime));
        long startTime1 = System.currentTimeMillis();
        for(int i = 0; i < requestList.size(); i++){
            returnArray[i] = calc(requestList.get(i));
        }
        //连续 end.....costTime = 432
        System.out.println("连续 end.....costTime = " + (System.currentTimeMillis() - startTime1));
        System.out.println(Arrays.asList(returnArray));

    }

    private static String calc(String source) {
        int as = Integer.parseInt(source);
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return String.valueOf(Math.pow(as, 3));
    }
}

 

标签:异步,String,System,JDK8,CompletableFuture,println,dataMap,out
来源: https://www.cnblogs.com/dukedu/p/15525460.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有