ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

SparkSQL

2021-11-08 23:00:51  阅读:339  来源: 互联网

标签:.. DataFrame DataSet rdd SparkSQL spark def


1、SparkSql概述

1、什么是SparkSql?

SparkSql用于处理结构化数据,底层还是RDD

2、SparkSql的两个数据抽象: DataFrame、DataSet

1、什么是DataFrame

DataFrame可以当做一个二维表格,有schema信息<有列名、列类型>
DataFrame只关注列不关注行的类型,不管每个元素<每行>是什么类型,表现出来都是Row类型

2、什么是DataSet

DataSet可以当做一个二维表格,有schema信息<有列名、列类型>
DataSet即关注列也关注行的类型,每个的数据类型是啥,表现出来就是啥

3、DataFrame与DataSet的区别:

1、DataFrame是弱类型,DataSet是强类型
2、DataFrame是运行期安全,编译器不安全。DataSet是编译器安全,运行期也安全

4、DataFrame与DataSet的使用时机:

1、如果是将rdd转成sparksql编程,
此时如果rdd里面的元素类型是样例类,转成DataSet或者DataFrame都可以
此时如果rdd里面的元素类型元组,推荐转成DataFrame,可以通过toDF指定列名
2、如果想要使用map、flatMap这种写函数的强类型算子,推荐使用DataSet

5、RDD、DataFrame、DataSet的联系

1、RDD、DataFrame、DataSet都是弹性分布式数据集
2、RDD、DataFrame、DataSet都是惰性执行的,都需要调用action算子之后才会真正执行
3、RDD、DataFrame、DataSet都有分区
4、RDD、DataFrame、DataSet有很多共同的函数: map、flatMap、filter..
5、RDD、DataFrame、DataSet都是数据在内存与磁盘中动态存储

2、SparkSql编程

1、创建SparkSession: SparkSession.builder().master("").appName(..).getOrCreate()

2、DataFrame创建:

1、通过toDF方法

要想使用toDF方法必须导入隐式转换: import sparksession对象名.implicits._
1、集合.toDF()
2、rdd.toDF()
toDF有两个重载的方式,如果调用的是无参的toDF,此时会生成默认的列名<如果集合/rdd中的元素是样例类,会将属性名作为列名,如果是元组,列名就是_1,_2形式>
所以如果元素是元组,可以有参的toDF方法指定列名<指定的列名的个数必须与列的个数要相同>

2、通过读取文件: spark.read.csv/json/jdbc..

3、通过其他DataFrame衍生

3、DataSet创建

1、通过toDS方法

要想使用toDS方法必须导入隐式转换: import sparksession对象名.implicits._
1、集合.toDS()
2、rdd.toDS()
toDS方法生成的DataSet此时会生成默认的列名<如果集合/rdd中的元素是样例类,会将属性名作为列名,如果是元组,列名就是_1,_2形式>

2、通过读取文件: spark.read.textFile()

3、通过其他DataFrame衍生

4、SparkSql编程的两种方式:

1、SQL风格

1、将df/ds注册成表:
createTempView:: 注册成临时表
createOrReplaceTempView: 注册成临时表[如果表已经存在会替换],只能在当前SparkSession中使用,后续只在使用表的时候直接用 表名 既可以
createGlobalTempView:注册成全局表
createOrReplaceGlobalTempView: 注册成全局表,可以在多个sparkSession中使用,后续在使用的时候,必须通过 global_temp.表名 的方式使用
2、sql编写: spark.sql("sql语句")

2、DSL风格: 使用select、filter、where、groupBy等api变成

常用的DSL api:

1、过滤:

1、filter("过滤条件") // filter("age>20")
2、where("过滤条件") // where("age>20")

2、去重:

1、distinct: 只有所有列都相同才会去重
2、dropDuplicates: 当指定列相同的时候就会去重

3、列裁剪: selectExpr("字段名","函数(字段名)","字段名 as 别名")

5、RDD、DataFrame、DataSet转换

1、RDD转DataFrame: rdd.toDF/rdd.toDF(列名,列名,..)
2、DataFrame转rdd: df.rdd
3、Rdd转DataSet: rdd.toDS
4、DataSet转rdd: val rdd:RDD[DataSet元素类型] = ds.rdd
5、DataFrame转DataSet: val ds:DataSet[类型] = df.as[类型]
DataFrame转DataSet的时候,
如果as后面的类型是样例类,需要样例类的属性名要与列名一致。
如果as后面的类型是元组,需要元组的个数 = 列的个数,类型也要一致
6、DataSet转DataFrame: ds.toDF/ds.toDF(列名,列名,..)

6、Row类型的取值: row.getAs[ 列的类型 ] ( "列名" )

7、自定义函数:

1、自定义UDF函数:

1、定义普通函数

val func = (id:Int) => id+"-001"

2、注册udf函数: spark.udf.register("函数名",函数)

spark.udf.register("myfunc",func)

3、通过sql使用函数:

spark.sql("select myfunc(id) from 表名")

2、自定义udaf函数

1、弱类型udaf:

