ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark优化

2021-09-01 12:31:46  阅读:169  来源: 互联网

标签:shuffle 堆外 cache 内存 spark 优化 Spark


1.从多个kafka topic中接收数据,可以用多个Reciver接收,然后合并在一起进行处理

 

 2.receiver’s block interval(接收器的块间隔),这由configuration parameter(配置参数) 的 spark.streaming.blockInterval 决定。对于大多数 receivers(接收器),接收到的数据 coalesced(合并)在一起存储在 Spark 内存之前的 blocks of data(数据块).

每个 receiver(接收器)每 batch(批次)的任务数量将是大约(batch interval(批间隔)/ block interval(块间隔))

例如,200 ms的 block interval(块间隔)每 2 秒 batches(批次)创建 10 个 tasks(任务)
如果你集群里有40个cpu核,那么10个任务只能用10个核,还有30个核空闲着,所以,要增大批次间隔或调小block interval
这时,我把block interval调整到100ms,再把批次间隔调整到4秒,这样正好等于40核,但是要有个富余量,可以把批次间隔调整为3.5秒

推荐的 block interval(块间隔)最小值约为 50ms,低于此任务启动开销可能是一个问题。

3.cpu conf

spark.default.parallelism  建议设置为集群中cpu总核数

4.使用kryo替换spark的默认序列化器

//spark需要序列化的地方
A、receiver接收器
B、persist持久化
C、reduceBy。。。。时 (有shuffle时)
D、广播变量时

conf
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //设置kryo为序列化器
.registerKryoClasses(Array(classOf[String],classOf[Array[(Long, Long, String)]],classOf[Student])) //设置要序列化的类

5.数据批次Job执行的时间不要大于batchsize的时间

6.调节堆内存 transmission和persist内存使用占比

spark中,堆内存又被划分成了两块儿,一块儿是专门用来给RDD的cache、persist操作进行RDD数据缓存用的;另外一块儿,就是我们刚才所说的,用来给spark算子函数的运行使用的,存放函数中自己创建的对象。默认情况下,给RDD cache操作的内存占比是0.6,即60%的内存都给了cache操作了。但是问题是,如果某些情况下cache占用的内存并不需要占用那么大,这个时候可以将其内存占比适当降低。怎么判断在什么时候调整RDD cache的内存占用比呢?其实通过Spark监控平台就可以看到Spark作业的运行情况了,如果发现task频繁的gc,就可以去调整cache的内存占用比了。通过SparkConf.set("spark.storage.memoryFraction","0.6")来设定。

//如果你程序中不用cache或者cache的数据很小,就可以减小这个占比
SparkConf.set("spark.storage.memoryFraction","0.6")

7.调节堆内存 shuffle和persist的内存使用占比

spark.shuffle.memoryFraction
参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

8.

spark内存分为

可用内存等于executor内存-System Reserved内存(300M)

Storeage -- 可用内存的 * 60% * 50% 用于cache
Execution --可用内存的 * 60% * 50% 用于task的运行
Ohter --可用内存 * 40% 用于存储spark内部元数据和用户定义的数据结构
System Reserved 300M 与other的作用相同

Unified Memory统一内存 = 可用内存 * 60%
Storeage和Execution内存默认平分Unified Memory统一内存
Storeage和Execution内存可以动态占用,但是Storeage内存是不可驱逐内存,一旦存储就不能被占用

Unified Memory的大小可以通过spark.memory.fraction参数来调节,默认0.6

SparkUI中显示的Storeage内存表示的是Unified Memory统一内存

9.调节堆外内存

重要:堆外内存只提供给Storeage和Execution内存区使用,没用other区和System Reserved区

为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。利用 JDK Unsafe API(从 Spark 2.0 开始,在管理堆外的存储内存时不再基于 Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现[3]),Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。

在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。

10.连接等待时长的调整

由于JVM内存过小,导致频繁的Minor gc,有时候更会触犯full gc,一旦出发full gc;此时所有程序暂停,导致无法建立网络连接;spark默认的网络连接的超时时长是60s;如果卡住60s都无法建立连接的话,那么就宣告失败了。

bin/spark-submit \
--conf spark.network.timeout=300s \

标签:shuffle,堆外,cache,内存,spark,优化,Spark
来源: https://www.cnblogs.com/canlovegolove/p/15214119.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有