ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

JUC学习 - 深入剖析线程池(ThreadPoolExecutor)(补)

2021-12-05 15:59:48  阅读:144  来源: 互联网

标签:JUC task Worker 任务 Runnable 线程 null ThreadPoolExecutor


接上一篇博客 https://blog.csdn.net/qq_43605444/article/details/121727738?spm=1001.2014.3001.5501

6、Worker 类

下面的是在 Worker 类上的官方的一段注释:

/**
  * Class Worker mainly maintains interrupt control state for
  * threads running tasks, along with other minor bookkeeping.
  * This class opportunistically extends AbstractQueuedSynchronizer
  * to simplify acquiring and releasing a lock surrounding each
  * task execution.  This protects against interrupts that are
  * intended to wake up a worker thread waiting for a task from
  * instead interrupting a task being run.  We implement a simple
  * non-reentrant mutual exclusion lock rather than use
  * ReentrantLock because we do not want worker tasks to be able to
  * reacquire the lock when they invoke pool control methods like
  * setCorePoolSize.  Additionally, to suppress interrupts until
  * the thread actually starts running tasks, we initialize lock
  * state to a negative value, and clear it upon start (in
  * runWorker).
  */

从注释中我们可以了解到 Worker 类是内部类,既实现了Runnable,又继承了AbstractQueuedSynchronizer(以下简称AQS),所以其既是一个可执行的任务,又可以达到锁的效果。从下面的代码也可以看出:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable

Worker 类主要维护正在运行任务的线程的中断控制状态,以及其他次要的记录。

这个类继承了AbstractQueuedSynchronizer类,以简化获取和释放锁(该锁作用于每个任务执行代码)的过程。这样可以防止去中断正在运行中的任务,只会中断在等待从任务队列中获取任务的线程。

我们实现了一个简单的不可重入互斥锁,而不是使用可重入锁(ReentrantLock),因为我们不希望工作任务在调用setCorePoolSize之类的池控制方法时能够重新获取锁。另外,为了在线程真正开始运行任务之前禁止中断,我们将锁状态初始化为负值,并在启动时清除它(在runWorker中)。

我们来看一下它的部分代码:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
    final Thread thread;//Worker持有的线程
    Runnable firstTask;//初始化的任务,可以为null
}

Worker 这个工作线程,实现了Runnable接口,并持有一个线程thread,一个初始化的任务firstTask。thread是在调用构造方法时通过ThreadFactory来创建的线程,可以用来执行任务;firstTask用它来保存传入的第一个任务,这个任务可以有也可以为null。如果这个值是非空的,那么线程就会在启动初期立即执行这个任务,也就对应核心线程创建时的情况;如果这个值是null,那么就需要创建一个线程去执行任务列表(workQueue)中的任务,也就是非核心线程的创建。

Worker 执行任务的模型如下:
在这里插入图片描述
关于 addWorker 方法的介绍在上一篇博客中有介绍,需要的看【https://blog.csdn.net/qq_43605444/article/details/121727738?spm=1001.2014.3001.5501

