ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

线程池 ThreadPoolExecutor 源码详细分析

2022-02-10 17:03:35  阅读:162  来源: 互联网

标签:0000 worker 源码 线程 当前 ctl null ThreadPoolExecutor


1、线程池的作用

一方面当执行大量一步任务的时候线程池能够提供较好的性能,在不使用线程池的时候,每当需要执行异步的时候都是直接 new 一线程进行运行,而线程的创建和销毁都是需要开销的。使用线程池的时候,线程池里面的线程是可复用的,不会每次执行异步任务的时候都重新创建和销毁线程。

另一方面线程池提供了一种资源限制和管理的手段,比如可以限制线程的个数,动态新增线程等,每个 ThreadPoolExecutor 也保留了一些基本的统计数据,比如当前线程池完成的任务数目等。

2、ThreadPoolExecutor 继承体系

  • Executor:只定义了一个方法 execute(),用于执行提交的任务
  • ExecutorService:定义了一些线程池管理、任务提交、线程检测的方法
  • AbstractExecutorService:提供了 ExecutorService 接口执行方法的默认实现,用于同一处理 Callable 任务和 Runnable 任务。

下面介绍一下线程池的拒绝策略

当线程池的任务缓存队列已经满了并且线程池中的线程数目达到 maximumPoolSize 的时候,如果还有任务到来就会采取任务拒绝策略,通常有以下四种拒绝策略:

  • AbortPolicy:直接抛出一个 RejectedExecutionException 的异常,让你感知到任务被拒绝了,下面你可以根据业务逻辑选择重试或者放弃提交。
  • DisCardPolicy:当新任务被提交后直接被丢弃,也不会给你任何的通知,可能造成数据丢失的现象。
  • DisCardOldestPolicy:如果线程池没有关闭且没有能力执行,则会丢弃任务队列中的头结点,然后重新提交被拒绝的任务。通常是存活时间最长的任务,这种策略与第二种不同之处在于它丢弃的任务不是最新提交的,而是队列中存活时间最长的,这样就可以腾出资源给新提交的任务,但是同理也存在一定的数据丢失风险。
  • CallerRunsPolicy:当有新任务提交的时候,如果线程池没被关闭且没有能力执行,则把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。这样做主要有两点好处:
    • 提交的新任务不会被丢弃,这样也就不会造成业务损失。
    • 由于谁提交任务谁就要负责执行,这样提交任务的线程就得负责执行任务,而执行任务又是比较耗时得,在这段期间,提交任务得线程被占用,也就不会提交新的任务,减缓了任务提交得速度,相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定的空间,相当于是给了线程池一定的缓冲期。

阻塞队列

BlockingQueue

BlockingQueuejava.util.concurrent 包下,其他阻塞类都实现自 BlockingQueue 接口,BlockingQueue 提供了线程安全的队列访问方式,当向队列中插入数据的时候,如果队列已经满了,线程则会阻塞等待队列中的元素被取出后再插入;当从队列中取数据的时候,如果队列为空,则线程会阻塞等待队列中有新元素再获取。

LinkedBlockingQueue

LinkedBlockingQueue 是一个由链表实现的线程安全的有界阻塞队列,容量默认值为 Integer.MAX_VALUE,也可以自定义容量,建议指定容量大小,默认大小在添加速度大于删除速度的情况下有造成内存溢出的风险,LinkedBlockingQueue 是先进先出的方式存储元素。

ArrayBlockingQueue

ArrayBlockingQueue 是一个有边界的阻塞队列,它的内部实现是一个数组。它的容量是有限的,我们在其初始化的时候指定它的容量大小,容量大小一旦指定就不可改变。ArrayBlockingQueue 也是先进先出的方式存储数据,ArrayBlockingQueue 内部的阻塞队列通过 ReenterLockCondition 条件队列实现的,因此 ArrayBlockingQueue 中的元素存在公平访问和非公平访问的区别,对于公平访问队列,被阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列。而非公平队列,当队列可用的时候,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的先后顺序。

DelayQueue

DelayQueue 是一个支持延时获取元素的无界阻塞队列,队列中的元素必须实现 Delayed 接口,在创建元素的时候可以指定延迟时间,只有到达了延迟的时间之后,才能获取到该元素。实现了 Delayed 接口必须重写两个方法,getDelay(TimeUnit)compareTo(Delayed)

PriorityQueue

PriorityQueue 是一个基于优先级堆的无界优先级队列。优先级队列的元素按照其自然顺序进行排序,或者根据构造队列的时候提供的 Comparator 进行排序,具体取决于所使用的构造方法。优先级队列不允许使用 null 元素。

PriorityQueue 需要注意的点

  • PriorityQueue 是非线程安全的,在多线程情况下可使用 PriorityBlockingQueue 类替代。
  • PriorityQueue 不允许插入 null 元素。

LinkedTransferQueue

LinkedTransferQueue 是一个由链表结构组成的无界阻塞队列,相对于其他阻塞队列,LinkedTransferQueue 采用一种预占模式。意思就是消费者线程取元素的时候,如果队列不为空,则直接取走数据,若是队列为空,那就生成一个节点(节点元素为 null)入队,然后消费者线程被等待在这个节点上,后面生产者线程入队的时候发现有一个元素为 null 的节点,生产者线程就不入队了,直接就将元素填充到该节点,并唤醒该节点等待的线程,被唤醒的消费者线程取走元素,从调用的方法返回。我们称这种节点操作为 匹配 方式。

总结

