ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

线程池

2020-02-28 14:04:40  阅读:158  来源: 互联网

标签:队列 创建 任务 线程 ctl null


线程池

线程池

  • 线程池的重要性
    • 重用线程,避免线程创建与销毁带来的性能开销。
    • 控制线程池的最大并发数,避免大量线程之间因为互相抢占系统资源而导致的阻塞现象。
    • 能够对线程进行简单管理,并且提供定时执行以及制定间隔循环执行等功能。
  • 线程池的好处

    • 加快响应速度,消除了线程创建带来的延迟
    • 合理利用CPU和内存,达到一个资源占用与效率之间的平衡。
    • 统一管理
  • 线程池的适用场合

    • 服务器接收到大量请求时,使用线程池技术是非常合适的,可以大大减少线程的创建和销毁次数,提高服务器的工作效率。
    • 实际开发中,如果需要创建5个以上的线程,那么就可以使用线程池来管理。

创建与停止线程池

  • 线程池构造函数主要参数

    • int corePoolSize:核心线程数,线程池在完成初始化以后,默认情况下,线程池中没有线程,线程池会等待有任务到来时,再创建新的线程去完成任务。

    • int maximumPoolSize:最大线程数,线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数会有一个上限,这就是最大量maximumPoolSize。

    • long keepAliveTime:存活时间,如果线程池当前的线程数多于corePoolSize,那么多于的线程空闲时间超过keepAliveTime,他们就会被终止。这种机制可以在线程池占用资源的过程中减少冗余消耗。默认情况下是只有多于corePoolSize的线程会被回收,除非是修改了allowCoreThreadTimeOut属性设置true,那么keepAliveTime也会同样作用于核心线程。

  • BlockingQueue workQueue:任务存储队列,3中常见的队列类型,(1)直接交接:SynchronousQueue,(2)无界队列:LinkedBlockingQueue,(3)有界的队列:ArrayBlockingQueue。

  • ThreadFactory threadFactory:当线程池需要新的线程时,会使用线程工厂来生成新的线程。默认使用 Executor.defaultThreadFacdtory(),创建出来的线程都在统一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否守护线程等等,只是一般用默认的就足够了。

  • RejectedExecutionHandler handler:由于线程池无法接受提交的任务而拒绝策略

所以线程的添加规则是:

  1. 如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来执行新任务。
  2. 如果线程数等于或者大于corePoolSize但是少于maximumPoolSize,就将任务放入队列workQueue
  3. 如果队列已满,并且线程数小于maximumPoolSize,就创建一个新的线程来执行任务。
  4. 如果队列已经满了,并且线程数大于或等于maximumPoolSize,就拒绝这个任务。

从而得出线程池中增减线程的特点

  • 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池。
    • 线程池希望保持较少数的线程数,并且只有在复杂变得很大时,才增加它。
    • 通过设置maximumPoolSize为很高的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。
    • 是只有在队列填满时才创建多余corePoolSize的线程,所以如果使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。
  • 手动创建与自动创建

    手动创建更好,因为可以更加明确线程池的运行规则,避免资源耗尽的风险。相反,自动创建线程池,直接调用JDK封装好的构造函数,可能会带来一些问题。

    • newFixedThreadPool

      由于传进去的LinkedBlockingQueue是没有容量上限的,所以当传入的请求越来越多,并且无法及时处理完毕的时候,就是请求堆积的时候,会容易造成占用大量的内存,可能会导致OOM

    • newSingleThreadPool

      单线程的线程池,它只会用唯一的线程来执行任务。这个和FixedThreadPool的原理基本相同,只是把核心线程数和最大线程数都设置为了1,所以也会导致同样的问题,也就是请求堆积的时候,可能会占用大量的内存。

    • CachedThreadPool

      可缓存线程;无界线程池,具有自动回收多余线程的功能。它的弊端在于第二个参数maximumPoolSize被设置为Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至直接导致OOM。

    • ScheduleThreadPool

      它支持定时以及周期性任务执行的线程池。

    所以线程池的创建,最好的方式应该是手动创建,根据不同的业务场景,自己设置线程池参数,比如我们的内存有多大,想给线程取什么名字等等。

  • 线程池中线程数量的合适设定

    • CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1-2倍左右。

    • 耗时IO型(读写数据库、文件、网络读写等等):最佳线程数一般会大于CPU核心数很多被,以JVM线程监控显示频繁情况为依据,保证线程空闲可以衔接上,参考BrainGoetz推荐的计算方法:

      线程数=CPU核心数*(1+平均等待时间/平均工作时间)

  • 停止线程池的正确方法

    1. shutdown:不会马上停止,已经提交的任务会执行完,新的任务无法再被提交了。
    2. isShutdown:返回是否shutdow,即是线程池中的任务还在执行。
    3. isTerminated:它可返回整个线程池是不是已经完全停止了。
    4. awaitTermination:测试一段时间内线程池是不是会完全停止,起到的作用是检测。这个方法在返回之前是阻塞的,只有当线程池中所有任务都执行完毕,或者等待时间到了,以及等待过程中被中断了抛出异常。
    5. shutdownNow:马上停止线程池,用interrupt信号去通知正在执行的线程停止,而队列当中等待的任务则直接返回runableList,可以用于再以后通过其他方式执行。

