ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

ThreadPoolExecutor之RejectedExecutionHandler

2020-01-16 15:53:40  阅读:185  来源: 互联网

标签:Task start ............ put ThreadPoolExecutor RejectedExecutionHandler BlockQue


最近工作种常用到ThreadPoolExecutor这个对象, 这是一个并发编程中非常常用的对象。因为和并发编程相关所以它存在于java.util.concurrent这包中。

创建这个对象的基本方法如下:

 

 今天主要想研究一下最后一个参数RejectedExecutionHandler对整个线程池的影响。首先写出需要用到测试代码如下:

import java.io.Serializable;
import java.util.concurrent.*;

public class ThreadPoolExecutorTest {
    private static int produceTaskSleepTime = 1000;     // 一秒add一个任务
    private static int consumeTaskSleepTime = 60000;   //每个任务停留10秒这样结果更明显清楚
    private static BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);  //缓存队列数
    private static int  produceTaskMaxNumber = 51;      //总线程数
    
    private static int corePoolSize = 2;                //
    private static int maximumPoolSize = 4;
    private static int keepAliveTime = 5;
    
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExcutor = new ThreadPoolExecutor(corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                TimeUnit.SECONDS,
                queue,
                new ThreadPoolExecutor.DiscardPolicy()); //调整策略

        for(int i=1; i< produceTaskMaxNumber;i++){
            try{
                String  work = "Task@ " + i;
                System.out.println("put : " + work);
                threadPoolExcutor.execute(new ThreadPoolTask(work));

                System.out.println("BlockQueue Size is " + queue.size());    //打印出缓存队列中线程数
                //等待一段时间方便看清楚线程处理顺序
                Thread.sleep(produceTaskSleepTime);
            }catch (Exception e){
                e.printStackTrace();
            }
        }

    }
    
    public static class ThreadPoolTask implements  Runnable, Serializable{
        private static final long serialVersionUID = 0;
        private  Object threadPoolTaskData;

        ThreadPoolTask(Object work){
            this.threadPoolTaskData = work;
        }

        @Override
        public void run() {
            System.out.println("start............" + threadPoolTaskData);  //标记任务开始
            try {
                Thread.sleep(consumeTaskSleepTime);
            }catch (Exception e){
                e.printStackTrace();
            }
            System.out.println("end.............." + threadPoolTaskData);   //标记任务结束
            threadPoolTaskData = null;
        }

        public  Object getTask(){
            return this.threadPoolTaskData;
        }
    }
}

我的想法是模拟50个任务,每个任务执行60s,这样就可以不断有任务阻塞到缓存队列,并在一定时间内达到线程池最大线程数,进而发现不同拒绝策略是在线程数超出线程池最大允许数量后是如何处理。

1.DiscardPolicy

put : Task@ 1
BlockQueue Size is 0
start............Task@ 1
put : Task@ 2
BlockQueue Size is 0
start............Task@ 2
put : Task@ 3
BlockQueue Size is 1
put : Task@ 4
BlockQueue Size is 2
put : Task@ 5
BlockQueue Size is 2
start............Task@ 5
put : Task@ 6
BlockQueue Size is 2
start............Task@ 6
put : Task@ 7
BlockQueue Size is 2
..........
.......... put : Task@ 50 BlockQueue Size is 2 end..............Task@ 1 start............Task@ 3 end..............Task@ 2 start............Task@ 4 end..............Task@ 5 end..............Task@ 6 end..............Task@ 3 end..............Task@ 4

从结果可以看出,线程装载顺序是

核心线程数(1,2满)->缓存队列数(3,4 满)->最大线程数(5,6满)->不执行。

结论:大于最大线程数以后的任务完全不执行。

2.AbortPolicy

put : Task@ 7
java.util.concurrent.RejectedExecutionException: Task com.dangkei.ThreadPoolExecutorTest$ThreadPoolTask@7cf10a6f rejected from java.util.concurrent.ThreadPoolExecutor@2ff4f00f[Running, pool size = 4, active threads = 4, queued tasks = 2, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
    at com.dangkei.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:28)

 结论:运行结果前后相同但是 在put Task 7时,也就是超过最大线程数后每个线程都抛出RejectedExecutionException