队列有界性数据结构
ArrayBlockingQueuebounded加锁arraylist
LinkedBlockingQueueoptionally-bounded加锁linkedlist
ConcurrentLinkedQueueunbounded无锁linkedlist
LinkedTransferQueueunbounded无锁linkedlist
PriorityBlockingQueueunbounded加锁heap
DelayQueueunbounded加锁heap

五种常用线程池

  • newCachedThreadPool:可缓存的线程池
    • newCachedThreadPool 用于创建一个可缓存的线程池,之所以叫可缓存线程池,是因为它在创建新线程的时候如果有可重用的线程,则重用它们,否则创建一个新线程并将其添加到线程池中。
    • 在线程池的 keepAliveTime 时间超过默认的 60秒后,该线程会被终止并从缓存中移除,因此在没有线程任务运行的时候,newCachedThreadPool 将不会占用系统的线程资源
  • newFixedThreadPool:固定大小的线程池
    • newFixedThreadPool 可用于创建一个固定的线程数量的线程池
    • 如果任务数量大于等于指定线程池中线程的数量,则新提交的任务将在阻塞队列中排队,直到有可用的线程资源。
  • newScheduledThreadPool:可做任务调度的线程池
    • newScheduledThreadPool 用于创建可定时调度的线程池,可设置在给定延迟时间后执行或者定期执行某个线程任务。
  • newSingleThreadPool :单个线程的线程池
    • newSingleThreadPool:创建的线程池会确保池中永远有且只有一个可用的线程
    • 在该线程停止或者发生异常的时候,newSingleThreadPool 线程池会启动一个新的线程代替该线程继续执行任务
  • newWorkStealingPool:足够大小的线程池
    • newWorkStealingPool 用于创建持有足够多线程的线程池来达到快速运算的目的。
    • 在内部通过使用多个队列来减少各个线程调度产生的竞争
    • 足够的线程指的是 JDK 根据当前线程的运行需求向操作系统中申请足够多的线程,以保障线程的快速执行

3、ThreadPoolExecutor 源码分析

3.1、成员属性

// 高三位:表示当前线程池运行状态,除去高三位的低位29位:表示当前线程池中所拥有的线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 表示在ctl 中,低 COUNT_BITS 位是用于存放当前线程数量的位数
// 这里为什么不直接使用29,而是使用 Integer.SIZE - 3 表示?
// 小概率情况,防止 Integer在JDK版本变更中所占的字节位数不是4个字节了
private static final int COUNT_BITS = Integer.SIZE - 3; // 29

// 低 COUNT_BITS 位 所能表达的最大数值
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits  在ctl中的高三位所代表的运行状态
// -1 负数在计算机中以补码的形式存在 1110 0000 0000 0000 0000 0000 0000 0000 转换成整数是一个负数
private static final int RUNNING    = -1 << COUNT_BITS;

// 0000 0000 0000 0000 0000 0000 0000 000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

// 0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP       =  1 << COUNT_BITS;

// 0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING    =  2 << COUNT_BITS;

// 0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED =  3 << COUNT_BITS;

// 任务队列,当线程池中的线程达到核心线程数的时候,再提交的时候就会直接提交到 workQueue
// workQueue instanceof ArrayBrokingQueue LinkedBrokingQueue 同步队列
private final BlockingQueue<Runnable> workQueue;

// 线程池的全局锁,增加worker,减少worker 的时候需要持有mainLock,修改线程池运行状态的时候也需要
private final ReentrantLock mainLock = new ReentrantLock();

// 线程池真正存放worker -> thread的地方
private final HashSet<Worker> workers = new HashSet<Worker>();

// 当外部线程调用 awaitTermination() 方法的时候,外部线程会等待当前线程池状态为 Terminated 为止
// 等待是如何实现的?就是将外部线程封装成waitNode放入到 Condition 队列中了,waitNode.thread 就是外部线程,会被park掉(处于WAITING 状态)
// 当线程池状态变为 Termination的时候,会去唤醒这些线程,通过 termination.signAll() ,唤醒之后这些线程会进入到阻塞队列,头结点会去抢占mainLock
// 抢到锁的线程会继续执行 awaitTermin() 后面的程序,这些线程最后都会正常执行
// 简单理解,termination.await() 会将线程阻塞在这里
//         termination.signAll() 会将阻塞在这里的线程依次唤醒
private final Condition termination = mainLock.newCondition();

// 记录线程池生命周期内线程数最大值
private int largestPoolSize;

// 记录线程池所完成的任务总数,当worker 退出的时候会将 worker 完成的任务累计到completedTaskCount
private long completedTaskCount;

// 创建线程的时候会使用线程工厂,当我们使用 Executors.newFix.. newCache.. 创建线程池的时候使用的是 DefaultThreadFactory
// 一般不建议使用 DefaultThreadFactory,推荐自己实现 ThreadFactory,为什么不推荐使用DefaultThreadFactory?
// 因为DefaultThreadFactory创建线程的时候给赋值给线程的名字是 namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"
// 当线程池中的某个线程出现问题的时候,并不能通过该线程名字去定位到哪个地方出现了问题,这是很浪费时间的
private volatile ThreadFactory threadFactory;

// 拒绝策略:juc包中默认提供了四种方式,默认采用 AbortPolicy,直接抛出异常的方式
private volatile RejectedExecutionHandler handler;

// 空闲线程存活时间,当allowCoreThreadTimeOut == false的时候,会维护核心线程数内的线程存活,超出时间的线程会超时
// allowCoreThreadTimeOut == true的时候,核心线程数量内的线程空闲的时候,也会被回收
private volatile long keepAliveTime;

