ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

SparkCore-常用转换算子总结

2021-11-27 13:33:13  阅读:201  来源: 互联网

标签:总结 Int 分区 SparkCore 说人话 RDD 源码 算子 机翻


主要是分为三个类型:Value 类型、双 Value 类型和 Key-Value 类型

1.Value类型

1.1map

 传递一个对象,返回一个对象

源码中给的解释机翻如下:

通过对这个RDD的所有元素应用一个函数,返回一个新的RDD。

说人话就是:

将处理的数据逐条进行映射转换,可以是类型的转换,也可以是值的转换

值的转换,即里面每个数据*2

val mapRDD: RDD[Int] = rdd.map(
   _ * 2
)

类型转换,转为k-v结构

val wordToOne: RDD[(String, Int)] = words.map(
   word => (word, 1)
)

1.1.1mapPartitions

传递一个迭代器,返回一个迭代器

源码中给的解释机翻如下:

通过对这个RDD的每个分区应用一个函数,返回一个新的RDD。

说人话就是:

将待处理的数据以分区为单位发送到计算节点进行处理,可以是值的转换,可以是类型的转换,也可以过滤数据

 一次是处理一个分区的数据,所以说我们发现下面的代码中println只执行两次。

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
    val mapRDD: RDD[Int] = rdd.mapPartitions(
      iter => {
        //一次把一个分区的数据拿过来,所以只执行两次
        println(">>>>>>>>>>>>>>")
        iter.map(_ * 2)
      }
    )

1.1.2map与mapPartitions的区别

数据处理上:map是分区内数据逐个执行,类似于串行操作;而mapPartitions是以分区为单位进行批处理操作。

功能上:map对于源数据进行值改变或者类型转换,但不能减少或者增加数据;mapPartitions传递一个迭代器返回一个迭代器,可以对里面的元素进行过滤,即减少和增加数据。

性能上:map类似于串行操作,所以性能较低;mapPartitions算子类似于批处理,性能较高,但是mapPartitions会长时间占用内存。

1.1.3mapPartitionsWithIndex

源码中给的解释机翻如下:

通过对这个RDD的每个分区应用一个函数,返回一个新的RDD,同时跟踪原始分区的索引。

说人话就是:

其实就在mapPartitions的基础上多了一个可以返回分区的索引的功能,不多赘述。

1.2flatMap

传递一个对象,返回一个“序列”

源码中给的解释机翻如下:

返回一个新的RDD,首先对这个RDD的所有元素应用一个函数,然后将结果扁平化。

说人话就是:

在map的基础上做了扁平化处理

下面这张网上找的图可以很形象的说明flatMap:

 就是先map再flatten

1.3glom

源码中的解释机翻如下:

返回一个RDD,将每个分区中的所有元素合并到一个数组中。

说人话就是:

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不会改变。

val rdd: RDD[Any] = sc.makeRDD(List(1, 2, 3, 4), 2)
    val glomRDD: RDD[Array[Any]] = rdd.glom()

    glomRDD.collect().foreach(data => {
      println(data.mkString(","))
    })

有两个分区,那么就被转换为数组1,2和数组3,4。

1.4groupBy

源码中的解释机翻如下:

返回一个分组项目的RDD。每个组由一个键和一个映射到该键的元素序列组成。每个组中元素的顺序无法保证,甚至可能在每次评估结果RDD时都有所不同。

说人话就是:

将数据根据指定的规则进行分组分区默认不变,但是数据会被打乱重组(shuffle)。

按奇偶分区

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)


val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(_ % 2)

结果:

很明显这里是传入列表中的元素,返回由运算结果为key,映射到该key的元素组成的序列为value的结果。 

1.5filter

传递的是一个对象,返回的是一个布尔类型

源码中的解释机翻如下:

返回一个新的RDD,其中只包含满足谓词的元素。

说人话就是:

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合的丢弃。

进行筛选后,分区不变,但各个分区的数据可能不均衡,就会出现数据倾斜

筛选掉所有偶数,保留奇数。

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)

val filterRDD: RDD[Int] = rdd.filter(_ % 2 != 0)

1.6sample

源码中的解释机翻如下:

返回这个RDD的抽样子集。

说人话就是:

根据指定的规则从数据集中抽取数据。

我个人觉得这个了解即可,没必要特意去记,这里就举例分析这个转换算子了。

1.7distinct

源码中的解释机翻如下:

返回一个新的RDD,其中包含这个RDD中不同的元素

说人话就是:

去重,比如对下面的列表进行去重

val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))

val rdd1: RDD[Int] = rdd.distinct()

1.8coalesce

源码中的解释机翻如下:

返回一个新的RDD,它被简化为' numPartitions '分区。

说人话就是:

根据数据量缩小分区。比如当spark程序存在过多的小任务时,可以通过coalesce方法收缩合并分区,减少分区的个数,减少任务调度成本

有可能缩小分区的时候导致数据分配不均匀,可以通过启用shuffle使数据均匀。

1.9repartition

源码中的解释机翻如下:

返回一个具有numPartitions分区的新RDD。

说人话就是:

可以扩大也可以缩小分区,但是其实其底层就是:

 也就是说上面提到的 coalesce方法如果启用shuffle是可以扩大分区的。

1.10sortBy

源码中的解释机翻如下:

返回按给定键函数排序的RDD。

说人话就是:

用于排序数据,使用比较简单。其中ascending默认为true表示升序排列,指定为false可以改为降序。

2.双value类型

2.1intersection

求两个集合的交集

2.2union

求两个集合的并集

2.3subtract

求差集,也就是说以一个RDD的元素为主,去除两个RDD中重复的元素,其他保留下来。

