ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Spark的Dataset操作(五)-多表操作 join

2021-06-21 16:03:51  阅读:224  来源: 互联网

标签:aaa join key2 key1 df1 bbb df2 多表 Spark


Spark的Dataset操作(五)-多表操作 join

先看两个源数据表的定义:

scala> val df1 = spark.createDataset(Seq(("aaa", 1, 2), ("bbb", 3, 4), ("ccc", 3, 5), ("bbb", 4, 6)) ).toDF("key1","key2","key3")
df1: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]

scala> val df2 = spark.createDataset(Seq(("aaa", 2, 2),    ("bbb", 3, 5),    ("ddd", 3, 5),    ("bbb", 4, 6), ("eee", 1, 2), ("aaa", 1, 5), ("fff",5,6))).toDF("key1","key2","key4")
df2: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]

scala> df1.printSchema
root
 |-- key1: string (nullable = true)
 |-- key2: integer (nullable = false)
 |-- key3: integer (nullable = false)


scala> df2.printSchema
root
 |-- key1: string (nullable = true)
 |-- key2: integer (nullable = false)
 |-- key4: integer (nullable = false)

scala> df1.show()
+----+----+----+
|key1|key2|key3|
+----+----+----+
| aaa|   1|   2|
| bbb|   3|   4|
| ccc|   3|   5|
| bbb|   4|   6|
+----+----+----+

scala> df2.show()
+----+----+----+
|key1|key2|key4|
+----+----+----+
| aaa|   2|   2|
| bbb|   3|   5|
| ddd|   3|   5|
| bbb|   4|   6|
| eee|   1|   2|
| aaa|   1|   5|
| fff|   5|   6|
+----+----+----+

Spark对join的支持很丰富,等值连接,条件连接,自然连接都支持。连接类型包括内连接,外连接,左外连接,右外连接,左半连接以及笛卡尔连接。

下面一一示例,先看内连接

/*
内连接 select * from df1 join df2 on df1.key1=df2.key1
*/
scala> val df3 = df1.join(df2,"key1")
df3: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 3 more fields]

scala> df3.printSchema
root
 |-- key1: string (nullable = true)
 |-- key2: integer (nullable = false)
 |-- key3: integer (nullable = false)
 |-- key2: integer (nullable = false)
 |-- key4: integer (nullable = false)

scala> df3.show
+----+----+----+----+----+
|key1|key2|key3|key2|key4|
+----+----+----+----+----+
| aaa|   1|   2|   1|   5|
| aaa|   1|   2|   2|   2|
| bbb|   3|   4|   4|   6|
| bbb|   3|   4|   3|   5|
| bbb|   4|   6|   4|   6|
| bbb|   4|   6|   3|   5|
+----+----+----+----+----+

/*
还是内连接,这次用joinWith。和join的区别是连接后的新Dataset的schema会不一样,注意和上面的对比一下。
*/
scala> val df4=df1.joinWith(df2,df1("key1")===df2("key1"))
df4: org.apache.spark.sql.Dataset[(org.apache.spark.sql.Row, org.apache.spark.sql.Row)] = [_1: struct<key1: string, key2: int ... 1 more field>, _2: struct<key1: string, key2: int ... 1 more field>]

scala> df4.printSchema
root
 |-- _1: struct (nullable = false)
 |    |-- key1: string (nullable = true)
 |    |-- key2: integer (nullable = false)
 |    |-- key3: integer (nullable = false)
 |-- _2: struct (nullable = false)
 |    |-- key1: string (nullable = true)
 |    |-- key2: integer (nullable = false)
 |    |-- key4: integer (nullable = false)

scala> df4.show
+---------+---------+
|       _1|       _2|
+---------+---------+
|[aaa,1,2]|[aaa,1,5]|
|[aaa,1,2]|[aaa,2,2]|
|[bbb,3,4]|[bbb,4,6]|
|[bbb,3,4]|[bbb,3,5]|
|[bbb,4,6]|[bbb,4,6]|
|[bbb,4,6]|[bbb,3,5]|
+---------+---------+

然后是外连接:

/*
select * from df1 outer join df2 on df1.key1=df2.key1 
*/
scala> val df5 = df1.join(df2,df1("key1")===df2("key1"), "outer")
df5: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]

