ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark保存到外部数据源

2021-05-31 17:57:09  阅读:189  来源: 互联网

标签:val 外部 数据源 new sc import spark SparkConf



文章目录


保存为sequenceFile

package write

import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.{SparkConf, SparkContext}

object saveToSeq {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]")
      .setAppName("saveToSeq")
    val sc = new SparkContext(conf)

    val data = List(("name", "xiaoming"), ("age", "18"))
    val rddData = sc.parallelize(data, 1)
    rddData.saveAsSequenceFile("D:\\studyplace\\sparkBook\\chapter4\\result\\1",Some(classOf[GzipCodec]))
  }
}

其中saveAsSequenceFile的api第一个参数是保存文件路径,第二个参数是设置压缩方式

对于ClassOf[xxxCodec]对象必须封装在Option集合中再传入SequenceFile方法中,在scala中Option的两个实例为Some集合和None集合,后者代表没有任何元素

在压缩方式中,GzipCodec的压缩比率较高,磁盘不足可以使用这个方式,虽然Bzip压缩率更高,但对于频繁读写场景不适用

保存到HDFS

  • saveAsTextFile

    本质上调用了saveAsHadoopFile方法

  • saveAsHadoopFile

    对URI进行判断,以file:/// 将数据保存到本地文件系统中,如果schema是hdfs://将数据写到hdfs文件中

    saveAsHadoopFile方法中,默认调用的是TextOutputFormat实现类作为输出数据的格式化工具

    import org.apache.hadoop.io.{IntWritable, Text}
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
    import org.apache.spark.{SparkConf, SparkContext}
    
    object saveTohadoop {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("saveTohadoop").setMaster("local[*]")
        val sc = new SparkContext(conf)
        val rddData = sc.parallelize(List(("cat",20),("dog",29),("pig",11)),1)
        rddData.saveAsNewAPIHadoopFile("路径",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
        sc.stop()
      }
    }

保存到mysql

package write

import java.sql.DriverManager

import org.apache.spark.{SparkConf, SparkContext}

object saveToMySQL {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName("saveToMySQL")
    val sc = new SparkContext(conf)

    Class.forName("com.mysql.jdbc.Driver")
    val rddData = sc.parallelize(List(("tom",11),("jettty",19)))
    rddData.foreachPartition((iter:Iterator[(String,Int)]) => {
      val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark?useUnicode=true&characterEncoding=utf-8","root","123456")
      conn.setAutoCommit(false)
      val statement = conn.prepareStatement("insert into spark.person (name,age) VALUES (?,?);")
      iter.foreach( t => {
        statement.setString(1,t._1)
        statement.setInt(2,t._2)
        statement.addBatch()
      })
      statement.executeBatch()
      conn.commit()
      conn.close()
    })
    sc.stop()
  }
}

保存数据的时候使用foreachPartition方法遍历RDD的每一个分区

注意:DriverManager.getConnection 需要移到foreaPartition内部

conn.setAutoCommit(false) 关闭自动提交,对于大数据量批量操作更合适

标签:val,外部,数据源,new,sc,import,spark,SparkConf
来源: https://blog.51cto.com/u_13985831/2836494

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有