ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

并发编程六、线程池原理解析

2022-08-30 11:30:59  阅读:125  来源: 互联网

标签:int null 队列 编程 并发 任务 线程 执行


前言:

  1. 文章内容:线程与进程、线程生命周期、线程中断、线程常见问题总结
  2. 本文章内容来源于笔者学习笔记,内容可能与相关书籍内容重合
  3. 偏向于知识核心总结,非零基础学习文章,可用于知识的体系建立,核心内容复习,如有帮助,十分荣幸
  4. 相关文献:并发编程实战、计算机原理

 为何要用线程池?

  1. 创建/销毁线程,是个重资源的操作,为了避免频繁的创建和销毁线程,让创建的线程进行复用,就用了线程池
  2. 在一些需要多线程和缓冲任务的场景下,线程池能够提供缓冲和线程管理机制

在实际项目中使用线程池的场景?

  1. 根据系统的承受能力,调整线程池中工作线线程的数目,防止因为消耗过多的内存,而把服务器累趴下
  2. 定时任务系统里用线程池:在可以预计任务执行时间的情况下,可以设置每个任务错开时间执行,保证多个线程池高峰期不重合。
  3. 业务用线程池:根据内存大小和任务的大小,合理的给一个阻塞队列值,然后根据业务类型来走拒绝策略,回到正常的业务系统

线程池的七大核心参数:

  • 核心线程数:默认情况下一直存在于线程池中,即使没有任务执行
  • 线程池最大线程数:非核心线程+核心线程数总和。如果队列已经满了,则会创建新的线程,如果线程数超过了该值,会执行拒绝策略(如果是无界队列,该参数就失效了)。注意:在ThreadPoolExecutor内部,并不区分哪个线程是核心,哪个是非核心,超过了核心线程数量的线程会被销毁,那么剩下的就是核心线程了。
  • 非核心线程闲置超时时长:非核心线程处于闲置状态时间超过该值,就被销毁。设置allowCoreThreadTimeOut(true)也可作用于核心线程
  • 超时时长单位:微毫秒(NANOSECONDS)、微秒(MICROSECONDS)、毫秒(MILLSECONDS)、秒(MINUTES)
  • 阻塞队列:存放等待执行的线程任务对象
  • 线程工厂:创建线程的工厂,批量创建线程。统一在创建线程时设置一些参数。如果不指定,会新建一个默认的线程工厂
  • 拒绝处理策略:线程数量大于最大线程数就会采用拒绝处理策略
    • AbortPolicy:默认策略,丢弃任务并抛出异常
    • DiscardPolicy:丢弃新任务,但不抛异常
    • DiscardOlderstPolicy:丢弃队列中最旧的任务,在执行execute方法
    • CallerRunsPolicy:直接用当前业务线程来执行任务

线程池状态:

  • RUNNING:线程池创建后处于RUNNING状态
  • SHUTDOWN:调用shutdown方法进入该状态,线程池不接受新任务,清除空闲工作线程,等待阻塞队列中的任务完成
  • STOP:调用shutdownNow方法进入该状态,线程池不接受新任务,中断所有线程,阻塞队列中未执行的任务全部丢弃
  • TIDYING:所有任务已终止,ctl记录的任务数量为0,进入该状态,然后执行terminated函数
  • TERMINATED:执行完terminated函数后进入该状态

线程池execute执行过程:

四种常用线程池:

  • newCachedThreadPool:核心池大小为0,最大线程数是Integer.MAX_VALUE。任务创建就进入SynchronousQueue中。超时设置60s,适用于执行短时间任务,线程复用率较高,不会占用很多资源。
  • newFixedThreadPool:核心池与最大线程数大小一致,创建的都是核心线程,采用LinkedBlockingQueue大小Integer.MAX_VALUE。由于创建核心线程,没有任务时在getTask方法中会阻塞在take方法上,线程不会被回收,这时占用资源也更多。几乎不会触发拒绝策略,阻塞队列很大。CachedThreadPool因为最大线程数超大,也几乎不会触发拒绝策略。
  • newSingleThreadExecutor:有且仅有一个核心线程,使用LinkedBlockingQueue,不会创建非核心线程,任务按照先来先执行顺序,唯一线程不空闲,任务在队列中等待。
  • newScheduledThreadPool:定长线程池,支持定时及周期性任务执行。

