ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark-调优(代码层面)

2022-07-21 21:34:54  阅读:159  来源: 互联网

标签:String val 代码 RDD 调优 split import spark


spark-调优(代码)

在编写代码时可以进行优化

  1. 避免创建重复的RDD
  2. 尽可能复用同一个RDD
  3. 对多次使用的RDD进行持久化
  4. 尽量避免使用shuffle类算子
  5. 使用map-side预聚合的shuffle操作
  6. 使用高性能的算子
  7. 广播大变量
  8. 使用Kryo优化序列化性能
  9. 优化数据结构
  10. 使用高性能的库fastutil

1.对多次使用的RDD进行持久化

默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大, 可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避 免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作 ,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传 送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种 策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化 级别,会导致JVM的OOM内存溢出异常。

如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用 MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个 partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别 比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算 子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上, 如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。

如何选择一种最合适的持久化策略
如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是 MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无 法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优 先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写 ,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将 所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性 能开销,除非是要求作业的高可用性,否则不建议使用。

package com.shujia.spark.opt

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel

object Demo1Cache {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("cache")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

    /**
     * 当对同一个rdd进行多次使用的时候可以将rdd缓存起来
     *
     */

    //缓存级别是MEMORY_ONLY
    //studentsRDD.cache()

    //内存放不下放磁盘,同时会对数据做序列化,将一个分区的数据序列化从一个字节数组
    studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)

    /**
     * rdd: rdd.cache
     * df : df.cache
     * sql: cache table student,  uncache table studnet
     */

    /**
     * 统计班级的的人数
     *
     */
    studentsRDD
      .map(stu => (stu.split(",")(3), 1))
      .reduceByKey(_ + _)
      .map {
        case (clazz: String, num: Int) =>
          s"$clazz\t$num"
      }
      .saveAsTextFile("data/cache/clazz_num")

    /**
     * 统计性别的人数
     *
     */
    studentsRDD
      .map(stu => (stu.split(",")(3), 1))
      .reduceByKey(_ + _)
      .map {
        case (gender: String, num: Int) =>
          s"$gender\t$num"
      }
      .saveAsTextFile("data/cache/gender_num")

    /**
     * 清空缓存
     */
    studentsRDD.unpersist()
    while (true) {
    }
  }
}

2.使用高性能的算子

  1. 使用reduceByKey/aggregateByKey替代groupByKey
  2. 使用mapPartitions替代普通map Transformation算子
  3. 使用foreachPartitions替代foreach Action算子
  4. 使用filter之后进行coalesce操作
  5. 使用repartitionAndSortWithinPartitions替代repartition与sort类操 作 代码
  6. repartition:coalesce(numPartitions,true) 增多分区使用这个
  7. coalesce(numPartitions,false) 减少分区 没有shuffle只是合并 partition

2.1aggregateByKey案例:

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object Demo2AggregateByKey {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("Demo2AggregateByKey")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

    val clazzKvDS: RDD[(String, Int)] = studentsRDD.map(stu => (stu.split(",")(4), 1))

    /**
     * aggregateByKey: 需要两个函数,一个是map端预聚合的函数,一个reduce端汇总的函数
     * reduceByKey map端和reduce端聚合函数是一样,
     * 如果map端和reduce端要写不一样的聚合函数可以使用aggregateByKey
     *
     */
    val countRDD: RDD[(String, Int)] = clazzKvDS.aggregateByKey(0)(
      (u: Int, i: Int) => u + i,//在map端做聚合函数
      (u1: Int, u2: Int) => u1 + u2//在reduce端做聚合的函数
    )
    countRDD.foreach(println)
  }
}

2.2 mapPartitions案例

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

import java.text.SimpleDateFormat
import java.util.Date

object Demo3MapPartition {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("cache")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val dataRDD: RDD[String] = sc.textFile("data/ant_user_low_carbon.txt")

    val kvRDD: RDD[(String, String, String)] = dataRDD.mapPartitions(iter => {
      iter.map(line => {
        //如果只是简单的拆分数据,使用map和mappartition没有区别
        val split: Array[String] = line.split("\t")
        (split(0), split(1), split(2))
      })
    })

