ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

在结构数组上使用 PySpark UDF 进行数据转换:在结构数组中添加新字段

2022-08-28 18:02:15  阅读:179  来源: 互联网

标签:StringType PySpark -- StructField dept UDF 可为 数组 True


在结构数组上使用 PySpark UDF 进行数据转换:在结构数组中添加新字段

PySpark UDF on complex Data types

在处理系统日志或任何其他半结构化数据时,我们遇到了具有许多嵌套字段和嵌入式结构数组的数据。

我们要选择的第一个也是最简单的解决方案是展开字段,然后执行数据转换。如果您需要平面模式,这种方法并没有错,但为了保持模式完整,我们需要对嵌套字段应用转换。

一种方法是将 Dataframe 转换为 RDD 并使用低级 API 来转换 Dataframe。假设我们想使用 Spark SQL API 以方便使用

为了克服这个问题,我们可以使用 PySpark UDF,它可以将复杂字段作为参数并返回新字段。

让我们创建一个足够复杂的示例数据,以便为我们的用例处理。

 从 pyspark.sql 导入 SparkSession  
 从 pyspark.sql.types 导入 *  
 从 pyspark.sql 导入行  
 从 pyspark.sql.functions 导入 udf, col data = [(["James","","Smith","36636","M",3000, [{'dept':'HR','allocation':0.4},{'dept':'FIN ','分配':0.6}]],  
 ["Michael","Rose","","40288","M",4000,[{'dept':'HR','allocation':0.4},{'dept':'FIN','allocation ':0.6}]],  
 ["罗伯特","","威廉姆斯","42114","M",4000,[{'dept':'HR','allocation':0.9},{'dept':'FIN','allocation ':0.1}]],  
 ["Maria","Anne","Jones","39192","F",4000,[{'dept':'HR','allocation':0.75},{'dept':'FIN','分配':0.25}]],  
 ["Jen","Mary","Brown","","F",-1,[{'dept':'HR','allocation':0.30},{'dept':'FIN','分配':0.70}]])  
 ] 架构 =( ArrayType(StructType([  
 StructField("名字",StringType(),True),  
 StructField("中间名",StringType(),True),  
 StructField("姓氏",StringType(),True),  
 StructField("id", StringType(), True),  
 StructField("性别", StringType(), True),  
 StructField("salary", IntegerType(), True),  
 StructField("隶属关系", ArrayType(StructType([StructField('dept', StringType()),  
 StructField('allocation', FloatType())]  
 )  
 ), 真的)  
 ])  
 )) spark = SparkSession.builder.appName('test_udf').getOrCreate()  
 df = spark.createDataFrame(data= data , schema=schema)

这只是一行数据,其中包含存储在 Array of Struct 中的许多员工的详细信息。

问题陈述:我们要添加 经理 内部结构中的字段 隶属关系 并保持 Dataframe 的结构完整,因此不希望通过爆炸来改变粒度。

Dataframe 的当前架构:

 根  
 |-- 值:数组(可为空=真)  
 | |-- 元素:结构(containsNull = true)  
 | | |-- 名字:字符串(可为空 = true)  
 | | |-- 中间名:字符串(可为空=真)  
 | | |-- 姓氏:字符串(可为空 = true)  
 | | |-- id: 字符串(可为空=真)  
 | | |-- 性别:字符串(可为空=真)  
 | | |-- 工资:整数(可为空 = true)  
 | | |-- 隶属关系:数组(可为空 = true)  
 | | | |-- 元素:结构(containsNull = true)  
 | | | | |-- 部门:字符串(可为空=真)  
 | | | | |-- 分配:浮动(可为空=真)

目标模式:在结构数组中添加新字段

 根  
 |-- 值:数组(可为空=真)  
 | |-- 元素:结构(containsNull = true)  
 | | |-- 名字:字符串(可为空 = true)  
 | | |-- 中间名:字符串(可为空=真)  
 | | |-- 姓氏:字符串(可为空 = true)  
 | | |-- id: 字符串(可为空=真)  
 | | |-- 性别:字符串(可为空=真)  
 | | |-- 工资:整数(可为空 = true)  
 | | |-- 隶属关系:数组(可为空 = true)  
 | | | |-- 元素:结构(containsNull = true)  
 | | | | |-- 部门:字符串(可为空=真)  
 | | | | |-- 分配:浮动(可为空=真)  
 | | | | |-- 经理:字符串(可为空=真)

为了嵌入新字段,我们将编写一个 UDF,它将未分解的字段作为参数并返回一个新字段 经理 嵌入到我们想要的级别,即隶属关系数组。

定义 return_schema 在数组中有新字段 隶属关系:

 return_schema = (ArrayType(StructType([  
 StructField("名字", StringType(), True),  
 StructField("中间名", StringType(), True),  
 StructField("姓氏", StringType(), True),  
 StructField("id", StringType(), True),  
 StructField("性别", StringType(), True),  
 StructField("salary", IntegerType(), True),  
 StructField("隶属关系", ArrayType(StructType([StructField('dept', StringType()),  
 StructField('分配', FloatType()),  
 StructField('manager', StringType())]  
 )  
 ), 真的)  
 ])  
 ))

下一步是编写带有对象的 UDF 图式 并返回一个新对象 return_schema。

 @udf(returnType=return_schema)  
 def add_manager(p):  
 行 = []  
 对于 p 中的 ele:  
 inner_rows = []  
 对于 ele.affiliations 中的 aff_ele:  
 inner_rows.append(Row(dept=aff_ele.dept, clubs=aff_ele.allocation,manager='Mr. X')) # 例如:可以通过 API 调用或映射 DS 来拉取新字段  
 行.追加(  
 行(名字=ele.firstname,中间名=ele.middlename,姓氏=ele.lastname,id=ele.id,性别=ele.gender,  
 薪水=ele.salary,隶属关系=inner_rows))  
 返回行

最后一件事是在 select 语句中调用 udf:

 df.select(add_manager(col("value"))).show(truncate=False)

Dataframe 中的每条记录都作为 Row 对象传递到 UDF。我们要修改数组的模式 隶属关系不可变 Row[] 的行为不允许修改。因此,我们为每条记录创建一个新的 Row 对象,并返回一个嵌套的 Row 对象,该对象与 return_schema .

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明

本文链接:https://www.qanswer.top/1346/52362817

标签:StringType,PySpark,--,StructField,dept,UDF,可为,数组,True
来源: https://www.cnblogs.com/amboke/p/16633259.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有