ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

线程池原理与实践

2021-10-17 19:33:18  阅读:122  来源: 互联网

标签:Thread 队列 实践 任务 线程 原理 main pool


JUC的线程池架构

1.Executor

Executor是Java异步任务的执行者接口,目标是执行目标任务。Executor作为执行者角色,目的是提供一种将“任务提交者”与“任务执行者”分离的机制。它只有一个函数式方法:

public interface Executor {
    void execute(Runnable command);
}

2.ExecutorService

ExecutorService继承于Executor。它对外提供异步任务的接收服务。ExecutorService提供了“接受异步任务并转交给执行者”的方法,比如submit、invoke方法等。具体如下:

public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

3.AbstractExecutorService

AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。AbstractExecutorService存在的目的是为ExecutorService中的接口提供默认实现。(模板模式)

4.ThreadPoolExecutor

大名鼎鼎的线程池实现类,继承于AbstractExecutorService。它是核心实现类,它可以预先提供指定数量的可重用线程,可以对线程进行管理和监控。

5.ScheduledExecutorService

她继承于ExecutorService。是一个完成延时和周期性任务的接口。

6.Executors

是一个静态工厂类,内置的静态工厂方法可以理解为快捷创建线程池的方法。

image

Executors的4种快捷创建线程池的方法

newSingleThreadExecutor 创建只有一个线程的线程池

newFixedThreadPool 创建固定大小的线程池

newCachedThreadPool 创建一个不限制线程数量的线程池,任何提交的任务都立即执行,空闲线程会及时回收

newScheduledThreadPool 创建一个可定期或延时执行任务的线程池

  • newSingleThreadExecutor
public static void main(String[] args) {
       final AtomicInteger integer = new AtomicInteger(0);

       ExecutorService pool = Executors.newSingleThreadExecutor();

       for (int i = 0; i < 5; i++) {
           pool.execute(() -> {
               System.out.println(Thread.currentThread() + " :doing" + "-" + integer.incrementAndGet());
               try {
                   Thread.sleep(500);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }
           });
       }

       pool.shutdown();
}

Thread[pool-1-thread-1,5,main] :doing-1
Thread[pool-1-thread-1,5,main] :doing-2
Thread[pool-1-thread-1,5,main] :doing-3
Thread[pool-1-thread-1,5,main] :doing-4
Thread[pool-1-thread-1,5,main] :doing-5

场景:任务按照提交顺序,一个任务一个任务逐个执行。

以上代码最后调用shutdown来关闭线程池。执行shutdown方法后,线程池状态变为shutdown,线程池将拒绝新任务,不能再往线程池中添加新任务。此时,线程池不会立刻退出,直到线程池中的任务处理完成后才会退出。还有一个shutdownNow方法,执行这个后,线程状态变为stop,试图停止所有正在执行的线程,并且不再处理阻塞队列中等待的任务,会返回那些未执行的任务。

  • newFixedThreadPool
ExecutorService pool = Executors.newFixedThreadPool(3);

Thread[pool-1-thread-1,5,main] :doing-1
Thread[pool-1-thread-3,5,main] :doing-2
Thread[pool-1-thread-2,5,main] :doing-3
Thread[pool-1-thread-3,5,main] :doing-4
Thread[pool-1-thread-1,5,main] :doing-5

适用场景:需要任务长期执行的场景。“固定数量的线程池”能稳定的保证一个数,避免频繁 回收和创建线程,适用于CPU密集型的任务,在CPU被线程长期占用的情况下,能确保少分配线程。

弊端:内部使用无界队列存放任务,当有大量任务,队列无限增大,服务器资源迅速耗尽。

newFixedThreadPool工厂方法返回一个ThreadPoolExecutor实例,该线程池实例的corePoolSize数量为参数nThread,其maximumPoolSize数量也为参数nThread,其workQueue属性的值为LinkedBlockingQueue()无界阻塞队列。使用Executors创建“固定数量的线程池”的潜在问题主要存在于其workQueue上,其值为LinkedBlockingQueue(无界阻塞队列)。如果任务提交速度持续大于任务处理速度,就会造成队列中大量的任务等待。如果队列很大,很有可能导致JVM出现OOM(Out Of Memory)异常,即内存资源耗尽。

  • newCachedThreadPool

线程池内的某些线程无事可干成为空闲线程,可以灵活回收这些空闲线程。

ExecutorService pool = Executors.newCachedThreadPool();

Thread[pool-1-thread-5,5,main] :doing-5
Thread[pool-1-thread-1,5,main] :doing-1
Thread[pool-1-thread-2,5,main] :doing-2
Thread[pool-1-thread-3,5,main] :doing-3
Thread[pool-1-thread-4,5,main] :doing-4

特点:在执行任务时,如果池内所有线程忙,则会添加新线程来处理。不会限制线程的大小,完全依赖于操作系统能够创建的最大线程大小。如果存量线程超过了处理任务数量,就会回收线程。、

