ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

CompletableFuture 异步编排

2022-07-27 17:02:23  阅读:125  来源: 互联网

标签:异步 System 编排 CompletableFuture executor println public out


 

 

1、创建异步对象

CompletableFuture 提供了四个静态方法来创建一个异步操作

1 #runAsync 无返回值
2 public static CompletableFuture<Void> runAsync(Runable runable)
3 public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
4 
5 #supplyAsync 有返回值
6 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
7 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

2、Completable启动异步任务

runAsync(),无返回值

 1 package com.lian.gulimall.search.thread;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 
 7 public class ThreadTestAsyn {
 8 
 9     //创建线程池
10     public static ExecutorService executor = Executors.newFixedThreadPool(10);
11     
12     public static void main(String[] args) throws Exception {
13         //1、Async创建异步对象,没返回值
14         CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
15             //异步任务内容
16             System.out.println("current thread: " + Thread.currentThread().getId());
17             int i = 10 / 2;
18            System.out.println("run result: " + i);
19         }, executor);
20         System.out.println("main...end..");
21     }
22 }

supplyAsync(),有返回值

 1 package com.lian.gulimall.search.thread;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 
 7 public class ThreadTestAsyn {
 8 
 9     //创建线程池
10     public static ExecutorService executor = Executors.newFixedThreadPool(10);
11     
12     public static void main(String[] args) throws Exception {
13     
14         //2、Supply,有返回值
15         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
16             System.out.println("current thread: " + Thread.currentThread().getId());
17             int i = 10 / 2;
18             System.out.println("run result: " + i);
19             return i;
20         }, executor);
21         Integer integer = future.get();
22         System.out.println(integer);
23         System.out.println("main...end..");
24     }
25 }

3、Completable异步完成回调与异常感知

whenComplete:返回结果和异常
exceptionally:如果出现异常,返回默认值

 1 package com.lian.gulimall.search.thread;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 
 7 public class ThreadTestAsyn {
 8 
 9     //创建线程池
10     public static ExecutorService executor = Executors.newFixedThreadPool(10);
11     
12     public static void main(String[] args) throws Exception {
13         //1、Async创建异步对象,没返回值
14 //        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
15 //            //异步任务内容
16 //            System.out.println("current thread: " + Thread.currentThread().getId());
17 //            int i = 10 / 2;
18 //            System.out.println("run result: " + i);
19 //        }, executor);
20 
21         //2、Supply,有返回值
22         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
23             System.out.println("current thread: " + Thread.currentThread().getId());
24             int i = 10 / 0;
25             System.out.println("run result: " + i);
26             return i;
27             //参数(结果,异常),虽然能得到异常信息,但是没法修改返回数据
28         }, executor).whenComplete((res,exception)->{
29             System.out.println("async completed...result is: "+res+",exception is: "+exception);
30         //如果结果出现异常,就给一个默认返回值,感知异常,同时返回默认值
31         }).exceptionally((t)->{
32             return 4;
33         });
34         //获取异步结果
35         Integer integer = future.get();
36         System.out.println(integer);
37         System.out.println("main...end..");
38     }
39 }

4、handle

可对结果做异常处理, 可改变返回值

 1 package com.lian.gulimall.search.thread;
 2 
 3 import java.util.concurrent.CompletableFuture;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 
 7 public class ThreadTestAsyn {
 8 
 9     //创建线程池
10     public static ExecutorService executor = Executors.newFixedThreadPool(10);
11     public static void main(String[] args) throws Exception {
12         //1、Async创建异步对象,没返回值
13         //方法执行完成后得处理,无论成功与失败
14         CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
15             System.out.println("current thread: " + Thread.currentThread().getId());
16             int i = 10 / 4;
17             System.out.println("run result: " + i);
18             return i;
19             //参数(结果,异常),虽然能得到异常信息,但是没法修改返回数据
20         }, executor).handle((res,thr)->{
21             //如果返回结果不为空,就将结果×2
22             if (res != null){
23                 return res*2;
24             }
25             //如果异常不为空,就返回0
26             if (thr != null) {
27                 return 0;
28             }
29             //如果两个都不走就返回0
30             return 0;
31         });
32         //获取异步结果
33         Integer integer = future.get();
34         System.out.println(integer);
35     }
36 }

