ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Java线程池源码分析

2021-10-21 23:04:33  阅读:173  来源: 互联网

标签:Java int private 源码 线程 boolean ctl return


Java线程池,基于jdk1.8

一些属性

//线程数量和线程池状态  高三位是状态  低29位是数量
	private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
	//位移位数  29
    private static final int COUNT_BITS = Integer.SIZE - 3;
	//容量  2的29次方-1   00011111 11111111 11111111 11111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 线程池状态标记
	// 11100000 00000000 00000000 00000000
    private static final int RUNNING    = -1 << COUNT_BITS;
	// 00000000 00000000 00000000 00000000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
	// 00100000 00000000 00000000 00000000
    private static final int STOP       =  1 << COUNT_BITS;
	// 01000000 00000000 00000000 00000000
    private static final int TIDYING    =  2 << COUNT_BITS;
	// 01100000 00000000 00000000 00000000
    private static final int TERMINATED =  3 << COUNT_BITS;
	
	// 位运算 获取当前线程池的状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 位运算 获取当前线程池的线程数
	private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
	
	private final BlockingQueue<Runnable> workQueue;
	private final ReentrantLock mainLock = new ReentrantLock();
	private final HashSet<Worker> workers = new HashSet<Worker>();
	//是否 允许核心线程超时
	private volatile boolean allowCoreThreadTimeOut;
	
	private int largestPoolSize;

构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
	//参数依次是  核心线程数  最大线程数  线程存活时间  线程存活时间的单位  阻塞队列  线程工厂  拒绝策略
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

提交任务

public void execute(Runnable command) {
		//任务为空,抛异常
        if (command == null)
            throw new NullPointerException();
		//线程计数
		int c = ctl.get();
		//当前线程池的线程数 小于 核心线程数
		if (workerCountOf(c) < corePoolSize) {
			// 添加线程
			if (addWorker(command, true))
				return;
			//获取最新的线程数
			c = ctl.get();
		}
		//在运行中并且可以将任务添加到阻塞队列(阻塞队列未满)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 线程池不是运行中 并且 将阻塞队列 里面的线程 移除
			if (! isRunning(recheck) && remove(command))
                //执行拒绝
				reject(command);
			//如果线程池中没有线程了,则创建一个线程执行任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
		// 阻塞队列满了,添加线程失败(达到最大线程数) 执行拒绝
        else if (!addWorker(command, false))
            reject(command);
    }
	
	private static boolean isRunning(int c) {
        return c < SHUTDOWN;
    }
	//添加线程方法	当前任务  是否核心线程
	private boolean addWorker(Runnable firstTask, boolean core) {
        retry://跳出标志
        for (;;) {
            int c = ctl.get();
			//获取线程池状态
            int rs = runStateOf(c);

            // 检查阻塞队列是否为空  阻塞队列不为空,说明里面有任务,说明线程数已经达到核心线程数的限制
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
				// 线程数 大于等于 最大线程容量  或者 线程数大于等于 核心线程数/最大线程数 不再添加线程
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
				//增加线程数 跳出循环,执行flag处的语句
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //线程池状态改变了,再次进行循环
				if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
		// flag		添加的线程 启动 和 添加成功标记
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
			//将当前任务封装成一个Worker线程
            w = new Worker(firstTask);
			//工作线程
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();//加锁
                try {
					//
                    int rs = runStateOf(ctl.get());

					// 当前线程池是运行状态  || 是SHUTDOWN并且当前任务为空
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 新创建的线程 已经存活  抛异常 刚创建还没启动怎么就启动了
						if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        //添加到工作线程集合
						workers.add(w);
						// 判断工作线程数是否大于醉倒线程数 出现过的最大线程数
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
						//标记添加线程成功
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
				// 添加线程成功 启动 设置启动标志
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
			//添加工作线程失败 将Worker从Worker集合中删除,并且减少工作线程数  因为上面对工作线程数+1了
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
	
	private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
	
	private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }
	
	private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
	//执行任务
	final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 允许中断
        boolean completedAbruptly = true;
        try {
			//当前任务不为空或者可以从阻塞队列里面获取到任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //(线程池状态为stop || (线程中断了 并且 线程池准备stop)) && 当前线程没有中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();//中断线程
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();//执行任务
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
					//置空任务 任务完成数加1 
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
			//获取不到任务了
            completedAbruptly = false;
        } finally {
			//销毁工作线程
            processWorkerExit(w, completedAbruptly);
        }
    }
	//获取任务方法
	private Runnable getTask() {
        boolean timedOut = false; 

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 阻塞队列为空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // 允许核心线程超时 或者 当前线程数大于核心线程数(有非核心线程)
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
				//获取任务 
                Runnable r = timed ?
					//允许核心线程超时 或者 有非核心线程 达到存活时间没有获取到任务也会返回 
					//再次循环 上面的判断队列为空的条件 会让方法返回空
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();//这里会阻塞 保持核心线程
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
	
	private static boolean runStateAtLeast(int c, int s) {
        return c >= s;
    }
	
	private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
	
	private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
			//增加完成任务数量
            completedTaskCount += w.completedTasks;
			//移除Worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

		//终止线程
        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
			//检查是不是因为获取不到任务 进入该方法的
            if (!completedAbruptly) {
				//线程获取不到任务,就是该消亡线程了
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
			//给线程池保存一个工作线程
            addWorker(null, false);
        }
    }
	
	final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
	
//内部类
	private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

        // 工作线程
        final Thread thread;
        // 工作任务
        Runnable firstTask;
        // 完成的任务数量
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); // aqs的状态 防止中断
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        // 执行任务  活跃线程执行的任务在这里
        public void run() {
            runWorker(this);
        }

        // 0 未上锁, 1 上锁了
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

标签:Java,int,private,源码,线程,boolean,ctl,return
来源: https://blog.csdn.net/xuwenjingrenca/article/details/120896772

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有