ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Spark Python:如何计算RDD中每行之间的Jaccard相似度?

2019-10-11 18:58:39  阅读:375  来源: 互联网

标签:pairwise python apache-spark pyspark for-loop


我有一个约50k的不同行和2列的表.您可以将每一行视为一部电影,而将各列视为该电影的属性-“ ID”:该电影的ID,“ Tags”:该电影的某些内容标签,以每部电影的字符串列表的形式出现.

数据看起来像这样:

movie_1,[“浪漫”,“喜剧”,“英语”];
电影_2,[‘动作’,’功夫’,’中文’]

我的目标是首先根据相应的标签计算每部电影之间的提花相似度,完成后,我将能够为每部电影(例如,我选择movie_1)知道其他5部最相似的电影与此(在这种情况下为movie_1)一起使用.我不仅要让movie_1本身获得前5名的结果,还要让所有电影都获得前5名的结果.

我尝试使用Python解决问题,但是运行时在这里是一个很大的挑战.即使使用在6个内核上运行的多处理程序,总运行时间仍超过20小时.

下面的Python代码:

import pandas as pd
from collections import Counter
import numpy as np
from multiprocessing import Pool
import time

col_names=['movie_id','tag_name']
df=pd.read_csv("movies.csv",names=col_names)
movie_ids=df['movie_id'].tolist()
tag_list=df['tag_name'].tolist()

def jaccard_similarity(string1, string2):
    intersection = set(string1).intersection(set(string2))
    union = set(string1).union(set(string2))
    return len(intersection)/float(len(union))

def jc_results(movie_id):
    result=Counter()
    this_index=movie_ids.index(movie_id)
    for another_id in movie_ids:
        that_index=movie_ids.index(another_id)
        if another_id==movie_id:
            continue
        else:
            tag_1=tag_list[this_index]
            tag_2=tag_list[that_index]
            jaccard = jaccard_similarity(tag_1,tag_2)
            result[(movie_id,another_id)]=jaccard
    return result.most_common(10)


from multiprocessing import Pool
pool=Pool(6)
results={}
for movie_id in movie_ids:
    results[movie_id]=pool.apply_async(jc_results,args=(movie_id,))
pool.close()
pool.join()
for movie_id, res in results.items():
    results[movie_id] = res.get()

然后我想切换到Pyspark,但是我仍然很不熟悉python,并在写了几行之后陷入了困境,实际上我唯一取得的进步就是使用sc.textFile将数据读取到RDD中…阅读了现有的帖子,但是他们都在使用Scala.如果有人可以通过Pyspark帮助或提供任何指导,那将是非常不错的.非常感谢!

解决方法:

您可以尝试类似于this stackoverflow answer的解决方案,尽管由于您的数据已被标记化(字符串列表),所以您无需执行该步骤或ngram步骤.

我还要提到,pyspark中的roximateSimilarityJoin会计算“杰卡德距离”而不是“杰卡德相似度”,但是如果您特别需要,您可以从1中减去以转换回相似度.

您的代码最终看起来类似于:

from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, MinHashLSH
import pyspark.sql.functions as f

db = spark.createDataFrame([
        ('movie_1', ['romantic','comedy','English']),
        ('movie_2', ['action','kongfu','Chinese']),
        ('movie_3', ['romantic', 'action'])
    ], ['movie_id', 'genres'])


model = Pipeline(stages=[
        HashingTF(inputCol="genres", outputCol="vectors"),
        MinHashLSH(inputCol="vectors", outputCol="lsh", numHashTables=10)
    ]).fit(db)

db_hashed = model.transform(db)

db_matches = model.stages[-1].approxSimilarityJoin(db_hashed, db_hashed, 0.9)

#show all matches (including duplicates)
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
                 f.col('datasetB.movie_id').alias('movie_id_B'),
                 f.col('distCol')).show()

#show non-duplicate matches
db_matches.select(f.col('datasetA.movie_id').alias('movie_id_A'),
                 f.col('datasetB.movie_id').alias('movie_id_B'),
                 f.col('distCol')).filter('movie_id_A < movie_id_B').show()

带有相应的输出:

+----------+----------+-------+
|movie_id_A|movie_id_B|distCol|
+----------+----------+-------+
|   movie_3|   movie_3|    0.0|
|   movie_1|   movie_3|   0.75|
|   movie_2|   movie_3|   0.75|
|   movie_1|   movie_1|    0.0|
|   movie_2|   movie_2|    0.0|
|   movie_3|   movie_2|   0.75|
|   movie_3|   movie_1|   0.75|
+----------+----------+-------+

+----------+----------+-------+
|movie_id_A|movie_id_B|distCol|
+----------+----------+-------+
|   movie_1|   movie_3|   0.75|
|   movie_2|   movie_3|   0.75|
+----------+----------+-------+

标签:pairwise,python,apache-spark,pyspark,for-loop
来源: https://codeday.me/bug/20191011/1894743.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有