ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

线程池相关参数、调度流程学习

2021-09-05 15:31:45  阅读:173  来源: 互联网

标签:execute code 流程 调度 param 任务 task 线程


线程池任务调度流程


线程池构造方法一览

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {}

相关参数解析

/**
 * 
 * 核心线程数,如果没有设置allowCoreThreadTimeOut,线程一定存活(keep alive)
 * Core pool size is the minimum number of workers to keep alive
 * (and not allow to time out etc) unless allowCoreThreadTimeOut
 * is set, in which case the minimum is zero.
 */
private volatile int corePoolSize;

/**
 * 最大线程数 (但实际的最大值取决于/受限于CAPACITY)
 * Maximum pool size. Note that the actual maximum is internally
 * bounded by CAPACITY.
 */
private volatile int maximumPoolSize;

/**
 * 等待任务的闲置线程的超时时间(纳秒)
 * 什么情况下使用?
 *  1.有超过corePoolSize的线程存在时;
 *  2.allowCoreThreadTimeOut的值为true即允许核心线程超时
 * 其他情况下,线程会永远保持等待新任务的存活状态,即keep alive
 * Timeout in nanoseconds for idle threads waiting for work.
 * Threads use this timeout when there are more than corePoolSize
 * present or if allowCoreThreadTimeOut. Otherwise they wait
 * forever for new work.
 */
private volatile long keepAliveTime;

/**
 * keepAliveTime的时间单位
 * the time unit for the {@code keepAliveTime} argument
 */
TimeUnit unit;

/**
 * 用于持有任务并将任务切换给工作线程的队列
 * 当workQueue未满而有新任务进来时,会先将新任务放进这个队列。
 * 当workQueue已满且有新任务进来时,才会去创建大于corePoolSize而小于maximumPoolSize的这部分线程;
 * 当workQueue已满且线程数已达maximumPoolSize,再有新任务进来,就涉及到拒绝策略了。后面会讲到
 * 看一下上面构造方法里的注释"This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method."
 * 它仅仅用来存放被execute方法提交的Runnable任务
 * The queue used for holding tasks and handing off to worker
 * threads.  We do not require that workQueue.poll() returning
 * null necessarily means that workQueue.isEmpty(), so rely
 * solely on isEmpty to see if the queue is empty (which we must
 * do for example when deciding whether to transition from
 * SHUTDOWN to TIDYING).  This accommodates special-purpose
 * queues such as DelayQueues for which poll() is allowed to
 * return null even if it may later return non-null when delays
 * expire.
 */
private final BlockingQueue<Runnable> workQueue;

/**
 * 创建新线程的工厂
 * Factory for new threads. All threads are created using this
 * factory (via method addWorker).  All callers must be prepared
 * for addWorker to fail, which may reflect a system or user's
 * policy limiting the number of threads.  Even though it is not
 * treated as an error, failure to create threads may result in
 * new tasks being rejected or existing ones remaining stuck in
 * the queue.
 *
 * We go further and preserve pool invariants even in the face of
 * errors such as OutOfMemoryError, that might be thrown while
 * trying to create threads.  Such errors are rather common due to
 * the need to allocate a native stack in Thread.start, and users
 * will want to perform clean pool shutdown to clean up.  There
 * will likely be enough memory available for the cleanup code to
 * complete without encountering yet another OutOfMemoryError.
 */
private volatile ThreadFactory threadFactory;

/**
 * 拒绝策略。执行中遇到饱和或关闭情况调用的处理器
 * Handler called when saturated or shutdown in execute.
 */
private volatile RejectedExecutionHandler handler;

拒绝策略拓展

一共4种拒绝策略,这里仅关注juc包下的内容。

/**
 * 1.中止策略。拒绝任务,抛出RejectedExecutionException,这也是默认拒绝策略
 * A handler for rejected tasks that throws a
 * {@code RejectedExecutionException}.
 */
public static class AbortPolicy implements RejectedExecutionHandler {
    /**
     * Creates an {@code AbortPolicy}.
     */
    public AbortPolicy() { }