适用场景:快速处理突发性强、耗时短的任务场景,如Netty的NIO处理场景、REST API接口的瞬时削峰场景。

弊端:没有最大线程数量限制,如果大量的异步任务提交,服务器资源可能耗尽。

  • newScheduledThreadPool
public static void main(String[] args) {
        final AtomicInteger integer = new AtomicInteger(0);

        ScheduledExecutorService pool = Executors.newScheduledThreadPool(5);

        for (int i = 0; i < 5; i++) {
            pool.scheduleAtFixedRate(
                    () -> {
                        System.out.println(Thread.currentThread() + " :doing" + "-" + integer.incrementAndGet());

                    }, 0, 500, TimeUnit.MILLISECONDS);
            // 0表示首次执行任务的执行时间,500表示每次执行任务的间隔时间
        }
//        pool.shutdown();
}

因为可以周期性执行任务,所以不shutdown。

适用场景:周期性执行任务的场景。

线程池的标准创建方式

使用ThreadPoolExecutor构造方法创建,一个比较重要呃构造器如下:

public ThreadPoolExecutor(int corePoolSize,核心线程数
                              int maximumPoolSize, 最大线程数
                              long keepAliveTime, TimeUnit unit, 空闲时间
                              BlockingQueue<Runnable> workQueue, 阻塞队列
                              ThreadFactory threadFactory, 线程工厂(线程产生方式)
                              RejectedExecutionHandler handler 拒绝策略) {
    ...
}

1.核心和最大线程数量

接收新任务时,并且当前工作线程池数少于核心线程数量,即使有工作线程是空闲的,它也会创建新线程处理任务,直到达到核心线程数。

2.BlockingQueue

阻塞队列用于暂时接收任务。

3.KeepAliveTime

设置线程最大空闲时长,如果超过这个时间,非核心线程会被回收。当然,也可以调用allowCoreThreadTimeOut方法将超时策略应用到核心线程。

线程池的任务调度流程

  1. 工作线程数量小于核心线程数量,执行新任务时会优先创建线程,而不是获取空闲线程。
  2. 任务数量大于核心线程数量,新任务将被加入阻塞队列中。执行任务时,也是先从阻塞队列中获取任务。
  3. 在核心线程用完,阻塞队列已满的情况下,会创建非核心线程处理新任务。
  4. 在如果线程池总数超过maximumPoolSize,线程池会拒绝接收任务,为新任务执行拒绝策略。

image

ThreadFactory(线程工厂)

创建线程方式

阻塞队列

阻塞队列与普通度列相比:阻塞队列为空时,会阻塞当前线程的元素获取操作。当队列中有元素,被阻塞的线程会被自动唤醒。

BlockingQueue是JUC包的一个超级接口,比较常用的实现类有:

(1)ArrayBlockingQueue:数组队列

(2)LinkedBlockingQueue:链表队列

(3)PriorityBlockingQueue:优先级队列

(4)DelayQueue:延迟队列

(5)SynchronousQueue:同步队列

调度器的钩子方法

ThreadPoolExecutor为每个任务执行前后都提供了钩子方法。

// 任务执行之前的钩子方法(前钩子)
protected void beforeExecute(Thread t, Runnable r) { }
// 之后(后钩子)
protected void afterExecute(Runnable r, Throwable t) { }
// 终止(停止钩子)
protected void terminated() { }

beforeExecute:可用于重新初始化ThreadLocal线程本地变量实例、更新日志记录、计时统计等。

afterExecute:更新日志记录、计时统计等。

terminated:Executor终止时调用。

演示一下前钩子。

public class TestMain {

    public static void main(String[] args) {
        final ThreadPoolExecutor pool = new ThreadPoolExecutor(
                2,
                4,
                60, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(2)) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("前钩子嗷 ~ ~ ~ ");
            }
        };

        for (int i = 0; i < 5; i++) {
            pool.execute(() -> {
                System.out.println("你谁啊");
            });
        }
    }
}

线程池拒绝策略

任务被拒绝有两种情况:

  1. 线程池已经关闭。
  2. 工作队列已满且最大线程数已满。

拒绝策略有以下实现:

  • AbortPolicy:拒绝策略。抛异常。
  • DiscardPolicy:抛弃策略。丢弃新来的任务。
  • DiscardOldestPolicy:抛弃最老任务策略。因为队列是队尾进对头出,所以每次都是移除队头元素后再入队。
  • CallerRunsPolicy:调用者执行策略。提交任务线程自己执行任务,不使用线程池中的线程。
  • 自定义策略。实现RejectExecutionHandler接口的rejectedExecution方法。

Re

《Java高并发编程》
(之后补源码)

标签:Thread,队列,实践,任务,线程,原理,main,pool
来源: https://www.cnblogs.com/bllbl/p/15417779.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有