ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark(16)RDD的缓存机制、checkpoint机制

2020-08-24 03:31:18  阅读:297  来源: 互联网

标签:缓存 false val 16 checkpoint rdd 机制 true


RDD的缓存机制(★★★★★)

什么是rdd的缓存

spark可以把一个rdd的数据缓存起来,后续有其他的job需要用到该rdd的结果数据,可以直接从缓存中获取得到,避免了重复计算。缓存是加快后续对该数据的访问操作。

如何对rdd设置缓存

可以通过persist方法或cache方法将前面的RDD的数据缓存。但这两个方法被调用时不会立即执行缓存操作,而是触发后面的action时,才将RDD缓存在计算节点的内存中,并供后面重用。

persist方法和cache方法的源代码如下,可以看到cache方法内调用了persist方法,persist方法的参数的默认值是StorageLevel.MEMORY_ONLY。

/**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

  /**
   * Persist this RDD with the default storage level (`MEMORY_ONLY`).
   */
  def cache(): this.type = persist()

StorageLevel的部分源码带,StorageLevel是一个object,里面定义了不同的变量来表示不同的存储级别。

  1. NONE 不进行缓存
  2. DISK_ONLY 缓存到磁盘 DISK_ONLY_2 缓存到磁盘,2份
  3. MEMORY_ONLY 缓存到内存 MEMORY_ONLY_2 缓存到内存2份
  4. MEMORY_ONLY_SER 序列化后缓存到内存 MEMORY_ONLY_SER_2 序列化后缓存到内存2份
  5. MEMORY_AND_DISK 缓存到内存或磁盘 MEMORY_AND_DISK_2 缓存到内存或磁盘2份
  6. MEMORY_AND_DISK_SER 缓存到内存或磁盘且序列化 MEMORY_AND_DISK_SER_2 ...
  7. OFF_HEAP 缓存到堆外

注意:MEMORY_AND_DISK并不是把数据缓存一部分在内存中一部分在磁盘中,而是优先考虑内存,内存不够了才缓存到磁盘。

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

cache和persist的使用示例

打开spark shell

spark-shell --master spark://node01:7077 --executor-memory 1g --total-executor-cores 2

登录8080端口的spark页面,找到spark shell对应的Application,点击Spark shell

点击后,就进入了http://node01:4040/jobs/,然后切换到Storage

往spark shell一行行执行下列代码,注意刷新观察Storage界面的变化。

val rdd1=sc.textFile("/words.txt")
val rdd2=rdd1.flatMap(_.split(" "))
val rdd3=rdd2.cache
rdd3.collect

执行完rdd3.collect后,页面才发生了变化,如下图,图中显示存储在内存中的大小为440.0B,磁盘为0:

image-20200417094145627

继续执行下列代码:

val rdd4=rdd3.map((_,1))
val rdd5=rdd4.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
rdd5.collect

执行rdd5.collect后,页面再次发生变化,如下图:

image-20200417094636520