3.DiscardOldestPolicy

put : Task@ 1
BlockQueue Size is 0
start............Task@ 1
put : Task@ 2
BlockQueue Size is 0
start............Task@ 2
put : Task@ 3
BlockQueue Size is 1
put : Task@ 4
BlockQueue Size is 2
put : Task@ 5
BlockQueue Size is 2
start............Task@ 5
put : Task@ 6
BlockQueue Size is 2
start............Task@ 6
put : Task@ 7
BlockQueue Size is 2
put : Task@ 8
BlockQueue Size is 2
......
......
put : Task@ 49
BlockQueue Size is 2
put : Task@ 50
BlockQueue Size is 2
end..............Task@ 1
start............Task@ 49
end..............Task@ 2
start............Task@ 50
end..............Task@ 5
end..............Task@ 6
end..............Task@ 49
end..............Task@ 50

开始觉得这个结果比较奇怪,后来反应过来。 所谓1,2 ,5,6最终都执行结束了但是3,4线程没有开始。最后还有49,50也都结束.

结论,废弃最旧的线程是废弃掉缓存队列里最旧的线程对 核心队列(corePoolSize),和 (核心队列+缓存队列)<  线程队列 < 最大线程数 中间这部分线程(5,6 )也没影响。

由于新的线程任务不断替换掉 缓存队列里的任务, 所以 位于 blockingqueue里的任务始终无法执行。 只有最后两个线程(499,50) 没有被新的任务替换正常执行了。

4.CallerRunsPolicy