// 控制核心线程数量内的线程是否可以被回收 true,可以,false 不可以
private volatile boolean allowCoreThreadTimeOut;

// 核心线程数量限制
private volatile int corePoolSize;

// 线程池最大线程数量限制
private volatile int maximumPoolSize;

// 缺省的拒绝策略:采用的是 AbortPolicy 直接抛出异常的方式
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();

构造方法

// 构造方法
public ThreadPoolExecutor(int corePoolSize, // 核心线程数量限制
                          int maximumPoolSize, // 最大线程数限制
                          // 空闲线程存活时间,当allowCoreThreadTimeOut == false的时候,会维护核心线程数内的线程存活,超出时间的线程会超时
                          // allowCoreThreadTimeOut == true的时候,核心线程数量内的线程空闲的时候,也会被回收
                          long keepAliveTime,
                          TimeUnit unit, // 时间单位 seconds nano..
                          // 任务队列,当线程池中的线程达到核心线程数的时候,再提交的时候就会直接提交到 workQueue
                          // workQueue instanceof ArrayBrokingQueue LinkedBrokingQueue 同步队列
                          BlockingQueue<Runnable> workQueue,
                          // 创建线程的时候会使用线程工厂,当我们使用 Executors.newFix.. newCache.. 创建线程池的时候使用的是 DefaultThreadFactory
                          // 一般不建议使用 DefaultThreadFactory,推荐自己实现 ThreadFactory,为什么不推荐使用DefaultThreadFactory?
                          // 因为DefaultThreadFactory创建线程的时候给赋值给线程的名字是 namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"
                          // 当线程池中的某个线程出现问题的时候,并不能通过该线程名字去定位到哪个地方出现了问题,这是很浪费时间的
                          ThreadFactory threadFactory,
                          // 四种拒绝策略
                          RejectedExecutionHandler handler) {
    // 判断参数是否越界
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    // 工作队列和线程工厂和拒绝策略都不能为空
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

3.2、成员预热方法

// Packing and unpacking ctl
// 获取当前线程池运行状态
// ~CAPACITY & c = ctl => ~0001 1111 1111 1111 1111 1111 1111 1111 & 1110 0000 0000 0000 0000 0000 0000 0011(假设是RUNNING状态)
// ~CAPACITY =>      1110 0000 0000 0000 0000 0000 0000 0011
// ctl =>            1110 0000 0000 0000 0000 0000 0000 0000
// ~CAPACITY & c =>  1110 0000 0000 0000 0000 0000 0000 0000
private static int runStateOf(int c)     { return c & ~CAPACITY; }

// 获取当前线程池线程数量
// c == ctl =>      1110 0000 0000 0000 0000 0000 0000 0111
// CAPACITY =>      0001 1111 1111 1111 1111 1111 1111 1111
// c & CAPACITY =>  0000 0000 0000 0000 0000 0000 0000 0111
private static int workerCountOf(int c)  { return c & CAPACITY; }

// 用在重置当前线程池 ctl 值的时候会用到
// rs:代表线程池状态  wc:代表当前线程池 worker(线程)数量
// rs:     1110 0000 0000 0000 0000 0000 0000 0000 (假设位RUNNING状态)
// wc:     0000 0000 0000 0000 0000 0000 0000 0111
// rs | wc:1110 0000 0000 0000 0000 0000 0000 0111 == 重置为ctl的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

// 比较当前线程池ctl所表示的状态,是否小于某个状态s
// c = 1110 0000 0000 0000 0000 0000 0000 0111 < 0000 0000 0000 0000 0000 0000 0000 0000 == true
// 所有情况下,RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

// 比较当前线程池 ctl 所表示的状态,是否大于等于某个状态
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

// 小于SHUTDOWN 一定是 RUNNING 状态 SGUTDOWN == 0
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

// 使用 CAS 的方式让 ctl 的值 + 1,成功返回true,失败返回false
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

// 使用 CAS 的方式让 ctl 的值 -1,成功返回true,失败返回false
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}

// 将ctl 的值减-1,这个方法一定成功
private void decrementWorkerCount() {
    // 这里会一直进行重试,直到成功为止
    do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}