常见线程池的特点和用法

  • newFixedThreadPool
  • newSingleThreadPool
  • CachedThreadPool
  • ScheduleThreadPool
  • 以及JDK1.8加入的WorkingStealingPool:这个线程池和其他的有很大的不同。加入的任务不是普通任务,而且是有子任务的情况下会适合这个场景,比如说二叉树遍历、矩阵的处理。使用这个线程池有一定的窃取能力,每一个线程之间是会合作的。(使用场景有限)

FixedThreadPool与SingleThreadPool的Queue是LinkedBlockingQueue

他们的线程数量已经固定了,添加的任务数量无法估计,所以是用了一个可以存储无限多任务的队列来帮助存储任务。

CachedThreadPool使用的Queue是SynchronousQueue?

SynchronousQueue内部实际上是不存储的,但是在CachedThreadPool这个线程池的情况下也根本不需要队列来存储,会直接启动新的线程,效率更高些,也不需要一个队列去中转了。

ScheduleThreadPool使用的是延迟队列DelayedWorkQueue。

拒绝任务

  • 拒绝时机
    1. Executor关闭时,提交新任务会被决绝
    2. 以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时。
  • 4种拒绝策略
    • AbortPolicy:直接抛出一个异常
    • DiscardPolicy:默默地把任务丢弃
    • DiscardOldestPolicy:丢弃队列中最老的任务,以便腾出空间存储添加的新任务。
    • CallerRunsPolicy:让提交任务的这个线程去执行任务。这种方式避免了任务损失,也是一种负反馈策略,给了线程池缓冲的时间。

钩子方法

  • 在有特殊用途或者需要在每个任务执行前后进行一些日志或者统计,等等。

线程池实现原理

线程池、ThreadPoolExecutor、ExecutorService、Executor、Executors等,这么多和线程池相关的类,大家都是什么关系?

首先它们的继承实现关系依次是这样的:Executor <-ExecutorService <- AbstractExecutorService <- ThreadPoolExecutor

线程池实现任务复用的原理:

相同线程执行不同任务。源码的分析如下:

executorService.execute(new Task());

public interface Executor {
    void execute(Runnable command);
}


class ThreadPoolExecutor {
    //其它省略...
    
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
     
        //ctl记录了线程池状态和线程数
        int c = ctl.get();
        //首先检查当前线程熟练是不是小于核心线程数量
        if (workerCountOf(c) < corePoolSize) {
            //如果不够,就创建新的线程,command就是新的任务
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //检查是不是running状态,如果是就继续放到工作队列中
        if (isRunning(c) && workQueue.offer(command)) {
            //由于在此期间线程可能会被终止了,所以再进行一次判断
            int recheck = ctl.get();
            //如果现在不是正在运行,就将这个任务删除,并且使用拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果线程已经减少至0,就需要创建新的线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //能够执行到这里,说明线程已经停止,
        //或者队列已经放不进去了线程数也大于核心线程数了
        //就要增加线程,直到达到最大线程数量。
        else if (!addWorker(command, false))
            reject(command);//如果已经不能再增加了,就拒绝
    }
    
    
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            // Check if queue empty only if necessary.
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP)
                    || firstTask != null
                    || workQueue.isEmpty()))
                return false;

            for (;;) {
                if (workerCountOf(c)
                    >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //这个Worker,就是主要的工作相关的类
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();

                    if (isRunning(c) ||
                        (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    
    //这个方法反应了线程的复用
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //获取到Runable类型的任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //只要这个任务不为空,就去执行,getTask()就是从阻塞队列中去取出任务
            //while循环意味着整个worker不会停止,不停的执行任务,取出新的任务执行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        //执行Runable的run方法
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
}
  • 线程池管理器
  • 工作线程
  • 任务队列
  • 任务接口(Task)

线程池的五种状态:

  • RUNNING:接受新任务并且处理排队任务
  • SHUTDOWN:不接受新任务,但处理排队任务
  • STOP:不接受新任务,也不处理排队任务
  • TIDYING:中文是整洁,理解了中文就容易理解这个状态,所有任务都已经终止,workerCount为零时,线程会转换到TIDYING状态,并且将运行terminate()钩子方法。
  • TERMINATED:terminate()方法运行完成。

使用线程池的注意点

  • 避免任务的堆积
  • 避免线程数过度增加
  • 排查线程泄露:线程执行完毕却不能回收,往往是任务逻辑有问题,导致线程结束不了。

标签:队列,创建,任务,线程,ctl,null
来源: https://www.cnblogs.com/chen-ying/p/12376905.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有