ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

spark_sql

2021-01-27 12:57:33  阅读:144  来源: 互联网

标签:count String val value RDD sql spark


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bBx0fy0y-1611723184476)(C:\Users\14112\AppData\Roaming\Typora\typora-user-images\image-20210126202837264.png)]

$“age”+1,'age+1,column(“age”)+1,col(“age”)

中$,`,col,和colum等价

DSL

3.4.1.1 第一步:创建文本文件

在linux的/export/servers/路径下创建文本文件

cd /export/servers/

vim person.txt

写入文件:

1 zhangsan 20

2 lisi 29

3 wangwu 25

4 zhaoliu 30

5 tianqi 35

6 kobe 40

3.4.1.2 第二步:定义RDD

使用spark-shell 进入spark客户端

cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/

bin/spark-shell --master local[2]

val lineRDD = sc.textFile(“file:///export/servers/person.txt”).map(x => x.split(" "))

3.4.1.3 第三步:定义case class样例类

case class Person(id:Int,name:String,age:Int)

3.4.1.4 第四步:关联RDD与case class

val personRDD = lineRDD.map(x => Person(x(0).toInt,x(1),x(2).toInt))

3.4.1.5 第五步:将RDD转换成DF

val personDF = personRDD.toDF

注意:DF也可以转换成为RDD,直接使用DF调用rdd方法即可

scala> personDF.rdd.collect

res38: Array[org.apache.spark.sql.Row] = Array([1,zhangsan,20], [2,lisi,29], [3,wangwu,25], [4,zhaoliu,30], [5,tianqi,35], [6,kobe,40])

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PUWmnwKv-1611723184478)(C:\Users\14112\AppData\Roaming\Typora\typora-user-images\image-20210127125124494.png)]

  • DataSet:DataFrame+泛型

  • 问题:如果现在需要单词切分,String类型的数据才可以切分,如果DataFrame不知道泛型,不知道是Person,String

  • 案例:使用两种API实现WordCount

  • 需求:

  • 步骤:

    • 1-准备SparkSession环境
    • 2-读取数据
    • 3-扁平化数据FlatMap
    • 4-执行统计
package cn.itcast.sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._

/**
 * DESC:
 * 1-准备SparkSession环境
 * 2-读取数据
 * 3-扁平化数据FlatMap
 * 4-执行统计
 */
object _10DSLSQLWordCount {
  def main(args: Array[String]): Unit = {
    //1-准备SparkSession环境
    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    //2-读取数据
    //val valueRDD: RDD[String] = sc.textFile("data/input/words.txt")
    val df: DataFrame = spark.read.text("data/input/words.txt")
    //spark.read.textFile("data/input/words.txt")
    //3-扁平化数据FlatMap
    val dataDS: Dataset[String] = df.as[String]
    val strDS: Dataset[String] = dataDS.flatMap(x => x.split("\\s+"))
    //4-执行统计'---------DSL
    println("======================DSL-=========================")
    strDS.printSchema()
    //root
    //|-- value: string (nullable = true)
    strDS.groupBy($"value")
      .count()
      .orderBy($"count".desc)
      .limit(5)
      .show()
    //
    strDS.groupBy($"value")
      .count()
      .sort($"count".desc)
      .limit(5)
      .show()
    //
    strDS.groupBy($"value")
      .agg(count("value").alias("count"))
      .sort($"count".desc)
      .limit(5)
      .show()
    //
    strDS.groupBy($"value")
      //Map(
      //*"age" -> "max",
      //*"expense" -> "sum"
      //*)
      .agg(Map("value" -> "count"))
      .sort($"count(value)".desc)
      .limit(5)
      .show()
    //SQL
    println("======================SQL-=========================")
    strDS.createOrReplaceTempView("table_view")
    spark.sql(
      """
        |select value,count(value) as count
        |from table_view
        |group by value
        |order by count desc
        |limit  5
        |""".stripMargin).show()

    spark.stop()
  }
}

RDD-DF-DS之间装换

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**

  • DESC:
    */
    case class People2(id: Int, name: String, age: Int)

object 09RddDFDS {
def main(args: Array[String]): Unit = {
//1-准备SparkSession
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster(“local[*]”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel(“WARN”)
//2-读取数据文件
val rdd1: RDD[String] = sc.textFile(“data/input/sql/people1.txt”)
//3-转化数据为DataFrame
val peopleRDD: RDD[People2] = rdd1.map(
.split("\s+")).map(filed => People2(filed(0).toInt, filed(1), filed(2).toInt))
//这里在RDD转换到DF过程中需要引入隐式转换
import spark.implicits._
//1-从RDD转化为DF----4种
val peopleDF: DataFrame = peopleRDD.toDF()
//2-从DF转RDD
peopleDF.rdd.collect().foreach(println())
//[1,zhangsan,20]
//[2,lisi,29]
//3-RDD转DS
val dataDS: Dataset[People2] = peopleRDD.toDS()
//±–±-------±–+
//| id| name|age|
//±–±-------±–+
//| 1|zhangsan| 20|
//| 2| lisi| 29|
//| 3| wangwu| 25|
//4-DS转RDD
dataDS.rdd.collect().foreach(println(
))
//People2(1,zhangsan,20)
//People2(2,lisi,29)
//People2(3,wangwu,25)
//5-DF-DS
val peopleDS: Dataset[People2] = peopleDF.as[People2]
peopleDS.show()
//6-DS-DF
peopleDS.toDF().show()

//4-关闭SparkSession
spark.close()

}
}

UDF编程

本质不难:也就是一个函数,实现一个功能:一对一的关系,类似于map作用

使用的步骤:

通过session创建udf.register(函数名称,函数中每个变量:变量的类型)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0WNH7j5v-1611723184480)(C:\Users\14112\AppData\Roaming\Typora\typora-user-images\image-20210127123539258.png)]

有多少个变量 就选择选择哪个udf多少,例如有两个变量就选择UDF2

spark.udf.register()这是固定格式

//通过new的方式写函数

spark.udf.register("smalltoUpper2", new UDF1[String,String] {
  override def call(t1: String): String =
    t1.toUpperCase()
},DataTypes.StringType)

//通过lambam表达式

//2创建lamba表达式函数
spark.udf.register("smalltoUpper1",(a1:String)=>{
  a1.toUpperCase()
})
spark.udf.register("smalltoUpper2", new UDF1[String,String] {
  override def call(t1: String): String =
    t1.toUpperCase()
},DataTypes.StringType)

//通过lambam表达式

//2创建lamba表达式函数
spark.udf.register("smalltoUpper1",(a1:String)=>{
  a1.toUpperCase()
})

标签:count,String,val,value,RDD,sql,spark
来源: https://blog.csdn.net/weixin_51473488/article/details/113246097

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有