ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

线程池_ThreadPoolExecutor原理分析

2022-02-07 18:36:01  阅读:153  来源: 互联网

标签:队列 private int 任务 线程 ctl 原理 ThreadPoolExecutor


java.uitl.concurrent.ThreadPoolExecutor 类是 Executor 框架中最核心的类。

线程池简介

什么是线程池

线程池就是创建若干个可执行的线程放入一个池(容器)中,有任务需要处理时,会提交到线程池中的任务队列,处理完之后线程并不会被销毁,而是仍然在线程池中等待下一个任务。

为什么要使用线程池

因为 Java 中创建一个线程,需要调用操作系统内核的 API,操作系统要为线程分配一系列的资源,成本很高,所以线程是一个重量级的对象,应该避免频繁创建和销毁。

使用线程池就能很好地避免频繁创建和销毁,所以有必要引入线程池。使用 线程池的好处 有以下几点:

  • 降低资源消耗:通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。但是要做到合理的利用线程池,必须对其原理了如指掌。

线程池状态和工作线程数量

这本来是两个不同的概念,但是在ThreadPoolExecutor中我们使用一个变量ctl来存储这两个值,这样我们只需要维护这一个变量的并发问题,提高运行效率。

/**
 * 记录线程池中Worker工作线程数量和线程池的状态
 * int类型是32位,它的高3位,表示线程池的状态,低29位表示Worker的数量
 */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS 29位,
private static final int COUNT_BITS = Integer.SIZE - 3;
// 表示线程池中创建Worker工作线程数量的最大值。即 0b0001.....1(29位1)
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

怎么使用一个变量ctl存储两个值呢?

就是利用int变量的高3位来储存线程池状态,用int变量的低29位来储存工作线程数量。

这样就有两个需要注意的地方:

  • 工作线程数量最大值不能超过int类型29位的值CAPACITY 即0b0001.....1(29位1)
  • 因为线程池状态都是高3位储存的,所以工作线程数量不会影响状态值大小关系。

线程池状态

// 高3位值是111
private static final int RUNNING    = -1 << COUNT_BITS;
// 高3位值是000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 高3位值是001
private static final int STOP       =  1 << COUNT_BITS;
// 高3位值是010
private static final int TIDYING    =  2 << COUNT_BITS;
// 高3位值是011
private static final int TERMINATED =  3 << COUNT_BITS;

线程池状态分析:

① RUNNING:运行状态。接受新任务,并且也能处理阻塞队列中的任务。

② SHUTDOWN:关闭状态。不接受新任务,但可以处理阻塞队列中的任务。

  • 在线程池处于 RUNNING 状态时,调用 shutdown 方法会使线程池进入到该状态。
  • finalize 方法在执行过程中也会调用 shutdown 方法进入该状态。

③ STOP:停止状态。不接受新任务,也不处理队列中的任务。会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow 方法会使线程池进入到该状态。

④ TIDYING:整理状态。如果所有的任务都已终止了,workerCount (有效线程数) 为 0,线程池进入该状态后会调用 terminated 方法进入 TERMINATED 状态。

⑤ TERMINATED:已终止状态。在 terminated 方法执行完后进入该状态。默认 terminated 方法中什么也没有做。进入 TERMINATED 的条件如下:

  • 线程池不是 RUNNING 状态;
  • 线程池状态不是 TIDYING 状态或 TERMINATED 状态;
  • 如果线程池状态是 SHUTDOWN 并且 workerQueue 为空;
  • workerCount 为 0;
  • 设置 TIDYING 状态成功。

操作ctl的方法

获取线程池的状态

/**
 * 获取线程池的状态。因为线程池的状态是使用高3位储存,所以屏蔽低29位就行了。
 * 所以就c与~CAPACITY(0b1110..0)进行&操作,屏蔽低29位的值了。
 * 注意:这里是屏蔽低29位的值,而不是右移29位。
 */
private static int runStateOf(int c)     { return c & ~CAPACITY; }

获取工作线程数量

/**
 * 获取线程池中Worker工作线程的数量,
 * 因为只使用低29位保存Worker的数量,只要屏蔽高3位的值就行了
 * 所以就c与CAPACITY(0b0001...1)进行&操作,屏蔽高3位的值了。
 */
private static int workerCountOf(int c)  { return c & CAPACITY; }

合并ctl的值

/**
 * 得到ctl的值。
 * 接受两个参数rs和wc。rs表示线程池的状态,wc表示Worker工作线程的数量。
 * 对于rs来说我们只需要高3位的值,对于wc来说我们需要低29位的值。
 * 所以我们将rs | wc就可以得到ctl的值了。
 */
private static int ctlOf(int rs, int wc) { return rs | wc; }

其他方法

// 因为RUNNING状态高三位是111,所以状态值rs与工作线程数量ws相与的结果值c一定是个负数,
// 而其他状态值都是大于等于0的数,所以c是负数,那么表示当前线程处于运行状态。
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

/**
 * 使用CAS函数将ctl值自增
 */
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

/**
 * 使用CAS函数将ctl值自减
 */
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}
/**
 * 使用CAS函数加循环方法这种乐观锁的方式,解决并发问题。
 * 保证使ctl值减一
 */
