ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

Spark SQL:基于Spark的结构化数据操作的API

2021-09-23 13:02:59  阅读:228  来源: 互联网

标签:df age DataFrame API Plan SQL Spark


Spark SQL介绍

Spark SQL 是Spark中技术最复杂的的组件之一,它提供了在Spark程序中对结构化数据进行操作的功能,即SQL查询。具体来说,Spark SQL 有如下3个重要特点:

1.Spark SQL 支持多种结构化数据格式的读取,比如JSON,Parquet或者Hive表。
2.Spark SQL 支持从多种外部数据源读取数据,除了本地数据,HDFS以及S3之外,还可以通过JDBC等标准数据库连接器连接外部的关系型数据库系统。
3.最后一点就是能够在Spark程序中自由的进行SQL操作,并与各种编程语言Python/Java/Scala实现高度融合。

为了实现这些重要功能,Spark SQL中引入了一种特殊的RDD,叫做DataFrame,一开始其也被称为SchemaRDD(Spark 1.3.0之前)。
下面我们就先来重点看一看DataFrame。

DataFrame理论

DataFrame 是一种以RDD为基础的分布式数据集,理论上非常接近于关系型数据库中的一张数据表(table),或者是Python pandas中的数据抽象Data Frame。

但是相比于Python或者R中的DataFrame, Spark SQL中的DataFrame,在执行时内部做了更多优化。首先,和普通RDD一样,DataFrame同样也是遵守惰性机制,即真正的计算只有当action(比如 展示结果或者保存输出)被触发时才会进行,也正是因为这种机制,才让Spark SQL基于DataFrame的操作可以被自动化地进行优化。

Spark SQL背后的这种核心优化器被称为Catalyst Optimizer。我以下面这张图为基础来具体介绍一下Catalyst的工作流。
在这里插入图片描述
首先,被选中的DataFrame/Dataset以及对应的SQL语句会作为输入被Catalyst接收并生成未处理的Query Plan。每个Plan都用一棵树作为对用户程序的一个抽象集合,其节点用于描述从输入数据集到输出查询结果的一系列操作过程,每个节点对应其中的某一步。下面是一个简单的例子。
在这里插入图片描述
然后树与树之间再通过一系列的转换操作(Transformations),从开始的Query Plan得到优化后的Optimized Query Plan,用于生成RDD DAG来进行后续的执行,转换操作的内部结构如下。
在这里插入图片描述
整个优化过程就汇聚在Transformations部分,那么具体是如何进行转换的呢?在总结完整的转换过程之前,首先我们先对Transformation过程中涉及到的2个概念做个简单介绍。
1.Logical Plan:其定义了用户的查询语句涉及的相关计算操作,但是并不包括具体的计算流程。
2.Physical Plan:其具体描述了对于输入数据集的运算流程,因此它是可执行的。

接下来,我们来看一看Transformation具体是如何转换的。在整个Transformation的过程中,共包含两种不同类型的转换,同类别树之间的相互转换不同类别树之间的相互转换

我们首先来看一下第一种转换,即同类别树之间的转换,比如从Logical Plan到Optimized Logical Plan的转换。在Spark中,每一个单一转换都由某个rule定义,而每个rule又对应某个函数,这种函数被称为transform。一般情况下,我们习惯于将多个single rule 组合使用形成一个完整的转换过程。下面是转换过程的某个部分的例子。
predicate pushdown
上述转换过程涉及一个非常典型的优化步骤,被称为Predicate Pushdown。即将filter predicate 下压至指定的data source,上图中的t2表。先对t2表进行筛选,然后将筛选之后的结果进行join操作,这样避免了不必要的数据的join,有效增加了join的效率。

而这种将多个rule组合使用又被称为Rule Executor.即一个Rule Executor通过使用多个rule将一棵树转换为另一棵同类别的树。

接下来,我们来看一下第二种转换–不同类别树之间的转换,从Logical Plan 到 Physical Plan的转换。其中利用的就是一系列的策略,每一种策略都对应Logical Plan的某个节点到Physical Plan对应节点的转换过程。每个策略的运行都会触发后续策略的启动。

OK,最后我们来完整地梳理一下整个Catalyst的优化过程。
在这里插入图片描述

1.Analysis:使用Rule Executor将Unresolved Logical Plan 转换成 Resolved Logical Plan。
2.Logical Optimization:使用另一个Rule Executor将Resolved Logical Plan转换成Optimized Logical Plan。
3.Physical Planning:这个部分分为两个阶段,阶段1:通过Strategies将Optimized Logical Plan转换成Physical Plan。阶段2:通过Rule Executor来调整它,用于最终的执行。

