ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark-RDD,算子

2022-07-12 09:04:15  阅读:144  来源: 互联网

标签:String val 分区 RDD conf 算子 Spark spark


Spark内核

RDD

ResilientDistributedDataset (弹性分布式数据集 )

五大特性:
A list of partitions
A function for computing each split
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs
Optionally, a list of preferred locations to compute each split on

RDD五大特性

1、RDD由一组分区组成。读取文件默认一个block块

对应一个分区后面rdd的分区数和前面rdd-样

2、函数实际上是作用在每个分区上的,一个分区由一个task处理, 有多少个分区就有多少个task写代码是基于RDD写的代码,最终运行时,算

子是作用在分区上的

3、RDD之间有依赖关系,宽依赖和窄依赖,有shuffle是 宽依赖,没有shuffle窄依赖宽依赖前是 个Stage, 宽依赖后是个stage(map或者reduce端)

4、分区类的算子只能作用在kv格式的rdd上,groupByKey reduceByKey5、Spark为task的计算提供了最佳的计算位置,移动计算而不是移动数据

关于RDD的分区数

testFile 可以读取文件夹,但是文件夹不能包括子文件夹

RDD 的分区数,分区数越高,并行度越高,在资源重足的情况下效率越高
* 1. 读取文件,默认等于切片的数量
* 2. 读取文件可以设置最新的分区数量-minPartitions.
*
* 3. 控制RDD的分区数,只能爱切片的基础上,增加分区数,不能减少分区数
* 4. 宽依赖算子分区数默认等于前一个RDD,也可以设置分区数

package com.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo2Partition {
  def main(args: Array[String]): Unit = {
    //创建spark对象
    val conf = new SparkConf()
    //创建spark任务名称
    conf.setAppName("Demo2Partition")
    //设置spark的任务为本地执行
    conf.setMaster("local")
    //创建spark上下文的对象,作为程序的入口
    val sc = new SparkContext(conf)

    /**
     * testFile 可以读取文件夹,但是文件夹不能包括子文件夹
     */

    /**
     * RDD 的分区数,分区数越高,并行度越高,在资源重足的情况下效率越高
     * 1. 读取文件,默认等于切片的数量
     * 2. 读取文件可以设置最新的分区数量-minPartitions.
     *
     * 3. 控制RDD的分区数,只能爱切片的基础上,增加分区数,不能减少分区数
     * 4. 宽依赖算子分区数默认等于前一个RDD,也可以设置分区数
     *
     */

    //读取文件
    val linesRDD: RDD[String] = sc.textFile("data/students.txt",7)

    //查看分区数
    println(s"linesRDD的分区数为:${linesRDD.getNumPartitions}")

    val wordsRDD: RDD[String] = linesRDD.flatMap(lines => lines.split(","))

    //查看分区数
    println(s"wordsRDD的分区数为:${wordsRDD.getNumPartitions}")

    val kvRDD: RDD[(String, Iterable[String])] = wordsRDD.groupBy(w => w)

    //查看分区数
    println(s"kvRDD的分区数为:${kvRDD.getNumPartitions}")


    val wordCount: RDD[(String, Int)] = kvRDD.map(kv => (kv._1, kv._2.size))

    //查看分区数
    println(s"wordCount的分区数为:${wordCount.getNumPartitions}")

    wordCount.saveAsTextFile("data/wc")
  }

}

运行结果如下

接下来就是重点了-算子

map

map:将rdd数据一条一条传递给后面的函数,函数的返回值构建成一个新的rdd

map:算子不会改变总的数据长度

package com.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo3Map {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("Demo3Map")
    conf.setMaster("local")

    val sc = new SparkContext(conf)

    //读取学生表数据
    val lineRDD: RDD[String] = sc.textFile("data/students.txt")

    /**
     * map:将rdd数据一条一条传递给后面的函数,函数的返回值构建成一个新的rdd
     * map:算子不会改变总的数据长度
     */

    val nameRDD: RDD[String] = lineRDD.map(line => line.split(",")(1))

    nameRDD.foreach(println)

  }

}

filter-过滤数据

Filter: 将RDD的数据一条一条的传递给函数,如果返回true就保留数据,如果为false就过滤数据

Filter:会减少RDD的行数

package com.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo4Filter {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("Demo4Filter")
    conf.setMaster("local")

    val sc = new SparkContext(conf)

    val lineRDD: RDD[String] = sc.textFile("data/students.txt")

    /**
     * Filter: 将RDD的数据一条一条的传递给函数,如果返回true就保留数据,如果为false就过滤数据
     *
     * Filter: 会减少RDD的行数
     *
     */


    val filterRDD: RDD[String] = lineRDD.filter(line =>{
      val age: Int = line.split(",")(2).toInt
      age.equals(22)
    } )

    filterRDD.foreach(println)
  }

}

flatmap - 展开

package com.core

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo5FlatMap {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("flatmap")
    conf.setMaster("local")

    val sc = new SparkContext(conf)

    //读取文件
    val lineRDD: RDD[String] = sc.textFile("data/word.txt")

    /**
     *flatMap:一条一条的将RDD的数据返回给后面的函数,函数的返回值必须是一个集合,最后会将集合展开构建成一个新的RDD
     *
     */
    val wordsRDD: RDD[String] = lineRDD.flatMap(line => line.split(","))

    wordsRDD.foreach(println)
  }

}

sample-抽样

package com.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo6Sample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("map")
    conf.setMaster("local")

    val sc = new SparkContext(conf)


    //读取学生表的数据
    val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

    /**
     * sample : 可以从数据中抽样一部分数据
     *
     */
    val sample: RDD[String] = studentsRDD.sample(withReplacement = true, 0.1)

    sample.foreach(println)
  }

}

标签:String,val,分区,RDD,conf,算子,Spark,spark
来源: https://www.cnblogs.com/atao-BigData/p/16468704.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有