ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

大数据:Spark实战经验总结(python版)

2022-02-28 23:02:01  阅读:180  来源: 互联网

标签:缓存 持久 rdd python RDD 实战经验 内存 persist Spark


人工智能

大数据,Spark,Hadoop,python,pyspark

大数据:Spark实战经验总结

1. RDD持久化

说RDD持久化之前,先来了解一下惰性机制。

1)RDD的惰性机制:

RDD在设计时采用了惰性机制的特性,指的是转换RDD的过程先记录而不发生真正的计算,只有遇到行动操作时,才会触发“从头到尾”的真正的计算。举例说明:
假设/mnt/下又一个文件word.txt,内容如下:

Hadoop is good
Spark is fast
Spark is better

代码:

from pyspark import SparkConf, SparkContext


conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf=conf)

lines = sc.textFile("file:///mnt/word.txt")   # 记录,并不执行
lineLengths = lines.map(lambda s:len(s))     # 记录,并不执行
totalLength = lineLengths.reduce(lambda a, b: a + b)  # 开始执行!

为了看着更清晰,代码不妨写成:

# 记录,并不执行。
rdd1 = sc.textFile("file:///mnt/word.txt")   # 是一个RDD对象。
# 记录,并不执行。
rdd2 = rdd1.map(lambda s:len(s))     # 是一个RDD对象。
# 开始执行! 
totalLength = rdd2.reduce(lambda a, b: a+b)  # 是个数字。


"""
打印验证
"""
print(rdd1)  # file:///mnt/word.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0
print(rdd2)   # PythonRDD[5] at RDD at PythonRDD.scala:53
print(totalLength)   # 42

# 调用RDD自带的函数,来取出rdd1和rdd2对象中的值
rdd1.foreach(print)  # Spark is better  Hadoop is good  Spark is fast
rdd2.foreach(print)  # 14 13 15

上面代码中,
(i)第1行语句中的textFile()是一个转换操作(函数返回一个RDD对象),系统只会记录这次转换,并不会真正读取word.txt文件的数据到内存中;
(ii)第2行语句的map()也是一个转换操作(函数返回一个RDD对象),系统只是记录这次转换,不会真正执行map()方法;
(iii)而第3行语句的reduce()是一个“行动”类型的操作(函数返回一个整型数字),这时系统会生成一个作业,触发真正的计算。也就是说,这时才会加载word.txt的数据到内存,生成RDD。

2)RDD持久化 — (解决惰性机制的效率问题):

(1)效率低的背景:

在Spark中,RDD采用惰性求值的机制。导致每次遇到“行动”操作,都会从头开始执行计算(即每次调用行动操作,都会触发一次从头开始的计算),这对于迭代计算而言,代价是很大的,影响效率(因为迭代计算经常需要多次重复使用同一组数据)。下面是多次计算同一个RDD的例子:

li = ["Hadoop", "Spark", "Hive"]
rdd = sc.parallelize(li)  # 记录操作。生成一个RDD
print(rdd.count())   # 行动操作,触发一次真正的从头到尾的计算。运行结果:3
print(','.join(rdd.collect()))  # 行动操作,再触发一次真正的从头到尾的计算。运行结果:'Hadoop', 'Spark', 'Hive'
# 注:rrd.collect()是以数组形式返回数据集中的所有元素。结果:['Hadoop', 'Spark', 'Hive']

(2)增加持久化(缓存):

为了避免这种重复计算的开销,可以使用RDD的持久化(缓存),方法是使用persist()函数将一个RDD标记为持久化。注意:之所以“标记为持久化”,是因为出现persist()语句的地方并不会马上计算生成RDD并把它持久化,而是要对等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化。持久化后的RDD将会被保留在计算节点的内存中,被后面的行动操作重复时候。
persist()使用的时候有两种参数供选择:

  • persist(MEMORY_ONLY):仅内存,超出内存则覆盖(LRU原则)。表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容。 ---- 默认这种。
  • persist(MEMORY_AND_DISK):内存+磁盘,超出内存则存硬盘。表示将RDD作为反序列化的 对象存储于JVM中,如果内存不足,超出的分区将会被存储在硬盘上。
    这两种,默认参数是persist(MEMORY_ONLY):仅内存,超出内存则覆盖(LRU原则),因为效率第一,另一种超出就放在硬盘上不但会影响效率,还会造成资源浪费(尤其数据量巨大的时候)。

上面的例子,增加持久化缓存语句:

from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel


# 创建SparkConf对象,并给对象赋值
conf = SparkConf().setMaster("local").setAppName("My app")
# 创建SparkContext对象,不妨命名为sc
sc = SparkContext(conf=conf)
"""
spark创建的sc,其功能之一是调用自带的parallelize()函数来加载自定义的变量来创建RDD,如下面的 sc.parallelize:
(sc还有如加载文件textFile()等其他很多函数和功能)
"""

li = ["hadoop", "spark", "hive"]
rdd = sc.parallelize(li)

# 以仅内存方式标记RDD。将名为rdd的这个RDD对象标记持久化缓存
rdd.persist() # 默认MEMORY_ONLY。--仅内存,超出内存则覆盖(LRU原则)方式。
# 等价 rdd.persist(storageLevel=StorageLevel.MEMORY_ONLY)
# rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK) # --内存+磁盘,超出内存则存硬盘方式。

# 第一次行动操作,触发一次真正的从头到尾的计算,这时上面的rdd.persist()才会执行,把这个rdd放到缓存中。
print(rdd.count())

# 第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
print(','.join(rdd.collect()))

# 解除标记。释放名为rdd的RDD对象在内存中的缓存空间
rdd.unpersist()

:持久化RDD会占用内存空间,当不再需要一个RDD时,就可以使用unpersist()函数手动地把持久化的RDD从缓存中移除,释放内存空间。
注意,上面标记为仅内存执行rdd.persist() 或 rdd.persist(storageLevel=StorageLevel.MEMORY_ONLY) 后,要想重新标记为内存+磁盘执行 rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK) ,需要先执行rdd.unpersist()释放标记!!!否则报错!

(3)实际开发中,持久化(缓存)写法:

实际开发中,我们使用cache()方法就会自动调用persist(MEMORY_ONLY),我们一般用rdd.cache()或rdd.persist()即可,不用再导包from pyspark.storagelevel import StorageLevel来传参,通过查看cache()和persist()源码,可以看到这两个方法会自动导入包。
重点!!RDD持久化 实际开发代码,一般写法如下:

from pyspark import SparkConf, SparkContext


conf = SparkConf().setMaster("local").setAppName("My app")
sc = SparkContext(conf=conf)

li = ["hadoop", "spark", "hive"]
rdd = sc.parallelize(li)

# 以仅内存方式标记RDD。将名为rdd的这个RDD对象标记持久化缓存
rdd.cache()   # 会调用persist(MEMORY_ONLY)
# 或 rdd.persist() # 默认MEMORY_ONLY。--仅内存,超出内存则覆盖(LRU原则)方式。

# 第一次行动操作,触发一次真正的从头到尾的计算,这时上面的rdd.persist()才会执行,把这个rdd放到缓存中。
print(rdd.count())

# 第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
print(','.join(rdd.collect()))

# 解除标记。释放名为rdd的RDD对象在内存中的缓存空间
rdd.unpersist()

附:cache()和persist()函数的源码。在Anaconda的site-packages/pyspark/rdd.py文件:
在这里插入图片描述在这里插入图片描述

标签:缓存,持久,rdd,python,RDD,实战经验,内存,persist,Spark
来源: https://blog.csdn.net/Acegem/article/details/123086945

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有