阻塞队列:

  • ArrayBlockingQueue:数组结构有界队列,先进先出原则,新元素插入到队列尾部,获取从队列头开始获取元素
  • LinkedBlockingQueue:链表构成无界队列,默认大小Integer.MAX_VALUE,先进先出
  • DelayQueue:只有当元素指定延迟时间到了,才能从队列中获取元素,无大小限制
  • PriorityBlockingQueue:基于优先级的无界阻塞队列,生产不阻塞,消费阻塞。使用时生产数据速度必能快于消费速度,时间长会耗尽堆空间。放入元素必须实现Comparable接口,排序值高的永远排在前面
  • SynchronousQueue:没有内部容量,put一个元素必须等待take取数据,然后生产者会阻塞

手写阻塞队列:

private List<Integer> container = new ArrayList<>();
    private volatile int size;
    private volatile int capacity;
    private Lock lock = new ReentrantLock();
    //condition
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();

    MyBlockQueue(int cap){
        this.capacity = cap;
    }
    public void put(int data){
        try{
            lock.lock();
            try {
                while (size > capacity){
                    System.out.println("队列满了");
                    notEmpty.await();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            ++size;
            container.add(data);
            notFull.signal();
        }finally {
            lock.unlock();
        }
    }
    public int take(){
        try{
            lock.lock();
            try {
                while (size==0){
                    System.out.println("队列空了");
                    notFull.await();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            --size;
            Integer ret = container.get(0);
            container.remove(0);
            notEmpty.signal();
            return ret;
        }finally {
            lock.unlock();
        }
    }

如何针对不同类型的任务来使用线程池呢?

  1. 高并发、任务执行时间短:任务耗时短,要求线程尽量少,如果线程太多,有可能出现线程切换和管理的时间,大于任务执行的时间,那效率就低了
  2. 并发不高,任务执行时间长:如果业务时间长集中在IO操作,那就是IO密集型任务,IO操作不占用CPU,所以不要让所有CPU闲下来,可以加大线程池中线程数目,让CPU处理更多业务。如果业务时间长集中在计算上,那就是计算密集型任务,线程池中线程数设置少一点,减少线程上下文切换
  3. 并发高,任务执行时间长:考虑具体任务类型,是IO密集还是CPU密集,线程池这块要控制好线程数和队列容量,不然线程池很容易满。改善任务,对任务执行时间进行优化,考虑某些数据是否可以走缓存,不重要的业务是否可以用中间件拆分解耦,异步处理+回调降低执行时间。

源码分析


带着问题看源码:

  如何在线程执行前后额外操作、线程池如何复用线程,线程池如何保证线程不会销毁,超过核心线程数的线程是如何销毁,能够实现创建的线程数达到最大线程数再将任务放入队列吗

 核心属性ctl与线程池状态

//int类型,前三位标识线程池状态,后29位标识有效线程的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//29位,方便ctl进行位运算使用的常量
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池的线程最大容量
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

//线程池运行状态 前三位
private static final int RUNNING    = -1 << COUNT_BITS;//运行中:111
private static final int SHUTDOWN   =  0 << COUNT_BITS;//线程被shutdown,继续执行完剩下的任务 000
private static final int STOP       =  1 << COUNT_BITS;//线程被shutdownNow,线程池停止,中断所有任务 001
private static final int TIDYING    =  2 << COUNT_BITS;//shutdown或shutdownNow后,任务都被处理完,到这个过渡状态 010
private static final int TERMINATED =  3 << COUNT_BITS;//线程池停止 011

execute:执行方法

public void execute(Runnable command) {
    //非空判断
    if (command == null)
        throw new NullPointerException();
    //获取ctl变量    
    int c = ctl.get();
    //workerCountOf():获取线程池中正在工作的线程数  判断是否小于核心数
    if (workerCountOf(c) < corePoolSize) {
        //核心线程还有,创建核心线程,并执行任务。传入true代表是核心线程
        if (addWorker(command, true))
            return;
        //添加核心线程数失败,重新获取ctl
        c = ctl.get();
    }
    //如果线程池运行状态是运行中,将任务追加到阻塞队列
    if (isRunning(c) && workQu eue.offer(command)) {
        //添加阻塞队列成功
        int recheck = ctl.get();
        //二次校验线程池状态 如果不是运行中,并移除成功,就执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
        //如果工作线程为0,创建一个工作线程
            addWorker(null, false);
    }
    //没有核心线程且阻塞队列满了 添加有一个非核心线程 创建成功返回true,失败返回false
    else if (!addWorker(command, false))
        //添加最大线程数失败,执行拒绝策略
        reject(command);
}

addWorker():创建工作线程

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {//外循环判断线程池状态
        //获取ctl 和runStateOf():获取当前线程池运行状态
        int c = ctl.get();
        int rs = runStateOf(c);
        //rs >= SHUTDOWN:说明线程池执行了shutdown或shutdownNow
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN && //线程池停了
               firstTask == null && //任务为null
               ! workQueue.isEmpty())) //队列空了
            return false;//返回false,不创建工作线程
        for (;;) {//内存循环判断线程池工作线程数量
            //获取当前线程池中正在工作的线程数
            int wc = workerCountOf(c);
            //工作线程大于核心线程 或 工作线程大于最大线程 返回false,不创建工作线程
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //满足可以创建工作线程的条件,CAS操作:正在工作的线程数+1。这里没有真正的创建工作线程
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //重新获取ctl
            c = ctl.get();
            //CAS失败,就要判断是重新执行内循环,还是外循环。如果线程池状态改变,则执行外循环。否则执行内循环  
            if (runStateOf(c) != rs)
                continue retry;
        }
    }
    //声明两个标识 workerStarted:工作线程启动了吗  workerAdded:工作线程添加了吗
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //new一个worker,并传入任务
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //创建工作线程时,通过ReentrantLock来保证多线程并发创建,获取了线程池的全局锁
            final ReentrantLock mainLock = this.mainLock;
            //上锁,shutdown、shutdownNow也需要获取全局锁
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
                //线程池状态是否是运行状态
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {//或者线程池为shutdown并且任务是null
                    if (t.isAlive()) //线程正在运行,抛出异常。这里工作线程还没有创建和启动,不会是运行。
                        throw new IllegalThreadStateException();
                     //将工作线程追加到workers,worker是一个HaseSet<Worker>。  
                     workers.add(w);
                    int s = workers.size();
                    //如果现在工作线程数大于历史最大值,替换掉历史最大值
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //修改工作线程添加标记为true,标识工作线程创建好了
                    workerAdded = true;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            //如果工作线程添加成功率
            if (workerAdded) {
                //启动线程
                t.start();
                //修改工作线程启动标识为true
                workerStarted = true;
            }
        }
    } finally {
        //如果工作线程启动失败
        if (! workerStarted)
            //补救操作
            addWorkerFailed(w);
    }
    return workerStarted;//返回工作线程的启动结果
}

