ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

并发编程从零开始(十六)-ForkJoinPool

2021-11-03 23:33:07  阅读:215  来源: 互联网

标签:队列 ForkJoinPool 编程 阻塞 任务 从零开始 线程 窃取


并发编程从零开始(十六)-ForkJoinPool

第四部分:ForkJoinPool

15 ForkJoinPool用法

ForkJoinPool就是JDK7提供的一种“分治算法”的多线程并行计算框架。Fork意为分叉,Join意为合并,一分一合,相互配合,形成分治算法。此外,也可以将ForkJoinPool看作一个单机版的Map/Reduce,多个线程并行计算。

相比于ThreadPoolExecutor,ForkJoinPool可以更好地实现计算的负载均衡,提高资源利用率。

假设有5个任务,在ThreadPoolExecutor中有5个线程并行执行,其中一个任务的计算量很大,其余4个任务的计算量很小,这会导致1个线程很忙,其他4个线程则处于空闲状态。

利用ForkJoinPool,可以把大的任务拆分成很多小任务,然后这些小任务被所有的线程执行,从而实现任务计算的负载均衡。

例子1:快排

快排有2个步骤:

  1. 利用数组的第1个元素把数组划分成两半,左边数组里面的元素小于或等于该元素,右边数组里面的元素比该元素大;

  2. 对左右的两个子数组分别排序。

左右两个子数组相互独立可以并行计算。利用ForkJoinPool:

image-20211103093250826

image-20211103093311931

例子2:求1到n个数的和

image-20211103093352427

上面的代码用到了 RecursiveAction 和 RecursiveTask 两个类,它们都继承自抽象类ForkJoinTask,用到了其中关键的接口 fork()、join()。二者的区别是一个有返回值,一个没有返回值。

RecursiveAction/RecursiveTask类继承关系:

image-20211103093417372

在ForkJoinPool中,对应的接口如下:

image-20211103093431416


16 核心数据结构

与ThreadPoolExector不同的是,除一个全局的任务队列之外,每个线程还有一个自己的局部队列。

image-20211103093604844

核心数据结构如下所示:

image-20211103093618170

下面看一下这些核心数据结构的构造过程:

image-20211103093637628

image-20211103093653252


17 工作窃取队列

关于上面的全局队列,有一个关键点需要说明:它并非使用BlockingQueue,而是基于一个普通的数组得以实现。

这个队列又名工作窃取队列,为 ForkJoinPool 的工作窃取算法提供服务。在 ForkJoinPool开篇的注释中,Doug Lea 特别提到了工作窃取队列的实现,其陈述来自如下两篇论文:"Dynamic CircularWork-Stealing Deque" by Chase and Lev,SPAA 2005与"Idempotent work stealing" byMichael,Saraswat,and Vechev,PPoPP 2009。读者可以在网上查阅相应论文。

所谓工作窃取算法,是指一个Worker线程在执行完毕自己队列中的任务之后,可以窃取其他线程队列中的任务来执行,从而实现负载均衡,以防有的线程很空闲,有的线程很忙。这个过程要用到工作窃取队列。

image-20211103093740999

这个队列只有如下几个操作:

  1. Worker线程自己,在队列头部,通过对top指针执行加、减操作,实现入队或出队,这是单线程的。

  2. 其他Worker线程,在队列尾部,通过对base进行累加,实现出队操作,也就是窃取,这是多线程的,需要通过CAS操作。

这个队列,在Dynamic Circular Work-Stealing Deque这篇论文中被称为dynamic-cyclic-array。之所以这样命名,是因为有两个关键点:

  1. 整个队列是环形的,也就是一个数组实现的RingBuffer。并且base会一直累加,不会减小;top会累加、减小。最后,base、top的值都会大于整个数组的长度,只是计算数组下标的时候,会取top&(queue.length-1),base&(queue.length-1)。因为queue.length是2的整数次方,这里也就是对queue.length进行取模操作。

    当top-base=queue.length-1 的时候,队列为满,此时需要扩容;

    当top=base的时候,队列为空,Worker线程即将进入阻塞状态。

  2. 当队列满了之后会扩容,所以被称为是动态的。但这就涉及一个棘手的问题:多个线程同时在读写这个队列,如何实现在不加锁的情况下一边读写、一边扩容呢?

通过分析工作窃取队列的特性,我们会发现:在 base 一端,是多线程访问的,但它们只会使base变大,也就是使队列中的元素变少。所以队列为满,一定发生在top一端,对top进行累加的时候,这一端却是单线程的!队列的扩容恰好利用了这个单线程的特性!即在扩容过程中,不可能有其他线程对top进行修改,只有线程对base进行修改。

下图为工作窃取队列扩容示意图。扩容之后,数组长度变成之前的二倍,但top、base的值是不变的!通过top、base对新的数组长度取模,仍然可以定位到元素在新数组中的位置。

image-20211103093902177

下面结合WorkQueue扩容的代码进一步分析。

image-20211103093914450

image-20211103093921638

image-20211103093932906

image-20211103093943661


18 ForkJoinPool状态控制

18.1 状态变量ctl解析

类似于ThreadPoolExecutor,在ForkJoinPool中也有一个ctl变量负责表达ForkJoinPool的整个生命周期和相关的各种状态。不过ctl变量更加复杂,是一个long型变量,代码如下所示。

image-20211103101757636

image-20211103101807994

ctl变量的64个比特位被分成五部分:

  1. AC:最高的16个比特位,表示Active线程数-parallelism,parallelism是上面的构造方法传进去的参数;

  2. TC:次高的16个比特位,表示Total线程数-parallelism;

  3. ST:1个比特位,如果是1,表示整个ForkJoinPool正在关闭;

  4. EC:15个比特位,表示阻塞栈的栈顶线程的wait count(关于什么是wait count,接下来解释);

  5. ID:16个比特位,表示阻塞栈的栈顶线程对应的id。

