ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java 线程池 ThreadPoolExecutor -01

2021-04-13 21:51:40  阅读:191  来源: 互联网

标签:01 Java int worker 任务 线程 workQueue null


感谢 参考:
http://www.cnblogs.com/trust-freedom/p/6681948.html
https://www.jianshu.com/p/ae67972d1156

一、为什么使用线程池

  1. 创建和销毁线程伴随着系统的开销,过于频繁的创建/销毁x线程 会很大程度上影响处理效率
  2. 线程并发数过多,抢占系统资源可能会导致阻塞
  3. 想对线程进行简单的管理

二、线程池作用

线程池作用是针对于为什么使用线程池来说的:

  1. 降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  2. 提高响应速度,当任务到达时,任务可以不需要创建新的线程就能立即执行
  3. 提高线程的可管理性

三、使用 Executor ThreadPoolExecutor

3.1

Java中 ,线程池的概念是Executorz这个接口,具体实现是ThreadPollExecutor

3.2 线程池构造函数简介
  1. 5个参数
 public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,  long keepAliveTime,  TimeUnit unit,  BlockingQueue<Runnable> workQueue)

  1. 6个参数
public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,  long keepAliveTime,  TimeUnit unit,  BlockingQueue<Runnable> workQueue,  ThreadFactory threadFactory)

  1. 6个参数2
public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,  long keepAliveTime,  TimeUnit unit,  BlockingQueue<Runnable> workQueue,  RejectedExecutionHandler handler)

  1. 7个参数
public ThreadPoolExecutor(int corePoolSize,  int maximumPoolSize,  long keepAliveTime,  TimeUnit unit,  BlockingQueue<Runnable> workQueue,  ThreadFactory threadFactory,  RejectedExecutionHandler handler)

参数解释:
int corePoolSize
(1)核心线程池中核心线程的最大数
(2)核心线程默认情况下会一直存活在线程池中,即使闲置好长时间
(3) 设置allowCoreThreadTimeOut = true 闲置线程到达keepAliveTime时间就会被销毁

int maximumPoolSize
(1)线程总数 线程池中线程的最大数
(2)线程总数(maximumPoolSize) = 核心线程数(corePoolSize)+ 非核心线程数

long keepAliveTime
(1)闲置线程保留时长 当线程数超过核心线程数(corePoolSize) ,如果有闲置的线程,机会被保留keepAliveTime时长 如果超过这个时长还没有任务执行 那就销毁

TimeUnit unit
(1)keepAliveTime的单位
(2)

		TimeUnit.HOURS; //小时
		TimeUnit.MINUTES;//分钟
		TimeUnit.SECONDS;//秒//等等

BlockingQueue workQueue
(1)队列 存储将被执行的任务
(2)它只保存通过execute()方法提交的任务
(3)workQueue 常见队列类型

  • SynchronousQueue (newCachedThreadPool 使用)
    点击查看推荐文章
    这个队列接受到任务时,会直接提交给线程处理,而不保留,如果所有线程在工作的话,那就新建一个线程来执行,所以为了保证线程数不达到线程总数(maximumPoolSize) ,maximumPoolSize的值被设置为Integer.MAX_VALUE
//new ThreadPoolExecutor( 第二个参数 设置为了Integer.MAX_VALUEpublic static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  60L, TimeUnit.SECONDS,  new SynchronousQueue<Runnable>());}

  • LinkedBlockingQueue (newFixedThreadPool ,newSingleThreadExecutor 使用 )

  • DelayedWorkQueue (newScheduledThreadPool 使用 )
    队列接收到任务时先入队,只有达到指定延时时间,才会执行任务

ThreadFactory threadFactory
(1)线程池内线程的创建方式 (这个工厂类),一般不自定义 使用默认DefaultThreadFactory
(2) 如果自定义需要实现ThreadFactory 重写newThread方法

RejectedExecutionHandler handler
(1)发生异常的时候使用