以上就是对Catalyst Optimizer的一个简单介绍,另外一个值得一提的点就是,对于Python使用者,即PySpark来说,Catalyst Optimizer的另一个好处就是可以显著提升PySpark的使用效率。换句话说,相比于在PySpark中使用RDD,使用DataFrame的查询效率会有极大的提升。原因很简单,因为对于Catalyst Optimizer而言,无论开始的代码使用的是何种语言编写的,最终optimizer都会统一生成JVM 二进制编码用于执行,因此使用DataFrame时,执行速度不受编程语言的影响。但是如果是RDD,Python的执行速度相比于同样进行RDD操作的Java或Scala,会慢很多。主要原因就是Python与JVM在沟通过程中的开销。我们可以来看一下下面这张图来直观地感受一下。
在这里插入图片描述

DataFrame 实践

说完了Spark SQL背后重要的优化器,现在我们正式进入DataFrame的使用介绍。这里以PySpark作为例子。

首先我们需要先创建一个SparkSession,在spark2,0之前,我们会使用SQLContext。同时Spark还有其他多种contexts,比如HiveContext, StreamingContext, and SparkContext等等,现在都统一合并成SparkSession

from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession

conf=SparkConf().setMaster('local').setAppName('sparksql')
sc=SparkContext(conf=conf)

spark=SparkSession.builder.appName('sparksql').master('local').getOrCreate()

我们有一个简单描述学生信息的JSON文件std_info.json,我们使用SparkSession将它读取进来。

[{"id": "123", "name": "Katie", "age": 19, "eyeColor": "brown"},
 {"id": "234", "name": "Michael", "age": 22, "eyeColor": "green"}, 
 {"id": "345", "name": "Simone", "age": 23, "eyeColor": "blue"}]
df=spark.read.json('std_info.json') # 生成DataFrame
df.show()
# 返回结果
+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+

除了使用.show()方法来查看DataFrame中包含的数据,我们还可以使用.collect()以及.take(n),n代表我们想要取的row数,返回的Row对象会被存储在列表中返回。

# 我们还可以查看列信息
df.printSchema()
# 返回结果
root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

DataFrame API 进行数据查询

接下来我们先使用DataFrame的API来进行数据的query。

1.我们可以先使用.count()方法来查看DataFrame的Row个数。

df.count()

# 返回结果
3

2.我们可以组合使用.select().filter()来进行条件筛选。

# 查询年龄为22的学生
df.select('id','age').filter('age=22').show()
# or
df.select(df.id,df.age).filter(df.age==22).show()
# 返回结果
+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+
# 查询眼睛颜色以字母b开头的学生
df.select('name','eyeColor').filter('eyeColor like "b%"').show()
# 返回结果
+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+

使用SQL进行数据查询

我们使用SQL进行和上面相同的查询任务,使用spark.sql()。需要注意的一点是当我们需要对DataFrame进行SQL查询的时候,我们需要先把DataFrame注册成一个Table或者View

df.createOrReplaceTempView("df")

1.查询Row个数。

spark.sql(" select count(1) from df" ).show()
# 返回结果
+--------+
|count(1)|
+--------+
|       3|
+--------+

2.查询年龄为22的学生

spark.sql('select id,age from df where age=22').show()
# 返回结果
+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+

3.查询眼睛颜色以字母b开头的学生

spark.sql('select name,eyeColor from df where eyeColor like "b%"').show()
# 返回结果
+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+

如何将RDD转换为DataFrame

从RDD->DataFrame是一个数据结构化的过程,我们需要自定义schema。来看下面这个例子。

# import types 用于schema的定义
from pyspark.sql.types import *

# 创建一个RDD 和上述例子中包含的信息一致
rdd=sc.parallelize([
(123,'Katie',19,'brown'),
(234,'Michael',22,'green'),
(345,'Simone',23,'blue')
])

# 定义schema
schema = StructType([
   StructField("id", LongType(), True),
   StructField("name", StringType(), True),
   StructField("age", LongType(), True),
   StructField("eyeColor", StringType(), True)
   ])

#创建DataFrame
rdd2df = spark.createDataFrame(rdd, schema)
rdd2df.show()
# 返回结果
+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+

总结

OK,到这里为止,关于Spark SQL的基本理论背景以及基本的DataFrame操作已经介绍完毕了,希望对大家有帮助吧!

参考

1.Learning PySpark. Tomasz Drabas, Denny Lee
2.Introducing DataFrames in Apache Spark for Large Scale Data Science

标签:df,age,DataFrame,API,Plan,SQL,Spark
来源: https://blog.csdn.net/weixin_44607838/article/details/120375417

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有