ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark的转换算子及一个案例

2021-10-10 18:59:18  阅读:129  来源: 互联网

标签:String val RDD Int 分区 rdd 案例 算子 spark


spark的转换算子


转换算子

map:同分区有序运行,不同分区无序运行

(在各个分区内取数据,功能强大,但是效率低)(:/1c37ca978ea74eae9f8c4258b0f9064f)

val result1: RDD[Int] = rdd.map(num => {
      println(num)
      num * 2
    })

mapPartitions:一次性取数一个分区,在分区内计算

效率高,但是只有全部计算完成的时候各分区的数据才会释放(因为对象的引用),内存小数据量大的场合下,可能会内存移除

val result2: RDD[Int] = rdd.mapPartitions(x => {
      x.map(_ * 2)})

golm:将一个分区的数据变成集合

val glomRdd: RDD[Array[Int]] = rdd.glom() 
    val maxRDD: RDD[Int] = glomRdd.map(
      data => data.max
    )
    println(maxRDD.collect().sum)

groupBy:讲数据源中的每一个数据进行key进行分

    val rdd: RDD[String] = sc.makeRDD(List("hello java", "spark", "scala"), 2)
    def func1(string: String):Char = {
      string(0)
    }
    rdd.groupBy(func1).collect().foreach(println)

filter:过滤,返回布尔类型

    val rdd: RDD[String] = sc.makeRDD(List("hello java", "spark", "scala"), 2)
    rdd.filter(_.startsWith("s")).collect().foreach(println)

sample

    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7), 2)
    println(rdd.sample(
      false, // 抽取完是否放回,true为放回
      0.4, // 每一个数据被抽取的概率,就算大于1也不一定会被抽取
      1 //随机数种子,可以不设置
    ).collect().mkString(","))

distinct

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 1, 3, 3, 4, 5, 6, 7), 2)
    // case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
    rdd.distinct().collect().foreach(print)

coalesce: 缩减分区

(不选择shuffle可能后续会数据倾斜)

repartition:其实就是(coalesce(shuffle true))

可以增加分区,调用的coalesce,但是默认给shuffle=true

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
    rdd .coalesce(2, true).saveAsTextFile("out")

sortBy: 会进行shuffle,默认不改变分区且升序

val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "2"), (3, "4"), (2, "6")), 2)
    // 是经过shuffle的
    rdd.sortBy(_._2,false)
      .collect().foreach(println)

双value的操作,(交集,并集,差集,拉链)

intersection、union、subtract、zip
交集,并集,差集要求RDD的数据类型要一致;拉链可以不一致
    // 交集
    val rddInt: RDD[Int] = rdd1.intersection(rdd2)
    // 并集
    val rddUnion: RDD[Int] = rdd1.union(rdd2)
    // 差集
    val rddSub: RDD[Int] = rdd1.subtract(rdd2)
    // 拉链
    val rddZip: RDD[(Int, Int)] = rdd1.zip(rdd2)

partitionBy:

    val rddNew: RDD[(Int, Int)] = rdd.map((_, 1))
    // 自带hash分区,可重写
    val result: RDD[(Int, Int)] = rddNew.partitionBy(new HashPartitioner(2))
    result.saveAsTextFile("out")

groupByKey:不给参数,自动获取第一个为key

与groupBy的区别在于groupBy会保留完整的K-V对,groupByKey会提取出value

    val GroupRDD: RDD[(Int, Iterable[(Int, String)])] = rdd.groupBy(_._1)
    // (1,CompactBuffer((1,spark), (1,hello)))(3,CompactBuffer((3,scala)))(2,CompactBuffer((2,java)))

    val GroupByRDD: RDD[(Int, Iterable[String])] = rdd.groupByKey()
    // (1,CompactBuffer(spark, hello))(3,CompactBuffer(scala))(2,CompactBuffer(java))

补充:groupByKey 和 reduceByKey 的区别

shuffle角度:groupByKey是将数据打乱,会进行shuffle重新分区,可以再map进行统计
shuffle必须要落盘操作,不能在内存中等待
reduceByKey会在分区中预聚合(combine:类似MR的map端预聚合)。shuffle的数据就会变少一点;性能更高一点
功能角度:groupByKey是进行分组,再可以实现别的操作,功能更灵活,groupByKey是对相同key的value聚合,如果只是需要分组的话就不能使用

    val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1, 1), (2, 1), (1, 1), (3, 1)), 1)
    rdd.reduceByKey(_ + _).collect().foreach(println)
    rdd.groupByKey().map({ case (word, iter) => (word, iter.sum) }).collect().foreach(println)
  • aggregateByKey :两个参数列表。第一个参数列表为初始值;第二个参数列表两个参数:第一个参数为分区内操作,第二个参数为分区间操作
