ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

SparkCore之数据的读取与保存

2021-05-18 09:31:40  阅读:302  来源: 互联网

标签:SparkContext ps 读取 val SparkCore 保存 sc SparkConf String


  • Spark的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统。

  • 文件格式分为:Text文件、Json文件、Csv文件、Sequence文件以及Object文件

  • 文件系统分为:本地文件系统、HDFS以及数据库

一、文件类数据读取与保存

1.1 Text文件

  • 数据读取:textFile(String)
  • 数据保存:saveAsTextFile(String)
  • 代码实现
object Operate_Text {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.1 读取输入文件
        val inputRDD: RDD[String] = sc.textFile("input/1.txt")

        //3.2 保存数据
        inputRDD.saveAsTextFile("output")

        //4.关闭连接
        sc.stop()
    }
}
  • 注意:如果是集群路径:hdfs://master:9000/input/1.txt

1.2 Json文件

  • 如果JSON文件中每一行就是一个JSON记录,那么可以通过将JSON文件当做文本文件来读取,然后利用相关的JSON库对每一条数据进行JSON解析。
  • 数据准备:在input目录下创建1.txt文件,里面存储如下内容
{"username": "zhangsan","age": 20}
{"username": "lisi","age": 18}
{"username": "wangwu","age": 16}
  • 代码实现
object Operate_Json {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.1 读取Json输入文件
        val jsonRDD: RDD[String] = sc.textFile("input/user.json")

        //3.2 导入解析Json所需的包并解析Json
        import scala.util.parsing.json.JSON
        val resultRDD: RDD[Option[Any]] = jsonRDD.map(JSON.parseFull)

        //3.3 打印结果
        resultRDD.collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}
  • 注意:使用RDD读取JSON文件处理很复杂,同时SparkSQL集成了很好的处理JSON文件的方式,所以应用中多是采用SparkSQL处理JSON文件。

1.3 Sequence文件

  • SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File)。在SparkContext中,可以调用sequenceFile[keyClass, valueClass](path)。
  • 代码实现
object Operate_Sequence {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.1 创建rdd
        val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2),(3,4),(5,6)))

        //3.2 保存数据为SequenceFile
        dataRDD.saveAsSequenceFile("output")

        //3.3 读取SequenceFile文件
        sc.sequenceFile[Int,Int]("output").collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}
  • 注意:SequenceFile文件只针对PairRDD

1.4 Object对象文件

  • 对象文件是将对象序列化后保存的文件,采用Java的序列化机制。可以通过objectFile[k,v](path)函数接收一个路径,读取对象文件,返回对应的RDD,也可以通过调用saveAsObjectFile()实现对对象文件的输出。因为是序列化所以要指定类型。
  • 代码实现
object Operate_Object {

    def main(args: Array[String]): Unit = {

        //1.创建SparkConf并设置App名称
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")

        //2.创建SparkContext,该对象是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)

        //3.1 创建RDD
        val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4))

        //3.2 保存数据
        dataRDD.saveAsObjectFile("output")

        //3.3 读取数据
        sc.objectFile[(Int)]("output").collect().foreach(println)

        //4.关闭连接
        sc.stop()
    }
}

二、文件系统类数据读取与保存

2.1 HDFS

  • Spark的整个生态系统与Hadoop是完全兼容的,所以对于Hadoop所支持的文件类型或者数据库类型,Spark也同样支持。另外,由于Hadoop的API有新旧两个版本,所以Spark为了能够兼容Hadoop所有的版本,也提供了两套创建操作接口。对于外部存储创建操作而言,hadoopRDD和newHadoopRDD是最为抽象的两个函数接口

2.2 MySQL

  • 支持通过Java JDBC访问关系型数据库。需要通过JdbcRDD进行,示例如下:
  • 添加Maven依赖
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.27</version>
</dependency>
  • 从MySQL中读取数据
package com.spark.day06

import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.DriverManager


