ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

20211018-ThreadPoolExecutor

2022-05-22 21:04:20  阅读:149  来源: 互联网

标签:int 20211018 worker private ctl null final ThreadPoolExecutor


成员变量

ctl变量

/**
* The main pool control state, ctl, is an atomic integer packing
    * two conceptual fields
    *   workerCount, indicating the effective number of threads
    *   runState,   indicating whether running, shutting down etc
   
* RUNNING: Accept new tasks and process queued tasks
    *   SHUTDOWN: Don't accept new tasks, but process queued tasks
    *   STOP:     Don't accept new tasks, don't process queued tasks,
    *             and interrupt in-progress tasks
    *   TIDYING: All tasks have terminated, workerCount is zero,
    *             the thread transitioning to state TIDYING
    *             will run the terminated() hook method
    *   TERMINATED: terminated() has completed
    */
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;// 32-3=29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 2的29次方-1,0001... 低29位表示线程数最大数,高3位表示executors状态

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }// 运行状态,即上面的 RUNNING等
private static int workerCountOf(int c) { return c & CAPACITY; } // worker即工人数量
private static int ctlOf(int rs, int wc) { return rs | wc; }// runState 与 workerCount的和

mainLock+works

private final ReentrantLock mainLock = new ReentrantLock();// 访问works的锁
private final HashSet<Worker> workers = new HashSet<Worker>();

 

 

ThreadPoolExecutor

execute

1、worker<coreSize,新增worker

2、worker>=coreSize,queue未满,加入任务队列

3、worker>=coreSize,queue满了,但是worker<maxSize,新增worker

4、worker>=maxSize,queue满了,拒绝策略拒绝

public void execute(Runnable command) {
   if (command == null)
       throw new NullPointerException();
   int c = ctl.get();
   if (workerCountOf(c) < corePoolSize) {
       if (addWorker(command, true))
           return;
       c = ctl.get();
  }
   if (isRunning(c) && workQueue.offer(command)) {
       int recheck = ctl.get();
       if (! isRunning(recheck) && remove(command))
           reject(command);
       else if (workerCountOf(recheck) == 0)
           addWorker(null, false);
  }
   else if (!addWorker(command, false))
       reject(command);
}

 

addWorker

1、第一个for

自旋+CAS:增加 ctl内 worker数量

2、第二个for

new Worker,再加入 works

worker.start

private boolean addWorker(Runnable firstTask, boolean core) {
       retry:
       for (;;) {
           int c = ctl.get();
           int rs = runStateOf(c);

           // Check if queue empty only if necessary.
           if (rs >= SHUTDOWN &&
               ! (rs == SHUTDOWN &&
                  firstTask == null &&
                  ! workQueue.isEmpty()))
               return false;

           for (;;) {
               int wc = workerCountOf(c);
               if (wc >= CAPACITY ||
                   wc >= (core ? corePoolSize : maximumPoolSize))// 数量限制与workers数量比较,决定能否新增worker
                   return false;
               if (compareAndIncrementWorkerCount(c))
                   break retry;
               c = ctl.get();  // Re-read ctl
               if (runStateOf(c) != rs)
                   continue retry;
               // else CAS failed due to workerCount change; retry inner loop
          }
      }

       boolean workerStarted = false;
       boolean workerAdded = false;
       Worker w = null;
       try {
           w = new Worker(firstTask);
           final Thread t = w.thread;
           if (t != null) {
               final ReentrantLock mainLock = this.mainLock;
               mainLock.lock();
               try {
                   // Recheck while holding lock.
                   // Back out on ThreadFactory failure or if
                   // shut down before lock acquired.
                   int rs = runStateOf(ctl.get());

                   if (rs < SHUTDOWN ||
                      (rs == SHUTDOWN && firstTask == null)) {
                       if (t.isAlive()) // precheck that t is startable
                           throw new IllegalThreadStateException();
                       workers.add(w);
                       int s = workers.size();
                       if (s > largestPoolSize)
                           largestPoolSize = s;
                       workerAdded = true;
                  }
              } finally {
                   mainLock.unlock();
              }
               if (workerAdded) {
                   t.start();
                   workerStarted = true;
              }
          }
      } finally {
           if (! workerStarted)
               addWorkerFailed(w);
      }
       return workerStarted;
  }

 

Worker

public void run() {
runWorker(this);
}

 

runWorker

1、取任务,来自 firstTask 或者 getTask()

2、有任务,task.run(),进入下一个while

3、无任务,processWorkerExit

