ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Spark快速上手(3)Spark核心编程-RDD转换算子

2022-07-01 19:36:12  阅读:168  来源: 互联网

标签:sparkContext val Int list List RDD 算子 Spark


RDD(2)

RDD转换算子

RDD根据数据处理方式的不同将算子整体上分为Value类型、双Value类型、Key-Value类型

value类型

map

函数签名
def map[U:ClassTag](f:T=>U):RDD[U]
函数说明
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换
e.g.1

 val source = sparkContext.parallelize(Seq(1, 2, 3, 4, 5, 6))
    val map = source.map(item => item*10)
    val result = map.collect()
    result.foreach(println)

e.g.2

   val data1: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4), 2)
//    val data2: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4), 1)
    val rdd1: RDD[Int] = data1.map(
      num => {
        println(">>>" + num)
        num
      }
    )
    val rdd2: RDD[Int] = rdd1.map(
      num => {
        println("<<<" + num)
        num
      }
    )
    rdd2.collect()

note:
RDD计算同一分区内数据有序,不同分区数据无序

(func)从服务器日志数据apache.log中获取用户请求URL资源路径(例):
apache.log

83.149.9.216 - - 17/05/2015:10:05:12 +0000 GET /presentations/logstash-monitorama-2013/plugin/zoom-js/zoom.js
83.149.9.216 - - 17/05/2015:10:05:07 +0000 GET /presentations/logstash-monitorama-2013/plugin/notes/notes.js
83.149.9.216 - - 17/05/2015:10:05:34 +0000 GET /presentations/logstash-monitorama-2013/images/sad-medic.png

code:

val data = sparkContext.textFile("input/apache.log")
    val clean = data.map{
      item => {
        item.split(" ")(6)
      }
    }
    clean.foreach(println(_))

mapPartitions

函数签名

def mapPartitions[U:ClassTag](
  f:Iterator[T] =>Iterator[U],
  preservesPartitioning:Boolean = false):RDD[U]

函数说明
将待处理的数据以分区为单位发送到计算节点进行任意的处理(过滤数据亦可)
note: 函数会将整个分区的数据加载到内存中进行引用。内存较小、数据量较大的情况下,容易出现内存溢出。

val dataRDD1: RDD[Int]= dataRDD.mapPartitions(
  datas =>{            //遍历每个分区进行操作
    datas.filter(_==2) //过滤每个分区中值为2的数据
  }
)

(func)获取每个数据分区的最大值

code:

object getMaxFromArea {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Max")
    val sparkContext: SparkContext = new SparkContext(sparkConf)
    val source: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 6), 2)
    val mapPartition: RDD[Int] = source.mapPartitions(p => List(p.max).iterator)

    //多个分区获取最大值,使用迭代器
    val result: Array[Int] = mapPartition.collect()
    result.foreach(println)
    sparkContext.stop()

  }
}

comparison:

map和mapPartitions的区别

数据处理角度
Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而 mapPartitions 算子
是以分区为单位进行批处理操作。

功能的角度
Map 算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。
MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,
所以可以增加或减少数据

性能的角度
Map 算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算子类似于批处
理,所以性能较高。但是 mapPartitions 算子会长时间占用内存,那么这样会导致内存可能
不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作

mapPartitionsWithIndex

函数签名
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U]
函数说明
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引

val dataRDD1 = dataRDD.mapPartitionsWithIndex(
 (index, datas) => {
      datas.map(index, _)
 }
)

(func)获取第二个数据分区的数据

code:

object getSecondArea {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Sec")
    val sparkContext: SparkContext = new SparkContext(sparkConf)
    val source: RDD[Int] = sparkContext.parallelize(List(1, 2, 3, 4, 5, 6), 2)
    val mapPartitionsWithIndex: RDD[Int] = source.mapPartitionsWithIndex(
      (index, data) => {
        if (index == 1) {
          data
        } else {
          Nil.iterator
        }
      }
    ) 
    val result: Array[Int] = mapPartitionsWithIndex.collect()
    result.foreach(println(_))
    sparkContext.stop()
  }

}

(func)获取每个数据及其对应分区索引

code:

object getDataAndIndexOfArea {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Data")
    val sparkContext: SparkContext = new SparkContext(conf)

    val data: RDD[Int] = sparkContext.makeRDD(List(1, 2, 3, 4, 5, 6))
    val dataAndIndex: RDD[(Int, Int)] = data.mapPartitionsWithIndex(
      (index, iter) => {
        iter.map(data => (data, index))
      }
    )

    val result: Array[(Int, Int)] = dataAndIndex.collect()
    result.foreach(println)

  }

}


note:这里没有自定义分区数量,故默认最多分区数(与机器逻辑处理器数量相关),数据随机存储在这些分区中

flatMap

函数签名
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
函数说明
将处理的数据进行扁平化后再进行映射处理,所以算子也称作扁平映射

val dataRDD = sparkContext.makeRDD(
	List(List(1,2),List(3,4)),1)
val dataRDD1 = dataRDD.flatMap(list => list)
1
2
3
4

(func)将List("Hello World","Hello Spark")进行扁平化操作
1)字符扁平化

val data = sparkContext.makeRDD(List("Hello World","Hello Spark"),1)
    val rdd: RDD[Char] = data.flatMap(list => list)
    val result1: Array[Char] = rdd.collect()
    result1.foreach(item => print(item+" "))

2)字符串扁平化

val data1: RDD[String] = sparkContext.parallelize(List("Hello World", "Hello Spark"), 1)
    val rdd1: RDD[String] = data1.flatMap(list => {
      list.split(" ")
    })
    val result2: Array[String] = rdd1.collect()
    result2.foreach(item => println(item + " "))


(func)将List(List(1,2),3,List(4,5))进行扁平化操作

thinking:List(List(1,2),3,List(4,5)) => List(list,int,list) => RDD[Any]
当数据的格式不能够满足时我们可以使用match进行格式的匹配(类似java中的switch,case)

code:

val data2: RDD[Any] = sparkContext.parallelize(List(List(1, 2), 3, List(4, 5)))
    val rdd2: RDD[Any] = data2.flatMap {
   //  完整代码:
      //          dat =>{
      //            dat match {
      //              case list: List[_] => list
      //              case int => List(int)
      //            }
      //          }
      case list: List[_] => list
      case int => List(int)
    }
    val result3: Array[Any] = rdd2.collect()
    result3.foreach(item => print(item + " "))

标签:sparkContext,val,Int,list,List,RDD,算子,Spark
来源: https://www.cnblogs.com/unknownshangke/p/16434018.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有