/*
* 从MySQL数据库中读取数据
*
*   sc: SparkContext,   Spark程序执行的入口,上下文对象
    getConnection: () => Connection,  获取数据库连接
    sql: String,  执行SQL语句
    lowerBound: Long, 查询的起始位置
    upperBound: Long, 查询的终止位置
    numPartitions: Int, 分区数
    mapRow: (ResultSet) => T  对结果集进行处理
    *
    * jdbc连接数据库
    *   注册驱动
    *   获取连接
    *   创建数据库操作对象PrepareStatement
    *   执行SQL
    *   处理结果集
    *   关闭连接
    *
* */

object Spark04_MySQL_read {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Test").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 创建RDD
    // 数据库连接四要素:
    var driver = "com.mysql.jdbc.Driver"
    var url = "jdbc:mysql://172.23.4.221:3306/test"
    var username = "root"
    var password = "123456"
    var sql = "select * from wyk_csdn where id >= ? and id <= ?"
    val resRDD = new JdbcRDD(
      sc,
      () => {
        // 注册驱动
        Class.forName(driver)

        // 获取连接
        DriverManager.getConnection(url, username, password)
      },
      sql,
      1,
      20,
      2,
      rs => (rs.getInt(1), rs.getString(2), rs.getString(3))
    )

    resRDD.collect().foreach(println)

    sc.stop()
  }
}
  • 往MySQL中写入数据
package com.spark.day06

import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{Connection, DriverManager, PreparedStatement}


/*
* 从MySQL数据库中写入数据
*
    *
    * jdbc连接数据库
    *   注册驱动
    *   获取连接
    *   创建数据库操作对象PrepareStatement
    *   执行SQL
    *   处理结果集
    *   关闭连接
    *
* */

object Spark05_MySQL_write {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("Test").setMaster("local[*]")
    val sc = new SparkContext(conf)

    // 创建RDD
    // 数据库连接四要素:
    var driver = "com.mysql.jdbc.Driver"
    var url = "jdbc:mysql://172.23.4.221:3306/test"
    var username = "root"
    var password = "123456"

    val rdd: RDD[(Int, String, String)] = sc.makeRDD(List((1, "banzhang", "2021-05-20 10:18:35")))

///
//  下面这段代码,需要让ps实现序列化,但是ps不是我们自己定义的类型,没有办法实现
//    //  注册驱动
//    Class.forName(driver)
//
//    // 获取连接
//    val conn: Connection = DriverManager.getConnection(url, username, password)
//
//    // 声明数据库操作的SQL语句
//    var sql:String = "insert into wyk_csdn(id, name, ins_ts) values(?, ?, ?)"
//
//    // 创建数据库操作对象
//    val ps: PreparedStatement = conn.prepareStatement(sql)
//
//
//    // 在循环体中创建连接对象,每次遍历出RDD中的一个元素,都要创建一个连接对象,效率低,不推荐使用
//    rdd.foreach{
//      case (id, name, ins_ts) => {
//
//
//        // 给参数赋值
//        ps.setInt(1, id)
//        ps.setString(2, name)
//        ps.setString(3,ins_ts)
//
//        // 执行SQL语句
//        ps.executeUpdate()
//      }
//    }
//    // 关闭连接
//    ps.close()
//    conn.close()
///

    rdd.foreachPartition{
      // datas是rdd的一个分区的数据
      datas => {
        //  注册驱动
        Class.forName(driver)

        // 获取连接
        val conn: Connection = DriverManager.getConnection(url, username, password)

        // 声明数据库操作的SQL语句
        var sql:String = "insert into wyk_csdn(id, name, ins_ts) values(?, ?, ?)"

        // 创建数据库操作对象
        val ps: PreparedStatement = conn.prepareStatement(sql)

        // 对当前分区内的数据,进行遍历
        // 这里的foreach不是算子了,是集合的方法
        datas.foreach{
          case (id, name, ins_ts) => {


            // 给参数赋值
            ps.setInt(1, id)
            ps.setString(2, name)
            ps.setString(3,ins_ts)

            // 执行SQL语句
            ps.executeUpdate()
          }
        }
        ps.close()
        conn.close()
      }
    }


    sc.stop()
  }
}

 

标签:SparkContext,ps,读取,val,SparkCore,保存,sc,SparkConf,String
来源: https://blog.csdn.net/qq_38689352/article/details/116951909

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有