scala> df5.show
+----+----+----+----+----+----+
|key1|key2|key3|key1|key2|key4|
+----+----+----+----+----+----+
|null|null|null| ddd|   3|   5|
| ccc|   3|   5|null|null|null|
| aaa|   1|   2| aaa|   2|   2|
| aaa|   1|   2| aaa|   1|   5|
| bbb|   3|   4| bbb|   3|   5|
| bbb|   3|   4| bbb|   4|   6|
| bbb|   4|   6| bbb|   3|   5|
| bbb|   4|   6| bbb|   4|   6|
|null|null|null| fff|   5|   6|
|null|null|null| eee|   1|   2|
+----+----+----+----+----+----+

下面是左外连接,右外连接和左半连接:

/*
左外连接
*/
scala> val df6 = df1.join(df2,df1("key1")===df2("key1"), "left_outer")
df6: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]

scala> df6.show
+----+----+----+----+----+----+
|key1|key2|key3|key1|key2|key4|
+----+----+----+----+----+----+
| aaa|   1|   2| aaa|   1|   5|
| aaa|   1|   2| aaa|   2|   2|
| bbb|   3|   4| bbb|   4|   6|
| bbb|   3|   4| bbb|   3|   5|
| ccc|   3|   5|null|null|null|
| bbb|   4|   6| bbb|   4|   6|
| bbb|   4|   6| bbb|   3|   5|
+----+----+----+----+----+----+

/*
右外连接
*/
scala> val df7 = df1.join(df2,df1("key1")===df2("key1"), "right_outer")
df7: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]

scala> df7.show
+----+----+----+----+----+----+
|key1|key2|key3|key1|key2|key4|
+----+----+----+----+----+----+
| aaa|   1|   2| aaa|   2|   2|
| bbb|   4|   6| bbb|   3|   5|
| bbb|   3|   4| bbb|   3|   5|
|null|null|null| ddd|   3|   5|
| bbb|   4|   6| bbb|   4|   6|
| bbb|   3|   4| bbb|   4|   6|
|null|null|null| eee|   1|   2|
| aaa|   1|   2| aaa|   1|   5|
|null|null|null| fff|   5|   6|
+----+----+----+----+----+----+

/*
左半连接
*/
scala> val df8 = df1.join(df2,df1("key1")===df2("key1"), "leftsemi")
df8: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 1 more field]

scala> df8.show
+----+----+----+
|key1|key2|key3|
+----+----+----+
| aaa|   1|   2|
| bbb|   3|   4|
| bbb|   4|   6|
+----+----+----+

笛卡尔连接不太常用,毕竟现在用spark玩的表都大得很,做这种全连接成本太大了。

/*
笛卡尔连接
*/
scala> val df9 = df1.crossJoin(df2)
df9: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]

scala> df9.count
res17: Long = 28

/* 就显示前10条结果吧 */
scala> df9.show(10)
+----+----+----+----+----+----+
|key1|key2|key3|key1|key2|key4|
+----+----+----+----+----+----+
| aaa|   1|   2| aaa|   2|   2|
| aaa|   1|   2| bbb|   3|   5|
| aaa|   1|   2| ddd|   3|   5|
| aaa|   1|   2| bbb|   4|   6|
| aaa|   1|   2| eee|   1|   2|
| aaa|   1|   2| aaa|   1|   5|
| aaa|   1|   2| fff|   5|   6|
| bbb|   3|   4| aaa|   2|   2|
| bbb|   3|   4| bbb|   3|   5|
| bbb|   3|   4| ddd|   3|   5|
+----+----+----+----+----+----+
only showing top 10 rows

下面这个例子还是个等值连接,区别之前的等值连接是去调用两个表的重复列,就像自然连接一样:

/*
基于两个公共字段key1和key的等值连接
*/
scala> val df10 = df1.join(df2, Seq("key1","key2"))
df10: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 2 more fields]

scala> df10.show
+----+----+----+----+
|key1|key2|key3|key4|
+----+----+----+----+
| aaa|   1|   2|   5|
| bbb|   3|   4|   5|
| bbb|   4|   6|   6|
+----+----+----+----+

条件连接在spark的低版本好像是不支持的,反正现在是ok啦~

/*
select df1.*,df2.* from df1 join df2 
on df1.key1=df2.key1 and df1.key2>df2.key2
*/
scala> val df11 = df1.join(df2, df1("key1")===df2("key1") && df1("key2")>df2("key2"))
df11: org.apache.spark.sql.DataFrame = [key1: string, key2: int ... 4 more fields]

scala> df11.show
+----+----+----+----+----+----+
|key1|key2|key3|key1|key2|key4|
+----+----+----+----+----+----+
| bbb|   4|   6| bbb|   3|   5|
+----+----+----+----+----+----+

标签:aaa,join,key2,key1,df1,bbb,df2,多表,Spark
来源: https://blog.51cto.com/u_15278282/2931960

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有