ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

pyspark-combineByKey详解

2021-02-01 11:04:59  阅读:262  来源: 互联网

标签:combineByKey Combiner shuffle spill python self partition 详解 pyspark


最近学习Spark,我主要使用pyspark api进行编程,

网络上中文的解释不是很多,api官方文档也不是很容易明白,我结合自己的理解记录下来,方便别人参考,也方便自己回顾吧

本文介绍的是pyspark.RDD.combineByKey

combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None, partitionFunc=<function portable_hash at 0x7f1ac7340578>)

它是一个泛型函数,主要完成聚合操作,将输入RDD[(K,V)]转化为结果RDD[(K,C)]输出

例如:

[python] view plain copy    
  1. x = sc.parallelize([('B',1),('B',2),('A',3),('A',4),('A',5)])  
  2. createCombiner = (lambda el: [(el, el**2)])  
  3.   
  4. mergeVal = (lambda aggregated, el : aggregated + [(el, el**2)])  
  5. mergeComb = (lambda agg1, agg2 : agg1 + agg2)  
  6.   
  7. y = x.combineByKey(createCombiner, mergeVal, mergeComb)  
  8. print(x.collect())  
  9. print(y.collect())  

打印结果如下:

[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]
主要有三个参数需要自己实现,分别是

createCombiner:实现输入RDD[(K,V)]中V到结果RDD[(K,C)]中C的转换, V和C可能是相同类型,也可能是不同类型,如上例中的createCombiner

它会创建一个元素列表

mergeValue:将V合并到C中

它会将当前值添加到元素列表的末尾

mergeCombiners:对mergeValue产生的C进一步合并,即是reduce操作

它会将两个C合并到一起

combineByKey的处理流程如下:

遍历RDD[(K,V)]中每一个元素

1、如果当前K是一个新元素,使用createCombiner()创建K为键的累加器初始值,生成列表[('A',(3,9))], [('B',(1,1))]

2、如果当前K已经遇到过,使用mergeValue()将当前(K,V)合并进第1步生成的累加器列表,,

      生成[('A',(3,9)),('A',(4,16)),('A',(5,25)) , ('B',(1,1,)), ('B', (2,4))]

      否则执行第1步

3、将相同键值的累加器进行合并,得到[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]

因此得到如下结果:

[('A', [(3, 9), (4, 16), (5, 25)]), ('B', [(1, 1), (2, 4)])]

Spark的很多transformation方法是基于combineBy的,会导致shuffle过程,所以一般认为成本较大。那么具体过程如何?涉及到哪些spark配置,又该如何调整呢?数据倾斜问题又会对此造成什么影响呢?

本文暂不涉及调优分析,仅从代码层面基于pyspark分析combineBy的实现原理和其中的shuffle过程,期望抛砖引玉。

接口定义如下:

python|copycode|?1def combineByKey(self, createCombiner, mergeValue, mergeCombiners,2numPartitions=None)

约定Combiner和value含义为:

Value 是PariRDD中的value值Combiner是combineByKey完成后PairRDD的value值,可以与Value类型不一样

前3个参数都是callback方法:

Combiner createCombiner(Value),通过一个Value元素生成Combiner元素,会被多次调用Combiner mergeValue(Combiner, Value),将一个Value元素合并到Combiner里,会被多次调用Combiner mergeCombiners(Combiner, Combiner),将两个Combiners合并,也会被多次调用

从python/pyspark/rdd.py的combineBy代码看到,其处理过程分为3步:

locally_combined = self.mapPartitions(combineLocally),在python进程进行combineshuffled = locally_combined.partitionBy(numPartitions) ,进行python进程内部的shuffle和基于scala的worker nodes间shufflereturn shuffled.mapPartitions(_mergeCombiners, True) ,类似MR的reducer,聚合shuffle的结果
pyspark combineBy的实现原理与shuffle过程

下面来详细看每一个steps。

Step1,locally_combined = self.mapPartitions(combineLocally)。Python|copycode|?1def combineLocally(iterator):2merger = ExternalMerger(agg, memory * 0.9, serializer) \3if spill else InMemoryMerger(agg)4merger.mergeValues(iterator)5return merger.iteritems()