cache和persist的区别(面试题

简述下如何对RDD设置缓存,以及它们的区别是什么?

对RDD设置缓存成可以调用rdd的2个方法: 一个是cache,一个是persist,调用这2个方法都可以对rdd的数据设置缓存,但不是立即就触发缓存执行,后面需要有action,才会触发缓存的执行。

cache方法和persist方法区别:

  1. cache: 默认是把数据缓存在内存中,其本质就是调用persist方法;
  2. persist:可以把数据缓存在内存或者是磁盘,有丰富的缓存级别,这些缓存级别都被定义在StorageLevel这个object中。

什么时候需要设置缓存?

首先理解一个概念:transformation算子是延迟加载的,只有在触发action时才会被执行,job执行完之后,前面所有rdd的数据就都不存在了,如果没有action算子,各个rdd之间就只是一个转换

1、某个rdd的数据后期被使用了多次

1569037915592

如上图所示的计算逻辑:

当第一次使用rdd2做相应的算子操作得到rdd3的时候,就会从rdd1开始计算,先读取HDFS上的文件,然后对rdd1 做对应的算子操作得到rdd2,再由rdd2计算之后得到rdd3。同样为了计算得到rdd4,前面的逻辑会被重新计算。

默认情况下多次对同一个rdd执行算子操作, rdd都会对这个rdd及之前的父rdd全部重新计算一次。 这种情况在实际开发代码的时候会经常遇到,但是我们一定要避免一个rdd重复计算多次,否则会导致性能急剧降低。

因此,可以把多次使用到的rdd,也就是公共rdd进行持久化,避免后续需要,再次重新计算,提升效率。如下图,在设置了rdd2.cache或rdd2.persist后,得到rrd3时(假设rdd2-->rdd3是一个action),步骤还是HDFS-->rdd1-->rdd2-->rdd3,但是因为rdd3是rdd2经过action算子操作得到的,rrd2的数据得到缓存

那么生成rdd4的时候,步骤就简单了很多,直接从缓存中获取数据,计算得到rdd4。

image-20200417095850150

2、为了获取得到一个rdd的结果数据,经过了大量的算子操作或者是计算逻辑比较复杂,总之某个rdd的数据来之不易时,可以进行缓存:

val rdd2=rdd1.flatMap(函数).map(函数).reduceByKey(函数).xxx.xxx.xxx.xxx.xxx

清除缓存数据

自动清除

一个application应用程序结束之后,对应的缓存数据也就自动清除

手动清除

调用rdd的unpersist方法

RDD的checkpoint机制(★★★★★)

checkpoint概念

我们可以对rdd的数据进行缓存,保存在内存或者是磁盘中。后续就可以直接从内存或者磁盘中获取得到,但是它们不是特别安全。

cache

它是直接把数据保存在内存中,后续操作起来速度比较快,直接从内存中获取得到。但这种方式很不安全,由于服务器挂掉或者是进程终止,会导致数据的丢失

persist

它可以把数据保存在本地磁盘中,后续可以从磁盘中获取得到该数据,但它也不是特别安全,由于系统管理员一些误操作删除了,或者是磁盘损坏,也有可能导致数据的丢失

checkpoint(检查点)

它是提供了一种相对而言更加可靠的数据持久化方式。它是把数据保存在分布式文件系统,比如HDFS上。这里就是利用了HDFS高可用性,高容错性(多副本)来最大程度保证数据的安全性。

如何设置checkpoint

1、在hdfs上设置一个checkpoint目录

sc.setCheckpointDir("hdfs://node01:8020/checkpoint") 

2、对需要做checkpoint操作的rdd调用checkpoint方法

val rdd1=sc.textFile("/words.txt")
rdd1.checkpoint
val rdd2=rdd1.flatMap(_.split(" ")) 

3、最后需要有一个action操作去触发任务的运行

rdd2.collect

查看缓存中hdfs中的数据:

[hadoop@node01 ~]$ hdfs dfs -ls /checkpoint/e237e2bb-dc0e-47d9-851f-26687b0d7dbe/rdd-5
Found 2 items
-rw-r--r--   3 hadoop supergroup         53 2020-04-17 10:20 /checkpoint/e237e2bb-dc0e-47d9-851f-26687b0d7dbe/rdd-5/part-00000
-rw-r--r--   3 hadoop supergroup          4 2020-04-17 10:20 /checkpoint/e237e2bb-dc0e-47d9-851f-26687b0d7dbe/rdd-5/part-00001

cache、persist、checkpoint三者区别

cache和persist

  • cache默认数据缓存在内存中
  • persist可以把数据保存在内存或者磁盘中
  • 后续要触发 cache 和 persist 持久化操作,需要有一个action操作
  • 它不会开启其他新的任务,一个action操作就对应一个job
  • 它不会改变rdd的依赖关系,程序运行完成后对应的缓存数据就自动消失

checkpoint

  • 可以把数据持久化写入到hdfs上

  • 后续要触发checkpoint持久化操作,需要有一个action操作,后续会开启新的job执行checkpoint操作

  • 它会改变rdd的依赖关系,后续数据丢失了不能够在通过血统进行数据的恢复。

  • 程序运行完成后对应的checkpoint数据就不会消失

cache或persisit与checkpoint的结合使用:

   sc.setCheckpointDir("/checkpoint")
   val rdd1=sc.textFile("/words.txt")
   val rdd2=rdd1.cache
   rdd2.checkpoint
   val rdd3=rdd2.flatMap(_.split(" "))
   rdd3.collect
   
//对checkpoint在使用的时候进行优化,在调用checkpoint操作之前,可以先来做一个cache操作,缓存对应rdd的结果数据,后续就可以直接从cache中获取到rdd的数据写入到指定checkpoint目录中   

标签:缓存,false,val,16,checkpoint,rdd,机制,true
来源: https://www.cnblogs.com/jimmy888/p/13551718.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有