ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

ThreadPoolExecutor线程池原理+源码,了解,字节跳动技术整理

2021-09-10 13:07:30  阅读:128  来源: 互联网

标签:task return 源码 线程 new null public ThreadPoolExecutor


    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    // 线程池状态 >= SHUTDOWN
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
        return false;

    for (;;) {
    	// 内层自旋
        int wc = workerCountOf(c);
        if (wc >= CAPACITY ||
            wc >= (core ? corePoolSize : maximumPoolSize))
            // 工作中的线程数大于线程池的容量,或者已经大于等于核心线程数,或者大于等于最大线程数
            // core为true,表示要创建核心线程,false表示要创建非核心线程
            // 为什么大于等核心线程数的时候要返回false,因为要添加到缓冲队列,或者创建非核心线程来执行,不能创建核心线程了
            return false;
        if (compareAndIncrementWorkerCount(c))
        	// 以CAS的方式尝试把线程数加1
        	// 注意这里只是把线程池中的线程数加1,并没有在线程池中真正的创建线程
        	// 成功后跳出内层自旋
            break retry;
        // CAS失败,再次获取ctl,检查线程池状态    
        c = ctl.get();  // Re-read ctl
        if (runStateOf(c) != rs)
        	// 线程池状态被改变了,从外层自旋开始再次执行之前的逻辑
            continue retry;
        // else CAS failed due to workerCount change; retry inner loop
    }
}
// 可以看到两层自旋 + CAS,仅仅是为了把线程池中的线程数加1,还没有新建线程

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
	// 把task包装成Worker
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        // 加锁
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            // 获取锁之后,再次检查线程池的状态
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                	// 检查线程状态
                    throw new IllegalThreadStateException();
                // 添加到worders
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                	// 维护largestPoolSize变量
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
        	// 解锁
            mainLock.unlock();
        }
        if (workerAdded) {
        	// 添加成功
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
    	// 执行worker的线程启动失败
        addWorkerFailed(w);
}
return workerStarted;

}


可以看到`addWorker`方法前一部分,用了外层**自旋**判断线程池的状态,内层**自旋 + CAS**给线程池中的线程数加1。后半部分用了`ReentrantLock`保证创建`Worker`对象,以及启动线程的线程安全。一个方法中三次获取了线程池的状态(不包含该方法调用的其他方法),因为每两次之间,线程池的状态都有可能被改变。

### runWorker

前文在介绍`Worker`内部类时说过,`Worker`会把自己传递给`ThreadFactory`创建的线程执行,最终执行`Worker`的`run`方法,而`Worker`类的`run`方法只有一行代码:

runWorker(this);


所以接下来看看`runWorker`方法是如何实现了

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 允许外部中断
w.unlock(); // allow interrupts
// 记录worker是不是异常退出的
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// 自旋,如果task不为空,或者能从缓冲队列(阻塞队列)中获取任务就继续执行,不能就一直阻塞
// 加锁
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
// 如果线程池正在停止,并且当前线程没有被中断,就中断当前线程
wt.interrupt();
try {
// 钩子函数,处理task执行前的逻辑
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子函数,处理task执行后的逻辑
afterExecute(task, thrown);
}
} finally {
task = null;
// 完成的任务数加1
w.completedTasks++;
// 解锁
w.unlock();
}
}
// 运行到这里,说明worker没有异常退出
completedAbruptly = false;
} finally {
// 自旋操作被打断了,说明线程需要被回收
processWorkerExit(w, completedAbruptly);
}
}


第10行代码中,task为null时,会通过`getTask()`方法从缓冲队列中取任务,因为缓冲队列是阻塞队列,所以如果获取不到任务会一直被阻塞,接下来看看`getTask`方法的内部实现

### getTask

`getTask`用于**阻塞**式的从缓冲队列中获取任务。