比如(1,2,3,4)和(3,4,5,6),最后只保留1和2。

2.4zip

俗称拉链,将两个RDD中的元素,以键值对的形式进行合并。其中key和value分别为第一个、第二个RDD中的相同位置的元素。

3.key-value类型

3.1partitionBy

源码中的解释机翻如下: 

返回使用指定分区程序分区的RDD的副本

说人话:

将数据按照指定的分区器Partitioner重新进行分区。默认的分区器是HashPartitioner。

3.2reduceByKey

 源码中的解释机翻如下:

使用关联和交换reduce函数合并每个键的值。这也会在将结果发送给reducer之前,在每个mapper上执行本地合并,类似于MapReduce中的“combiner”。输出将使用现有分区并行级别进行散列分区。

说人话:

将数据按照相同的k对v进行聚合,至于翻译后半段在groupByKey后面再解释。典型应用就是wordCount。

3.3groupByKey 

 源码中的解释机翻如下:

将RDD中每个键的值分组到单个序列中。使用现有的分区并行级别对产生的RDD进行散列分区。每个组中元素的顺序无法保证,甚至可能在每次评估结果RDD时都有所不同。

说人话:

将数据根据k对v进行分组。我们发现和reduceByKey差不多,不过groupByKey只包含分组的功能,而reduceByKey包含分组和聚合的功能。

这里提到reduceByKey说到的机翻:reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合处理,即combine,这样可以减少落盘的数据量,所以性能较高,而groupByKey只是进行分组,不存在数据量减少的问题。

3.4aggregateByKey

源码中的解释机翻如下:

使用给定的组合函数和中性的“零值”聚合每个键的值。这个函数可以返回一个不同的结果类型U,而不是这个RDD中值的类型V。因此,我们需要一个操作将V合并为U,一个操作将两个U合并,如scala.TraversableOnce。前一个操作用于合并分区内的值,后一个操作用于合并分区之间的值。为了避免内存分配,这两个函数都允许修改并返回它们的第一个参数,而不是创建一个新的U。

说人话:

将数据根据不同的规则进行分区内计算和分区间计算

比如下面的先是分区内求最大,然后分区间求和。

//第一个参数需要传递一个参数表示初始值,主要用于当碰见第一个key的时候和value进行分区计算
    //第二个参数中的第一个参数是分区内计算规则(选最大),第二个参数是分区间计算规则(相加聚合)
    val abkRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)((x, y) => math.max(x, y), (x, y) => x + y)

3.5foldByKey

 源码中的解释机翻如下:

使用一个关联函数和一个中立的“零值”来合并每个键的值,它可以被添加到结果中任意次数,并且不能改变结果(例如,Nil用于列表拼接,0用于加法,或1用于乘法)。

说人话:

相当于分区内分区间计算规则相同版的aggregateByKey。

3.6combineByKey

 源码中的解释机翻如下:

简化版的combineByKeyWithClassTag,哈希分区的结果RDD使用现有的分区并行级别。这里的方法是为了向后兼容。它不向洗牌提供组合器类信息。

说人话:

是对k-v类型的rdd进行聚合操作的聚集函数,其实和aggregateByKey挺像,但是它允许用户返回值的类型与输入的不同

如下是求相同key的v的总和与数量,当然得到这个总和与数量,后面就可以求某个key对应值的平均值了。

val abkRDD: RDD[(String, (Int, Int))] = rdd.combineByKey(
      v => (v, 1),
      //t和默认值类型一样,v是指k相同的待聚合的value
      (t: (Int, Int), v) => {
        (t._1 + v, t._2 + 1)
      },
      //t1和t2都和默认值类型一样
      (t1: (Int, Int), t2: (Int, Int)) => {
        (t1._1 + t2._1, t1._2 + t2._2)
      }
    )

3.6.1reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别

第一个数据分区内与分区间计算规则
reduceByKey不进行任何计算相同
foldByKey与初始值进行分区内计算相同
aggregateByKey与初始值进行分区内计算不同
combineByKey当数据结构不满足要求时,可以让第一个数据转换结构不同

3.7sortByKey

 源码中的解释机翻如下:

按键对RDD排序,这样每个分区都包含一个排序过的元素范围。在生成的RDD上调用' collect '或' save '将返回或输出一个有序的记录列表(在' save '的情况下,它们将按照键的顺序写入文件系统中的多个' part-X '文件)。

说人话:

根据key进行排序。这个函数比较简单

3.8join

 源码中的解释机翻如下:

返回一个RDD,包含所有在' this '和' other '中具有匹配键的元素对。每对元素将以a (k, (v1, v2))元组的形式返回,其中(k, v1)在' this '中,(k, v2)在' other '中。执行跨集群的散列连接。

说人话:

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素连接在一起的 (K,(V,W))的 RDD。且key的类型必须相同

3.9leftOuterJoin

有左当然也有右,这个比较基础,和sql中的左外连接以及右外连接比较像。

3.10cogroup

源码中的解释机翻如下:

 对于' this '或' other '中的每个键k,返回一个结果RDD,该RDD包含一个元组,其中包含' this '和' other '中该键的值列表。

说人话:

在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD 。

 

如下对rdd1和rdd2使用

val rdd1: RDD[(String, Int)] = sc.makeRDD(List(
      ("a", 1), ("b", 2) //, ("c", 3)
    ))

    val rdd2: RDD[(String, Int)] = sc.makeRDD(List(
      ("a", 4), ("b", 5), ("c", 6), ("c", 7)
    ))

    //cogroup : connect + group
    val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)

得到结果如下:

标签:总结,Int,分区,SparkCore,说人话,RDD,源码,算子,机翻
来源: https://blog.csdn.net/emttxdy/article/details/121512772

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有