ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

如何将动态命名的列连接到字典中?

2019-12-11 01:56:31  阅读:216  来源: 互联网

标签:pyspark-sql dataframe pyspark python


给定这些数据帧:

IncomingCount
-------------------------
Venue|Date    | 08 | 10 |
-------------------------
Hotel|20190101| 15 | 03 |
Beach|20190101| 93 | 45 |

OutgoingCount
-------------------------
Venue|Date    | 07 | 10 | 
-------------------------
Beach|20190101| 30 | 5  |
Hotel|20190103| 05 | 15 |

我如何可以合并(完全联接)两个表,从而得到如下结果而不必手动遍历两个表的每一行?

Dictionary:
[
 {"Venue":"Hotel", "Date":"20190101", "08":{ "IncomingCount":15 }, "10":{ "IncomingCount":03 } },
 {"Venue":"Beach", "Date":"20190101", "07":{ "OutgoingCount":30 }, "08":{ "IncomingCount":93 }, "10":{ "IncomingCount":45, "OutgoingCount":15 } },
 {"Venue":"Hotel", "Date":"20190103", "07":{ "OutgoingCount":05 }, "10":{ "OutgoingCount":15 } }
]

条件是:

>“地点”和“日期”列的作用类似于加入条件.
>其他以数字表示的列是动态创建的.
>如果动态列不存在,则将其排除(或以None作为值包含在内).

解决方法:

这很奇怪,但是可以通过使用spark中的create_map函数来完成.

基本上将列分为四组:键(地点,日期),通用键(10),仅传入键(08),仅传出键(07).

然后按组(键除外)创建映射器,仅映射每个组可用的映射器.应用映射,删除旧列,然后将映射的列重命名为旧名称.

最后将所有行转换为dict(来自df的rdd)并收集.

from pyspark.sql import SparkSession
from pyspark.sql.functions import create_map, col, lit

spark = SparkSession.builder.appName('hotels_and_beaches').getOrCreate()

incoming_counts = spark.createDataFrame([('Hotel', 20190101, 15, 3), ('Beach', 20190101, 93, 45)], ['Venue', 'Date', '08', '10']).alias('inc')
outgoing_counts = spark.createDataFrame([('Beach', 20190101, 30, 5), ('Hotel', 20190103, 5, 15)], ['Venue', 'Date', '07', '10']).alias('out')

df = incoming_counts.join(outgoing_counts, on=['Venue', 'Date'], how='full')

outgoing_cols = {c for c in outgoing_counts.columns if c not in {'Venue', 'Date'}}
incoming_cols = {c for c in incoming_counts.columns if c not in {'Venue', 'Date'}}

common_cols = outgoing_cols.intersection(incoming_cols)

outgoing_cols = outgoing_cols.difference(common_cols)
incoming_cols = incoming_cols.difference(common_cols)

for c in common_cols:
    df = df.withColumn(
        c + '_new', create_map(
            lit('IncomingCount'), col('inc.{}'.format(c)),
            lit('OutgoingCount'), col('out.{}'.format(c)),
        )
    ).drop(c).withColumnRenamed(c + '_new', c)

for c in incoming_cols:
    df = df.withColumn(
        c + '_new', create_map(
            lit('IncomingCount'), col('inc.{}'.format(c)),
        )
    ).drop(c).withColumnRenamed(c + '_new', c)

for c in outgoing_cols:
    df = df.withColumn(
        c + '_new', create_map(
            lit('OutgoingCount'), col('out.{}'.format(c)),
        )
    ).drop(c).withColumnRenamed(c + '_new', c)

result = df.coalesce(1).rdd.map(lambda r: r.asDict()).collect()
print(result)

结果:

[{'Venue': 'Hotel', 'Date': 20190101, '10': {'OutgoingCount': None, 'IncomingCount': 3}, '08': {'IncomingCount': 15}, '07': {'OutgoingCount': None}}, {'Venue': 'Hotel', 'Date': 20190103, '10': {'OutgoingCount': 15, 'IncomingCount': None}, '08': {'IncomingCount': None}, '07': {'OutgoingCount': 5}}, {'Venue': 'Beach', 'Date': 20190101, '10': {'OutgoingCount': 5, 'IncomingCount': 45}, '08': {'IncomingCount': 93}, '07': {'OutgoingCount': 30}}]

标签:pyspark-sql,dataframe,pyspark,python
来源: https://codeday.me/bug/20191211/2105850.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有