ICode9

精准搜索请尝试: 精确搜索
首页 > 数据库> 文章详细

python – Pyspark SQL Pandas UDF:返回一个数组

2019-07-10 14:05:03  阅读:523  来源: 互联网

标签:python pandas pyspark databricks pyspark-sql


我正在尝试制作一个带有整数值的两列的pandas UDF,并根据这些值之间的差异返回一个小数组,其长度等于上述差异.

这是我到目前为止的尝试,我一直在尝试使用这种方法来实现这一点,但这里是一般的想法

import pandas as pd

@pandas_udf(ArrayType(DecimalType()), PandasUDFType.SCALAR)
def zero_pad(x, y):
  buffer = []

  for i in range(0, (x - y)):
    buffer.append(0.0)

  return buffer #correction provided by Ali Yessili

这是我如何使用它的一个例子

df = df.withColumn("zero_list", zero_pad(df.x, df.y))

最终结果是df,其中一个名为zero_list的新列是一个ArrayType(DecimalType())列,看起来像[0.0,0.0,0.0,…],其长度为(df.x – df.y)

错误消息是如此笼统,几乎不值得发布,只是“由于阶段失败而中止作业”,它只追溯到我的代码中我执行df.show()的部分,

Py4JJavaError                             Traceback (most recent call last)
<command-103561> in <module>()
---> 33 df.orderBy("z").show(n=1000)

/databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    350         """
    351         if isinstance(truncate, bool) and truncate:
--> 352             print(self._jdf.showString(n, 20, vertical))
    353         else:
    354             print(self._jdf.showString(n, int(truncate), vertical))

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

我希望有人可以指出我正确的方向来制作一个可以返回可变长度数组的pandas udf,或者只是告诉我为什么我的代码或方法是错误的.

我正在使用带有spark 2.3.1的数据驱动程序来完成所有这些工作.

解决方法:

我不明白你为什么从函数中返回一个pandas Series值.它为每个输入返回多行.

>>> import pandas as pd
>>> def zero_pad(x, y):
...     buffer = []
...     for i in range(0, (x - y)):
...             buffer.append(0.0)
...     return pd.Series(buffer)
... 
>>> zero_pad(5,1)
0    0.0
1    0.0
2    0.0
3    0.0
dtype: float64

因此,您无法添加具有多行结果的列.

而另一方面,你不能直接在withColumn语句中使用udf.请参阅下面的我的脚本我认为结果正是您正在寻找的

>>> from pyspark.sql.functions import udf
>>> 
>>> data = sc.parallelize([
...     (2,1),
...     (8,1),
...     (5,2),
...     (6,4)])
>>> columns = ['x','y']
>>> df = spark.createDataFrame(data, columns)
>>> df.show()
+---+---+
|  x|  y|
+---+---+
|  2|  1|
|  8|  1|
|  5|  2|
|  6|  4|
+---+---+

>>> def zero_pad(x, y):
...     buffer = []
...     for i in range(0, (x - y)):
...             buffer.append(0.0)
...     return buffer
... 
>>> my_udf = udf(zero_pad)
>>> df = df.withColumn("zero_list", my_udf(df.x, df.y))
>>> df.show()
+---+---+--------------------+
|  x|  y|           zero_list|
+---+---+--------------------+
|  2|  1|               [0.0]|
|  8|  1|[0.0, 0.0, 0.0, 0...|
|  5|  2|     [0.0, 0.0, 0.0]|
|  6|  4|          [0.0, 0.0]|
+---+---+--------------------+

标签:python,pandas,pyspark,databricks,pyspark-sql
来源: https://codeday.me/bug/20190710/1424957.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有