Work对象:封装工作线程

  Worker继承了AQS,实现了Runnable。

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    private static final long serialVersionUID = 6138294804551838833L;
    //存放线程对象
    final Thread thread;
    //存放任务
    Runnable firstTask;
    volatile long completedTasks;
    Worker(Runnable firstTask) {
        setState(-1); //设置state 在runWorker之前禁止中断worker创建过程。
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
    //调用worker的start方法,实际上执行的worker中的run方法,即runWorker方法
    public void run() {
        runWorker(this);
    }

runWorker方法:

  • 实现线程执行前后额外增强:beforeExecute和afterExecute
  • 线程复用:runWorker方法中,有个while循环,会一直判断任务不为null或队列中是否有任务。有新任务就调用task.run执行。不会再去创建新线程,来达到复用的目的。

final void runWorker(Worker w) {
    //获取当前线程
    Thread wt = Thread.currentThread();
    //获取worker中具体任务
    Runnable task = w.firstTask;
    //将worker中任务设置为null
    w.firstTask = null;
    //这里unlock实际调用时aqs的release(1),释放构造方法设置的标识。代表当前线程可以被打断
    w.unlock(); 
    //设置标记为true 最后都为true表示:线程执行或获取任务过程发生异常
    boolean completedAbruptly = true;
    try {
        //如果worker中task有任务,直接执行。如果没有任务,就从队列中取任务getTask()
        while (task != null || (task = getTask()) != null) {
            //要执行任务,添加一个标记。即便shutdown也不会打断任务执行
            w.lock();
            //判断当前线程池状态以及线程状态,是否需要被中断。这时候线程池都不能执行任务了,就会执行中断
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //执行任务之前执行beforeExecute:前置增强
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {//这个的try是让线程执行出异常后,项目代码依旧会往后走
                    //执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //执行任务之后执行afterExecute:后置增强,这样可以让开发者可以在线程执行之前和之后添加额外处理。
                    afterExecute(task, thrown);
                }
            } finally {
                //线程执行完了,任务设置为null,任务处理完毕0
                task = null;
                //当前worker处理的任务数+1
                w.completedTasks++;
                //去掉标记
                w.unlock();
            }
        }
        //没有任务进来了,队列任务也执行完了,就是false
        completedAbruptly = false;
    } finally {
        //当任务没有了或队列中也没有任务了执行processWorkerExit。销毁线程
        processWorkerExit(w, completedAbruptly);
    }
}