该方法根据spark.shuffle.spill方法决定是使用ExternalMerger还是InMemoryMerger,其中ExternalMerger的内存是由spark.python.worker.memory限定的。以下主要关注ExternalMerger,磁盘+内存的外部排序。另外,merger.mergeValues也是有可能调用全部3个callbacks方法的,不仅仅是mergeValue callback,不要被它的名称迷惑了。

ExternalMerger的关键成员变量:

data: dict of {K: V},unpartitioned merged data,还没有spill前的数据都是放在该dict里的pdata:list of dicts,list长度由self.partitions数目决定,该值与numPartitions不一样。partitioned merged data,如果已经发生过spill,则后续的数据都读入pdata并hash到不同的dict槽位里。ExternalMerger的外存文件路径是/path/to/localdir//

。其中localdir路径可以有多个,当位于不同的磁盘时,可以提高并发写入速度。spill_num是spill的轮次,partition_num是self.partitions对应的分片数。文件中的每一行是序列化后的Key-Combiner list。

再来看mergeValues的方法主体:

Python|copycode|?01def mergeValues(self, iterator):03""" Combine the items by creator and combiner """05iterator = iter(iterator)07# speedup attribute lookup09creator, comb = self.agg.createCombiner, self.agg.mergeValue11d, c, batch = self.data, 0, self.batch14for k, v in iterator:16d[k] = comb(d[k], v) if k in d else creator(v)18c += 121if c % batch == 0 and get_used_memory() > self.memory_limit:23self._spill() // 按实时计算的内部partition id把self.data中的数据 flush到disk,为了兼容pdata的flush,这里每行一个单元素的dict25self._partitioned_mergeValues(iterator, self._next_limit())27break // iterator指针已经交给_partitioned_mergeValues了,这里就直接跳出了针对当前mapPartition里的每一个元素,调用createCombiner或mergeValue callback生成新的Combiner对象,并存入self.data[k]里。定期(根据c%batch,注意batch的值会动态调整)检查内存,如果不超限就继续读入。直到内存不足时,首先调用_spill方法把self.data flush,然后重新计算memory_limit并把处理权交给_partitioned_mergeValues。Python|copycode|?01def _partitioned_mergeValues(self, iterator, limit=0):02""" Partition the items by key, then combine them """03# speedup attribute lookup04creator, comb = self.agg.createCombiner, self.agg.mergeValue05c, pdata, hfun, batch = 0, self.pdata, self._partition, self.batch07for k, v in iterator:08d = pdata[hfun(k)]09d[k] = comb(d[k], v) if k in d else creator(v)10if not limit:11continue13c += 114if c % batch == 0 and get_used_memory() > limit:15self._spill()16limit = self._next_limit()该方法也是继续读入元素,但不同的是,将新生成的Combiner存入self.pdata[hfun(k)][k]里,即根据内部partitions方法进行分片。如果内存不足,则调用_spill写入磁盘,并调整limit值。

上面两次调用了_spill方法,由于只有第一次是针对self.data的,故应只有spill_num=1时的partition file里,才有多行;后续spill_num的partition file都是一个大行。

mergeValues把当前partition里具有相同key的values合并到一个Combiner里了,但这些数据可能在内存和磁盘上,这就需要iteritems()来把它们归并起来返回给上层调用者。如果没用spill过,证明数据都在self.data里,就直接返回其迭代器即可。否则就要调用_external_items()来归并了。

_external_items()按partition处理文件,即每次处理多轮spill生成的一个partition id对应的文件。为了最大程度利用内存并降低复杂度,首先会把pdata里的数据也spill掉。

Python|copycode|?01try:02for i in range(self.partitions):03self.data = {}04for j in range(self.spills):05path = self._get_spill_dir(j)06p = os.path.join(path, str(i))07# do not check memory during merging08self.mergeCombiners(self.serializer.load_stream(open(p)),09False)11# limit the total partitions12if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS13and j < self.spills - 114and get_used_memory() > hard_limit):15self.data.clear() # will read from disk again16gc.collect() # release the memory as much as possible17""" chengyi02: Because the yield values before are also using memory(in the caller),18so if now more than limit, the following partitions will almostly exceed too.19So recursive merged all the remaining partitions.2021for v in self._recursive_merged_items(i):22yield v23return25for v in self.data.iteritems():26yield v27self.data.clear()29# remove the merged partition30for j in range(self.spills):31path = self._get_spill_dir(j)32os.remove(os.path.join(path, str(i)))在调用self.mergeCombiner处理一个/

