ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

基于spark2的dataFrame和dataSet

2021-05-31 17:54:29  阅读:156  来源: 互联网

标签:val spark2 dataFrame dataSet session apache import spark id



文章目录


dataFrame

package sql2

import org.apache.avro.generic.GenericData.StringType
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession, types}

object Spark2DateFrame {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .appName("SQLTest01")
      .master("local[*]")
      .getOrCreate()
//    创建RDD
    val lines = session.sparkContext.textFile("")
//    将数据进行整理
    val rowRDD = lines.map(line => {
      val fields = line.split(",")
      val id = fields(0).toLong
      val name = fields(1)
      Row(id, name)
    })
//    结果类型,其实就是表头,用于描述DataFrame,true表示是否为空
    val sch = StructType(List(
      StructField("id", LongType, true),
      StructField("name", StringType, true)
    ))

    //创建DataFrame
    val df = session.createDataFrame(rowRDD,sch)

    import session.implicits._
    val df2 = df.where($"id">8).orderBy($"id" desc,$"age" asc)

//    df.show()
    //写入成csv文件.json,parquet,jdbc等
    df2.write.parquet("")
    session.stop()

  }
}

dataFrame wordCount

package sql2

import org.apache.spark.sql.SparkSession

object Spark2WoldCount {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Spark2WoldCount")
      .master("local[*]")
      .getOrCreate()
    val lines = spark.read.textFile("")

    //整理数据,压平.导入隐式转换
    import spark.implicits._
    val words = lines.flatMap(_.split(" "))

    //注册视图
    words.createTempView("v_wc")

    //执行sql
    val result = spark.sql("select value,COUNT(*) counts from v_wc GROUP BY value ORDER BY counts DESC")
    result.show()
    spark.stop()
  }
}

基于dataSet的wordCount

package sql2

import org.apache.avro.generic.GenericData.StringType
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession, types}

object Spark2DateFrame {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .appName("SQLTest01")
      .master("local[*]")
      .getOrCreate()
//    创建RDD
    val lines = session.sparkContext.textFile("")
//    将数据进行整理
    val rowRDD = lines.map(line => {
      val fields = line.split(",")
      val id = fields(0).toLong
      val name = fields(1)
      Row(id, name)
    })
//    结果类型,其实就是表头,用于描述DataFrame,true表示是否为空
    val sch = StructType(List(
      StructField("id", LongType, true),
      StructField("name", StringType, true)
    ))

    //创建DataFrame
    val df = session.createDataFrame(rowRDD,sch)

    import session.implicits._
    val df2 = df.where($"id">8).orderBy($"id" desc,$"age" asc)

//    df.show()
    //写入成csv文件.json,parquet,jdbc等
    df2.write.parquet("")
    session.stop()

  }
}

               

标签:val,spark2,dataFrame,dataSet,session,apache,import,spark,id
来源: https://blog.51cto.com/u_13985831/2836508

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有