getTask:

  在getTsak时,会判断工作线程数是否大于最大线程数、核心线程是否设置了可销毁、队列是否为空。来判断是否要CAS的减少工作线程数。如果线程池是非运行状态并且工作队列是空的会CAS的减少工作线程数,并销毁线程。

private Runnable getTask() {
    //设置超时标记
    boolean timedOut = false; 
    for (;;) {
        //获取运行状态
        int c = ctl.get();
        int rs = runStateOf(c);
        //如果线程池非运行状态 并且 (线程池已经停止或工作队列是空的)
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            //CAS的减少工作线程数
            decrementWorkerCount();
            return null;//返回null
        }
        //获取工作中的线程数
        int wc = workerCountOf(c);
        //allowCoreThreadTimeOut:false核心线程空闲也存活,ture核心线程使用keepAliveTime超时等待工作
        //工作线程数大于核心线程数或核心线程可销毁,timed就是true
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        //如果工作线程大于最大线程数 或者 工作线程数大于最大线程数了,或核心线程可销毁
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            //工作线程数至少大于1,是因为timed为true代表工作队列可能还有任务,所以至少要保留一条线程执行任务
            // 或者 工作队列空了那可以直接销毁 cas减少工作线程数,成功返回null
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;//CAS失败,再次循环
        }

        try {
            //如果timed为true 则超时的去队列获取任务,否则阻塞获取
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
            //如果返回不是null,说明队列有任务
                return r;
            timedOut = true;//超时标记为true,表示超时时间到了,队列中还没有任务。继续执行循环
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

processWorkerExit:

  1. 线程销毁时:会校验线程池状态试图终止线程池,然后看是否是正常退出,如果异常退出新增一个工作线程继续处理。
  2. 如果是正常退出,代表队列没任务且任务执行完了,然后判断是否设置了核心线程可销毁,设置了最小线程数为0,没有就核心线程数。
  3. 如果最小线程数为0了,但队列不为空,就保留一个线程继续执行任务

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    //为true 表示线程池在执行任务或取队列任务执行过程发生了异常
    if (completedAbruptly) 
        //CAS的把工作线程数减为0
        decrementWorkerCount();
    //获取全局锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //线程池完成的任务数进行叠加
        completedTaskCount += w.completedTasks;
        //将线程移除出worker的set
        workers.remove(w);
    } finally {
        //释放全局锁
        mainLock.unlock();
    }
    //校验线程池的状态,试图终止线程池
    tryTerminate();
    int c = ctl.get();
    //线程池状态小于stop 即运行或shutdown状态
    if (runStateLessThan(c, STOP)) {
        //线程池是正常退出,即队列没有任务,且任务执行完了
        if (!completedAbruptly) {
            //如果设置了核心线程可销毁 则最新线程为0,否则是核心线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            //如果min是0 并且队列不为空
            if (min == 0 && ! workQueue.isEmpty())
                //保留一个线程执行任务
                min = 1;
            if (workerCountOf(c) >= min)
                return;//如果工作线程数大于等于最小线程数,返回
        }
        //如果是异常退出走这里,创建一个线程
        //如果当前线程数量<最小最小线程数,也要创建一个线程
        addWorker(null, false);
    }
}