文件时,不检查内存是否超限,否则可能会继续向当前/path/to/localdir/下flush数据,就破坏文件数据结构了。而是在一个文件处理完成之后,检查内存若超限且分片数可控,则调用_recursive_merged_items进行递归的合并。

这里需要注意的是,_recursive_merged_items会所有>=i(当前partition id)的数据,而非仅处理当前partition。这是因为,当前内存消耗主要包含两块:rdd.py里调用者保存的yield返回内存块,以及当前partition已经读入的数据。假设partition比较平均,则后者数据量相对稳定;而前者不断增长。所以后续内存超限的几率会更大。

_recursive_merged_items会new 一个ExternalMerger对象m,将文件里的数据依次读入、merge,并按需spill(这时会spill到其他localdir目录,spill_num也重新开始计数,可以视为在不同的数据空间),最后通过m._external_items返回合并后的数据。在m._external_items里还会不会再次发生递归调用呢?几率很小,因为这个方法里的内存消耗基本等同于m的一个partition分片数据大小。而m的所有数据 == self的一个partition,故m的一个partition数据量非常小。即使再次发生递归调用,m的子m 分片数据量会依次递减,故会再次降低spill几率。

上面的描述基本上符合外部排序算法,但工程的世界里还需要考虑GC问题。在_spill的最后和_extern_items里都调用了python的gc.collect()方法,同步释放引用计数为0的内存块。但由于collect不一定能那么完美的释放,一些reachabled还是无法释放的,如果这部分存量较大(例如很逼近本次limit),那极端情况下一个元素就又会触发spill了,这显然逼近耗时也没用意义。所以ExternalMerger会动态调整limit值,max(self.memory_limit, get_used_memory() * 1.05)。所以,这能看出来了,python进程实际消耗内存可能会大于python.worker.memory值。

以上step1结束,返回的locally_combined_rdd的结构为:[(k, Combiner), ...]。k还是原来的k,但当前worker task下所有相同k的values都聚合到了Combiner中。step2,shuffled = locally_combined.partitionBy(numPartitions)

这里有一个问题,为什么不直接把rdd给scala的partitionBy,而在python代码里实现了一些内部shuffle的逻辑呢?首先,partitionBy的回调方法是可定制的,python里默认是portable_hash,如果用scala实现,如何回调python的分片函数呢?其次,python和scala间的通信是需要序列化的,一条一条的成本有点大,所以python shuffle后也做了batch。具体如下。


pyspark combineBy的实现原理与shuffle过程

(注:partitionBy完之后,数据量不会有变化,以上kv变化仅为了代表shuffle后一个task里只会有属于自己分片的key了。)

从代码可以看到python里计算了partition_id,所以scala仅根据确定的分片情况,进行shuffle。在深入之前,先看下python partitionBy里涉及到的几个rdds:

自身,即self,PairRDD(k,v),通过mapPartitions向下转化keyed,PairRDD(paritition_id, [(k,v),(k,v),...]),通过copy constructor向下转化pairRDD,JavaPairRDD,java会调用适配到scala,并通过scala的partitionBy向下转化jrdd,java代理的scala的ShuffledRDD,通过copy constructor向下转化rdd,PairRDD(k,v),分片完成

keyed的生成过程比较简单,由内部方法add_shuffle_key实现,完成k到partition_id的计算,并将一批kv打包、序列化作为keyed rdd的v。Every 1k items, will check used_memroy once,and if equal to batch+1, will also flush。并且batch的值会动态调整,尽量使生成的批量包大小在1M和10M之间。

在jrdd的构造过程中,会生成shuffleDependency,当stage提交时,DAG调度发现该dependency,就会发起ShuffleMapTask,生成shuffle数据(相当于MR里map端的shuffle工作):


pyspark combineBy的实现原理与shuffle过程

ExternalSorter是shuffle的关键,在该分支中生成ExternalSorter的参数:

Scala|copycode|?1sorter = new ExternalSorter[K, V, V](None, Some(dep.partitioner), None, dep.serializer)