    val resultRDD: RDD[(String, Long, String)] = kvRDD.mapPartitions(iter => {
      /**
       *
       * 可以将一些初始化的代码房子mapPartitions中,减少占用的内存空间
       */
      //将时间字段转换成时间戳
      //在这里创建的对象,是一个分区创建一个
      val format = new SimpleDateFormat("yyyy/MM/dd")

      iter.map {
        case (id: String, sdate: String, p: String) =>
          val dateObj: Date = format.parse(sdate)
          val ts: Long = dateObj.getTime
          (id, ts, p)
      }
    })
    resultRDD.foreach(println)
  }
}

2.3 foreachPartitions案例

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.sql.{Connection, DriverManager, PreparedStatement}

object Demo4foreachPartitions {
  def main(args: Array[String]): Unit = {

    val startTIme: Long = System.currentTimeMillis()

    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("cache")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

    /**
     * 将rdd的数据保存到mysql中
     *
     */

    /**
     * foreach: 每一条数据都需要创建一个网络链接
     * 不能将网络链接放在算子外(网络链接不能在网络中传输)
     *
     */
    /* studentsRDD.foreach(stu => {
       val split: Array[String] = stu.split(",")
       //1、加载启动
       Class.forName("com.mysql.jdbc.Driver")
       val start: Long = System.currentTimeMillis()
       //2、创建链接
       val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8", "root", "123456")
       val end: Long = System.currentTimeMillis()
       println(s"创建数据库的链接用了:${end - start}")
       //3、编写插入数据的sql
       val stat: PreparedStatement = con.prepareStatement("insert into students(id,name,age,gender,clazz) values(?,?,?,?,?)")
       //4、设置列值
       stat.setLong(1, split(0).toLong)
       stat.setString(2, split(1))
       stat.setLong(3, split(2).toLong)
       stat.setString(4, split(3))
       stat.setString(5, split(4))

       //5、执行插入
       stat.execute()

       //6、关闭链接
       stat.close()
       con.close()
     })*/

    /**
     * foreachPartition: 一次遍历一个分区的数据
     * 如果需要将rdd的数据保存到外部数据库中,比如mysql,hbase,redis, 需要使用foreachPartition
     */
    studentsRDD.foreachPartition(iter => {

      //1、加载启动
      Class.forName("com.mysql.jdbc.Driver")
      val start: Long = System.currentTimeMillis()
      //2、创建链接
      val con: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8", "root", "123456")
      val end: Long = System.currentTimeMillis()
      println(s"创建数据库的链接用了:${end - start}")
      //3、编写插入数据的sql
      val stat: PreparedStatement = con.prepareStatement("insert into students(id,name,age,gender,clazz) values(?,?,?,?,?)")

      iter.foreach(stu => {
        val split: Array[String] = stu.split(",")
        //4、设置列值
        stat.setLong(1, split(0).toLong)
        stat.setString(2, split(1))
        stat.setLong(3, split(2).toLong)
        stat.setString(4, split(3))
        stat.setString(5, split(4))

        //5、执行插入
        stat.execute()
      })
      //6、关闭链接
      stat.close()
      con.close()
    })

    val endTIme: Long = System.currentTimeMillis()

    println(s"共用了:${endTIme - startTIme}")
  }
}

2.4 repartition(重分区)案例

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object Demo5RePartition {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("cache")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

    println(s"studentsRDD分区数据:${studentsRDD.getNumPartitions}")

    /**
     * repartition: 对rdd重分区,返回一个新的rdd,  会产生shuffle
     * repartition可以用于增加分区和减少分区,
     * 增加分区可以增加并行度,在资源充足的情况下, 效率更高
     * 减少分区可以减少产生的小文件的数量
     *
     */
    val rePartRDD: RDD[String] = studentsRDD.repartition(10)

    println(s"rePartRDD分区数据:${rePartRDD.getNumPartitions}")

    /**
     * coalesceL 重分区,,可以设置是否产生shuffle
     * 如果指定shuffle为true,可以用于增加分区和减少分区
     * 如果指定shuffle为false,只能用于减少分区
     *
     */

    val coalesceRDD: RDD[String] = rePartRDD.coalesce(100, shuffle = true)
    println(s"coalesceRDD分区数据:${coalesceRDD.getNumPartitions}")

    /**
     * 当处理好的数据需要保存到磁盘的时候,如果产生了很多的小文件,可以使用coalesce合并小文件
     * 合并的标准:保证合并之后的每一个文件的大小在128M左右
     *
     * 比如数据保存的数据是10G, 最好的情况是合并为80个
     *
     * shuffle = false: 不产生shuffle,效率更好
     *
     */
    coalesceRDD
      .coalesce(1, shuffle = false) //合并小文件
      .saveAsTextFile("data/coalesce")
  }
}

