ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

python实现MapReduce操作(以数据按要求合并、重排为例)

2019-06-11 22:52:59  阅读:359  来源: 互联网

标签:mapper 为例 python py MapReduce sys key line data


现在已有的很多博客demo都是以wordcount为例,众所周知这是一个非常简单的功能,但凡遇到一些高阶一点的操作我都会大脑一片空白,今天正好有相关的需求,就来学习了一下。
http://www.zhangdongshengtech.com/article-detials/236
上面的链接是记录频次的demo,写的非常的好,相信各位看了它就会了解mapreduce核心的写法

目录

Intro:wordcount

说在前面:mapreduce程序的调试可以单独分别运行mapper和reducer,直接在命令行输入你指定好的输入格式,就会打印出输出

mapper.py

输入文件的形式就是

word1
word2
word1
word3

# coding=utf-8
import sys
 
for line in sys.stdin:
	words = line.strip().split('|')
	try:
	    his = data['uP_cat']
	    for vid, fre in his.items():
	        if vid[0] != 'V': continue
	        print(vid)
	 except:
	     continue
reducer.py

这里实现的就是一个简单的计数并把频次写到文件中的操作。
如果你只需要实现计数操作,那么只用修改mapper.py的print的值即可

# coding=utf-8
import sys

count = 0
key = ""
current_key = ""

for line in sys.stdin:
    line =  line.rstrip()
    if not line:
        sys.stderr.write("data is wrong")
        sys.exit(1)
    line = line.rstrip()
    items = line.split("\t")
    current_key = items[3]
    cur_timestamp = items[2]
    if current_key == key:
        if cur_timestamp < timestamp:
            print "%s\t%d" % (key, count)
        count = 0
        key = current_key
    count += 1

if key:
    print "%s\t%d" % (key, count)
run.sh

运行环境配置这一块我可能没有办法讲清楚,因为是别人写好的脚本,我只修改了上面2个代码
在这里修改你的输入路径和输出路径

#!/bin/bash
HADOOP_bin='/your/path/hadoop-2.7.3/bin/hadoop'
INPUT_PATH="input_data"
OUTPUT_PATH="test"

$HADOOP_bin fs -rmr $OUTPUT_PATH


$HADOOP_bin jar /your/path/hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar\
    -D mapred.job.priority="VERY_HIGH"\
    -D mapred.reduce.tasks=200\
    -D mapreduce.job.queuename=root.online.default\
    -D mapred.job.map.capacity=400\
    -D mapred.job.reduce.capacity=100\
    -D mapred.job.name="test"\
    -D mapred.textoutputformat.ignoreseparator="true"\
    -input ${INPUT_PATH} \
    -output ${OUTPUT_PATH} \
    -file ./mapper.py\
    -file ./reducer.py\
    -partitioner "org.apache.hadoop.mapred.lib.HashPartitioner"\
    -mapper "python mapper.py"\
    -reducer "python reducer.py"\
    -inputformat "org.apache.hadoop.mapred.TextInputFormat"\
    -outputformat "org.apache.hadoop.mapred.TextOutputFormat"\

Advance:有条件的合并内容

下面来实现把输入按某一个值进行合并

输入形式:
key1 value1 value2
key2 value1 value2
key1 value3 value4
输出形式:
key1 value1 value2 value3 value4…………
key2 value1 value2 …………

mapper.py
#coding=utf8
import json
import sys
#f = open('part-07198', 'r') #调试用,因为我的mapreduce任务配置在python2下,调试的时候sys.stdin接收不到输入,所以直接读文件
for line in sys.stdin:
    line = line.strip()
    if not line:
        continue
    data = line.split('\t', 2) #只区分key,后面的values不做区分
    if len(data) <= 1:
        continue
    print data

这里如果怕数据有问题可以写在try except里,如果还需要对每一行的数据做什么处理都放在mapper里处理,当把数据预处理成可以根据某一项进行合并时就print输出,丢给reducer

reducer.py
import json
import sys
from operator import itemgetter
from itertools import groupby

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 2)

def main(separator='\t'):
    #f = open('part-07198', 'r') #调试用
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)
    for name, group in groupby(data, itemgetter(0)):
        val = []
        for values in group:
            for v in values[1]:
                val.append(v)
        print "%s\t%s"% (values[0], json.dumps(v))

if __name__ == "__main__":
    main()

这里有两个函数非常重要,搞懂了它们你就能搞懂如何写reducer,再复杂的功能你都能变着花实现
groupby
https://blog.csdn.net/Together_CZ/article/details/73042997

key,  group= groupby(iterator, key=func())

将key函数作用于原循环器的各个元素。根据key函数结果,将拥有相同函数结果的元素分到一个新的循环器。每个新的循环器以函数返回结果为标签。
这就好像一群人的身高作为循环器。我们可以使 用这样一个key函数: 如果身高大于180,返回"tall";如果身高底于160,返回"short";中间的返回"middle"。最终,所有身高将分为三个循环器, 即"tall", “short”, “middle”。

这个函数的意思就是说把原来的迭代器中的值按照某一个key聚合
再来复习一下mapreduce的原理:
hadoop框架会自动的将相同的key分配到同一个reducer上,这个key,默认的就是上一个mapper输出数据的以\t,或者\001分割后的第一部分
看到这里大概应该就明白了,只要我们把所需要合并的key在mapper中变换到第一位输出,这样就能用groupby直接进行聚合
那么,groupby的输出是什么呢?
groupby的输出有两部分,一部分是key,另一部分就是同一key下的所有data,这里的data同样含有key这个字段
看一下https://blog.csdn.net/LY_ysys629/article/details/72553273这个实例应该就能对groupby的输出有直观感受了。
itemgetter
我觉得这篇博客的理解写的非常好https://blog.csdn.net/qq_22022063/article/details/79019294

作用:itemgetter 用于获取对象的哪些位置的数据,参数即为代表位置的序号值,

也就是说itemgetter的参数等于i,就取data[i]的数据,并且它是一个函数,因此可以直接用作groupby的key传参。

中文字符的处理

我已经被这个坑了两次了,记录一下
(1)文件开头一定要记得#coding=utf8
(2)如果保存的文件是’\u’开头的,解析的时候用json.loads能直接解析出中文
(3)如果保存的文件是’\x’开头的。。。可能在解析的时候要decode(‘utf-8’)吧。。。

标签:mapper,为例,python,py,MapReduce,sys,key,line,data
来源: https://blog.csdn.net/weixin_41864878/article/details/91473360

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有