ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

ThreadPoolExecutor线程池初探

2021-12-19 21:01:16  阅读:188  来源: 互联网

标签:任务 线程 初探 workQueue 执行 pool ThreadPoolExecutor


1 ThreadPoolExecutor线程池初探

1.1 线程池的创建

  • ThreadPoolExecutor

基于Executors创建线程池的方法就不在赘述,本文主要介绍通过ThreadPoolExecutor对象创建线程池

ThreadPoolExecutor的参数共有七个

  1. corePoolSize – 线程池中的核心线程数, 除非设置allowCoreThreadTimeOut(后续会介绍改方法),否则核心线将会一直保留在线程池中,
  2. maximumPoolSize – 线程池中的最大线程个数
  3. keepAliveTime – 超过corePoolSize的线程的最大空闲时间
  4. unit – keepAliveTime的时间单位
  5. workQueue – 用来保存提交到线程池的任务
  6. threadFactory – 线程池工厂
  7. handler – 当maximumPoolSize都在执行任务,workQueue也被已提交任务填满时,多余任务提交到Pool的拒绝策略

值得注意的是,超过corePoolSize的任务被提交到Pool中时,并不会立即创建线程来执行,而是会先将任务防止的workQueue中,当workQueue被任务填满后,才会创建线程来执行新提交的任务,知道线程数达到maximunPoolSize。

如下代码所示

向线程池中提交6个任务(T1~T6)时,线程池的线程数一直为1,超过coreSize的任务会在workQueue中等待
但当第7个任务提交时,由于coreSize线程都在执行任务,且wokrQueue已经满了,线程池会创建新的线程(不大于最大线程数的情况下)来执行T7任务

public static void main(String[] args){
        System.out.println("begin");
        ThreadPoolExecutor pool=new ThreadPoolExecutor(1,2,30L, TimeUnit.SECONDS,new ArrayBlockingQueue<>(5),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
        for (int i = 0; i < 6; i++) {
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1*1000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("执行当前任务的线程名称:"+Thread.currentThread().getName());
                }
            });
        }

         pool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3*1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("第七个任务:"+Thread.currentThread().getName());
            }
        });

        System.out.println("当前线程池中运行的线程个数:"+pool.getActiveCount());
        pool.shutdown();
        System.out.println("end");
    }

可以看到上面的代码运行结果如下,最后提交的任务反而优先执行(pool.getActiveCount() 并不一定能获取准确的运行数,这个后续在介绍)

begin
当前线程池中运行的线程个数:2
end
执行当前任务的线程名称:pool-1-thread-1
不在阻塞被执行:pool-1-thread-2
执行当前任务的线程名称:pool-1-thread-1
执行当前任务的线程名称:pool-1-thread-2
执行当前任务的线程名称:pool-1-thread-1
执行当前任务的线程名称:pool-1-thread-2
执行当前任务的线程名称:pool-1-thread-1

从ThreadPoolExecutor#execute也可以看出如下逻辑

  int c = ctl.get();
  //当前运行worker数如果小于核心线程数
  if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
    }
  //pool的状态为RUNNING,则入队
  if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次检查,并发操作可能pool已经处于非RUNNING状态
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果work为0,新增work,防止workQueue中的task一直无法被执行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
   } 
   //入队失败,则尝试创建woeker执行,创建失败执行拒绝策略
   else if (!addWorker(command, false))
            reject(command);        
          

1.2 线程池如何从workQueue获取Task

线程中用来执行任务的其实是ThreadPoolExecutor的一个私有内部类Worker实现了AQS和Runnable接口
它内部通过ThreadFactory持有一个Thread

调用worker的start方法实际执行的是runWorker方法,该方法中会从阻塞队列中不断获取task来执行
同时线程的空闲销毁也是在该处来判断的

Worker#runWorker方法如下所示:

//该标志用来判断work是否被异常退出
 boolean completedAbruptly = true;
 try {
    //当前work的task不为空,或超时时间内从workqueue成功获取任务,则执行对应task
    while (task != null || (task = getTask()) != null) {
        //暂时省略               
}
    //从workQueue中获取task失败(超时或pool被关闭)
    completedAbruptly = false;        
} finally {
    //销毁worker
    processWorkerExit(w, completedAbruptly);        
}       

1.2 线程池中线程的销毁

Worker#getTask方法

// 是否允许核心线程池过期或当前worker数量是否大于核心线程池
 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

 //timedOut代表上次获取是否超时
//pool可以动态调整maximumPoolSize大小,因此需要判断是否大于最大线程池数量或 当前线程数量大于核心数量,且上次获取Task超时,获取Task设置的超时时间是keepAliveTime,代表worker空闲时间大于等于keepAliveTime ,此时返回null,runWorker中的循环会结束,执行finally中的方法
if ((wc > maximumPoolSize || (timed && timedOut))
    && (wc > 1 || workQueue.isEmpty())) {
    if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;            
    }
    try {
         Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;       
    } catch (InterruptedException retry) {
                timedOut = false;
    }         

Worker#processWorkerExit


//如果completedAbruptly为True,代表异常退出,需要workerCount数量减1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();       
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        //销毁线程worker
        workers.remove(w);
        } finally {
            mainLock.unlock();
        }       
    //判断pool是否需要关闭
    tryTerminate();
    int c = ctl.get();
        //如果线程池处于RUNNING或SHUTDOWN
        if (runStateLessThan(c, STOP)) {
            //未被异常退出,可能是获取超时,次数queue中可能还有剩余任务,
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //若worker数量为0,需要创建worker来完成queue中剩余的任务
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            //创建woker,保证消费queue中的任务
            addWorker(null, false);
        }   

暂时粗略的介绍了线程池创建的参数,以及提交任务和线程池中线程的销毁策略,还有比较多的细节知识会在后续慢慢补充。

标签:任务,线程,初探,workQueue,执行,pool,ThreadPoolExecutor
来源: https://blog.csdn.net/haoyue__haoyue/article/details/122029789

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有