    /**
     * Always throws RejectedExecutionException.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     * @throws RejectedExecutionException always
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 从这里可以看到,抛RejectedExecutionException异常
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

/**
 * 默认拒绝策略就是AbortPolicy(中止策略,拒绝任务)
 * The default rejected execution handler
 */
private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();
    
/**
 * 直接在调用execute方法的线程中运行被拒绝的任务
 * 除非执行器已被关闭,这种情况下任务会被丢弃。
 * A handler for rejected tasks that runs the rejected task
 * directly in the calling thread of the {@code execute} method,
 * unless the executor has been shut down, in which case the task
 * is discarded.
 */
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    /**
     * Executes task r in the caller's thread, unless the executor
     * has been shut down, in which case the task is discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 从这里可以看到,如果线程池执行器未关闭,直接在调用者线程执行该任务
        if (!e.isShutdown()) {
            r.run();
        }
    }
}   

/**
 * 丢弃最老的尚未处理的请求,然后执行新任务
 * 除非执行器已被关闭,这种情况下任务会被丢弃
 * A handler for rejected tasks that discards the oldest unhandled
 * request and then retries {@code execute}, unless the executor
 * is shut down, in which case the task is discarded.
 */
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 如果线程池执行器未关闭,丢弃最老的任务,然后执行新任务
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
} 

/**
 * 静静地丢弃任务,不做任何处理
 * 注:默认的AbortPolicy会抛RejectedExecutionException异常
 * A handler for rejected tasks that silently discards the
 * rejected task.
 */
public static class DiscardPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardPolicy}.
     */
    public DiscardPolicy() { }

    /**
     * Does nothing, which has the effect of discarding task r.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        // 可以看到,此处啥也没做。
    }
}  
  • AbortPolicy : 直接抛异常处理,这是默认的拒绝策略
  • DiscardPolicy : 直接抛弃(任务)不处理
  • DiscardOldestPolicy : 丢弃队列中最老的任务
  • CallerRunsPolicy : 将任务分配给当前执行execute方法的线程来处理

参数部分的补充

关于参数allowCoreThreadTimeOut:

/**
 * 这个参数跟corePoolSize相关
 * If false (default), core threads stay alive even when idle.
 * If true, core threads use keepAliveTime to time out waiting
 * for work.
 */
private volatile boolean allowCoreThreadTimeOut;

核心线程数怎么设置?有什么评估标准不?

https://cloud.tencent.com/developer/article/1605149

假设机器有N个CPU,那么对于计算密集型的任务,应该设置线程数为N+1;对于IO密集型的任务,应该设置线程数为2N;对于同时有计算工作和IO工作的任务,应该考虑使用两个线程池,一个处理计算任务,一个处理IO任务,分别对两个线程池按照计算密集型和IO密集型来设置线程数。

计算密集型:N+1 IO密集型:2*N

 

CPU(计算)密集型举例:
我们可以把任务分为计算密集型和IO密集型。
计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。

线程池为何不允许使用Executors创建?

如上图,出自《Java开发手册嵩山版》
FixedThreadPool和SingleThreadPool:
允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。

CachedThreadPool:
允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

ThreadPoolExecutor的执行流程

一个线程池中的线程异常了,那么线程池会怎么处理这个线程?

需要说明一下,文中讨论的线程池都是Executors线程池。

当一个线程池里面的线程异常后:
当执行方式是execute时,可以看到堆栈异常的输出。
当执行方式是submit时,堆栈异常没有输出。但是调用Future.get()方法时,可以捕获到异常。
不会影响线程池里面其他线程的正常执行。
线程池会把这个线程移除掉,并创建一个新的线程放到线程池中。
不要背答案,要理解,要深入,上面说完后记得在问问面试官,需要我从源码的角度讲一讲吗?这逼装的,礼貌而不失风度。

https://www.cnblogs.com/thisiswhy/p/12221335.html  

参考资料

https://www.cnblogs.com/thisiswhy/p/12690630.html  https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html 美团技术团队文章 http://concurrent.redspider.group/article/01/1.html 深入浅出Java多线程

标签:execute,code,流程,调度,param,任务,task,线程
来源: https://www.cnblogs.com/xm66/p/15229531.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有