image-20211103101828774


18.2 阻塞栈Treiber Stack

什么叫阻塞栈呢?

要实现多个线程的阻塞、唤醒,除了park/unpark这一对操作原语,还需要一个无锁链表实现的阻塞队列,把所有阻塞的线程串在一起。

在ForkJoinPool中,没有使用阻塞队列,而是使用了阻塞栈。把所有空闲的Worker线程放在一个栈里面,这个栈同样通过链表来实现,名为Treiber Stack。前面讲解Phaser的实现原理的时候,也用过这个数据结构。

下图为所有阻塞的Worker线程组成的Treiber Stack。

image-20211103101917428

首先,WorkQueue有一个id变量,记录了自己在WorkQueue[]数组中的下标位置,id变量就相当于每个WorkQueue或ForkJoinWorkerThread对象的地址;

image-20211103101931154

其次,ForkJoinWorkerThread还有一个stackPred变量,记录了前一个阻塞线程的id,这个stackPred变量就相当于链表的next指针,把所有的阻塞线程串联在一起,组成一个Treiber Stack。

最后,ctl变量的最低16位,记录了栈的栈顶线程的id;中间的15位,记录了栈顶线程被阻塞的次数,也称为wait count。


18.3 ctl变量的初始值

构造方法中,有如下的代码:

image-20211103102023370

因为在初始的时候,ForkJoinPool 中的线程个数为 0,所以 AC=0-parallelism,TC=0-parallelism。这意味着只有高32位的AC、TC 两个部分填充了值,低32位都是0填充。


18.4 ForkJoinWorkerThread状态与个数分析

在ThreadPoolExecutor中,有corePoolSize和maxmiumPoolSize 两个参数联合控制总的线程数,而在ForkJoinPool中只传入了一个parallelism参数,且这个参数并不是实际的线程数。那么,ForkJoinPool在实际的运行过程中,线程数究竟是由哪些因素决定的呢?

要回答这个问题,先得明白ForkJoinPool中的线程都可能有哪几种状态?可能的状态有三种:

  1. 空闲状态(放在Treiber Stack里面)。

  2. 活跃状态(正在执行某个ForkJoinTask,未阻塞)。

  3. 阻塞状态(正在执行某个ForkJoinTask,但阻塞了,于是调用join,等待另外一个任务的结果返回)。

ctl变量很好地反映出了三种状态:

高32位:u=(int) (ctl >>> 32),然后u又拆分成tc、ac 两个16位;

低32位:c=(int) ctl。

  1. c>0,说明Treiber Stack不为空,有空闲线程;c=0,说明没有空闲线程;

  2. ac>0,说明有活跃线程;ac<=0,说明没有空闲线程,并且还未超出parallelism;

  3. tc>0,说明总线程数 >parallelism。

在提交任务的时候:

image-20211103102145022

image-20211103102151423

image-20211103102200608

image-20211103102208302

在通知工作线程的时候,需要判断ctl的状态,如果没有闲置的线程,则开启新线程:

image-20211103102237263


19 Worker线程的阻塞-唤醒机制

ForkerJoinPool 没有使用 BlockingQueue,也就不利用其阻塞/唤醒机制,而是利用了park/unpark原语,并自行实现了Treiber Stack。

下面进行详细分析ForkerJoinPool,在阻塞和唤醒的时候,分别是如何入栈的。

19.1 阻塞-入栈

当一个线程窃取不到任何任务,也就是处于空闲状态时就会阻塞入栈。

image-20211103211314035

image-20211103211337941

image-20211103211355712

image-20211103211404365

image-20211103211418326


19.2 唤醒-出栈

在新的任务到来之后,空闲的线程被唤醒,其核心逻辑在signalWork方法里面。

image-20211103211447532

image-20211103211501240


20 任务的提交过程分析

在明白了工作窃取队列、ctl变量的各种状态、Worker的各种状态,以及线程阻塞—唤醒机制之后,接下来综合这些知识,详细分析任务的提交和执行过程。

关于任务的提交,ForkJoinPool最外层的接口如下所示。

image-20211103212253505

image-20211103212259890

image-20211103212324732

如何区分一个任务是内部任务,还是外部任务呢?

可以通过调用该方法的线程类型判断。

如果线程类型是ForkJoinWorkerThread,说明是线程池内部的某个线程在调用该方法,则把该任务放入该线程的局部队列;

否则,是外部线程在调用该方法,则将该任务加入全局队列。

20.1 内部提交任务push

内部提交任务,即上面的q.push(task),会放入该线程的工作窃取队列中,代码如下所示。

image-20211103212424047

由于工作窃取队列的特性,操作是单线程的,所以此处不需要执行CAS操作。


20.2 外部提交任务

image-20211103212654856

image-20211103212706668

lockedPush(task)方法的实现:

image-20211103212716781

外部多个线程会调用该方法,所以要加锁,入队列和扩容的逻辑和线程内部的队列基本相同。最后,调用signalWork(),通知一个空闲线程来取。


21 工作窃取算法:任务的执行过程分析

全局队列有任务,局部队列也有任务,每一个Worker线程都会不间断地扫描这些队列,窃取任务来执行。下面从Worker线程的run方法开始分析:

image-20211103231648554

run()方法调用的是所在ForkJoinPool的runWorker方法,如下所示。

image-20211103231708571

下面详细看扫描过程scan(w, a)。

image-20211103231720552

image-20211103231727562

标签:队列,ForkJoinPool,编程,阻塞,任务,从零开始,线程,窃取
来源: https://www.cnblogs.com/yangchen-geek/p/15506272.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有