1、定义class继承UserDefinedAggregateFunction
2、重写抽象方法

def inputSchema: StructType <定义udaf参数类型>
def bufferSchema: StructType <定义中间变量的参数类型>
def dataType: DataType <定义最终结果类型>
def deterministic: Boolean <一致性>
def initialize(buffer: MutableAggregationBuffer): Unit <初始化中间变量>
def update(buffer: MutableAggregationBuffer, input: Row): Unit <每次传入组中一个值,更新中间变量>
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit <合并所有task的统计结果>
def evaluate(buffer: Row): Any <获取最终结果>

3、注册udaf:

1、创建自定义udaf对象: val obj = new xxx
2、注册: spark.udf.register("函数名",obj)

2、强类型的udaf

1、定义class继承Aggregator[IN,BUF,OUT]

IN: udaf参数类型
BUF: 中间变量类型
OUT: 最终结果类型

2、重写方法

def zero: Buff <中间变量赋初始值>
def reduce(buff: Buff, age: Int): Buff <在每个分区中先预聚合,每个传入一个元素,更新中间结果>
def merge(b1: Buff, b2: Buff): Buff <对所有分区的结果再次聚合>
def finish(reduction: Buff): Double <获取最终结果>
def bufferEncoder: Encoder[Buff] <对中间结果类型编码>
def outputEncoder: Encoder[Double] <对最终结果类型编码>

3、注册

1、创建udaf对象: val obj = new XXX
2、导入隐式转换,使用udaf函数:
import org.apache.spark.sql.functions._
val uobj = udaf(obj)
3、注册: spark.udf.register("函数名",uobj)

3、数据读取与保存

1、读取

1、文件读取:

1、spark.read

.format() --指定文件读取格式[csv/json/text/parquet/orc]
.option().option().. --指定读取的参数
.load(path) --指定加载路径的数据
在读取文件的时候,一般只有csv文件才需要配置option,csv文件常用的option:
sep: 指定字段之间的分隔符
header: 指定是否以文件的第一行作为列名
inferSchema: 指定是否自动推断字段的类型

2、spark.read[.option()].csv/json/csv/parquet/orc

2、mysql数据读取

1、spark.read

.format() --指定文件读取格式[jdbc]
.option().option().. --指定读取的参数<账号、密码、driver、表、url>
.load() --指定加载路径的数据

2、spark.read.jdbc(url,表名,参数设置): 此种方式读取jdbc的时候分区数 = 1,只能用于数据量小的场景

spark.read.jdbc(url,表名,分区条件参数,参数设置): 此种方式读取jdbc的时候分区数=分区条件参数数组的元素个数。<不常用>
val arr = Array("age<20","age>=20 and age<40","age>=40")
spark.read.jdbc("jdbc:mysql://xx:3306/test","person",arr,参数设置)
spark.read.jdbc(url,表名,mysql字段名,lowerBound,uperBound,分区数,参数设置): <工作常用>
此种方式读取的时候,分区数 = (uperBound-lowerBound) > 分区数 ? 分区数 : uperBound-lowerBound

2、数据保存:

1、df/ds.write

.mode() --指定写入模式
.format() --指定数据写入的格式[csv/json/parquet/orc/jdbc]
.option() --指定数据写入的时候需要的参数
csv文件写入的时候指定的option:
header: 写入的时候是否将列名也写入
sep: 写入的时候指定字段之间的分隔符
.save() --数据保存

2、df/ds.write.mode(..).csv/json/parquet/orc/jdbc

常用的写入模式:
SaveMode.Overwrite: 如果指定的路径/表已经存在,则覆盖历史数据<数据写入HDFS的时候使用>
SaveMode.Append: 如果指定的路径/表已经存在,则追加数据<数据写入mysql的时候使用>
如果写入mysql的时候,主键数据已经存在,此时不能使用append,需要通过foreachPartitions对数据进行更新写入

3、hive的数据读取和保存

1、读取数据:

1、在创建SparkSession的时候通过enableHiveSupport需要开启hive的支持:

SparkSession.builder().master(..).appName(..).enableHiveSupport().getOrCreate

2、直接在代码中通过spark.sql("查询hive表数据")

2、保存数据到hive表:

df/ds.write.mode(..).saveAsTable("hive表") <不常用,一般都是将数据写入HDFS>

4、多维聚合:<对多个维度按照不同的组合进行聚合>

grouping sets:
语法: select 维度1,维度2,..,聚合函数 from 表 group by 维度1,维度2,.. grouping sets( (维度1),(维度1,维度2),(..) )
grouping sets后面的字段名必须是group by后面的字段
案例:
select A,B,C,count(1) from person group by A,B,C grouping sets( (A),(A,B),(B,C),(A,B,C) )
等价于:
select A,null B,null C,count(1) from person group by A
union
select A,B,null C,count(1) from person group by A,B
union
select null A,B,C,count(1) from person group by B,C
union
select A,B,C,count(1) from person group by A,B,C

标签:..,DataFrame,DataSet,rdd,SparkSQL,spark,def
来源: https://www.cnblogs.com/1463490Ya/p/15526711.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有