final void runWorker(Worker w) {
   Thread wt = Thread.currentThread();
   Runnable task = w.firstTask;
   w.firstTask = null;
   w.unlock(); // allow interrupts
   boolean completedAbruptly = true;// 突然中断,如果while条件未满足则非突然的,其他都是突然的
   try {
       while (task != null || (task = getTask()) != null) {
           w.lock();
           // If pool is stopping, ensure thread is interrupted;
           // if not, ensure thread is not interrupted. This
           // requires a recheck in second case to deal with
           // shutdownNow race while clearing interrupt
           if ((runStateAtLeast(ctl.get(), STOP) ||
                (Thread.interrupted() &&
                 runStateAtLeast(ctl.get(), STOP))) &&
               !wt.isInterrupted())
               wt.interrupt();
           try {
               beforeExecute(wt, task);
               Throwable thrown = null;
               try {
                   task.run();
              } catch (RuntimeException x) {
                   thrown = x; throw x;
              } catch (Error x) {
                   thrown = x; throw x;
              } catch (Throwable x) {
                   thrown = x; throw new Error(x);
              } finally {
                   afterExecute(task, thrown);
              }
          } finally {
               task = null;
               w.completedTasks++;
               w.unlock();
          }
      }
       completedAbruptly = false;
  } finally {
       processWorkerExit(w, completedAbruptly);
  }
}

 

getTask

1、是否淘汰(核心线程运行超时 或 worker数量大于corePoolSize)

2、是淘汰-超时时间内获取任务

3、不淘汰-不限时阻塞获取任务

private Runnable getTask() {
   boolean timedOut = false; // Did the last poll() time out?

   for (;;) {
       int c = ctl.get();
       int rs = runStateOf(c);

       // Check if queue empty only if necessary.
       if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
           decrementWorkerCount();
           return null;
      }

       int wc = workerCountOf(c);

       // Are workers subject to culling?
       boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 是否淘汰

       if ((wc > maximumPoolSize || (timed && timedOut))
           && (wc > 1 || workQueue.isEmpty())) {
           if (compareAndDecrementWorkerCount(c))
               return null;
           continue;
      }

       try {
           Runnable r = timed ?
               workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :// 需要淘汰,超时时间内获取任务,此处使用空闲时间
           workQueue.take();
           if (r != null)
               return r;
           timedOut = true;
      } catch (InterruptedException retry) {
           timedOut = false;
      }
  }
}

 

 

processWorkerExit

1、完成任务计数 更新

2、移除当前worker

3、非正常完成,新增worker

4、正常完成,worker数量满足最小要求,直接退出;不满足min要求,新增worker

private void processWorkerExit(Worker w, boolean completedAbruptly) {
   if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
       decrementWorkerCount();// 是突然的,ctl的work计数未调整,此处调整

   final ReentrantLock mainLock = this.mainLock;
   mainLock.lock();
   try {
       completedTaskCount += w.completedTasks;// 完成任务计数 更新
       workers.remove(w);// 移除当前worker
  } finally {
       mainLock.unlock();
  }

   tryTerminate();// 不太清楚有什么用

   int c = ctl.get();
   if (runStateLessThan(c, STOP)) {
       if (!completedAbruptly) {// 正常完成
           int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 允许核心线程空闲超时时死亡,则线程池最小线程数为0;否则最小线程数是corePoolSize
           if (min == 0 && ! workQueue.isEmpty())
               min = 1;
           if (workerCountOf(c) >= min)
               return; // replacement not needed
      }
       addWorker(null, false);
  }
}

 

参数配置

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 

1、需要配置哪些

int corePoolSize		核心线程数
int maximumPoolSize		最大线程数
queueSize				队列长度

 

2、如何配置

依照

每秒请求数(QPS,如 100~1000)+

每个请求耗时(COST,0.5s)+

系统最大响应时间(MAXRSP,2s)

corePoolSize = QPS/(1/COST) = QPS/2 = 50~500

其中 (1/COST) 可以理解为单个线程 1s内可以完成的请求数 n(0<n<无限大),此处为 2,即 1s内一个线程能完成 2个请求

哦,网上还说了个什么 8020原则,貌似希望核心线程数满足 80% 的最大请求数,那么此处应该就是 400

 

queueSize = (MAXRSP-COST) * (max(QPS)-corePoolSize*(1/COST)) = 1.5 * (1000-800) = 300

太大:接入了无法满足最大响应时间的请求

太小:能满足最大响应时间的请求又拒绝了

 

队列大小应该满足最大响应时间,目前看是队列满时,最后一个任务出队完成刚好满足最大响应时间

最大响应时间 2s - 请求耗时 0.5s = 最长待 1.5秒,即 1.5s内核心线程数可以堆积的任务数

 

maximumPoolSize = max(QPS)/(1/COST) = 500

太大,创建过多线程,OOM;应该大于corePoolSize=400 但是小于最大 QPS 所需线程数=500

原则上最大线程数与队列都满负荷运作,应该满足最大请求数,此处QPS=1000

3、问

进入队列的请求与下一秒新的请求,谁会先执行

队列内的请求由以往work完成

新的请求看情况是

1、入队-新请求后执行

2、新增worker-新请求应该会先执行

3、拒绝

标签:int,20211018,worker,private,ctl,null,final,ThreadPoolExecutor
来源: https://www.cnblogs.com/zpq5935/p/16298960.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有