ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

SpringBoot使用异步线程池实现生产环境批量数据推送

2022-01-30 13:35:00  阅读:177  来源: 互联网

标签:异步 SpringBoot 上报 线程 executor import public


 

前言

SpringBoot使用异步线程池:

1、编写线程池配置类,自定义一个线程池;

2、定义一个异步服务;

3、使用@Async注解指向定义的线程池;

 

这里以我工作中使用过的一个案例来做描述,我所在公司是医疗行业,敏感数据需要上报到某监管平台,所以有一个定时任务在流量较小时(一般是凌晨后)执行上报行为。但特殊时期会存在一定要在工作时间大批量上报数据的情况,且要求短时间内就要完成,此时就考虑写一个专门的异步上报接口手动执行,利用线程池上报,极大提高了速度。

 


 

  • 编写线程池配置类

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * 类名称:ExecutorConfig
     * ********************************
     * <p>
     * 类描述:线程池配置
     *
     * @author guoj
     * @date 2021-09-07 09:00
     */
    @Configuration
    @EnableAsync
    @Slf4j
    public class ExecutorConfig {
        /**
         * 定义数据上报线程池
         * @return
         */
        @Bean("dataCollectionExecutor")
        public Executor dataCollectionExecutor() {
    
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    
            // 核心线程数量:当前机器的核心数
            executor.setCorePoolSize(
                    Runtime.getRuntime().availableProcessors());
    
            // 最大线程数
            executor.setMaxPoolSize(
                    Runtime.getRuntime().availableProcessors() * 2);
    
            // 队列大小
            executor.setQueueCapacity(Integer.MAX_VALUE);
    
            // 线程池中的线程名前缀
            executor.setThreadNamePrefix("sjsb-");
    
            // 拒绝策略:直接拒绝
            executor.setRejectedExecutionHandler(
                    new ThreadPoolExecutor.AbortPolicy());
    
            // 执行初始化
            executor.initialize();
    
            return executor;
        }
    
    }

     PS:

    1)、需要注意,这里一定要自己定义ThreadPoolTaskExecutor线程池,否则springboot的异步注解会执行默认线程池,存在线程阻塞导致CPU飙高及内存溢出的风险。这一点可以参考阿里开发手册,线程池定义这块明确提到了这一点;

    2)、在@Bean注解中定义线程池名称,后面异步注解会用到。

     


     

  • 编写异步服务

    /**
     * 异步方法的服务, 不影响主程序运行。
     */
    @Service
    public class AsyncService {
    
        private final Logger log = LoggerFactory.getLogger(AsyncService.class);
    
        /**
         * 发送短信
         */
        @Async("sendMsgExecutor")
        public void sendMsg(String access_token, Consult item, Map<String, String> configMap) {
            // 此处编写发送短信业务
            // 1、buildConsultData();
            // 2、sendMsg();
        }
    
        /**
         * 发送微信订阅消息
         */
        @Async
        public void sendSubscribeMsg(String access_token, Consult item, Map<String, String> configMap) {
            // 此处编写发送微信订阅消息业务
            // 1、buildConsultData();
            // 2、sendSubscribeMsg();
        }
    
        /**
         * 数据并上报
         */
        @Async("dataCollectionExecutor")
        public void buildAndPostData(String access_token, Consult item, Map<String, String> configMap) {
            // 此处编写上报业务,如拼接数据,然后执行上报。
            // 1、buildConsultData();
            // 2、postData();
        }
    }
PS:
1)、以上是代码片段,个人经验认为专门定义一个异步service存放各个异步方法最佳,这样可以避免编码时一些误操作比如异步方法不是void或者是private修饰,导致@Async注解失效的情况,同时可以安排每个注解指向不同的自定义线程池更加灵活;
2)、@Async注解中的名称就是上面定义的自定义线程池名称,这样业务执行时就会从指定线程池中获取异步线程。

 

  • 异步批量上报数据

    @Autowired
    private AsyncService asyncService;
    
    /**
     * 手动上报问诊记录,线程池方式。
     */
    public void manualUploadConsultRecordsAsync(String channel, Date startTime, Date endTime) {
    
        // 查询指定时间内的问诊记录
       List<Consult> consultList = consultService
           .findPaidListByChannelAndTime(channel, startTime, endTime, configMap.get("serviceId"));
    
       if (!CollectionUtils.isEmpty(consultList)) {
    
           log.debug("[SendWZDataService][manualUploadConsultRecordsAsync]>>>> 手动上报问诊记录, 一共[{}]条", consultList.size());
    
           consultList.forEach((item) -> {
               try {
                   // 异步调用,使用线程池。
                   asyncService.buildAndPostData(access_token, item, configMap);
               } catch (Exception ex) {
                   log.error("[SendWZDataService][manualUploadConsultRecordsAsync]>>>> 手动上报问诊记录发生异常: ", ex);
               }
           });
    
       }
    }

     


     

    •  总结

      以上方式已经在生产环境运行,在工作时间内执行过很多次,一次数万条记录基本是几分钟内就全部上报完毕,而正常循环遍历时一次大概需要半个小时左右。
      线程池的使用方式往往来源于业务场景,如果类似的业务不存在紧急处理的情况,大体还是以任务调度执行为主,因为更安全。如果存在紧急处理的情况,那么使用SpringBoot+线程池的方式不仅能节省非常多的时间,且不占用主线程的执行空间。
      喜欢就点个关注吧~~

标签:异步,SpringBoot,上报,线程,executor,import,public
来源: https://www.cnblogs.com/fulongyuanjushi/p/15856660.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有