val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1, 1), (2, 1), (1, 1), (3, 1)), 2)
    rdd.aggregateByKey(0)(math.max(_,_),_+_).collect.foreach(println)

foldByKey:分区内和分区间的计算规则一样时使用

相当于第二个参数列表两个参数相同的aggregateByKey

val rdd: RDD[(Int, Int)] = sc.makeRDD(List((1, 1), (2, 1), (1, 1), (3, 1)), 1)
    rdd.foldByKey(0)(_ + _).collect.foreach(println)

使用aggregateByKey求key的平均值

    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 1), ("b", 5)), 2)
    /**
     * 初始值的(0,0) 第一个0是表示“值”,第二个0表示出现分区键“值”的次数,实际每一次的运算都是将初始值更新。
     */
    val rdd1: RDD[(String, (Int, Int))] = rdd.aggregateByKey((0, 0))(
      // 第一次运算时:x代表的是(0,0),y是("a", 1)中的 1,运算的结果就是第一个0+y得到第一此运算的“和”,第二个0+1,得到运算的次数;结果为(1,1)
      (init_tuple, value) => {
        (init_tuple._1 + value, init_tuple._2 + 1)
      },
      // 各个分区内的数据进行运算,“值”和“值”相加,次数和次数相加
      (part1, part2) => {
        (part1._1 + part2._1, part1._2 + part2._2)
      }
    )
    val rdd2: RDD[(String, Int)] = rdd1.mapValues({ case (x, y) => {
      x / y
    }
    })

可以用combineByKey代替:

	val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 3), ("b", 1), ("b", 5)), 2)

    /**
     * 需要三个参数
     * 第一个参数:将相同key的第一个数据进行结构转换
     * 第二个参数:分区内的计算规则
     * 第三个参数:分区间的计算规则
     */
    val rdd2: RDD[(String, (Int, Int))] = rdd.combineByKey(x => (x, 1),
      (init_tuple: (Int, Int, ), value) => {
        (init_tuple._1 + value, init_tuple._2 + 1)
      }
      , (part1: (Int, Int), part2: (Int, Int)) => {
        (part1._1 + part2._1, part1._2 + part2._2)
      }
    )
  • join、leftOuterJoin、rightOuterJoin,cogroup(这个不发散,会将两个rdd中的数据变成两个迭代器iter)

案例、不同省份的广告点击量前三的城市

// 时间戳	   省份 城市 用户 广告 
// 1635304263 广东 深圳 yan2 N
    val rdd: RDD[String] = sc.textFile("C:\\Users\\93134\\Desktop\\a.txt")
    //1635304263 广东 深圳 yan2 N
    val mapRdd: RDD[((String, String), Int)] = rdd.map(line => {
      val datas: Array[String] = line.split(" ")
      ((datas(1), datas(2)), 1)
    })

    // ((江西,南昌),115)
    val reduceRdd: RDD[((String, String), Int)] = mapRdd.reduceByKey(_ + _)

    // (江西,(南昌,115))
    val mapRdd2: RDD[(String, (String, Int))] = reduceRdd.map({
      // 不建议用_的方式,用模式匹配可以直接使用
      case ((province, city), sum) => {
        (province, (city, sum))
      }
    })
    // (江西,CompactBuffer((江西,(吉安,104)), (江西,(赣州,121)), (江西,(南昌,115))))
    val groupRdd: RDD[(String, Iterable[(String, Int)])] = mapRdd2.groupByKey()

    // (江西,List((赣州,121), (南昌,115)))
    val resultRdd: RDD[(String, List[(String, Int)])] = groupRdd.mapValues(values => {
      values.toList.sortBy(_._2)(Ordering.Int.reverse).take(3)
    })
    resultRdd.collect().foreach(println)
    sc.stop()

标签:String,val,RDD,Int,分区,rdd,案例,算子,spark
来源: https://blog.csdn.net/weixin_44429965/article/details/120690131

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有