ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

java开发之ThreadPoolExecutor源码分析

2021-10-18 10:00:18  阅读:170  来源: 互联网

标签:java mainLock Worker try 源码 线程 ctl null ThreadPoolExecutor


线程池的状态

只有了解线程池的几个状态,才能读懂它的核心源码。所以先说说这几个状态

running:为线程池初始化时的默认状态,此状态会接收任务进行处理

shutdown: 该状态下的线程池不接收任何任务,但会等待正在运行的任务执行完。通常调用shutdown() 方法完成设置

stop: 该状态的线程池不接收任何任务,同时​​java培训​​不会等待正在运行的任务执行完毕。通常调用shutdownNow() 方法完成设置

tidying:该状态下的线程池内,没有任何线程和任务

terminated:该状态为线程池的终态,通常调用tryTerminate()方法完成设置

大多数情况下线程池的一个生命周期流转大概是 running -> (shutdown,stop)-> tidying -> terminated

这几个状态在ThreadPoolExecutor源码中,通过一个ctl的整型原子变量标识,高3位标识线程状态,低29位标识线程数量。翻看源码就能看到

核心源码分析

  • execute(Runnable command)

为线程池的核心方法,调用该方法任务就会执行,直接看下面代码注释吧

  public void execute(Runnable command) {
        if (command == null) throw new NullPointerException();
        int c = ctl.get();//获取ctl原子变量

        //如果当前线程池的线程数量小于corePoolSize,添加Worker对象。Worker对象是什么后面说
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true)) 
                return;//返回,结束
            c = ctl.get();
        }

        // 如果当前线程池的线程数量 > corePoolSize
        // 且当前线程是否处于running ,则添加任务到队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
           // 二次检查,当前线程不是处于running,则移除任务
            if (! isRunning(recheck) && remove(command))
            // 执行拒绝策略
                reject(command);
                //线程数量等于零,那就在添加Worker对象呗
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

        // 如果任务队列满,则添加Worker对象,如果添加失败执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

以上为核心源码的分析,无非就是根据线程池情况添加Worker、任务入队、执行拒绝策略。可以看看下面这个流程图,可能会更清晰

到这里,我们可以来讲讲addWorker 了。这个方法会封装成一个Worker对象,然后运行任务。看看Worker对象的类图:

Worker实现Runnable接口、继承AbstractQueuedSynchronizer,持有一个Thread的成员变量。所以可以把Worker对象看成一个线程,同时拥有AbstractQueuedSynchronizer的属性和方法,因此它能够进行加锁和释放锁的操作。

ok,逐步跟进来看看addWorker方法里面的逻辑。

  private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            
            //当前线程池状态
            int rs = runStateOf(c);

            // 如果当前线程池状态不合法就不让添加
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
   
            
            for (;;) {
               //获取当前线程数量
                int wc = workerCountOf(c);
                // 如果wc 大于ctl所能表示的最大线程数或者大于最大线程数则不让添加
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 通过CAS操作,增加线程池中的Worker数。如果添加成功结束双层循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //如果CAS操作失败,内层循环继续执行   
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           //创建Worker对象,传入任务
            w = new Worker(firstTask);
            // 获取Worker对象的线程变量
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                //加mainLock锁,防并发
                mainLock.lock();
                try {
                    //当前线程池状态
                    int rs = runStateOf(ctl.get());
                     // 如果Worker对象的线程状态不合法,抛异常
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                         // 如果合法添加到workers集合
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                      // 一个变量标识,标明workers集合是否有添加新的worker对象      
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                   //启动线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

整体还不算复杂,核心就是根据传入的任务创建一个Worker对象,然后启动Worker。

下面来看看Worker启动的逻辑,前面说过了Worker实现Runnable接口,所以启动将会触发执行run方法,而run方法最终调的是runWorker()方法。

 final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           //死循环获取任务,然后执行任务。这里getTask()方法会有阻塞情况的,我们这里知道一下就行,下面马上讲。
            while (task != null || (task = getTask()) != null) {
               //获取w锁。前面说过了,Worker对象继承AbstractQueuedSynchronizer,所以本身就内置了一把锁
                w.lock();
                // 判断同一个时刻当前线程和线程池的状态是否合法,不合法结束呗
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                   //任务执行前的处理逻辑
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                    //任务执行后的处理逻辑
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    //当前Worker完成的任务数量
                    w.completedTasks++;
                    //释放w锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //处理Worker退出的逻辑
            processWorkerExit(w, completedAbruptly);
        }
    }

整个方法的逻辑其实也不算复杂,就是当前Worker不断死循环获取队列里面是否有任务。有,就加锁然后执行任务。无,就阻塞等待获取任务。那什么情况下才会跳出整个死循环,执行processWorkerExit呢?这里就需要看下getTask() 方法逻辑了。

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 判断线程池状态和任务队列的情况,不满足条件直接返回 null,结束。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);
            
            // 超时时间的标识,[是否设置了核心线程数的超时时间 或者 当前线程数量是否大于核心线程数 ],
 //因为我们知道线程池运行的线程数量如果大于核心线程数,多出来的那部分线程是需要被回收的。
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
            // 如果timed为false,则一直阻塞等待,直到获取到元素,然后返回
            // 如果timed为true,则一直阻塞等待keepAliveTime超时后返回,
            //到这里其实就知道如何结束runWorker方法的那个死循环了,也就意味着Worker它的线程生命周期结束了。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

