ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

大数据之Spark Core外部数据源引入

2021-04-14 23:32:42  阅读:125  来源: 互联网

标签:Core val 数据源 RDD org apache import Spark HBase


外部数据源

 

Spark可以从外部存储系统读取数据,比如RDBMs表中或者HBase表中读写数据,这也是企业中常常使用,如:

 1)、要分析的数据存储在HBase表中,需要从其中读取数据数据分析

日志数据:电商网站的商家操作日志

订单数据:保险行业订单数据

 2)、使用Spark进行离线分析以后,往往将报表结果保存到MySQL表中

网站基本分析(pv、uv。。。。。)

注意:实际开发中会封装为工具类直接使用

https://github.com/teeyog/blog/issues/22

https://blog.csdn.net/u011817217/article/details/81667115

 

 

 

MySQL 数据源

     实际开发中常常将分析结果RDD保存至MySQL表中,使用foreachPartition函数;此外Spark中提供JdbcRDD用于从MySQL表中读取数据。

调用RDD#foreachPartition函数将每个分区数据保存至MySQL表中,保存时考虑降低RDD分区数目和批量插入,提升程序性能。

演示代码

package cn.itcast.core

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.{JdbcRDD, RDD}

/**
  * Author itcast
  * Desc 演示使用Spark将数据写入到MySQL,再从MySQL读取出来
  */
object SparkJdbcDataSource {
  def main(args: Array[String]): Unit = {
    //1.创建SparkContext
    val sparkConf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
      .setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")
    //2.准备数据
    val data: RDD[(String, Int)] = sc.parallelize(List(("jack", 18), ("tom", 19), ("rose", 20)))
    //3.将RDD中的数据保存到MySQL中去
    //将每一个分区中的数据保存到MySQL中去,有几个分区,就会开启关闭连接几次
    //data.foreachPartition(itar=>dataToMySQL(itar))
    data.foreachPartition(dataToMySQL) //方法即函数,函数即对象


    //4.从MySQL读取数据
    /*
    class JdbcRDD[T: ClassTag](
    sc: SparkContext,
    getConnection: () => Connection,
    sql: String,
    lowerBound: Long,
    upperBound: Long,
    numPartitions: Int,
    mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
     */
    val getConnection = ()=> DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")
    val sql:String = "select id,name,age from t_student where id >= ? and id <= ?"
    val mapRow = (rs:ResultSet) => {
      val id: Int = rs.getInt(1)
      val name: String = rs.getString(2)
      val age: Int = rs.getInt("age")
      (id,name,age)
    }
    val studentRDD: JdbcRDD[(Int, String, Int)] = new JdbcRDD(sc,getConnection,sql,4,5,2,mapRow)
    println(studentRDD.collect().toBuffer)
  }

  /**
    * 将分区中的数据保存到MySQL
    * @param itar 传过来的每个分区有多条数据
    */
  def dataToMySQL(itar: Iterator[(String, Int)]): Unit = {
    //0.加载驱动
    //Class.forName("") //源码中已经加载了
    //1.获取连接
    val connection: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","root","root")
    //2.编写sql
    val sql:String = "INSERT INTO `t_student` (`name`, `age`) VALUES (?, ?);"
    //3.获取ps
    val ps: PreparedStatement = connection.prepareStatement(sql)
    itar.foreach(data=>{
      //4.设置参数
      ps.setString(1,data._1)
      ps.setInt(2,data._2)
      //5.执行sql
      ps.addBatch()
    })
    ps.executeBatch()
    ps.close()
    connection.close()
  }
}

 

​​​​​​​HBase 数据源

Spark可以从HBase表中读写(Read/Write)数据,底层采用TableInputFormatTableOutputFormat方式,与MapReduce与HBase集成完全一样,使用输入格式InputFormat和输出格式OutputFoamt。

 

 

​​​​​​​HBase Sink

回顾MapReduce向HBase表中写入数据,使用TableReducer,其中OutputFormat为TableOutputFormat,读取数据Key:ImmutableBytesWritable(Rowkey),Value:Put(Put对象)

写入数据时,需要将RDD转换为RDD[(ImmutableBytesWritable, Put)]类型,调用saveAsNewAPIHadoopFile方法数据保存至HBase表中。

HBase Client连接时,需要设置依赖Zookeeper地址相关信息及表的名称,通过Configuration设置属性值进行传递。

 

范例演示:将词频统计结果保存HBase表,表的设计

 

代码如下:

package cn.itcast.core

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 将RDD数据保存至HBase表中
 */
object SparkWriteHBase {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
      .setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    // 构建RDD
    val list = List(("hadoop", 234), ("spark", 3454), ("hive", 343434), ("ml", 8765))
    val outputRDD: RDD[(String, Int)] = sc.parallelize(list, numSlices = 2)

