标签:val rdd RDD conf println 序列化 优化
一、RDD重用和存储级别选择
缓存:cache persist
val conf= new SparkConf()
.setAppName(this.getClass.getName)
.setMaster("local[1]")
val sc = new SparkContext(conf)
//当遇到action算子,cache才会生效
//cache实际就是调用persist,默认使用StorageLevel.MEMORY_ONLY,使用persist可自行设置
val unit: RDD[Int] = sc.parallelize(1 to 10).cache().collect
val unit1: RDD[Int] = sc.parallelize(1 to 10).persist(StorageLevel.MEMORY_ONLY).collect
//取消缓存
unit1.unpersist()
检查点:checkpoint
val conf= new SparkConf()
.setAppName(this.getClass.getName)
.setMaster("local[1]")
val sc = new SparkContext(conf)
//设置一个检查点目录
sc.setCheckpointDir("file:///D:\\bigdata\\notes\\maven\\mavenscala\\test\\checkPoint")
//创建数据RDD
val result: RDD[String] = sc.textFile("file:///D:\\bigdata\\notes\\maven\\mavenscala\\test\\word.txt")
val rdd = result.flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_+_)
//打印血缘关系
println(rdd.toDebugString)
//将RDD持久化到磁盘
rdd.checkpoint()
//是否持久化成功
println(rdd.isCheckpointed)
//执行action算子,使持久化生效
rdd.foreach(println)
println("持久化到检查点后")
//打印血缘关系
println(rdd.toDebugString)
//是否持久化成功
println(rdd.isCheckpointed)
println(rdd.getCheckpointFile)
检查点与缓存的区别
设置检查点后,需要执行action算子才会持久化成功,同时会改变该算子的血缘关系,数据也从此保存在目标检查点文件夹,application结束不会消失
而缓存不会改变血缘关系,application结束后自动删除缓存文件
二、广播变量
类似mapJoin把小表放到内存,广播变量是把小数据广播出去分发给(缓存到)各个节点
允许开发者将一个只读变量(Driver 端)缓存到每个节点(Executor)上, 而不是每个任务传递一个副本。注意,不能对 RDD 进行广播。
val conf= new SparkConf()
.setAppName(this.getClass.getName)
.setMaster("local[1]")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.parallelize(1 to 10)
val broadcast: Broadcast[Array[Int]] = sc.broadcast(Array(1,2,3,4,5))
rdd.map(x=>x*broadcast.value(1)).foreach(x=>print(x+"\t"))
println()
rdd.map(x=>x*broadcast.value(2)).foreach(x=>print(x+"\t"))
println()
rdd.map(x=>x*broadcast.value(3)).foreach(x=>print(x+"\t"))
println()
三、RDD分区设计
Spark 官方推荐,task 数量应该设置为 Spark 作业总CPU core 数量的 2~3 倍
分区太少,不利于并发,即不会使用集群中所有可用的 CPU 核心,更容易受数据倾斜影响,groupBy, reduceByKey, sortByKey 等内存压力增大
分区过多,Shuffle 开销越大,创建任务开销越大。
每个分区大约128MB
如果分区小于但接近2000,则设置为大于2000
并行度设置
val conf = new SparkConf() .set("spark.default.parallelism", "500")
四、优化序列化性能
Java序列化机制比较重,有很多内置信息, 效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。
Kryo 序列化机制比 Java 序列化机制性能提高 10 倍左右,Spark 之所以没有默认使用 Kryo 作为序列化类库,是因为它不支持所有对象的序列化,同时 Kryo 需要用户在使用前注册所需要序列化的类型,不够方便,但从 Spark 2.0.0 版本开始,简单类型、简单类型数组、字符 串类型的 Shuffling RDDs 已经默认使用Kryo 序列化方式了。
//创建 SparkConf 对象
val conf = new SparkConf().setMaster(…).setAppName(…)
//使用 Kryo 序列化库,如果要使用 Java 序列化库,需要把该行屏蔽掉
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//在 Kryo 序列化库中注册自定义的类集合,如果要使用 Java 序列化库,需要把该行屏蔽掉
conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator");
五、简化结构
不要用Java的结构,因为Java里有个对象头(Mark Word 指向类的指针,数组长度(数组对象才有),记录类信息,每个对象强关联类型,因此会很重。用scala原生的结构
六、数据倾斜
会触发 shuffle 操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、 join、cogroup、repartition 等。出现数据倾斜时,可能就是代码中使用了这些算子中的某一个所导致的。
在 Spark Web UI 上深入看一下当前这个 stage 各个 task 分配的数据量,从而 进一步确定是不是 task 分配的数据不均匀导致了数据倾斜。
七、代码上的优化
RDD重用:避免创建同一个RDD;尽可能的复用同一个RDD
RDD尽可能早的 filter 操作:获取到初始 RDD 后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用, 从而提升 Spark 作业的运行效率
调节本地化等待时长:默认3秒,3秒没有等到资源就会传到其他节点
val conf = new SparkConf() .set("spark.locality.wait", "6")
标签:val,rdd,RDD,conf,println,序列化,优化 来源: https://blog.csdn.net/qq_29310159/article/details/112434061
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。