put : Task@ 1
BlockQueue Size is 0
start............Task@ 1
put : Task@ 2
BlockQueue Size is 0
start............Task@ 2
put : Task@ 3
BlockQueue Size is 1
put : Task@ 4
BlockQueue Size is 2
put : Task@ 5
BlockQueue Size is 2
start............Task@ 5
put : Task@ 6
BlockQueue Size is 2
start............Task@ 6
put : Task@ 7
start............Task@ 7
end..............Task@ 1
start............Task@ 3
end..............Task@ 2
start............Task@ 4
end..............Task@ 5
end..............Task@ 6
end..............Task@ 7
BlockQueue Size is 0
put : Task@ 8
BlockQueue Size is 1
start............Task@ 8
put : Task@ 9
start............Task@ 9
BlockQueue Size is 0
put : Task@ 10
BlockQueue Size is 1
put : Task@ 11
BlockQueue Size is 2
put : Task@ 12
start............Task@ 12
end..............Task@ 3
start............Task@ 10
end..............Task@ 4
start............Task@ 11
end..............Task@ 8
end..............Task@ 9
end..............Task@ 12
BlockQueue Size is 0
put : Task@ 13
BlockQueue Size is 1
start............Task@ 13
put : Task@ 14
BlockQueue Size is 1
put : Task@ 15
BlockQueue Size is 2
put : Task@ 16
BlockQueue Size is 2
start............Task@ 16
put : Task@ 17
start............Task@ 17
end..............Task@ 10
start............Task@ 14
end..............Task@ 11
start............Task@ 15
end..............Task@ 13
end..............Task@ 16
end..............Task@ 17
BlockQueue Size is 0
put : Task@ 18
BlockQueue Size is 0
start............Task@ 18
put : Task@ 19
BlockQueue Size is 1
put : Task@ 20
BlockQueue Size is 2
put : Task@ 21
BlockQueue Size is 2
start............Task@ 21
put : Task@ 22
start............Task@ 22
end..............Task@ 14
start............Task@ 19
end..............Task@ 15
start............Task@ 20
end..............Task@ 18
end..............Task@ 21
end..............Task@ 22
BlockQueue Size is 0
put : Task@ 23
BlockQueue Size is 1
start............Task@ 23
put : Task@ 24
BlockQueue Size is 1
put : Task@ 25
BlockQueue Size is 2
put : Task@ 26
BlockQueue Size is 2
start............Task@ 26
put : Task@ 27
start............Task@ 27
end..............Task@ 19
start............Task@ 24
end..............Task@ 20
start............Task@ 25
end..............Task@ 23
end..............Task@ 26
end..............Task@ 27
BlockQueue Size is 0
put : Task@ 28
start............Task@ 28
BlockQueue Size is 0
put : Task@ 29
BlockQueue Size is 1
start............Task@ 29
put : Task@ 30
BlockQueue Size is 1
put : Task@ 31
BlockQueue Size is 2
put : Task@ 32
start............Task@ 32
end..............Task@ 24
start............Task@ 30
end..............Task@ 25
start............Task@ 31
end..............Task@ 28
end..............Task@ 29
end..............Task@ 32
BlockQueue Size is 0
put : Task@ 33
BlockQueue Size is 1
start............Task@ 33
put : Task@ 34
BlockQueue Size is 1
put : Task@ 35
BlockQueue Size is 2
put : Task@ 36
BlockQueue Size is 2
start............Task@ 36
put : Task@ 37
start............Task@ 37
end..............Task@ 30
start............Task@ 34
end..............Task@ 31
start............Task@ 35
end..............Task@ 33
end..............Task@ 36
end..............Task@ 37
BlockQueue Size is 0
put : Task@ 38
start............Task@ 38
BlockQueue Size is 0
put : Task@ 39
BlockQueue Size is 1
put : Task@ 40
BlockQueue Size is 2
put : Task@ 41
BlockQueue Size is 2
start............Task@ 41
put : Task@ 42
start............Task@ 42
end..............Task@ 34
start............Task@ 39
end..............Task@ 35
start............Task@ 40
end..............Task@ 38
end..............Task@ 41
end..............Task@ 42
BlockQueue Size is 0
put : Task@ 43
start............Task@ 43
BlockQueue Size is 0
put : Task@ 44
BlockQueue Size is 1
put : Task@ 45
BlockQueue Size is 2
put : Task@ 46
BlockQueue Size is 2
start............Task@ 46
put : Task@ 47
start............Task@ 47
end..............Task@ 39
start............Task@ 44
end..............Task@ 40
start............Task@ 45
end..............Task@ 43
end..............Task@ 46
end..............Task@ 47
BlockQueue Size is 0
put : Task@ 48
BlockQueue Size is 1
start............Task@ 48
put : Task@ 49
BlockQueue Size is 1
put : Task@ 50
BlockQueue Size is 2
end..............Task@ 44
start............Task@ 49
end..............Task@ 45
start............Task@ 50
end..............Task@ 48

这次贴出的是比较完整的打印结果:

结论:可以看到使用这种策略是不会丢失任何任务或者抛出任何异常的。 适应于对数据要求比较严谨的任务。

  从这段结果还有个有意思的发现 其实使用这种策略是可以同时执行最大线程数+1个线程。

 

通过这次实验得到以下总结:

假设我们可以把线程池执行看成三个队列, 核心执行队列, 缓存队列, 最大执行队列。 缓存队列不是执行队列。

一. 它们的装载顺序时这样的。

   核心-缓存-最大。

二. 但是执行顺序时这样的

  核心-最大-(然后缓存在核心最大线程中的任务执行完后进行补充)

三. DiscardPolicy策略

      超出最大线程数后的任务将都被废弃不会加入到缓存队列,不抛异常。

四. AbordPolicy策略

      超出最大线程数后的任务将被废弃,抛出异常。

五. DiscardOldestPolicy策略

 超出最大线程数后的任务将顶替缓存队列中最早的任务。不抛异常但是被顶替掉的任务将丢失不执行。

六. CallerRunsPolicy策略

   超出最大线程数后的任务将等待,缓存队列执行完毕清空后,进入缓存队列。

 

 以上属于个人理解, 有不对的地方请指正。 希望对大家学习能有帮助。

标签:Task,start,............,put,ThreadPoolExecutor,RejectedExecutionHandler,BlockQue
来源: https://www.cnblogs.com/dangkei/p/12200211.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有