3.广播大变量

开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如 100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提 升性能

函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络 传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如 100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节 点的Executor中占用过多内存导致的频繁GC(垃圾回收),都会极大地影响性能

如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播 后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的 task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本 的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低 GC的频率

广播大变量发送方式:Executor一开始并没有广播变量,而是task运行需要用到广 播变量,会找executor的blockManager要,bloackManager找Driver里面的 blockManagerMaster要

package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

object Demo6MapJoin {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("cache")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val studentDF: DataFrame = spark.read
      .format("csv")
      .option("sep", ",")
      .schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")
      .load("data/students.txt")

    val scoreDF: DataFrame = spark.read
      .format("csv")
      .option("sep", ",")
      .schema("id STRING,cid STRING,score DOUBLE")
      .load("data/score.txt")

    /**
     * studentDF.hint("broadcast"): 将小表广播出去
     *
     * 当一个大表关联小表的时候,可以将小表广播出去,使用mapjoin
     * mapjoin 不会产生shuffle,可以提高关联的效率,小表一般要在1G以内
     *
     * mapjoin 会产生两个job
     * 1、第一个job是将小表的数据拉取到Driver端,从Driver端广播到Executor端
     * 2、关联的job
     *
     */
    val joinDF: DataFrame = scoreDF.join(studentDF.hint("broadcast"), "id")

    joinDF.show()
    while (true) {
    }
  }
}

4.使用Kryo优化序列化性能(一般重要)

在Spark中,主要有三个地方涉及到了序列化:

  1. 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输
  2. 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,SXT是自定义类型),所有自 定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现 Serializable接口。
  3. 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个 partition都序列化成一个大的字节数组。
package com.shujia.spark.opt
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.storage.StorageLevel

object Demo7Kyyo {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local")
      .appName("cache")
      .config("spark.sql.shuffle.partitions", 1)
      //序列化方式
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      //指定注册序列化的类,自定义
      .config("spark.kryo.registrator", "com.shujia.spark.opt.Demo8KryoRegister")
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val studentsRDD: RDD[String] = sc.textFile("data/students.txt")

    /**
     * 将rdd中一行数据转换成Student的对象
     *
     */
    val stuRDD: RDD[Student] = studentsRDD.map(stu => {
      val split: Array[String] = stu.split(",")
      Student(split(0), split(1), split(2).toInt, split(3), split(4))
    })

    /**
     * 不做使用序列化,数据是280K
     * 使用默认的序列化的方式: 数据是55K
     * 使用kryo进行序列化: 数据大小:43k
     *
     *
     * spark sql 默认已经使用了kryo进行序列化, rdd没有使用,需要自己实现
     *
     */

    stuRDD.persist(StorageLevel.MEMORY_ONLY_SER)

    stuRDD
      .map(stu => (stu.clazz, 1))
      .reduceByKey(_ + _)
      .map {
        case (clazz: String, num: Int) =>
          s"$clazz\t$num"
      }
      .foreach(println)

    /**
     * 统计性别的人数
     *
     */
    stuRDD
      .map(stu => (stu.gender, 1))
      .reduceByKey(_ + _)
      .map {
        case (gender: String, num: Int) =>
          s"$gender\t$num"
      }
      .foreach(println)

    while (true) {
    }
  }
  case class Student(id: String, name: String, age: Int, gender: String, clazz: String)
}
上述的代码需要的工具类: KryoRegistrator
package com.shujia.spark.opt
import com.esotericsoftware.kryo.Kryo
import com.shujia.spark.opt.Demo7Kyyo.Student
import org.apache.spark.serializer.KryoRegistrator

class Demo8KryoRegister extends KryoRegistrator {
  override def registerClasses(kryo: Kryo): Unit = {
    /**
     * 在这个方法中将需要使用kryo进行序列化的类做一个注册
     *
     */
    kryo.register(classOf[Student])
    kryo.register(classOf[String])
    kryo.register(classOf[Int])
  }
}

标签:String,val,代码,RDD,调优,split,import,spark
来源: https://www.cnblogs.com/atao-BigData/p/16503631.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有