5、CompletableFuture线程串行化

 

 

 

1、thenRunAsync,不能获取到上一步的执行结果,无返回值

 1 public class ThreadTestAsyn {
 2 
 3     //创建线程池
 4     public static ExecutorService executor = Executors.newFixedThreadPool(10);
 5     public static void main(String[] args) throws Exception {
 6     //thenRun 不能获取到上一步的执行结果,无返回值
 7         CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
 8             System.out.println("current thread: " + Thread.currentThread().getId());
 9             int i = 10 / 4;
10             System.out.println("run result: " + i);
11             return i;
12             //参数(结果,异常),虽然能得到异常信息,但是没法修改返回数据
13         }, executor).thenRunAsync(() -> {
14             System.out.println("task2 startting");
15         }, executor);
16    }

2、thenAccept 能接收上一步的执行结果,但是无返回值

 1 public class ThreadTestAsyn {
 2 
 3     //创建线程池
 4     public static ExecutorService executor = Executors.newFixedThreadPool(10);
 5     public static void main(String[] args) throws Exception {
 6            //thenAccept 能接收上一步的执行结果,但是无返回值
 7         CompletableFuture.supplyAsync(() -> {
 8             System.out.println("current thread: " + Thread.currentThread().getId());
 9             int i = 10 / 4;
10             System.out.println("run result: " + i);
11             return i;
12             //参数(结果,异常),虽然能得到异常信息,但是没法修改返回数据
13         }, executor).thenAcceptAsync((res)->{
14             System.out.println("task2 startting"+res);
15         },executor);
16    }

3、thenApply 既能接收上一步的执行结果,还有返回值

 1 public class ThreadTestAsyn {
 2 
 3     //创建线程池
 4     public static ExecutorService executor = Executors.newFixedThreadPool(10);
 5     public static void main(String[] args) throws Exception {
 6         //thenApply 既能接收上一步的执行结果,还有返回值
 7         CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
 8             System.out.println("current thread: " + Thread.currentThread().getId());
 9             int i = 10 / 4;
10             System.out.println("run result: " + i);
11             return i;
12             //参数(结果,异常),虽然能得到异常信息,但是没法修改返回数据
13         }, executor).thenApplyAsync((res) -> {
14             System.out.println("task2 startting" + res);
15             return "hello" + res;
16         }, executor);
17         System.out.println("main...end...."+future2.get());
18     }

两任务组合-都要完成

 

 

 1 public class ThreadTestAsyn {
 2 
 3     //创建线程池
 4     public static ExecutorService executor = Executors.newFixedThreadPool(10);
 5     public static void main(String[] args) throws Exception {
 6         CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
 7             System.out.println("task1 thread start: " + Thread.currentThread().getId());
 8             int i = 10 / 4;
 9             System.out.println("task1 end");
10             return i;
11         }, executor);
12 
13         CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
14             System.out.println("task2 thread start: " + Thread.currentThread().getId());
15             System.out.println("task2 end");
16             return "hello";
17         }, executor);
18 
19         //组合两个future,不需要获取future的结果,只需两个future处理完任务后,处理该任务
20         future01.runAfterBothAsync(future02,()->{
21             System.out.println("task3 start...");
22         },executor);
23 
24         //组合两个future,获取两个future任务的返回结果,然后处理任务,没有返回值
25         future01.thenAcceptBothAsync(future02,(t,u)->{
26             System.out.println("task3 start...,result is:"+t+"--》"+u);
27         },executor);
28 
29         //组合两个future,获取两个future任务的返回结果,并返回当前任务的返回值
30         CompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
31             return f1 + ":" + f2 + "--> haha";
32         }, executor);
33         System.out.println("main..end"+future.get());
34 
35     }