线程池如何实现线程复用:

  线程池对Thread做了包装,不重复调用Thread.start。而是自己有一个Runnable.run,run方法在循环里跑。跑得过程中不断检查是否有新加入的子Runnable对象,有的话就调用自己的run。相当于把一个大run把其他小run串联起来。同一个Thread可以执行不同的runnable,线程池把线程和Runnable通过队列解耦了,线程从队列中不断获取新任务

线程池的线程是如何不保证不被销毁的:

  线程池的线程都被封装成了Worker,在Worker内部会循环的从队列获取任务,如果获取的任务为空,则销毁线程,如果任务不为空,则执行任务。从队列获取任务时,如果当前线程数量没有超过核心线程数量,会阻塞式的获取任务,超过了或者设置了核心线程可超过会超时时的获取任务,超时时间为KeepAliveTime,超时时间获取不到就会返回空个,则会结束线程。即通过一个循环保证线程不会结束运行,从而保证不被销毁。

线程池是如何动态调整参数的:

  • ThreadPoolExecutor.setCorePoolSize // 修改核心线程数
  • ThreadPoolExecutor.setMaximumPoolSize // 修改最大线程数
  • ThreadPoolExecutor.setKeepAliveTime // 修改空闲线程存活时间
  • ThreadPoolExecutor.setRejectedExecutionHandler // 修改拒绝策略
  • ThreadPoolExecutor.setThreadFactory // 修改线程工厂
  通过这些方法可以设置ThreadPoolExecutor的参数,要动态调整参数。可以将相关配置配置在配置中心,启动时从配置中心读,同时监听配置中心的配置变化。可以开放controller接口,在后台可以实时调用接口从而实现配置动态化

超过核心线程数的线程是如何销毁的:

  如果超过了核心线程数,在获取任务时会超时式的从队列获取任务,如果早keepAliveTime时间后没有任务返回,会返回null,会导致while循环退出,从而走到了销毁线程的逻辑。

能够实现创建的线程数达到最大线程数再将任务放入队列吗:

  在execute方法第二步,如果workQueue.offer返回了false,就会创建新线程来执行任务。只需要重写队列的offer方法。实现如果当前线程数量没有达到最大线程数量时,返回false,走创建线程逻辑,而不是将任务放入队列。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

标签:int,null,队列,编程,并发,任务,线程,执行
来源: https://www.cnblogs.com/zhangbLearn/p/16638665.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有