ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

spark streaming-DS,DF,RDD相互转换,submit,数据落盘

2022-07-20 10:05:26  阅读:188  来源: 互联网

标签:val DF 落盘 RDD org apache import spark ssc


spark streaming

DS转成DF写代码

package com.shujia.spark.streaming
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Duration, Durations, StreamingContext}

object Demo44DStreamTORDDAndDF {
  def main(args: Array[String]): Unit = {
    /**
     * 创建sparkSession
     */
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[2]")
      .appName("ds")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    import spark.implicits._

    val sc: SparkContext = spark.sparkContext

    val ssc = new StreamingContext(sc, Durations.seconds(5))

    /**
     * 读取一个socket得到一个ds
     *
     */
    val lineDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)

    /**
     * DStream底层也是一个RDD,每隔一段时间将接收到的数据封装成一个 RDD。
     * 每隔5秒一个rdd,rdd中的数据是不一样的
     *
     * 转换成rdd之后就不能使用有状态算子
     *
     */
    lineDS.foreachRDD((rdd: RDD[String]) => {
      println("正在处理数据")
      //在这里可以写rdd代码
      rdd
        .flatMap(_.split(","))
        .map((_, 1))
        .reduceByKey(_ + _)
        //.foreach(println)

      /**
       * rdd可以转换成DF,就可以写sql了
       */
      val linesDF: DataFrame = rdd.toDF("line")

      linesDF.createOrReplaceTempView("lines")

      val countDF: DataFrame = spark.sql(
        """
          |select word,count(1) as c from (
          |select explode(split(line,',')) as word
          |from lines
          |) as a
          |group by word
          |
          |""".stripMargin)

      countDF.show()

    })
      
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
  }
}

RDD转成写DS代码

package com.shujia.spark.streaming
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Durations, StreamingContext}

object Demo55RDDToDStream {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession
      .builder()
      .master("local[2]")
      .appName("ds")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()
    import spark.implicits._

    val sc: SparkContext = spark.sparkContext

    val ssc = new StreamingContext(sc, Durations.seconds(5))

    val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)

    /**
     * transform:每隔5秒传入一个rdd,在里面使用一个rdd的api处理数据
     * 处理完之后再返回一个rdd
     *
     */
    val resultDS: DStream[(String, Int)] = linesDS.transform((rdd:RDD[String]) =>{
      //rdd的计算是一个批次内部统计,并不是全局统计
      val countRDD: RDD[(String, Int)] = rdd
      .flatMap(_.split(','))
      .map((_, 1))
      .reduceByKey(_ + _)

      //处理完了返回一个rdd,返回的rdd会构建成新的Dstream
      countRDD
    })
    
    resultDS.print()

    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }
}

将本地代码打包到集群环境中运行

package com.shujia.spark.streaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Durations, StreamingContext}

object Demo66Submit {
  def main(args: Array[String]): Unit = {

    val spark: SparkSession = SparkSession
      .builder()
      .appName("ds")
      .config("spark.sql.shuffle.partitions", 1)
      .getOrCreate()

    val sc: SparkContext = spark.sparkContext

    val ssc = new StreamingContext(sc, Durations.seconds(5))

    val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)

    linesDS
      .flatMap(_.split(","))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()


    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

    /**
     *
     * spark-submit --master yarn-client --class com.shujia.spark.streaming.Demo6Submit --num-executors 2 spark-1.0.jar
     */
  }
}

将输入的数据落盘到磁盘中

package com.shujia.spark.streaming
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}

object Demo77SaveFIle {
  def main(args: Array[String]): Unit = {

  val spark: SparkSession = SparkSession
    .builder()
    .appName("ds")
    .master("local[2]")
    .config("spark.sql.shuffle.partitions", 1)
    .getOrCreate()

  val sc: SparkContext = spark.sparkContext

  val ssc = new StreamingContext(sc, Durations.seconds(5))

  val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("master", 8888)

  val countDS: DStream[(String, Int)] = linesDS
    .flatMap(_.split(","))
    .map((_, 1))
    .reduceByKey(_ + _)

  //保存数据到磁盘
  //滚动生成新的文件
  countDS.saveAsTextFiles("data/stream", "txt")

  ssc.start()
  ssc.awaitTermination()
  ssc.stop()

}
}

标签:val,DF,落盘,RDD,org,apache,import,spark,ssc
来源: https://www.cnblogs.com/atao-BigData/p/16496768.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有