3.3 ThreadPoolExecutor添加任务

execute方法

public static void main(String[] args) {
	Executors.newFixedThreadPool(10).execute(new MyRunnablee(1));}class MyRunnablee implements Runnable {int flag ;public MyRunnablee(int flag) {this.flag = flag;}public void run() {
		System.out.println(Thread.currentThread().getName() + ":执行run--"+flag);}}

/**
      执行被提交的任务 可能会创建一个新线程或者是使用已有的线程进行执行
      也可能被拒绝,如果线程池已经关闭shutdown 或者是线程池内线任务数达到最大值
     */public void execute(Runnable command) {//如果任务为null 抛出异常if (command == null)throw new NullPointerException();/*
         * 三个步骤:
         *1. 如果现有线程少于核心线程数(corePoolSize) 会试着去创建一个新的线程,
         并把当前人任务作为新线程的第一个任务去执行  
         它会调用addWorker方法 addWorker方法内会进行运行状态和线程总数的校验  
         防止在不能添加的时候添加了线程     
         * 2.如果第1步成功 还需要再次检测是否应该添加线程,因为上次添加后 现在可能已经有死亡的了 或者进入方法后线程池关闭了,所以我们重新检测状态 如果需要回滚队列,如果没有线程的话 新建一个线程 

		*3. 如果没有添加成功,我们试着去创建一个新线程 如果失败了,可能是线程池已经关闭或者已经饱和 我们就拒绝新任务的添加
         *  
         *///获取当前线程数int c = ctl.get();//如果当前线程数< 核心线程数  添加新的线程 把任务作为线程的第一个任务执行if (workerCountOf(c) < corePoolSize) {//成功if (addWorker(command, true))return;//不成功,重新获取 每次使用ctl的时候都需要重新获取 //不成功可能是因为://1. 线程池关闭了//2. 并发情况下别的线程 优先创建了worker   导致  workerCountOf(c) > corePoolSizec = ctl.get();}//第一步失败&&如果是运行状态 //把任务添加任务队列中if (isRunning(c) && workQueue.offer(command)) {//添加成功之后 再进行一次判断  int recheck = ctl.get();//如果线程不是运行状态了 需要从workQueue中删除添加的任务 人后拒绝任务if (! isRunning(recheck) && remove(command))reject(command);//如果是运行状态 或删除失败的话(有线程在执行要删除的任务)  //如果没有线程执行任务了(worker数量为0) 那么新建一个新的线程(addWorker(null, false)),任务为null  确保有线程执行任务else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果线程池不是runing状态 或者入队失败 尝试开启新线程  扩容到maxPoolSize else if (!addWorker(command, false))reject(command);}

流程:
借用别人一张图(自己还没学会怎么画)
在这里插入图片描述

  1. 如果当前线程数少于核心线程数(corePoolSize),就addWorker(command, true),如果创建成功就返回,否则执行后续
    创建失败的原因可能有:
    (1) 线程池已经关闭(shutdown) 不能再接受任务
    (2)workerCountOf© > corePoolSize ,在进行了workerCountOf© < corePoolSize 判断之后,犹豫并发原因,别的线程优先创建了worker 导致workerCountOf© > corePoolSize

  2. 如果线程是running状态将task加入workQueue队列中,如果成功进行双重校验,如果失败可能是队列已满 则执行后续步骤
    为什么要进行双重校验呢:主要是判断刚加入的task是否有线程进行执行
    (1)如果线程不是running状态,应该拒绝添加任务,把刚添加的任务从workQueue中移除
    (2)如果是running状态,或者从workQueue中移除失败(刚好有一个执行完的线程接受了这个新任务),要确保还有线程执行任务(创建一个不带任务的worker)

3.如果线程池不是running状态,或者无法入队列,尝试开启新的线程,扩容至maxPoolSize 如果添加失败了 那就拒绝任务

3.4 ThreadPoolExecutor addWorker方法
/**
*1. 检查根据给定的边界(corePoolSize 或maximumPoolSize  )
*core==true 的时候是 corePoolSize   否则是maximumPoolSize  
* 2. 如果条件符合创建一个新的workder并把任务作为线程的第一个任务执行
*/
 private boolean addWorker(Runnable firstTask, boolean core) {
		   //外层循环 判断线程池状态retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.//1.如果状态大于 SHUTDOWN 也就是STOP  TIDYING  TERMINATED 不能添加worker//2.如果rs== SHUTDOWN  firstTask不为空  不能添加新的worker 因为SHUTDOWN的线程不能接受新任务//3.workQueue为空 不用添加新worker 因为这个新worker就是为了处理task 如果没有task 那添加有啥意义if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&   firstTask == null &&   ! workQueue.isEmpty()))return false;//内层循环 负责worker+1  +1成功之后 才会真正的new Worker然后添加到wokers中for (;;) {int wc = workerCountOf(c);//判断长度是否大于最大要求长度 如果core是true 就用corePoolSize  如果是false 就用maximumPoolSizeif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//+1 如果成功 结束循环 如果成功 结束外层循环   if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctl//如果+1  不成功 并且状态改变不等于之前获取的状态 继续外层循环if (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}
	 
	 
	 		//========+1成功 开始创建新的Worker
	 		//worker开启状态boolean workerStarted = false;//worker添加状态boolean workerAdded = false;Worker w = null;try {//新建worker//1.设置worker  AQS同步状态state = -1//2.设置成员变量firstTask的值 第一个任务//3.利用ThreadFactory 创建一个线程  把当前worker传入构造函数 因为worker本身就继承了Runnable 我们在worker的run方法中执行runWorker() //runWorker方法也是传递当前对象进去 因为什么呢? 因为任务在当前对象的firstTask属性种存储着 到哪儿都要带着  厉害了/*Worker(Runnable firstTask) {
	                 setState(-1); // inhibit interrupts until runWorker
	                 this.firstTask = firstTask;
	                 this.thread = getThreadFactory().newThread(this);
	            }*/w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;//========================获取锁==================mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());//添加之前 还要进行判断 //1.rs 状态是小于SHUTDOWN 也就是线程池没有关闭呢  //2.状态是SHUTDOWN 并且 firstTask==null 因为SHUTDOWN的线程池不能添加新workerif (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//如果线程已经开启 就不能再添加worker了 胡闹吗不是 已经开启了 咋还能再添加 再开启if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();//添加workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;//设置添加成功状态为trueworkerAdded = true;}} finally {//依旧是这样 在finally中释放锁mainLock.unlock();//=========================释放锁===============================}//如果添加成功了 那就是风风雨雨都过去了 开启线程  线程会在还行worker的run worker的run中会调用runWorkerif (workerAdded) {t.start();workerStarted = true;}}} finally {//如果没有开启成功   移除worker  worker总数-1   判断如果线程池能终止的话就终止if (! workerStarted)addWorkerFailed(w);}return workerStarted;}

执行流程:

  1. 判断当前线程池是否可以添加worker
    1.1 线程池状态大于SHUTDOWN 可能为STOP TIDYING TERMINATED 不能添加worker
    1.2 如果线程池状态为SHUTDOWN 并且任务不为null 不能添加worker 因为关闭的线程不能添加新任务
    1.3 如果workQueue为空 不用添加worker 因为添加worker是为了处理workQueue中的task 它都没有了了处理个啥

  2. worker数量+1

  3. 添加新的worker 把任务添加到worker属性中

  4. 开启woker的thread 线程
    这就有意思了 worker本身就是一个Runnable子类
    worker有一个属性是thread 而他的runnable参数用的是this 也就是worker本身 这里启动thread 实质就是开始执行worker的run方法 而run方法中调用了runWorker() runWorker方法的参数也是this 因为实际任务时worker的一个属性 所以必须传入worker

3.5 ThreadPoolExecutor Worker内部类
//内部类 worker 继承AbstractQueuedSynchronizer  Runnable//继承AbstractQueuedSynchronizer是简化执行任务时获取和释放锁 //在这里看到一个问题:为什么不直接执行execute(commond) 提交的commond 而是用worker包一下呢?//答:为了控制中断//用什么控制呢?//用AQS 锁 运行时上锁就不能中断  //worker实现了一个简单的不可重入锁  不是用ReentrantLock 可重入锁 //这里有很多东西不懂 可能需要以后回过头来 才会领悟private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in.  Null if factory fails. */final Thread thread;/** Initial task to run.  Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */Worker(Runnable firstTask) {setState(-1); // 设置状态 大于0表示锁已经被获取 this.firstTask = firstTask;//把任务给这个属性this.thread = getThreadFactory().newThread(this);//创建一个线程}//run方法public void run() {runWorker(this);}//返回是否被锁 0表示没被锁  1表示被锁protected boolean isHeldExclusively() {return getState() != 0;}//尝试获取锁 protected boolean tryAcquire(int unused) {//尝试将状态从0改变为1 每次都是由0到1不是+1  那么说明是不可 重入锁if (compareAndSetState(0, 1)) {//如果获取成功 设置exclusiveOwnerThread为当前线程 setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//尝试释放锁protected boolean tryRelease(int unused) {//设置exclusiveOwnerThread=nullsetExclusiveOwnerThread(null);//设置状态为0    state=0setState(0);return true;}//这几个方法是AbstractQueuedSynchronizer的抽象方法 需要实现  就是用这几个方法来实现AQS 不可重入锁public void lock()        { acquire(1); }//尝试获取锁public boolean tryLock()  { return tryAcquire(1); }//尝试释放锁public void unlock()      { release(1); }//判断是否被锁public boolean isLocked() { return isHeldExclusively(); }//中断 void interruptIfStarted() {Thread t;//符合 : state>0  t!=null  && t没有被中断 //worker刚创建的时候state给了-1 就是为了不让中断 机智if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}

worker类说明:

  1. new Worker
    1.1 设置state = -1 不让中断
    1.2 设置firstTask 为execute(任务) 传入的任务
    1.3 创建 线程getThreadFactory().newThread(this)

worker控制中断:
2. 初始化state = -1 此时不允许中断 只有在runWoker中将state设置为0是 才能中断
2.1 线程池shutdown的时候回获取锁tryLock 如果当前线程worker在执行 不能被中断
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
2.2 shutdownNow线程池时 不用获取锁 但是shutdownNow-》interruptIfStarted方法也有判断getState() >= 0 才能中断

  1. 为了防止在某种情况下worker被中断 runWorker每次运行任务的时候都会获取锁 这样防止其他中断获取锁而中断当前worker 使任务丢失

这里用不可重入锁 是为了在worker获取锁的情况下 不再进入一些其他需要加锁的方法

3.6 ThreadPoolExecutor Worker内部类 的 runWorker 方法

盗图:
在这里插入图片描述
这里需要注意 没有任务->processWorkerExit 不一定是说没有任务就马上会执行processWorkerExit 这就说到getTask获取任务这个方法了 如果worker总数小于核心线程数 没有指定线程闲置超时时间的话 队列会调用take 阻塞方法 也就是说worker会一直等待有任务进来 如果worker总数超过核心线程数,或者指定了allowCoreThreadTimeOut 那就会调用poll 方法 会在指定时间后返回null(如果没有获取任务)

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;//先解锁  解锁了 这个时候是可以被中断的 w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//无限循环 //如果firstTask为null 获取任务 getTask方法 //不为的话空先执行firstTask//如果getTask 也为null 结束循环 销毁worker while (task != null || (task = getTask()) != null) {//获取锁 获取锁后可就不能被中断了 w.lock();//1. ctl.get() > STOP  不是 RUNNING/SHUTDOWN/STOP 中断 //或者 线程中断&&ctl.get() > STOP	             //并且 2. 线程不被中断 //中断!!! 线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() &&  runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//执行任务之前beforeExecute(wt, task);Throwable thrown = null;try {//神奇了  在这里手动调用run    这才是真正的业务逻辑 //我理解  就是为了顺序执行 执行完了 我就知道是完了 如果放到线程 让线程直接start()  这个run是否执行完很难控制 判断 task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//执行任务之后  可以看到 这里用了thrown  这可是在catch中初始化的哦  所以说额  catch是优先于finally 执行的 哈哈 课外话了 afterExecute(task, thrown);}} finally {//任务设置为nulltask = null;//完成线程数+1w.completedTasks++;//解锁 这时候可以中断了w.unlock();}}//是不是因为用户异常终止  true是  false不是   如果是用户异常那么就在while中异常了 直接走finnaly了 就不会走completedAbruptly = false 所以completedAbruptly = false就表示 没有异常  completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}}

3.7 ThreadPoolExecutor 的 getTask 方法

盗图:
在这里插入图片描述

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?//for (;;) {int c = ctl.get();int rs = runStateOf(c);// shutdown && workQueue 为null  //stop状态 (sutdownNow 会导致stop)if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// worker-1 decrementWorkerCount();//成功返回null return null;}int wc = workerCountOf(c);//allowCoreThreadTimeOut  允许闲置线程销毁 也就是说没有task后 不会阻塞  超时会返回null  然后worker就会被销毁 //wc > corePoolSize 大于核心线程数 跟上边逻辑一样 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//(worker总数大于最大线程数 或者 需要超时并且从队列获取已经是null)//并且(wc>1  或者 worker没有任务)if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//根据是否需要超时 获取任务  Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take(); //如果不为空 返回rif (r != null)return r;//如果是null  设置标志位timedOut = true timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}

3.8 ThreadPoolExecutor processWorkerExit方法

worker线程退出 销毁

 private void processWorkerExit(Worker w, boolean completedAbruptly) {   	//如果是突然中止 也就是异常了 需要这里-1    	//如果不是突然终止(没有异常) 就不需要-1了  getTask()已经-1 了 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {//worker完成的任务数 +到线程池完成的总任务数中 completedTaskCount += w.completedTasks;//从workers中移除workers.remove(w);} finally {mainLock.unlock();}//判断线程是否满足终止条件 然后尝试终止 tryTerminate();//是否需要增加worker int c = ctl.get();//状态是 running、shutdownif (runStateLessThan(c, STOP)) {//如果是突然终止 那就可能还有任务没有被完成 if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;//如果min==0 就是allowCoreThreadTimeOut =true 就是不需要维护核心线程池//不需要维护核心线程池 并且任务队列不是空 if (min == 0 && ! workQueue.isEmpty())min = 1;//如果总的worker大于最小min  返回  否则创建workerif (workerCountOf(c) >= min)return; // replacement not needed}//增加新的workeraddWorker(null, false);}}

processWorkerExit流程:

  1. worker数量-1
    A、如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
    B、如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了

  2. 从Workers Set中移除worker,删除时需要上锁mainlock

  3. tryTerminate():在对线程池有负效益的操作时,都需要“尝试终止”线程池,大概逻辑:
    判断线程池是否满足终止的状态
    A、如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
    B、没有线程了,更新状态为tidying->terminated

  4. 是否需要增加worker线程,如果线程池还没有完全终止,仍需要保持一定数量的线程
    线程池状态是running 或 shutdown
    A、如果当前线程是突然终止的,addWorker()
    B、如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
    故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程

标签:01,Java,int,worker,任务,线程,workQueue,null
来源: https://blog.51cto.com/u_12198094/2704234

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有