private Runnable getTask() {
// 线程是否超时
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
	// 自旋
	// 获取线程池状态
    int c = ctl.get();
    int rs = runStateOf(c);

    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    	// 线程池终止了,或者线程池停止了,且缓冲队列中没有任务了
    	// 自旋 + CAS方式减少线程计数
        decrementWorkerCount();
        return null;
    }

    int wc = workerCountOf(c);

    // 根据allowCoreThreadTimeOut参数来判断,要不要给核心线程设置等待超时时间
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
        // 当前线程数大于了maximumPoolSize(因为maximumPoolSize可以动态修改)或者当前线程设置了超时时间且已经超时了
        // 且线程数大于1或者缓冲队列为空
        // 这个条件的意思就是:当前线程需要被回收
        if (compareAndDecrementWorkerCount(c))
        	// 返回null后,上层runWorker方法中断循环,执行processWorkerExit方法回收线程
            return null;
        continue;
    }

    try {
    	// 从阻塞队列中获取任务
        Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();
        if (r != null)
        	// 成功获取任务
            return r;
        // 没有获取到任务,超时
        timedOut = true;
    } catch (InterruptedException retry) {
    	// 线程被中断,重试
        timedOut = false;
    }
}

}


理解该方法的前提,是要理解**阻塞队列**提供的阻塞式API。
这个方法重点关注两点:

*   从缓冲队列取任务时,`poll`非阻塞,`take`阻塞,调用哪个由当前线程需不需要被回收来决定
*   该方法返回null之后,上层方法会回收**当前线程**

除了这几个核心方法之外,往线程池提交任务还有一个方法叫`submit`

public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

public Future submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task);
execute(ftask);
return ftask;
}


`submit`方法可以接收线程池返回的结果,也就是`Futrue`对象,可以接收`Runnable`对象和`Callable`对象。
至于`Future`、`FutureTask`、`Runnable`、`Callable`之间的关系此处不再赘述。

至此`ThreadPoolExecutor`的核心方法的源码以及执行逻辑已经讲解完毕,再来看一些非核心方法,了解一下即可

*   `public void shutdown()`:关闭线程池,已经提交过的任务还会执行(线程池中未运行完毕的,缓冲队列中的)
*   `public List<Runnable> shutdownNow()`:停止线程池,试图停止正在执行的任务,暂停缓冲队列中的任务,并且返回
*   `public void allowCoreThreadTimeOut(boolean value)`:设置**核心线程**是否允许回收
*   `protected void beforeExecute(Thread t, Runnable r)`:钩子函数,处理线程执行任务前的逻辑,这里是**空实现**
*   `protected void afterExecute(Runnable r, Throwable t)`:钩子函数,处理线程执行任务后的逻辑,这里是**空实现**
*   `public int getActiveCount()`:返回正在执行任务的线程的**大致数量**
*   `public long getCompletedTaskCount()`:返回执行完成的任务的**大致数量**

除此之外还需要了解的是,构造方法中的七个参数,除了`BlockingQueue`是不能动态设置外,其余六个参数都可以动态设置,分别调用对于的`setXxx`方法即可,当然也可以通过对于的`getXxx`方法获取对应的信息。

鉴于此,我们再来看一个常见的问题

> Java有几种线程池?

JDK(准确的说是`java.util.concurrent.Executors`工具类)提供了四种线程池:

*   `CachedThreadPool`:缓冲线程池

    ```
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    ```

*   `FixedThreadPool`:固定线程数的线程池

    ```
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    ```

*   `SingleThreadExecutor`:单线程的线程池

    ```
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    static class FinalizableDelegatedExecutorService extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
    }

    ```

*   `ScheduledThreadPool`:可定时调度的线程池

    ```
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    // ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以super()还是调用ThreadPoolExecutor的构造方法
    public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService
    	public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
        }
    }

    ```

仔细看下这四种线程池,最终都调用了ThreadPoolExecutor的构造方法,只是传递的参数有所不同。

*   `CachedThreadPool`和`ScheculedThreadPool`设置的最大线程数都是`Integer.MAX_VALUE`,可能线程数过多而产生OOM
*   `SingleThreadExecutor`和`FixedThreadPool`使用的都是无界队列,最大元素个数为`Integer.MAX_VALUE`,可能缓冲队列中堆积的任务过多,而产生OOM

这两点正是**阿里巴巴代码规范**里禁止使用这四种线程池的原因。
想要使用线程池,必须通过`ThreadPoolExecutor`的方法来**创建线程池**。


标签:task,return,源码,线程,new,null,public,ThreadPoolExecutor
来源: https://blog.csdn.net/m0_60707263/article/details/120219784

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有