ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

【Java】基于线程池的独立任务并发执行器

2022-06-11 11:35:34  阅读:141  来源: 互联网

标签:执行器 Java thread private candidateKey 线程 key paramMap final


目的:

对于多个独立的任务,可以以并发的方式执行任务,以提高 CPU 利用率,提高处理效率。

思路

在一个线程池中,开启指定数量的线程,每个线程从任务队列中获取任务执行。

执行的过程中,判断当前线程是否在执行任务的状态,如果没有执行任务,取一条任务执行,如果正在执行,则跳过,下轮再判断。

在所有任务执行完后,关闭线程池。

需要注意的是数据结构的选择,须选择并发类的数据结构,不然可能出现阻塞,死锁等情况。

(具体逻辑参考源码)

示例

/**
 * 并发执行器示例
 */
public class ConcurrentExecutorTest {

    /**
     * 测试
     */
    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            test();
        }
    }

    private static void test() {
        Map<String, String> paramMap = new LinkedHashMap<>();
        for (int i = 0; i < 10; i++) {
            paramMap.put("key:" + i, "value:" + i);
        }

        final ConcurrentExecutor<String, String, Integer> executor = new ConcurrentExecutor<>(5, paramMap,
                (k, v) -> {
                    ThreadUtil.sleep(10);
                    System.out.println(Thread.currentThread().getName() + "-" + v);
                    final int abs = Math.abs(Objects.hash(v));
                    if (abs % 3 == 0) {
                        int i = 1 / 0;
                    }
                    return abs;
                });
        executor.execute();
        System.out.println("success result: " + executor.getSuccessResultMap());
        System.out.println("error result: " + executor.getErrorResultMap());
    }
}

测试结果

pool-1-thread-1-value:0
pool-1-thread-2-value:1
pool-1-thread-4-value:3
pool-1-thread-3-value:2
pool-1-thread-3-value:8
pool-1-thread-1-value:5
pool-1-thread-5-value:4
pool-1-thread-2-value:6
pool-1-thread-4-value:7
pool-1-thread-3-value:9
success result: {key:2=231604360, key:0=231604358, key:6=231604364, key:5=231604363, key:3=231604361, key:9=231604367, key:8=231604366}
error result: {key:1=java.lang.ArithmeticException: / by zero, key:4=java.lang.ArithmeticException: / by zero, key:7=java.lang.ArithmeticException: / by zero}

源码


import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.ConcurrentHashSet;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.BooleanUtil;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;

/**
 * 并发执行器
 * <p>
 * 适用场景:每个任务是独立的,不耦合的
 *
 * @author lilou
 * @since 2022/6/9 9:05
 */
public class ConcurrentExecutor<K, V, R> {
    /**
     * 任务参数映射(K:key的类型,V:值的类型)
     */
    private final Map<K, V> paramMap;
    /**
     * 成功的任务结果映射(R:结果类型)
     */
    private final Map<K, R> successResultMap;

    /**
     * 失败的任务结果映射
     */
    private final Map<K, Throwable> errorResultMap;

    /**
     * 当前运行中的key集合
     */
    private final Set<K> runningKeySet;

    /**
     * 候选任务key队列
     */
    private final Queue<K> candidateKeyQueue;

    /**
     * 同时运行的最大线程数量
     */
    private final int maxThreadNum;

    /**
     * 执行器
     */
    private final ExecutorService executorService;

    /**
     * 具体任务策略
     */
    private final BiFunction<K, V, R> biFunction;

    /**
     * 当前index线程的运行状态,可依据此状态,判断是否立刻从任务参数中获取任务执行
     */
    private final Map<Integer, Boolean> currentIndexThreadRunningStatusMap;

    public ConcurrentExecutor(int maxThreadNum, Map<K, V> paramMap, BiFunction<K, V, R> biFunction) {
        Assert.notNull(paramMap, "paramMap不可为空");
        Assert.isTrue(maxThreadNum > 0, "maxThreadNum不可小于1");

        final int paramSize = paramMap.size();
        this.maxThreadNum = Math.min(maxThreadNum, paramSize);
        // tips: 须转换成同步类的map数据结构,如果错误地使用 this.paramMap = paramMap; 且外部使用了HashMap 或 LinkedHashMap,多测试几遍会发现,偶尔会陷入了阻塞
        this.paramMap = Collections.synchronizedMap(paramMap);
        this.candidateKeyQueue = new ConcurrentLinkedQueue<>(paramMap.keySet());
        this.runningKeySet = new ConcurrentHashSet<>(paramSize);
        this.biFunction = biFunction;
        this.executorService = ThreadUtil.newExecutor(this.maxThreadNum, this.maxThreadNum, Integer.MAX_VALUE);
        this.currentIndexThreadRunningStatusMap = new ConcurrentHashMap<>(this.maxThreadNum);
        this.successResultMap = new ConcurrentHashMap<>(this.paramMap.size());
        this.errorResultMap = new ConcurrentHashMap<>();
    }


    public void execute() {
        while (CollUtil.isNotEmpty(paramMap)) {

            // 最多同时有 maxRunningThreadNumber 同时消费 taskMap 中的数据
            for (int i = 0; i < this.maxThreadNum; i++) {
                int currentIndex = i;

                // 当前线程上次还未执行完,暂时跳过
                final Boolean isRunning = currentIndexThreadRunningStatusMap.getOrDefault(currentIndex, false);
                if (BooleanUtil.isTrue(isRunning)) {
                    continue;
                }

                // 每个线程只处理和自己相关的
                final K candidateKey = pickCandidateKey();
                // 当前没有对应key的任务
                if (Objects.isNull(candidateKey)) {
                    continue;
                }

                // 在线程池中运行任务
                executorService.submit(() -> {
                    try {
                        currentIndexThreadRunningStatusMap.put(currentIndex, true);
                        final V data = paramMap.get(candidateKey);

                        // 开始执行任务
                        final R result = biFunction.apply(candidateKey, data);

                        // 存入正常结果
                        successResultMap.put(candidateKey, result);
                    } catch (Exception e) {
                        // 存入异常结果
                        errorResultMap.put(candidateKey, e);
                    } finally {
                        paramMap.remove(candidateKey);
                        candidateKeyQueue.remove(candidateKey);
                        currentIndexThreadRunningStatusMap.remove(currentIndex);
                    }
                });
            }
        }
        executorService.shutdown();
    }


    /**
     * 从候选任务key队列中选择一个任务key
     */
    private K pickCandidateKey() {
        for (K candidateKey : candidateKeyQueue) {
            if (!runningKeySet.contains(candidateKey)) {
                runningKeySet.add(candidateKey);
                return candidateKey;
            }
        }
        return null;
    }

    public Map<K, R> getSuccessResultMap() {
        return successResultMap;
    }

    public Map<K, Throwable> getErrorResultMap() {
        return errorResultMap;
    }
}

标签:执行器,Java,thread,private,candidateKey,线程,key,paramMap,final
来源: https://www.cnblogs.com/lyloou/p/16365529.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有