即将aggregate=None;dep.partitioner此处是PythonPartitioner,直接使用rdd的key作为paritition_id,在该场景就是python里计算好的partition_id;ordering=None。其中aggregate和ordering为None即代表无需再combine和sort,只单纯shuffle就好,以下仅关注这个分支,由insertAll和writePartitionedFile合作完成。

insertAll的一个可能分支是bypassMergeSort == True,即如果partitions数目较少,且无需aggregate和ordering,则直接写numPartitions个文件,随后再在writePartitionedFile里简单concate。缺点是每个worker node上都同时打开numPartitions个文件,有额外内存消耗,且可能too many open files。

另一个可能分支是大数据时的常见场景(无需combine且大量写),针对每条数据:

Scala|copycode|?1buffer.insert((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C])2maybeSpillCollection(usingMap = false)

由于python里做了批量打包,故这儿的一条数据的v对应python的一批数据的kv了。每插入一条数据,检查是否需要spill。这儿的实现思路与python的externalMerger也类似。简单介绍下,insertAll的maybeSpillCollection最终调用spillToMergeableFiles,根据partition_id和key对内容排序(这里partition_id==key),每次spill写入一个临时文件,并且把file信息记录在spills数组里。

随后writePartitionedFile得把之前分布在多个临时文件里的数据归并为最终输出的partition文件。与spilled bypassMergeSort对应的分支,因为一个partition的数据都在一个partition文件里,所以简单的concate即可。否则,数据分散在内存和多个文件里,整合的过程由PartitionedIterator迭代器触发完成,返回的iter依次写文件。

以上过程,将shuffled数据写入disk了,但怎样被下一个计算单元使用呢?

step3,return shuffled.mapPartitions(_mergeCombiners, True)

虽然spark1.2之后已默认使用sort based shuffle,但sort shuffle还是使用HashShuffleReader读取数据:SortShuffleManager -> HashShuffleReader -> BlockStoreShuffleFetcher.fetch() 。其目的是收集上游多个worker node产生的shuffled数据,所以必然有network I/O。

BlockStoreShuffleFetcher.fetch里,先获取shuffleId和reduceId对应的blocks上游节点信息,信息包括((address, size), index),其中address是BlockManageId类型。之后调用ShuffleBlockFetcherIterator从local block manager和remote BlockTransferService处获取blocks数据。

ShuffleBlockFetcherIterator的initialize方法调用splitLocalRemoteBlocks方法根据address.executeId生成出remoteRequests对象,针对同一address的多个block,若size < maxBytesInFlight/5 则合并为一个request。从而通过这种方式,确保最大并发度为5。也可以看出,如果有大的block,则request size可能大于maxBytesInFlight。随后会对remoteRequests列表随机化,以保证请求尽量均衡。

initialize方法随后立即调用sendRequest发送多个请求,并打印日志:logInfo(“Started ” + numFetches + ” remote fetches in” + Utils.getUsedTimeMs(startTime))。sendRequest调用shuffleClient.fetchBlocks()读取远端数据,并注册BlockFetchingListener,后者的onBlockFetchSuccess方法会接收buf数据并添加到results队列里。

ShuffleBlockFetcherIterator的next()方法先“释放”内存,并在内存充足的情况下再发出sendRequest请求。随后再读取results队列里的数据,并解压、反序列化,返回迭代器iterator。这里有两个小的优化点,“释放”内存并不是真的触发gc,而是从bytesInFlight里减去已接收到results队列的数据长度,因为后者随后将被读取,而bytesInFlight限制的是网络缓存。另一个是先sendRequest再处理results中的数据,原因是前者是异步调用,且耗时可能较长。

还需要注意的是,sendRequest调用的shuffleClient实际是BlockTransferService的一个实例,有netty和nio两种实现方式。

以上完成了shuffle数据的接收,next返回的iterator最终回到python代码进入step3的剩余阶段,这就很简单了,实际上此时才真正执行shuffled.mapPartitions(_mergeCombiners, True)。

通过以上3个步骤,python、java和scala多语言的交互,最终完成了pyspark的combineByKey。个人理解,由于scala的partitionBy和shuffle过程早已实现,所以pyspark主要解决的是 如何更优的进行多语言交互,以达到较优的性能、扩展性、复用度。

标签:combineByKey,Combiner,shuffle,spill,python,self,partition,详解,pyspark
来源: https://www.cnblogs.com/ExMan/p/14355506.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有