private void decrementWorkerCount() {
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

重要成员变量

// 记录线程池中Worker工作线程数量和线程池的状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 任务线程的阻塞队列,因为是阻塞队列,所以它是并发安全的
private final BlockingQueue<Runnable> workQueue;

// 独占锁,用来保证操作成员变量的并发安全问题
private final ReentrantLock mainLock = new ReentrantLock();

// 等待线程池完全终止的条件Condition,
private final Condition termination = mainLock.newCondition();

//-----------------  需要mainLock来保证并发安全-------------------------//
// 线程池中工作线程集合。Worker中持有线程thread变量
private final HashSet<Worker> workers = new HashSet<Worker>();

// 线程池中曾拥有过的最大工作线程个数
private int largestPoolSize;

// 线程池完成过任务的总个数
private long completedTaskCount;
//-----------------  需要mainLock来保证并发安全-------------------------//

// 创建线程的工厂类
private volatile ThreadFactory threadFactory;

// 当任务被拒绝时,用来处理这个被拒绝的任务
private volatile RejectedExecutionHandler handler;
// 工作线程空闲的超时时间keepAliveTime
private volatile long keepAliveTime;

// 是否允许核心池线程超时释放
private volatile boolean allowCoreThreadTimeOut;

// 线程池核心池线程个数
private volatile int corePoolSize;

// 线程池最大的线程个数
private volatile int maximumPoolSize;

成员变量说明:

  • mainLock:使用mainLock来保证会发生变化成员变量的并发安全问题。会发生的成员变量有5个:ctl、workQueue、workers、largestPoolSize和completedTaskCount。但是其中ctl和workQueue的类型本身就是多线程安全的,所以不用mainLock锁保护。
  • termination:等待线程池完全终止的条件,如果线程池没有完全终止,调用它的awaitNanos方法,让线程等待。当线程池完全终止后,调用它的signalAll方法,唤醒所有等待termination条件的线程。
  • workers:记录所有的工作线程Worker
  • workQueue:记录所有待执行的任务。使用阻塞队列BlockingQueue,可以在队列为空时,线程等待,队列有值时,唤醒等待的线程。
  • largestPoolSize:线程池中曾拥有过的最大工作线程个数
  • completedTaskCount:线程池完成过任务的总个数
  • threadFactory:创建线程的工厂类
  • handler:当任务被拒绝时,用来处理这个被拒绝的任务
  • keepAliveTime:工作线程允许空闲的超时时间,一般都是针对超过核心池数量的工作线程。
  • allowCoreThreadTimeOut: 是否允许核心池的工作线程超时释放。
  • corePoolSize:线程池核心池线程个数。
  • maximumPoolSize: 线程池最大的线程个数。

构造方法

ThreadPoolExecutor 有四个构造方法,前三个都是基于第四个实现。第四个构造方法定义如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
}

参数说明:

1、corePoolSize:核心线程数量。当有新任务通过 execute 方法提交时 ,线程池会执行以下判断:

  • 如果运行的线程数少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
  • 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当 workQueue 满时才创建新的线程去处理任务;
  • 如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建的线程池的大小是固定的。这时如果有新任务提交,若 workQueue 未满,则将请求放入 workQueue 中,等待有空闲的线程去从 workQueue 中取任务并处理;
  • 如果运行的线程数量大于等于 maximumPoolSize,这时如果 workQueue 已经满了,则使用 handler 所指定的策略来处理任务;

所以,任务提交时,判断的顺序为 corePoolSize => workQueue => maximumPoolSize。

2、maximumPoolSize:最大线程数量。

  • 如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。

值得注意的是:如果使用了无界的任务队列这个参数就没什么效果。

3、keepAliveTime:线程保持活动的时间(非核心线程的存活时间)。

当线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime。

所以,如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。

4、unit:keepAliveTime 的时间单位。有 7 种取值。可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

5、workQueue - 等待执行的任务队列。用于保存等待执行的任务的阻塞队列。 可以选择以下几个阻塞队列。

(1)ArrayBlockingQueue - 有界阻塞队列。

此队列是基于数组的先进先出队列(FIFO)。

此队列创建时必须指定大小。

(2)LinkedBlockingQueue - 无界阻塞队列。

此队列是基于链表的先进先出队列(FIFO)。
如果创建时没有指定此队列大小,则默认为 Integer.MAX_VALUE。
吞吐量通常要高于 ArrayBlockingQueue。
使用 LinkedBlockingQueue 意味着: maximumPoolSize 将不起作用,线程池能创建的最大线程数为 corePoolSize,因为任务等待队列是无界队列。
Executors.newFixedThreadPool 使用了这个队列。

(3)SynchronousQueue - 不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态。
吞吐量通常要高于 LinkedBlockingQueue。
Executors.newCachedThreadPool 使用了这个队列。

(4)PriorityBlockingQueue - 具有优先级的无界阻塞队列。

6、threadFactory - 线程工厂。可以通过线程工厂给每个创建出来的线程设置更有意义的名字。

7、handler - 饱和策略。它是 RejectedExecutionHandler 类型的变量。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。线程池支持以下策略:

AbortPolicy - 丢弃任务并抛出异常。这也是默认策略。
DiscardPolicy - 丢弃任务,但不抛出异常。
DiscardOldestPolicy - 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)。
CallerRunsPolicy - 只用调用者所在的线程来运行任务。

如果以上策略都不能满足需要,也可以通过实现 RejectedExecutionHandler 接口来定制处理策略。如记录日志或持久化不能处理的任务。

常用方法

 

标签:队列,private,int,任务,线程,ctl,原理,ThreadPoolExecutor
来源: https://www.cnblogs.com/xfeiyun/p/15868784.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有