两任务组合-其中一个完成

 

 

 1 public class ThreadTestAsyn {
 2 
 3     //创建线程池
 4     public static ExecutorService executor = Executors.newFixedThreadPool(10);
 5     public static void main(String[] args) throws Exception {
 6 
 7     CompletableFuture<Object> future01 = CompletableFuture.supplyAsync(() -> {
 8             System.out.println("task1 thread start: " + Thread.currentThread().getId());
 9             int i = 10 / 4;
10             System.out.println("task1 end");
11             return i;
12         }, executor);
13 
14         CompletableFuture<Object> future02 = CompletableFuture.supplyAsync(() -> {
15             System.out.println("task2 thread start: " + Thread.currentThread().getId());
16             System.out.println("task2 end");
17             return "hello";
18         }, executor);
19 
20         //两个任务,只要有一个完成,就执行任务3,不感知结果,也没有返回值
21         future01.runAfterEitherAsync(future02,()->{
22             System.out.println("task3 start...");
23         },executor);
24 
25         //只接收上一次任务的返回结果,没有返回值,感知结果,没有返回值
26         future01.acceptEitherAsync(future02,(res)->{
27             System.out.println("task3 start..."+res);
28         },executor);
29 
30         //感知结果,也有返回值
31         future01.applyToEitherAsync(future02,(res)->{
32             System.out.println("task3 start..."+res);
33             return res.toString() + "--> haha";
34         },executor);
35     }

多任务组合

 

 

 1 public class ThreadTestAsyn {
 2 
 3     //创建线程池
 4     public static ExecutorService executor = Executors.newFixedThreadPool(10);
 5     public static void main(String[] args) throws Exception {
 6     CompletableFuture<String> futureImg = CompletableFuture.supplyAsync(() -> {
 7             System.out.println("product info");
 8             return "hello.jpg";
 9         }, executor);
10 
11         CompletableFuture<String> futureAttr = CompletableFuture.supplyAsync(() -> {
12             System.out.println("product info");
13             return "black 256g";
14         }, executor);
15 
16         CompletableFuture<String> futureDes = CompletableFuture.supplyAsync(() -> {
17             System.out.println("product info");
18             return "huawei";
19         }, executor);
20 
21         CompletableFuture<Void> future = CompletableFuture.allOf(futureImg, futureAttr, futureDes);
22         future.get();  //get方法阻塞等待所有结果完成
23 
24         System.out.println(futureImg.get()+futureAttr.get()+futureDes.get());
25     }

五、案例演示

在商品详情页时,为了提升效率,节省时间,采用异步编排方式

第1步、配置属性和类绑定赋值

package com.lian.gulimall.product.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@ConfigurationProperties(prefix = "gulimall.thread")
@Component
@Data
public class ThreadPoolConfigProperties {
    private Integer coreSize;      //核心线程数
    private Integer maxSize;       //最大线程数
    private Integer keepAliveTime; //存活时间
}

第2步、配置application.properties,给实体类赋值

gulimall.thread.core-size=20
gulimall.thread.max-size=200
gulimall.thread.keep-alive-time=10

第3步、自定义线程池

package com.lian.gulimall.product.config;

import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 自定义线程池
 * 注入到spring容器就不需要配置 自动导入配置属性注解了
 */
//@EnableConfigurationProperties(ThreadPoolConfigProperties.class)
@Configuration
public class MyThreadConfig {

    @Bean
    public ThreadPoolExecutor threadPoolExecutor(ThreadPoolConfigProperties pool){
        return new ThreadPoolExecutor(
                pool.getCoreSize(),
                pool.getMaxSize(),
                pool.getKeepAliveTime(),
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10000),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    }
}

第4步、商品详情页的控制层

package com.lian.gulimall.product.web;

import com.lian.gulimall.product.service.SkuInfoService;
import com.lian.gulimall.product.vo.SkuItemVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.ui.Model;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

/**
 * 详情页
 */
@Controller
public class ItemController {

    @Autowired
    SkuInfoService skuInfoService;

    /**
     * 展示当前sku的详情
     * @return
     */
    @GetMapping("/{skuId}.html")
    public String skuItem(@PathVariable("skuId") Long skuId, Model model) throws Exception{
        System.out.println("准备查询"+skuId+"的详情");
        //根据skuId查询到所有的属性都封装到 SkuItemVo 中
        SkuItemVo vos = skuInfoService.item(skuId);
        model.addAttribute("item",vos);
        return "item";
    }
}

第5步、业务层

@Service("skuInfoService")
public class SkuInfoServiceImpl extends ServiceImpl<SkuInfoDao, SkuInfoEntity> implements SkuInfoService {

    @Autowired
    SkuImagesService skuImagesService; //图片的sku

    @Autowired
    ProductAttrValueService productAttrValueService; //商品属性值

    @Autowired
    SpuInfoDescService spuInfoDescService; //商品描述

    @Autowired
    AttrGroupService attrGroupService;  //组信息

    @Autowired
    SkuSaleAttrValueService skuSaleAttrValueService;

    @Autowired
    ThreadPoolExecutor executor;   //注入线程池

    @Override
    public SkuItemVo item(Long skuId) throws Exception{
    
        //商品详情页返回数据都封装到 SkuItemVo
        SkuItemVo skuItemVo = new SkuItemVo();

        /**
         * 使用异步编排,节省时间提升效率,一起执行不阻塞等待
         * 第一种:supplyAsync 有返回值,其他任务可以用
         * 开启一个异步任务,创建异步对象
         * infoFuture 任务完成后,saleAttrFuture、descFuture、baseAttrFuture 才开始执行,因为他们都需要依赖任务1的数据结果
         */
        CompletableFuture<SkuInfoEntity> infoFuture = CompletableFuture.supplyAsync(() -> {
            //1、sku基本信息获取,标题、副标题、价格等 pms_sku_info
            SkuInfoEntity info = baseMapper.selectById(skuId);
            skuItemVo.setInfo(info);
            //因为其他任务要用基本信息,所以我们返回基本信息
            return info;
            //executor代表要放到自己的线程池里面
        }, executor);

        //第二种:接下来接收任务的返回结果,accept只是接收上一个任务的结果,自己不返回结果
        CompletableFuture<Void> saleAttrFuture = infoFuture.thenAcceptAsync((res) -> {
            //执行第二个任务
            //3、获取spu的销售属性组合
            List<SkuItemSaleAttrVo> saleAttrVos = skuSaleAttrValueService.getSaleAttrsBySpuId(res.getSpuId());
            skuItemVo.setSaleAttr(saleAttrVos);
        });

        //继续执行任务
        CompletableFuture<Void> descFuture = infoFuture.thenAcceptAsync((res) -> {
            //4、获取spu的介绍
            SpuInfoDescEntity spuInfoDescEntity = spuInfoDescService.getById(res.getSpuId());
            skuItemVo.setDesc(spuInfoDescEntity);
        }, executor);

        //继续执行任务,任务3、4、5都依赖任务1的结果 获取spuId
        CompletableFuture<Void> baseAttrFuture = infoFuture.thenAcceptAsync((res) -> {
            //5、获取spu的规格参数信息
            List<SpuItemAttrGroupVo> attrGroupVos = attrGroupService.getAttrGroupWithAttrsBySpuId(res.getSpuId(), res.getCatalogId());
            skuItemVo.setGroupAttrs(attrGroupVos);
        }, executor);

        /**
         * 任务2 不需要依赖任务1提供的结果数据,所以不需要等待任务1完成,直接和任务1同步执行,所以自己也开启一个异步任务
         * runAsync 代表不需要返回结果,因为也没有其他任务需要依赖任务2的数据
         */
        CompletableFuture<Void> imageFuture = CompletableFuture.runAsync(() -> {
            //2、sku图片信息 pms_sku_images
            List<SkuImagesEntity> images = skuImagesService.getImagesBySkuId(skuId);
            skuItemVo.setImages(images);
        }, executor);

        /**
         * 等待所有任务都完成,因为每一个任务都是在给 vo 中封装数据
         * get()方法就是阻塞等待所有任务都执行完
         * infoFuture 也可以不写,因为别人是依赖她的,如果别人都执行完了,那么她肯定也执行完了
         */
        CompletableFuture.allOf(infoFuture, saleAttrFuture, descFuture, baseAttrFuture, imageFuture).get();

        return skuItemVo;
    }

 

标签:异步,System,编排,CompletableFuture,executor,println,public,out
来源: https://www.cnblogs.com/caicz/p/16525466.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有