ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

极客时间Spark性能调优实战-学习笔记(1)

2021-11-12 10:58:57  阅读:871  来源: 互联网

标签:极客 join 分区 调优 参数 内存 Memory spark Spark


通用性能调优(一)

一、应用开发三原则

原则一:使用spark自身的调优机制
充分利用 Spark 为我们提供的“性能红利”,如钨丝计划、AQE、SQL functions 等等。

钨丝计划的优势?
1)数据结构:采用紧凑的自定义二进制格式,存储效率高,避免的序列化反序列化。
2)开辟堆外内存来管理对象,对内存的估算更加精确,且避免了反复执行垃圾回收。
3)全阶段代码生成取代火山迭代模型。

AQE的优势?
AQE 可以让 Spark 在运行时的不同阶段,结合实时的运行时状态,周期性地动态调整前面的逻辑计划,然后根据再优化的逻辑计划,重新选定最优的物理计划,从而调整运行时后续阶段的执行方式。

1)自动分区合并
以filter和coalesce为例,当filter后,难免有些分区数据很少,需要我们手动通过coalesce合并。AQE会自动检测过小的分区,自动完成合并。
2)自动处理数据倾斜
自动加盐,完成数据倾斜的处理。
3)join策略自动调整
spark 3.0后可支持动态切换join方式,有一种情况是,其中一个表在排序前需要对数据进行过滤,过滤后的表小到足可以由广播变量容纳。针对这种情况,AQE 会根据运行时的统计数据,去动态地调整 Join 策略,把之前敲定的 Sort Merge Join 改为 Broadcast Join,从而改善应用的执行性能。

如何使用
钨丝计划:很简单,采用 DataFrame 或是 Dataset API 进行开发就可了。
AQE: 设置参数spark.sql.adaptive.enabled设置为true

原则二:能拖则拖
这个好理解,将shuffle操作尽量放在后面处理,因为一般来讲,程序越往后数据量是越少的(filter过滤),shuffle越靠后,自然要shuffle的数据量越少,效率越高。

原则三:跳出单机思维模式
我们需要时刻提醒自己RDD是一个分布式的数据集,我们操作的不是一个本地文件那么简单,一行简单的代码,哪怕你写的位置不一样,可能就会多执行几千万次,对性能是一个不小的开销。

二、硬件参数调优

1、CPU参数

主要包括spark.cores.max、spark.executor.cores 和 spark.task.cpus 这三个参数。分别对应整个集群资源可用的CPU,executor可用的cpu,task可用的cpu数。
CPU数代表最大可用的cpu个数,我们必须设置与此匹配的任务并行度来匹配。一般使用 spark.default.parallelism参数来控制。

2、内存参数
spark在管理模式上分为堆内内存和堆外内存,对外内存分Execution Memory 和 Storage Memory两部分。先把spark.memory.offHeap.enabled 置为 true,然后通过spark.memory.offHeap.size指定对外内存的大小。
堆内内存分了四个区域,也就是 Reserved Memory(预留空间300M)、User Memory(存储用户在driver端自定义的数据结构)、Execution Memory(执行分布式任务的内存) 和 Storage Memory(用于缓存等)(Execution Memory+Storage Memory=spark可用内存)。
在这里插入图片描述堆外和堆内内存如何权衡?
对于需要处理的数据集,如果数据模式比较扁平,而且字段多是定长数据类型,就更多地使用堆外内存。相反地,如果数据模式很复杂,嵌套结构或变长字段很多,就更多采用 JVM 堆内内存会更加稳妥,这是由于堆外内存采用紧凑的二进制存储格式决定的,若处理的数据集格式复杂,则会有局限性。

User Memory 与 Spark 可用内存如何分配?
spark.memory.fraction这个参数指定了spark中可用的内存比例,那么1-spark.memory.fraction则对应user memory的比例。如果代码中自定义的数据较多,那么可以给user memory多分配点内存,否则则应该将更多的内存给到spark可支配内存。

Execution Memory 该如何与 Storage Memory 平衡?
spark可用内存=Execution Memory+Storage Memory。那么这两部分的内存空间应该如何分配?
通常来说,在统一内存管理模式下,spark.memory.storageFraction 的设置就显得没那么紧要,因为无论这个参数设置多大,执行任务还是有机会抢占缓存内存,而且一旦完成抢占,就必须要等到任务执行结束才会释放。除非你的任务是类似机器学习之类的缓存密集型任务。

3、磁盘参数
spark.local.dir该参数指定的路径用于存放rdd cache和shuffle中间文件。有条件的可以用固态或者内存文件系统来提升IO效率。

三、shuffle类配置项
在这里插入图片描述
通过以上两个参数可以调整map端和reduce端的缓冲区大小。

还有一个参数是spark.shuffle.sort.bypassMergeThreshold 。当 Reduce 端的分区数小于这个设置值的时候,我们就能避免 Shuffle 在计算过程引入排序

四、sparkSQL大类配置项
AQE的三个特性:分区自动合并、数据倾斜自动处理、join策略动态调整。首先我们要通过sqrk.sql,adaptive.enabled来开启AQE。

1、分区自动合并
在这里插入图片描述
第一个参数是默认开启的,第二个参数和第三个参数共同影响了分区合并的效果。怎么影响的呢?假设shuffle后数据的大小时20G。假设第三个参数我们设置的200,代表最小分区数,如果咱们的分区数太少也会影响我们的并发效率。那么20G/200=100MB。也就是每个分区的大小大概是100MB.然后我们将100MB和第二个参数设置的数值取一个最小值就是每个分区的目标尺寸。

2、数据倾斜自动处理
在这里插入图片描述同样第一个参数默认开启。第三个参数是阈值,也就是说分区大小必须大于这个参数才有可能被判定为倾斜分区。但这还不够,还需要结合第二个参数来看。第二个参数什么意思?他是一个比例系数。举个例子,将所有分区大小进行排序,排序后取中位数,分区大小还必须大于中位数乘以这个比例系数才能判断为倾斜分区。也就是必须满足两个条件。第三个参数代表拆分力度。检测到的倾斜分区按照每个分区该参数设置的大小进行拆分。

3、join策略调整。我们指导join策略常用的有广播join,hash join mergejoin等。不同的策略开销也不一样。我们肯定需要选择一种合适的策略来提升join的性能。以前我们可以通过spark.sql.autoBroadcastJoinThreshold参数来设置广播join的阈值,如果有一张表的大小小于该值,便会使用广播join策略。
这种模式是定死的,无法动态平衡。而AQE 会在数据表完成过滤操作之后动态计算剩余数据量,当数据量满足广播条件时,AQE 会重新调整逻辑执行计划,在新的逻辑计划中把 Shuffle Joins 降级为 Broadcast Join。
在这里插入图片描述不过非空分区比例要小于这个参数才会触发自动join策略调整

标签:极客,join,分区,调优,参数,内存,Memory,spark,Spark
来源: https://blog.csdn.net/weixin_43757562/article/details/121201036

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有