    // 将数据写入到HBase表中, 使用saveAsNewAPIHadoopFile函数,要求RDD是(key, Value)
    //  组装RDD[(ImmutableBytesWritable, Put)]
    /**
     * HBase表的设计:
     * 表的名称:htb_wordcount
     * Rowkey:  word
     * 列簇:    info
     * 字段名称: count
     */
    val putsRDD: RDD[(ImmutableBytesWritable, Put)] = outputRDD.mapPartitions { iter =>
      iter.map { case (word, count) =>
        // 创建Put实例对象
        val put = new Put(Bytes.toBytes(word))
        // 添加列
        put.addColumn(
          // 实际项目中使用HBase时,插入数据,先将所有字段的值转为String,再使用Bytes转换为字节数组
          Bytes.toBytes("info"), Bytes.toBytes("cout"), Bytes.toBytes(count.toString)
        )
        // 返回二元组
        (new ImmutableBytesWritable(put.getRow), put)
      }
    }

    // 构建HBase Client配置信息
    val conf: Configuration = HBaseConfiguration.create()
    // 设置连接Zookeeper属性
    conf.set("hbase.zookeeper.quorum", "node1")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("zookeeper.znode.parent", "/hbase")
    // 设置将数据保存的HBase表的名称
    conf.set(TableOutputFormat.OUTPUT_TABLE, "htb_wordcount")
    /*
         def saveAsNewAPIHadoopFile(
             path: String,// 保存的路径
             keyClass: Class[_], // Key类型
             valueClass: Class[_], // Value类型
             outputFormatClass: Class[_ <: NewOutputFormat[_, _]], // 输出格式OutputFormat实现
             conf: Configuration = self.context.hadoopConfiguration // 配置信息
         ): Unit
     */
    putsRDD.saveAsNewAPIHadoopFile(
      "datas/spark/htb-output-" + System.nanoTime(), //
      classOf[ImmutableBytesWritable], //
      classOf[Put], //
      classOf[TableOutputFormat[ImmutableBytesWritable]], //
      conf
    )

    // 应用程序运行结束,关闭资源
    sc.stop()
  }
}

运行完成以后,使用hbase shell查看数据:

 

 

​​​​​​​HBase Source

回顾MapReduce从读HBase表中的数据,使用TableMapper,其中InputFormat为TableInputFormat,读取数据Key:ImmutableBytesWritable,Value:Result

   从HBase表读取数据时,同样需要设置依赖Zookeeper地址信息和表的名称,使用Configuration设置属性,形式如下:

 

     此外,读取的数据封装到RDD中,Key和Value类型分别为:ImmutableBytesWritable和Result,不支持Java Serializable导致处理数据时报序列化异常。设置Spark Application使用Kryo序列化,性能要比Java 序列化要好,创建SparkConf对象设置相关属性,如下所示:

 

范例演示:从HBase表读取词频统计结果,代码如下

package cn.itcast.core

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 从HBase 表中读取数据,封装到RDD数据集
 */
object SparkReadHBase {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
      .setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    // 读取HBase Client 配置信息
    val conf: Configuration = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", "node1")
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("zookeeper.znode.parent", "/hbase")

    // 设置读取的表的名称
    conf.set(TableInputFormat.INPUT_TABLE, "htb_wordcount")
    /*
         def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
             conf: Configuration = hadoopConfiguration,
             fClass: Class[F],
             kClass: Class[K],
             vClass: Class[V]
         ): RDD[(K, V)]
     */
    val resultRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(
      conf,
      classOf[TableInputFormat],
      classOf[ImmutableBytesWritable],
      classOf[Result]
    )

    println(s"Count = ${resultRDD.count()}")
    resultRDD
      .take(5)
      .foreach { case (rowKey, result) =>
        println(s"RowKey = ${Bytes.toString(rowKey.get())}")
        // HBase表中的每条数据封装在result对象中,解析获取每列的值
        result.rawCells().foreach { cell =>
          val cf = Bytes.toString(CellUtil.cloneFamily(cell))
          val column = Bytes.toString(CellUtil.cloneQualifier(cell))
          val value = Bytes.toString(CellUtil.cloneValue(cell))
          val version = cell.getTimestamp
          println(s"\t $cf:$column = $value, version = $version")
        }
      }

    // 应用程序运行结束,关闭资源
    sc.stop()
  }
}

运行结果:

 

 

标签:Core,val,数据源,RDD,org,apache,import,Spark,HBase
来源: https://blog.csdn.net/xiaoweite1/article/details/115712022

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有