3.3、Worker 内部类分析

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    // Worker 采用了AQS 的独占模式
    // 独占模式:两个重要属性 state 和 ExclusiveOwnerThread
    // state:0的时候,表示未被占用,>0 的时候表示被占用,<0 的时候,表示初始状态,这种情况下不能被抢锁
    // ExclusiveOwnerThread:表示独占锁线程
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    // Worker 内部封装的工作线程
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    // 假设 firstTask 不为空,那么当 worker 启动后(内部的线程启动)会优先执行 firstTask,当执行完firstTask后,会当workQueue中去获取下一个任务
    // 初始化任务,只在worker 第一次执行任务的时候执行,之后都是从workqueue中获取任务执行
    Runnable firstTask;
    /** Per-thread task counter */
    // 记录当前所完成的任务数量
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    // firstTask 可以为 null,为null启动后会到 queue中获取
    Worker(Runnable firstTask) {
        // 设置 AQS 独占模式为初始化状态,不能被抢占锁
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 使用线程工厂创建了一个线程,并且将当前 worker 指定为 Runnable,也就是说当 thread启动的时候,会以worker.run()为入口
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    // 当worker 启动的时候,会执行run()方法
    public void run() {
        // ThreadPoolExector -> runWork(Worker w) 这个是核心方法,等后面分析worker启动后逻辑的时候会以这里为切入点
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.
    // 判断当前worker的独占锁是否被独占
    // 0 表示未被占用
    // 1 表示已经占用
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    // 尝试去占用当前 worker 的独占锁
    //
    protected boolean tryAcquire(int unused) {
        // 使用 CAS 修改 AQS 中的state,期望值为0(0的时候表示未被占用),修改成功表示当前线程修改成功
        // 那么设置 ExclusiveOwnerThread 为当前线程
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    // 尝试释放当前 worker 的独占锁
    // 外部不会直接调用该方法,这个方法是 AQS的,外部调用 unlock的时候,unlock -> AQS.release -> tryRelease
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    // 加锁,加锁失败的时候,会阻塞当前线程,直到获取到锁位置
    public void lock()        { acquire(1); }

    // 尝试去加锁,如果当前锁是未被持有状态,加锁成功后会返回true,否则不会阻塞当前线程,直接返回false
    public boolean tryLock()  { return tryAcquire(1); }

    // 一般情况下,咱们调用unlock 要保证当前线程是持有锁的
    // 特殊情况,当 worker 的 state == -1的时候,调用 unlock 表示初始化state,设置 state == 0
    // 启动worker 之前会先调用 unlock() 这个方法,会强制刷新ExclusiveOwnerThread == null 和 state == 0
    public void unlock()      { release(1); }

    // 就是返回当前 worker 的lock 是否被占用
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

3.4、execute(Runnable command) 方法分析

// 线程池提交方法
// Runnable command:可以是普通的 Runnable 实现类,也可以是 FutureTask
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 获取ctl 最新值 ctl:高三位 表示线程池状态,低位表示当前线程池线程数量
    int c = ctl.get();
    // workerCountOf(c):获取当前线程池数量
    // 条件成立:表示当前线程数量小于核心线程数量,此次提交任务,直接创建一个新的worker,对应线程池中多了一个新线程
    if (workerCountOf(c) < corePoolSize) {
        // addWorker 即为创建线程的过程,会创建worker对象,并且将command作为fistTask
        // core == true:表示采用核心线程数量限制,false:表示采用 maximumPoolSize
        if (addWorker(command, true))
            // 创建成功后直接返回,addWorker 方法里面会启动新创建的worker,将firstTask执行
            return;
        // 执行到这条语句,说明addWorker 一定是失败了
        // 有几种可能?
        // 1.存在并发现象,execute 方法是有可能有多个线程同时调用的,当workerCountOf(c)<corePoolSize成立后
        // 其他线程可能也成立了,并且向线程池中创建了worker,这个时候线程池中线程数量已经达到了核心线程数,所以当前线程失败了
        // 2.当前线程池状态发生改变了,RUNNING SHUTDOWN STOP TIDYING TERMINATION
        // 当线程池状态是非 RUNNING 状态的时候,addWorker(firstWorker != null,true | false) 一定会失败
        // SHUTDOWN 状态下,也有可能创建成功,前提 firstTask == null 而且当前queue不为空,特殊情况,在addWorker方法中有说明
        c = ctl.get();
    }

    // 执行到这里有几种情况?
    // 1.当前线程数量已经达到了 corePoolSize
    // 2.addWorker 失败,并发导致
    // 条件成立:说明当前线程池处于RUNNING 状态,则尝试将 task 放入到workqueue中
    if (isRunning(c) && workQueue.offer(command)) {
        // 进入到内部的前提条件:当前线程池中的线程数量大于等于corePoolSize,并且当前线程池处于RUNNING状态,而且当前线程将command放入到workqueue成功
        // 再次获取ctl的值
        int recheck = ctl.get();
        // 条件一:! isRunning(recheck) 成立:说明你提交到队列之后,线程池状态被外部线程给修改了,比如:shutdown() shutdownNow()
        // 这种情况成立的话,说明线程池状态被外部线程给修改了,需要把刚刚提交的任务给删除掉
        // 条件二:remove(command) :有可能成功,也有可能失败
        // 成功:提交之后,线程池中的线程还未消费(处理)
        // 失败:提交之后,在shutdown() shutdownNow() 之前,就被线程池中的线程给处理了
        if (! isRunning(recheck) && remove(command))
            // 提交之后,线程状态为非RUNNING了且移除任务队列成功,走个拒绝策略
            reject(command);

        // 有几种情况会走到这里?
        // 1.当前线程池是RUNNING 状态
        // 2.线程池状态是非 RUNNING 状态,但是 remove 提交的任务失败
        // 担心当前线程池是 RUNNING 状态,但是线程池中的存活数量是0,这个时候,会很尴尬,任务没线程去跑了
        // 这里其实是一个担保机制,保证线程池在RUNNING 状态下,最起码得有一个线程在工作
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 执行到这里,有几种情况?
    // 1.workqueue.offer(command)失败,说明当前队列满了
    // 2.当前线程是非RUNNING 状态

    // 1.offer失败,需要做什么?说明当前queue满了,这个时候,如果当前线程数量尚未达到maximumPoolSize得话,尝试创建新得worker,直接执行command
    // 假设线程数量达到 maximumPoolSize 得话,这里会也会失败,也会走拒绝策略
    // 2.线程池状态为非RUNNING 状态,这个时候因为 command != null addWorker 一定是返回false
    else if (!addWorker(command, false))
        reject(command);
}

3.4、addWorker(Runnable firstTask, boolean core) 方法分析(重点)

// 返回值总结:
// true:表示worker 创建成功,且线程启动成功
// false:表示创建失败
// 1.线程池状态 rs > SHUTDOWN (STOP TIDYING TERMINATION)
// 2.rs == shutdown 但是队列中已经没有任务了 或者 当前状态是SHUTDOWN 且队列未空,但是当前任务的firstTask不为null
// 3. 当前线程池已经达到指定指标(corePoolSize / maximumPoolSize)
// 4.threadFactory 实现类创建的线程为null
// Runnable firstTask:可以为null,表示启动worker之后,worker 自动到queue中去获取任务,如果不是null,则worker优先执行 firstTask
// boolean core:表示采用的线程池的线程数限制,true:采用核心线程数限制,false:采用maximumPoolSize线程数限制
private boolean addWorker(Runnable firstTask, boolean core) {
    // 自旋操作:判断当前线程池状态是否允许创建线程
    retry:
    for (;;) {
        // 获取当前ctl值保存到c中
        int c = ctl.get();
        // 获取当前线程池运行状态保存到rs中
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 条件一:rs >= SHUTDOWN 成立:说明当前线程池状态不是RUNNING 状态
        // 条件二:前置条件:当前线程池状态不是RUNNING 状态 ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
        // rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
        // 表示:当前线程池状态是SHUTDOWN状态 && 当前提交的任务是null addWorker这个方法可能不是execute去调用的 && 当前任务队列不是空
        // 排除掉这种情况,当前线程池状态是SHUTDOWN状态,但是队列里面还有任务尚未处理,这个时候是允许添加worker的,但是不允许再次提交task
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            // 什么情况下会返回false?
            // 线程池状态 rs >= SHUTDOWN
            // rs == SHUTDOWN 但是队列中已经没有任务了 或者 rs == SHUTDOWN 且 firstTask != null
            return false;

        // 上面这些代码,就是判断当前线程池状态是否允许添加worker

        // 内部自旋操作:获取创建线程令牌的过程
        for (;;) {
            // 获取当前线程池中的线程数量,保存到wc中
            int wc = workerCountOf(c);
            // 条件一:wc >= CAPACITY 永远不成立,因为CAPACITY是一个5亿多的数字
            // 条件二:wc >= (core ? corePoolSize : maximumPoolSize)
            // core == true,判断当前线程数量是否 >= corePoolSize,会拿核心线程数量做限制
            // core == false,判断当前线程数量是否 >= maximumPoolSize,会拿最大线程数量做限制
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                // 执行到这里,说明当前已经无法添加线程了,已经达到指定限制了
                return false;

            // 条件成立:说明记录线程数量已经加1成功,相当于申请到了一块令牌
            // 条件失败:说明可能有其他线程,修改过ctl这个值了,CAS 发生了冲突
            // 可能发生过什么冲突?
            // 1.其他线程execute() 申请过令牌了,在这之前,改变了ctl的值,期望值c与内存中的ctl的值不符合,导致CAS 失败
            // 2.外部线程可能调用过 shutdown() 或者 shutdownNow() 导致线程池状态发生了变化了,ctl高三位表示线程池状态
            // 线程池状态改变后,CAS 操作也会失败
            if (compareAndIncrementWorkerCount(c))
                // 进入到这里,一定是CAS 失败,申请到令牌了,跳出外部自旋操作
                break retry;
            // CAS 失败,没有成功的申请到令牌
            // 获取最新的 ctl 值
            c = ctl.get();  // Re-read ctl
            // 判断当前线程池状态是否发生过变化,如果外部在这之前调用过shutdown 或者shurdownNow会导致线程池状态发生变化
            if (runStateOf(c) != rs)
                // 线程池状态发生变化后,直接返回到外层循环,外层循环负责判断当前线程池状态,是否允许创建线程
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // CAS 成功,跳出外部自旋操作来到了这里

    // workerStarted:表示当前创建的worker是否已经启动 false:未启动 true:已经启动
    boolean workerStarted = false;
    // workerAdded:表示创建的worker是否添加到线程池中 false:未添加 true:已经添加
    boolean workerAdded = false;
    // w:表示后面创建worker 的一个引用
    Worker w = null;
    try {
        // 创建worker
        w = new Worker(firstTask);
        // 将新创建的worker 节点的线程赋值给t
        final Thread t = w.thread;
        // 为什么这里还要做 t != null 这个判断?
        // 为了防止ThreadFactory 实现类有bug,因为ThreadFactory 是一个接口,谁都可以实现
        // 防止程序员自己实现的ThreadFactory 实现类有bug,导致创建出来的Thread为null
        if (t != null) {
            // 将全局锁的引用保存到mainLock变量中
            final ReentrantLock mainLock = this.mainLock;
            // 持有全局锁,可能会阻塞,直到获取成功为止,同一时刻操作线程池内部相关的操作都必须持有锁
            mainLock.lock();

            // 从这里加锁之后,其他线程是无法修改线程池状态的
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 获取最新的线程池运行状态保存到rs中
                int rs = runStateOf(ctl.get());

                // 条件一:rs < SHUTDOWN 成立:当前线程池处于RUNNING 状态,最正常的状态
                // 条件二: 前置条件:当前线程池状态不是RUNNING 状态
                // (rs == SHUTDOWN && firstTask == null):当前状态为SHUTDOWN 状态且firstTask为null
                // 其实判断的就是SHUTDOWN 状态下的特殊情况,只不过这里不再判断队列是否为空了
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // t.isAlive() 当前线程start 后,线程isAlive 会返回true
                    // 防止程序员在ThreadFactory实现类创建线程返回给外部之前,将线程给start了
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将咱们创建的worker 添加到线程池中
                    workers.add(w);
                    // 获取最新当前线程池的线程数量
                    int s = workers.size();
                    // 条件成立:说明当前线程数量是一个新高,更新lagestPoolSize线程池中的线程最大数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 表示线程已经加入到线程池中了
                    workerAdded = true;
                }
            } finally {
                // 释放线程池全局锁
                mainLock.unlock();
            }
            // 条件成立:说明当前添加worker成功
            // 条件失败:说明线程池在lock之前,线程池状态发生了变化导致添加失败
            if (workerAdded) {
                // 成功后则将创建的worker启动
                t.start();
                // 启动标记设置为true
                workerStarted = true;
            }
        }
    } finally {
        // 条件成立:说明添加当前线程到线程池失败或者启动线程失败,需要做清理工作
        // 1.释放令牌
        // 2.将当前worker 清理出workers集合
        if (! workerStarted)
            addWorkerFailed(w);
    }
    // 返回新创建的线程是否启动
    return workerStarted;
}

addWorkerFailed(Worker w) 方法

// addWorker添加线程到线程池中失败或者线程启动失败的后续工作,两个操作
// 1.释放令牌
// 2.将当前worker 清理出workers集合
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    // 持有线程池全局锁,因为操作的是线程池相关的东西
    mainLock.lock();
    try {
        // 条件成立:需要将worker 在workers 中清理除去
        if (w != null)
            workers.remove(w);
        // 线程池计数恢复-1,前面+1获取令牌了,这里因为失败,所以要-1,相当于归还令牌
        decrementWorkerCount();
        // 回头讲,shutdown shutdownNow再说
        tryTerminate();
    } finally {
        // 释放线程池全局锁
        mainLock.unlock();
    }
}

3.5、runWorker(Worker w) 方法分析

// w:就是启动worker
final void runWorker(Worker w) {
    // wt == w.thread
    Thread wt = Thread.currentThread();
    // 将初始执行的task赋值给task
    Runnable task = w.firstTask;
    // 清空当前w.firstTask的引用
    w.firstTask = null;
    // 这里为什么先调用unlock? 就是为了初始化worker state = 0 和 ExclusiveOwnerThread = null
    // 启动worker 之前会先调用 unlock() 这个方法,会强制刷新ExclusiveOwnerThread == null 和 state == 0
    w.unlock(); // allow interrupts

    // 是否是突然退出?如果为true,表示发生异常了,当前线程突然退出的,回头需要做一些处理 false:表示正常退出的
    boolean completedAbruptly = true;

    try {
        // 条件一:task != null 指的就是firstTask 是否为null 如果是null,直接执行循环体里面
        // 条件二:(task = getTask()) != null 条件成立:说明当前线程在queue中获取任务成功,getTask是会阻塞线程的方法
        // getTask()如果返回null,说明当前线程需要执行退出逻辑
        while (task != null || (task = getTask()) != null) {
            // worker 设置独占锁为当前线程
            // 为什么要设置独占锁呢?就是怕shutdown()的时候会判断当前worker状态,根据独占锁是否空闲来判断当前worker是否正在工作
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt

            // 条件一:runStateAtLeast(ctl.get(), STOP) 说明线程池状态大于等于STOP,线程池目前处于STOP/TIDYING/TERMINATION,此时线程一定要给它一个中断信号
            // 条件一成立:(runStateAtLeast(ctl.get(), STOP) && !wt.isInterrupted()
            // 上面条件如果成立,说明当前线程池状态是大于等于STOP且当前线程是未设置中断状态的,此时需要进入到if里面,给当前线程一个中断

            // 假设:(runStateAtLeast(ctl.get(), STOP) == false
            // (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))
            // Thread.interrupted() 获取当前中断状态,且设置中断位为false,连续调用两次interrupted()方法,第二次一定是返回false
            // runStateAtLeast(ctl.get(), STOP):大概率这里还是false
            // 其实它在强制刷新当前线程的中断标记为false,因为有可能上一次执行task的时候,业务代码里面将线程的中断标记位设置为了true且没有做处理
            // 所以这里一定要强制刷新一下,不会再影响到后面的task了
            // 假如:Thread.interrupted() == true && runStateAtLeast(ctl.get(), STOP) == true
            // 这种情况有可能发生嘛?
            // 有可能,因为外部线程再当前线程第一次(runStateAtLeast(ctl.get(), STOP) == false后,有机会调用shutdown shutdownNow方法将线程池状态修改
            // 这个时候也会将当前线程的中断标记位再次设置为中断状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();

            try {
                // 钩子方法,留给子类实现的
                beforeExecute(wt, task);
                // 表示异常情况,如果thrown不为空,表示task运行过程中发生了异常,向上层抛出了异常
                Throwable thrown = null;
                try {
                    // task可能是firstTask,也可能是普通的runnable接口实现类
                    // 如果前面是通过submit()提交的runnable/callable会被封装成FutureTask对象
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 钩子方法,留给子类实现的
                    afterExecute(task, thrown);
                }
            } finally {
                // 将局部变量task设置为 null
                task = null;
                // 更新worker完成任务数量
                w.completedTasks++;
                // worker 处理完一个任务后会释放独占锁,然后再次到queue中获取任务
                // 1.正常情况下会再次回到getTask()那里获取任务 while(getTask())
                // 2.task.run()内部抛出异常了
                w.unlock();
            }
        } // while
        // 什么情况下会来到这里?
        // getTask()返回null的时候,说明当前线程应该执行退出逻辑了
        completedAbruptly = false;
    } finally {
        // task.run() 内部抛出异常的时候,当上面的finally执行完以后跳到这里
        // 正常提出 completedAbruptly = false 异常退出:completedAbruptly = true
        // 这个方法以后再说
        processWorkerExit(w, completedAbruptly);
    }
}

3.6、getTask() 方法分析(重点)

// 什么情况下会返回null?
// 1.rs >= STOP成立说明:当前线程池的状态最低也是STOP状态,一定要返回null了
// 2.说明当前线程池状态为SHUTDOWN状态且任务队列已经为null,此时一定要返回null
// 3.线程池中的线程数量超过最大限制的时候,会有一部分线程返回为null
// 4.线程池中的线程数超过corePoolSize的时候,会有一部分线程超时后返回null
// 到当前的任务队列queue中去获取任务
private Runnable getTask() {
    // 表示当前线程获取任务是否超时 默认false true表示已经超时
    boolean timedOut = false; // Did the last poll() time out?

    // 自旋
    for (;;) {

        // 获取最新ctl的值保存到c中
        int c = ctl.get();
        // 获取当前线程池的运行状态
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 条件一:rs >= SHUTDOWN 条件成立:说明当前线程池是非RUNNING 状态,可能是SHUTDOWN/STOP/TIDYING/TERMINATION//
        // 条件二:rs >= STOP || workQueue.isEmpty()
        // 2.1:rs >= STOP成立说明:当前线程池的状态最低也是STOP状态,一定要返回null了
        // 2.2:前置条件:状态是SHUTDOWN,workqueue.isEmpty() 条件成立:说明当前线程池状态为SHUTDOWN状态且任务队列已经为null,此时一定要返回null
        // 返回null,runworker()方法就会将返回null的线程执行退出线程池的逻辑
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 使用CAS + 死循环的方式将ctl的值-1
            decrementWorkerCount();
            return null;
        }

        // 执行到这里,有几种情况:
        // 1.线程池是RUNNING 状态
        // 2.线程池是SHUTDOWN 状态,但是队列queue还未为空,此时可以创建线程(添加worker)

        // 获取线程池中的线程数量
        int wc = workerCountOf(c);

        // timed == true:表示当前这个线程获取task的时候是支持超时机制的,使用queue.poll(时间单位,数量)
        // 当获取task超时的情况下,下一次自旋就可能返回null了
        // timed == false:表示当前这个线程获取task的时候是不支持超时机制的,当前线程会使用 queue.take()

        // 情况一:allowCoreThreadTimeOut == true 表示核心线程数量内的空闲线程可以被回收,所有线程都是使用queue.poll()超时机制这种方式获取task
        // 情况二:allowCoreThreadTimeOut == false 表示当前线程池会维护核心数量内的线程

        // wc > corePoolSize:条件成立:当前线程池中的线程数量是大于核心线程数的,此时让所有路过这里的线程都使用poll,超时的方式去获取任务
        // 这样就可能会有一部分线程获取不到任务,返回null,然后runworker会执行线程退出逻辑
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 条件一:(wc > maximumPoolSize || (timed && timedOut)
        // 1.1:wc > maximumPoolSize 为什么会成立?setMaximumPoolSize() 方法,可能外部线程将线程池最大线程数设置为比初始化时的要小
        // 1.2:timed && timedOut 条件成立:前置条件,当前线程使用poll()的方式从队列queue中获取worker执行,上一次循环的时候,使用poll的方式获取任务的时候,超时了
        // 条件一为true:表示线程可以被回收,达到回收的标准,当确实需要回收的时候再回收
        // 条件二:wc > 1 || workQueue.isEmpty()
        // 2.1:wc > 1:条件成立,说明当前线程池中还有其他线程,当前线程不需要从队列中获取任务可以直接回收
        // 2.2:workQueue.isEmpty():前置条件,条件成立:wc == 1 ,说明当前任务队列已经空了,最后一个线程,也可以放心的退出
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 使用 CAS 机制,将ctl的值减去1,减一成功的线程返回null
            // CAS 失败?为什么会CAS 失败?
            // 1.其他线程先你一步推出了
            // 2.线程池状态发生变化了
            if (compareAndDecrementWorkerCount(c))
                return null;
            // 再次自旋的时候,timed有可能就是false了,因为当前线程CAS失败,很有可能是因为其他线程成功退出导致的,再次查询的时候
            // 检查发现,当前线程就可能属于不需要回收范围内了
            continue;
        }

        // 获取任务的逻辑
        try {
            // poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
            // 队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取,返回失败。
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                // take 方法:获取队列中的第一个元素,如果被阻塞,则等待
                workQueue.take();

            // 条件成立:返回任务
            if (r != null)
                return r;
            // 说明当前线程超时了
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

3.7、processWorkerExit(Worker w, boolean completedAbruptly) 方法分析

// 线程退出的逻辑
// w:线程执行的worker要获取任务
// completedAbruptly:线程是否发生异常 true:表示线程发生了异常 false:表示线程没有发生异常
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 条件成立:代表当前w这个worker 是发生异常退出的 task任务执行过程中向上抛出了异常
    // 异常退出的时候,ctl的线程数量并没有-1
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    // 获取线程池的全局锁引用
    final ReentrantLock mainLock = this.mainLock;
    // 加锁
    mainLock.lock();
    try {
        // 将当前worker完成的task数量,汇总到线程池的completedTaskCount
        completedTaskCount += w.completedTasks;
        // 将worker 从任务中移除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 回头再说
    tryTerminate();

    // 获取最新的ctl值
    int c = ctl.get();
    // 条件成立:说明当前线程池状态为RUNNING 或者 SHUTDOWN 状态
    if (runStateLessThan(c, STOP)) {
        // 条件成立:说明当前线程是正常退出的
        if (!completedAbruptly) {
            // min:表示线程池最低持有的线程数量
            // allowCoreThreadTimeOut == true:说明核心线程数内的线程,也会超时被回收 min = 0
            // allowCoreThreadTimeOut == false:说明核心线程数内的线程,不会超时被回收 min = corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

            // 前提状态:线程池状态为RUNNING SHUTDOWN
            // 条件一: min == 0成立
            // 条件二:! workQueue.isEmpty() 说明任务队列中还有任务,最起码留一个线程
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;

            // 条件成立:线程池中还拥有足够的线程
            // 考虑一个问题:workerCountOf(c) >= min => (0 >= 0)
            // 有可能!
            // 什么情况下,当线程池中的核心线程数是可以被回收的,会出现这种情况,这种情况下,当前线程池中的线程数会变为0
            // 下次再提交任务的时候,会再次创建线程
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }

        // 1.当前线程再执行task的时候发生异常,这里一定要创建一个新的worker顶上去
        // 2.! workQueue.isEmpty() 说明任务队列中还有任务,最起码要留一个线程,当前状态为 RUNNING | SHUTDOWN
        // 3.线程数量小于corePoolSize值,此时会创建线程,维护线程池中的线程数量在corePollSize左右
        addWorker(null, false);
    }
}

3.8、shutdown() 方法分析

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    // 获取线程池的全局锁
    mainLock.lock();
    try {
        // 判断是否有权限
        checkShutdownAccess();

        // 设置线程池的状态为 SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断空闲线程
        interruptIdleWorkers();
        // 空方法,子类可以扩展
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        // 释放线程池全局锁
        mainLock.unlock();
    }
    // 后面再说
    tryTerminate();
}

// 中断空闲线程
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

// 通过自旋操作设置线程池的状态为SHUTDWON
private void advanceRunState(int targetState) {
    // 自旋
    for (;;) {
        int c = ctl.get();
        // 条件一成立:假设targetState == SHUTDOWN,说明当前线程池状态是 >= SHUTDOWN
        // 条件一不成立:假设targetState == SHUTDOWN,说明当前线程池状态是 RUNNING
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

3.9、tryTerminate() 方法分析

final void tryTerminate() {
    // 自旋操作
    for (;;) {
        // 获取最新ctl的值
        int c = ctl.get();
        // 条件一:isRunning(c) 成立:直接返回就行了,线程池很正常
        // 条件二:runStateAtLeast(c, TIDYING) 说明已经有其他线程在执行 TIDYING 转到 TERMIATION 状态了,当前线程直接回去
        // 条件三:runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()
        // SHUTDOWN 特殊情况,如果是这种情况,直接回去,得等队列中得任务处理完后再转化状态
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;

        // 什么情况会执行到这里?
        // 1.线程池状态为 STOP 得时候
        // 2.线程池状态为 SHUTDWON并且队列已经空了
        // 条件成立:当前线程池中得线程数量 > 0
        if (workerCountOf(c) != 0) { // Eligible to terminate
            // 中断一个空闲线程
            // 空闲线程在哪空闲呢?queue.take() 或者 queue.poll()
            // 1.唤醒后的线程会在getTask方法返回null
            // 2.执行退出逻辑得时候,会再次调用 tryTerminate 方法,唤醒下一个空闲线程
            // 3.因为线程池状态是(线程池状态为 STOP 得时候 | 线程池状态为 SHUTDWON并且队列已经空了) 最终调用addWorker得时候会失败得
            // 最终空闲线程都会在这里退出,非空闲线程当执行完当前task得时候,也会调用tryTerminate方法,有可能会走到这里
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        // 执行到这里得线程是谁?
        // workerCountOf(c) == 0得时候,会来到这里
        // 最后一个退出得线程,咱们知道,在(线程池状态 == STOP | 线程池状态为 SHUTDWON并且队列已经空了)
        // 线程唤醒后,都会执行退出逻辑,退出过程中会先将 workerCount 计数 -1 => ctl中得计数-1
        // 调用 tryTerminate方法之前,已经减过了,所以 0 得时候,表示这是最后一个退出得线程了
        final ReentrantLock mainLock = this.mainLock;
        // 获取全局锁
        mainLock.lock();
        try {
            // 设置线程池状态为 TIDYING
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 调用钩子方法
                    terminated();
                } finally {
                    // 设置线程池状态为 TERMINATED 状态
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 唤醒调用 awaitTermination() 方法得线程
                    termination.signalAll();
                }
                return;
            }
        } finally {
            // 释放全局锁
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

3.10、shutdownNow() 方法分析

public List<Runnable> shutdownNow() {
    // 返回值引用
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    // 获取线程池的全局锁
    mainLock.lock();
    try {
        // 校验权限
        checkShutdownAccess();
        // 设置线程池状态为STOP
        advanceRunState(STOP);
        // 中断线程池中的所有线程
        interruptWorkers();
        // 导出未处理的task
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    // 返回当前任务队列中未处理的任务
    return tasks;
}

至此,线程池 ThreadPoolExecutor 类源码就告一段落了,下面会开始更新 AQS 部分的内容,如果有错误,请指正!

标签:0000,worker,源码,线程,当前,ctl,null,ThreadPoolExecutor
来源: https://blog.csdn.net/weixin_46410481/article/details/122864251

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有