ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RDD数据读取与保存

2022-06-11 10:35:03  阅读:264  来源: 互联网

标签:SparkContext 读取 val 保存 RDD sc Spark SparkConf String


1、文件读取与保存

1.1、Text 文件

1)数据读取:textFile(String) 2)数据保存:saveAsTextFile(String)
def main(args: Array[String]): Unit = {
 
        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
 
        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)
 
        //3.1 读取输入文件
        val inputRDD: RDD[String] = sc.textFile("input/1.txt")
 
        //3.2 保存数据
        inputRDD.saveAsTextFile("output")
 
        //4.关闭连接
        sc.stop()
    }

说明:如果是集群路径:hdfs://hadoop103:9000/input/1.txt

1.2、Json 文件

Json 文件准备

{"name": "linghc","age":29}
{"name": "yilin","age":18}
{"name": "renyy","age":25}

读取保存逻辑

import com.alibaba.fastjson.JSON
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Spark05_json {
  def main(args: Array[String]): Unit = {
    //获取 SparkConf 并设置应用名称*本地模式
    val conf: SparkConf = new SparkConf().setAppName("Spark").setMaster("local[*]")
    //获取 Spark 上下文对象
    val sc: SparkContext = new SparkContext(conf)
    val rdd: RDD[String] = sc.textFile("E:\\workspace_idea\\spark0520\\input\\user.json")
    val mapRDD: RDD[AnyRef] = rdd.map {
      str => {
        JSON.parse(str)
      }
    }
    mapRDD.collect().foreach(println)
    //释放 Spark 上下文对象
    sc.stop
  }
}

如果Json文件格式如下:

[{"name": "linghc","age":29}
{"name": "yilin","age":18}
{"name": "renyy","age":25}]
再次执行程序,发现解析失败。原因是Spark 读取Json 文件和读取Text文件是一样的,按行读取文件。 注意:使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件。

1.3、Sequence文件

SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFile[keyClass, valueClass](path)。
def main(args: Array[String]): Unit = {
 
        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
 
        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)
 
        //3.1 创建rdd
        val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2),(3,4),(5,6)))
 
        //3.2 保存数据为SequenceFile
        dataRDD.saveAsSequenceFile("output")
 
        //3.3 读取SequenceFile文件
        sc.sequenceFile[Int,Int]("output").collect().foreach(println)
 
        //4.关闭连接
        sc.stop()
    }
说明:SequenceFile文件只针对PairRDD

1.4、Object对象文件

对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。

def main(args: Array[String]): Unit = {
 
        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
 
        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)
 
        //3.1 创建RDD
        val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4))
 
        //3.2 保存数据
        dataRDD.saveAsObjectFile("output")
 
        //3.3 读取数据
        sc.objectFile[(Int)]("output").collect().foreach(println)
 
        //4.关闭连接
        sc.stop()
    }

2、文件系统类数据读取与保存

2.1、HDFS

Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口。对于外部存储创建操作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口

2.2、Mysql

添加依赖

dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.6</version>
</dependency>

读数据

def main(args: Array[String]): Unit = {
  //获取 SparkConf 并设置应用名称*本地模式
  val conf: SparkConf = new SparkConf().setAppName("Spark").setMaster("local[*]")
  //获取 Spark 上下文对象
  val sc: SparkContext = new SparkContext(conf)

  // 通过 jdbRDD  交互Mysql
  /**
   * sc: SparkContext, 上下文
   * getConnection: () => Connection, 获取连接
   * sql: String, sql
   * lowerBound: Long,
   * upperBound: Long,
   * numPartitions: Int,分区数
   * mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _) 结果姐
   */
  //3.定义连接mysql的参数
  val driver = "com.mysql.jdbc.Driver"
  val url = "jdbc:mysql://hadoop103:3306/db01"
  val userName = "user"
  val passWd = "pwd"
  var sql: String = "select * from user where  id >= ? and id <= ?"

  val jdbcRDD: JdbcRDD[(Int, String)] = new JdbcRDD(sc,
    () => {
      Class.forName(driver)
      DriverManager.getConnection(url, userName, passWd)
    },
    sql,
    0,
    1,
    2,
    rs => {
      (rs.getInt(1), rs.getString(2))
    }
  )

  jdbcRDD.collect().foreach(println)

  //释放 Spark 上下文对象
  sc.stop
}

写数据

def main(args: Array[String]): Unit = {
  //获取 SparkConf 并设置应用名称*本地模式
  val conf: SparkConf = new SparkConf().setAppName("Spark").setMaster("local[*]")
  //获取 Spark 上下文对象
  val sc: SparkContext = new SparkContext(conf)
  val rdd: RDD[(Int, String)] = sc.makeRDD(List((2, "linghc"), (3, "yilin")))
  val driver = "com.mysql.jdbc.Driver"
  val url = "jdbc:mysql://hadoop103:3306/db01"
  val userName = "user"
  val passWd = "pwd"
  //以分区为单位处理
  rdd.foreachPartition(datas => {
    Class.forName(driver)
    //创建链接
    val connection: Connection = DriverManager.getConnection(url, userName, passWd)
    //创建操作对象
    val ps: PreparedStatement = connection.prepareStatement("insert into user (id,name) values (?,?)")
    datas.foreach {
      case (id, name) => {
        //注册驱动
        //占位符 赋值
        ps.setInt(1, id)
        ps.setString(2, name)
        //exe
        ps.executeUpdate()
      }
    } //释放资源
    ps.close()
    connection.close()
  })
  //释放 Spark 上下文对象
  sc.stop
}

 

标签:SparkContext,读取,val,保存,RDD,sc,Spark,SparkConf,String
来源: https://www.cnblogs.com/wdh01/p/16325553.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有