我们来看一下 Worker 类的完整的代码:

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    // Worker持有的线程
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    // 初始化的任务,可以为null
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
    Worker(Runnable firstTask) {
        // 设置AQS的同步状态
        // 		- state:锁状态,-1为初始值,0为unlock状态,1为lock状态
        setState(-1); // inhibit interrupts until runWorker  在调用runWorker前,禁止中断
        this.firstTask = firstTask;
        // 线程工厂创建一个线程
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  
      *
      * 将主运行循环委托给外部 runWorker
      */
    public void run() {
        runWorker(this);   // runWorker()是ThreadPoolExecutor的方法
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.  0代表“没被锁定”状态
    // The value 1 represents the locked state.  1代表“锁定”状态

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
	
    /**
      * 尝试获取锁的方法
      * 	- 重写 AQS 的 tryAcquire()
      */
    protected boolean tryAcquire(int unused) {
        // 判断原值为0,且重置为1,所以state为-1时,锁无法获取。
        // 每次都是 0->1 ,保证了锁的不可重入性
        if (compareAndSetState(0, 1)) {
            // 设置exclusiveOwnerThread=当前线程
            // 独占模式
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

   /**
     * 尝试释放锁
     *     - 不是state-1,而是置为0
     */
    protected boolean tryRelease(int unused) {
    	// 清除当前占用的线程
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

   /**
     * 中断(如果运行)
     * shutdownNow时会循环对worker线程执行
     * 且不需要获取worker锁,即使在worker运行时也可以中断
     */
    void interruptIfStarted() {
        Thread t;
        // 如果state>=0、t!=null、且t没有被中断
        // new Worker()时state==-1,说明不能中断
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

关于 AQS 的描述看博主的另外一篇文章 【https://blog.csdn.net/qq_43605444/article/details/121705312?spm=1001.2014.3001.5501

7、runWorker 方法

  • 方法说明:可以说,runWorker(Worker w) 是线程池中真正处理任务的方法,前面的execute() 和 addWorker() 都是在为该方法做准备和铺垫。
  • 参数说明:
    1. Worker w:封装的Worker,携带了工作线程的诸多要素,包括 Runnable(待处理任务)、lock(锁)、completedTasks(记录线程池已完成任务数)
  • 下面是具体的代码分析:
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // new Worker()是state==-1,此处是调用Worker类的tryRelease()方法,将state置为0,而interruptIfStarted()中只有state>=0才允许调用中断
    w.unlock(); // allow interrupts
    
    // 线程退出的原因,true是任务导致,false是线程正常退出
    boolean completedAbruptly = true;
    try {
        // 当前任务和从任务队列中获取的任务都为空,方停止循环
        while (task != null || (task = getTask()) != null) {
            // 上锁可以防止在shutdown()时终止正在运行的worker,而不是应对并发
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
           /**
             * 判断1:确保只有在线程处于stop状态且wt未中断时,wt才会被设置中断标识
             * 条件1:线程池状态>=STOP,即STOP或TERMINATED
             * 条件2:一开始判断线程池状态<STOP,接下来检查发现Thread.interrupted()为true,
             * 即线程已经被中断,再次检查线程池状态是否>=STOP(以消除该瞬间shutdown方法生效,
             * 使线程池处于STOP或TERMINATED),
             * 条件1与条件2任意满足一个,且wt不是中断状态,则中断wt,否则进入下一步
             */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                // 当前线程调用interrupt()中断
                wt.interrupt();
            try {
                // 执行前(空方法,由子类重写实现)
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行后(空方法,由子类重写实现)
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                // 完成任务数+1
                w.completedTasks++;
                // 释放锁
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 处理worker的退出
        processWorkerExit(w, completedAbruptly);
    }
}

runWorker 方法的执行过程是:

  1. while循环不断地通过getTask()方法获取任务。
  2. getTask()方法从阻塞队列中取任务。
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
  4. 执行任务。
  5. 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。

8、getTask 方法

  • 方法说明:由函数调用关系图可知,在ThreadPoolExecutor类的实现中,Runnable getTask() 方法是为 void runWorker(Worker w)方法服务的,它的作用就是在任务队列(workQueue)中获取 task(Runnable)。
private Runnable getTask() {
    // 最新一次poll是否超时
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
       /**
         * 条件1:线程池状态SHUTDOWN、STOP、TERMINATED状态
         * 条件2:线程池STOP、TERMINATED状态或workQueue为空
         * 条件1与条件2同时为true,则workerCount-1,并且返回null
         * 注:条件2是考虑到SHUTDOWN状态的线程池不会接受任务,但仍会处理任务
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
       /**
         * 下列两个条件满足任意一个,则给当前正在尝试获取任务的工作线程设置阻塞时间限制
         *(超时会被销毁?不太确定这点),否则线程可以一直保持活跃状态
         * 1.allowCoreThreadTimeOut:当前线程是否以keepAliveTime为超时时限等待任务
         * 2.当前线程数量已经超越了核心线程数
         */
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 两个条件全部为true,则通过CAS使工作线程数-1,即剔除工作线程
        // 条件1:工作线程数大于maximumPoolSize,或(工作线程阻塞时间受限且上次在任务队列拉取任务超时)
        // 条件2:wc > 1或任务队列为空
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 移除工作线程,成功则返回null,不成功则进入下轮循环
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        // 执行到这里,说明已经经过前面重重校验,开始真正获取task了
        try {
            // 如果工作线程阻塞时间受限,则使用poll(),否则使用take()
            // poll()设定阻塞时间,而take()无时间限制,直到拿到结果为止
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
            // r不为空,则返回该Runnable
            if (r != null)
                return r;
            // 没能获取到Runable,则将最近获取任务是否超时设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 响应中断,进入下一次循环前将最近获取任务超时状态置为false
            timedOut = false;
        }
    }
}

9、processWorkerExit 方法

  • 方法说明:processWorkerExit(Worker w, boolean completedAbruptly),执行线程退出的方法
  • 参数说明:
    1. Worker w:要结束的工作线程。
    2. boolean completedAbruptly: 是否突然完成(异常导致),如果工作线程因为用户异常死亡,则completedAbruptly参数为 true。
  • 下面让我们看看 processWorkerExit 的源码:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
   /**
     * 1.工作线程-1操作
     * 1)如果completedAbruptly 为true,说明工作线程发生异常,那么将正在工作的线程数量-1
     * 2)如果completedAbruptly 为false,说明工作线程无任务可以执行,由getTask()执行worker-1操作
     */
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    // 2.从线程set集合中移除工作线程,该过程需要加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 将该worker已完成的任务数追加到线程池已完成的任务数
        completedTaskCount += w.completedTasks;
        // HashSet<Worker>中移除该worker
        workers.remove(w);
    } finally {
        // 释放锁
        mainLock.unlock();
    }

    // 3.根据线程池状态进行判断是否结束线程池
    tryTerminate();

   /**
     * 4.是否需要增加工作线程
     * 线程池状态是running 或 shutdown
     * 如果当前线程是突然终止的,addWorker()
     * 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
     * 故如果调用线程池shutdown(),直到workQueue为空前,
     * 线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
     */
    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

10、线程初始化

默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

  • prestartCoreThread():boolean prestartCoreThread(),初始化一个核心线程
  • prestartAllCoreThreads():int prestartAllCoreThreads(),初始化所有核心线程,并返回初始化的线程数
public boolean prestartCoreThread() {
    return workerCountOf(ctl.get()) < corePoolSize &&
        addWorker(null, true);   // 注意传进去的参数是null
}

public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))   // 注意传进去的参数是null
        ++n;
    return n;
}

参考文章:

1、https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html 【美团技术团队博客】

大家可以去看一下美团技术团队写的关于线程池的文章,里面有线程池在业务中的实践。

标签:JUC,task,Worker,任务,Runnable,线程,null,ThreadPoolExecutor
来源: https://blog.csdn.net/qq_43605444/article/details/121730142

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有