最后,来看下processWorkerExit() 方法处理了哪些逻辑

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        //获取mainLock锁
        mainLock.lock();
        try {
        //添加任务数量,然后移除worker
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
        // 释放mainLock锁
            mainLock.unlock();
        }
        //尝试将线程池状态设置为 terminate
        tryTerminate();
  
        //主要判断当前线程池的线程数是否小于corePoolSize,如果小于继续添加Worker对象
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

这个方法主要就是移除Worker对象,然后尝试将线程池的状态更改为terminate。这里需要讲一下tryTerminate方法逻辑,因为它和线程池awaitTermination()方法有一定的关联,来看看它的代码。

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            //判断线程池状态,还在运行或者已经是 terminate的状态直接结束了
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
             // 就是中断空闲的Worker,后面讲shutDown方法的时候聊
            if (workerCountOf(c) != 0) { 
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            

            final ReentrantLock mainLock = this.mainLock;
            //获取mainLock锁
            mainLock.lock();
            try {
            //线程池设置成TIDYING状态
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                    //钩子方法,线程池终止时执行的逻辑
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                    // termination为mainLock锁的condition实例,这个是来实现线程之间的通信。
                    //其实这里是来唤醒awaitTermination()方法,后面分析awaitTermination源码会看到。
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
            // 释放锁
                mainLock.unlock();
            }
           
        }
    }

到这里,线程池execute方法大致的逻辑就完了。可以再看看时序图,理清下几个方法和类之间的调用。

  • shutdown()

中断线程池的线程,会等待正在执行的线程结束执行,来看看源码它是怎么实现的

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        //获取mainLock锁,防止其他线程执行
        mainLock.lock();
        try {
        //检查权限,确保用户线程有关闭线程池的权限
            checkShutdownAccess();
        //通过CAS将线程池状态设置成 SHUTDOWN
            advanceRunState(SHUTDOWN);
          //中断所有空闲的Workers , 下面分析这个方法
            interruptIdleWorkers();
          //钩子方法,让子类进行收尾的逻辑
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
        // 释放mainLock锁
            mainLock.unlock();
        }
        //execute方法,我们分析过了,主要就是尝试将线程池的状态设置为terminate
        tryTerminate();
    }

该方法我们比较关注的点是 interruptIdleWorkers方法,是怎样中断空闲Worker,然后是如何保证Worker执行完毕的?看看代码就知道了

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        //获取mainLock锁
        mainLock.lock();
        try {
        //轮询workers逐一中断
            for (Worker w : workers) {
                Thread t = w.thread;
                //判断 如果当前线程未中断且能够获取w锁,则执行中断
                // 如果当前线程未中断但不能获取w锁,不进行中断。
                //这里的w锁,就是前面在分析execute时,有个死循环不断取任务,取到任务就会获取w锁。
                //所以这边如果获取不到w锁,就证明还有任务没有执行完。
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                    //中断线程
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

到这里,核心逻辑就是通过w这个锁来完成的。

  • shutdownNow
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    }

源码和shutdown差不多,只不过将线程池状态设置为stop,然后调用interruptWorkers 方法,看看worker方法。

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

代码中并没有获取w锁的逻辑,所以这个方法会直接中断所有线程,并不会等待那些正在执行任务的worker把任务执行完。

  • awaitTermination

调用awaitTermination方法会一直阻塞等待线程池状态变为 terminated 才返回 或者等待超时返回。来看看代码就明白了

    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
            //如果已经是terminated状态直接返回
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                // (1)等待mainLock锁的condition实例来唤醒,不然持续阻塞。
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }

(1)处的代码已经告诉了该方法什么时候返回,就是mainLock锁的termination条件变量被唤醒返回。在上面分析中termination条件变量被唤醒是在执行tryTerminate()时完成的,因为内部调用termination.signalAll()。而tryTerminate() 方法被shutDown() 和shutDownNow() 调用过,所以如果要让awaitTermination 返回,调用这2个方法就行。

标签:java,mainLock,Worker,try,源码,线程,ctl,null,ThreadPoolExecutor
来源: https://blog.csdn.net/m0_58371965/article/details/120820738

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有