ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

【Hadoop】:手动实现WordCount案例

2021-01-15 13:33:46  阅读:172  来源: 互联网

标签:key 手动 WordCount Hadoop value context org apache import


一.实现案例

实现WorldCount的流程如下:

备注:其中输入的数据是一个txt文件,里面有各种单词,每一行中用空格进行空行

 

一.Mapper的编写

我们在IDEA是使用“ctrl+alt+鼠标左键点击”的方式来查看源码,我们首先查看mapper 类的源码,同时源码我已经使用了,如下所示:

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.hadoop.mapreduce;

import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;

@Public
@Stable
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public Mapper() {
    }

//在任务开始之前,setup必然被调用一次 protected void setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { }
//在input split的时候,对每一个key/value的pair都call once.大多数程序都会overide这个方法 protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { context.write(key, value); } //在at the end of the task,这个方法被调用一次 protected void cleanup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { } //把整个程序,里面的所有方法串连起来 public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKeyValue()) {//每次仅读取一行数据 this.map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { this.cleanup(context); } }
//上下文,封装了程序当中大量的分析方法 public abstract class Context implements MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { public Context() { } } }

因此我们根据里面的源码,编写wordcount所需要的mapper的代码,如下所示:

//现在我们开始编写wordcount的示例
public class WordcountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
//mapper后面的参数:
    // 1.输入数据的key类型
    // 2.输入数据的value类型
    // 3.输出数据的key类型
    // 4.输出数据的value的类型

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1.首先获取一行
        String line=value.toString();
        //2.将获取后的单词进行分割,按照空格进行分割
        String[] words=line.split(" ");
        //3.循环输出(不是输出到控制台上面,是输出到reducer里进行处理)
       for(String word:words)
       {
           Text k=new Text();//定义我们输出的类型,肯定是Text,和整个类extends的顺序对应
           k.set(word);
           IntWritable v=new IntWritable();
           v.set(1);//将value设置为1
           context.write(k,v);
       }
    }
}

 

二.Reducer的编写

reducer的源码如下,和mapper的源码非常相似,其实也就是对reducer的方法进行了封装,并没有方法体:

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.mapreduce.ReduceContext.ValueIterator;
import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;

@Checkpointable
@Public
@Stable
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public Reducer() {
    }

    protected void setup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        Iterator i$ = values.iterator();

        while(i$.hasNext()) {
            VALUEIN value = i$.next();
            context.write(key, value);
        }

    }

    protected void cleanup(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
    }

    public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKey()) {
                this.reduce(context.getCurrentKey(), context.getValues(), context);
                Iterator<VALUEIN> iter = context.getValues().iterator();
                if (iter instanceof ValueIterator) {
                    ((ValueIterator)iter).resetBackupStore();
                }
            }
        } finally {
            this.cleanup(context);
        }

    }

    public abstract class Context implements ReduceContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        public Context() {
        }
    }
}

代码如下:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import javax.xml.soap.Text;
import java.io.IOException;

public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        super.reduce(key, values, context);
        //在reduce里拿到的是mapper已经map好的数据
        //现在数据的形式是这样的:
        //atguigu(key),1(value)
        //atguigu(key),1(value)

        int sum=0;
        //累计求和
        for(IntWritable value: values)
        {
            sum+=value.get();//将intwrite对象转化为int对象
        }
        IntWritable v=new IntWritable();
        v.set(sum);
        //2.写出 atguigu 2
        context.write(key,v);

        //总结,这个程序看起来并没有起到分开不同单词,并对同一单词的value进行相加的作用啊
        //唯一的功能则是统计仅有一个单词的字符之和,这有啥用......
    }
}

三.Driver程序编写,让mapreduce动起来!

代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class wordcoundDriver {
    //将mapper和reducer进行启动的类
    //driver是完全格式固定的
    public static void main(String[] args) throws Exception {
        Configuration conf=new Configuration();
        //1.获取Job对象
        Job job=Job.getInstance(conf);
        //2.设置jar储存位置
        job.setJarByClass(wordcoundDriver.class);
        //3.关联map和reduce类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //4.设置mapper阶段输出数据的key和value类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //5.设置最终数据输出的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //6.设置输入路径和输出路径
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileInputFormat.setInputPaths(job,new Path(args[1]));
        //7.提交Job
        job.submit();
        job.waitForCompletion(true);
    }
}

这样就可以运行起来了!大家可以尝试在分布式集群上实现wordcount统计这个功能,只需要将这些代码进行打成jar包,这样就可以放到linux操作系统上去运行了!最后运行的时候,路径写的是HDFS上的路径哦!

标签:key,手动,WordCount,Hadoop,value,context,org,apache,import
来源: https://www.cnblogs.com/geeksongs/p/14261619.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有