ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RDD 持久化

2022-06-08 19:02:25  阅读:105  来源: 互联网

标签:缓存 false val RDD 检查点 持久 true


1、RDD缓存

RDD通过Cache或者Persist方法将前面的计算结果缓存,默认情况下会把数据以序列化的形式缓存在JVM的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

代码实现

/**
 * RDD 缓存
 */
object Spark03_Cache {
  def main(args: Array[String]): Unit = {
    //获取 SparkConf 并设置应用名称*本地模式
    val conf: SparkConf = new SparkConf().setAppName("Spark").setMaster("local[*]")
    //获取 Spark 上下文对象
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[String] = sc.makeRDD(List("hello scala", "hello spark"))
    val flatMapRDD: RDD[String] = rdd.flatMap(_.split(" "))
    val mapRDD: RDD[(String, Long)] = flatMapRDD.map {
      word => {
        println("----------------------")
        (word, 1)
      }
    }
    println(mapRDD.toDebugString)
    mapRDD.cache()
    mapRDD.persist()
    mapRDD.collect()
    println(")0000000000000000000000000000000")

    println(mapRDD.toDebugString)
    mapRDD.collect()
    Thread.sleep(999999999)
    //释放 Spark 上下文对象
    sc.stop
  }
}
cache()底层调用了 persist(),并设置缓存级别MEMORY_ONLY,也可以直接调用persist()方法进行缓存,视情况选择合适的缓存级别
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()

/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/**
 * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
 * new storage levels.
 */
object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

注意:默认的存储级别都是仅在内存存储一份。在存储级别的末尾加上“_2”表示持久化的数据存为两份。

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。Spark会自动对一些Shuffle操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点Shuffle失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用persist或cache。

def main(args: Array[String]): Unit = {
        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3. 创建一个RDD,读取指定位置文件:
        val lineRdd: RDD[String] = sc.textFile("input1")

        //3.1.业务逻辑
        val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))

        val wordToOneRdd: RDD[(String, Int)] = wordRdd.map {
            word => {
                println("************")
                (word, 1)
            }
        }
       
        // 采用reduceByKey,自带缓存
        val wordByKeyRDD: RDD[(String, Int)] = wordToOneRdd.reduceByKey(_+_)

        //3.5 cache操作会增加血缘关系,不改变原有的血缘关系
        println(wordByKeyRDD.toDebugString)

        //3.4 数据缓存。
        //wordByKeyRDD.cache()

        //3.2 触发执行逻辑
        wordByKeyRDD.collect()

        println("-----------------")
        println(wordByKeyRDD.toDebugString)

        //3.3 再次触发执行逻辑
        wordByKeyRDD.collect()

        //4.关闭连接
        sc.stop()
    }
访问http://localhost:4040/jobs/页面,查看第一个和第二个job的DAG图。说明:增加缓存后血缘依赖关系仍然有,但是,第二个job取的数据是从缓存中取的

  

说明:RDD缓存是Spark调优的一个重要环节,合理的缓存可以节省计算资源提升Spark执行效率,当然需要视情况而定,在内存紧张的情况下要对缓存级别做适当的调账。

2、CheckPoint

检查点:是通过将RDD中间结果写入磁盘。 为什么要做检查点?由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。 检查点存储路径:Checkpoint的数据通常是存储在HDFS等容错、高可用的文件系统 检查点数据存储格式为:二进制的文件 检查点切断血缘:在Checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。 检查点触发时间:对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。但是检查点为了数据安全,会从血缘关系的最开始执行一遍

设置检查点步骤

(1)设置检查点数据存储路径:sc.setCheckpointDir("./checkpoint1") (2)调用检查点方法:wordToOneRdd.checkpoint()

def main(args: Array[String]): Unit = {
  //获取 SparkConf 并设置应用名称*本地模式
  val conf: SparkConf = new SparkConf().setAppName("Spark").setMaster("local[*]")
  //获取 Spark 上下文对象
  val sc: SparkContext = new SparkContext(conf)
  sc.setCheckpointDir("E:\\workspace_idea\\spark0520\\cp")
  val rdd: RDD[String] = sc.makeRDD(List("hello scala", "hello spark"))
  val flatMapRDD: RDD[String] = rdd.flatMap(_.split(" "))
  val mapRDD: RDD[(String, Long)] = flatMapRDD.map {
    word => {
      (word, System.currentTimeMillis())
    }
  }
  mapRDD.cache()
  //设置检查点
  mapRDD.checkpoint( )

  mapRDD.foreach(println)
  println(mapRDD.dependencies)
  //设置缓存

  mapRDD.collect()
  println(mapRDD.toDebugString)
  mapRDD.collect()
  mapRDD.foreach(println)


  Thread.sleep(999999999)
  //释放 Spark 上下文对象
  sc.stop
}
生产环境一般将检查点设置在hdfs 上,所以目录可以写hdfs 存储路径
sc.setCheckpointDir("hdfs://hadoop103:9000/checkpoint")
执行结果:访问http://localhost:4040/jobs 查看4个job的DAG图。其中第2个图是checkpoint的job运行DAG图。第3、4张图说明,检查点切断了血缘依赖关系。

 

3、缓存&检查点区别

  1. Cache缓存只是将数据保存起来,不切断血缘依赖。Checkpoint检查点切断血缘依赖。
  2. Cache缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint的数据通常存储在HDFS等容错、高可用的文件系统,可靠性高。
  3. 建议对checkpoint()的RDD使用Cache缓存,这样checkpoint的job只需从Cache缓存中读取数据即可,否则需要再从头计算一次RDD。
  4. 如果使用完了缓存,可以通过unpersist()方法释放缓存

标签:缓存,false,val,RDD,检查点,持久,true
来源: https://www.cnblogs.com/wdh01/p/16325547.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有