ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Hadoop学习之路(5)Mapreduce程序完成wordcount

2019-12-27 17:56:43  阅读:295  来源: 互联网

标签:Text wordcount Hadoop Mapreduce job hadoop Spark River class


程序使用的测试文本数据

Dear River
Dear River Bear Spark 
Car Dear Car Bear Car
Dear Car River Car 
Spark Spark Dear Spark 

1编写主要类

(1)Maper类

首先是自定义的Maper类代码

public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        //fields:代表着文本一行的的数据: dear bear river
        String[] words = value.toString().split("\t");
        for (String word : words) {
            // 每个单词出现1次,作为中间结果输出
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

     这个Map类是一个泛型类型,它有四个形参类型,分别指定map()函数的输入键、输入值、输出键和输出值的类型。LongWritable:输入键类型,Text:输入值类型,Text:输出键类型,IntWritable:输出值类型.
     String[] words = value.toString().split("\t");,words 的值为Dear River Bear River
     输入键key是一个长整数偏移量,用来寻找第一行的数据和下一行的数据,输入值是一行文本Dear River Bear River,输出键是单词Bear ,输出值是整数1
     Hadoop本身提供了一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型。这些类型都在org.apache.hadoop.io包中。这里使用LongWritable类型(相当于Java的Long类型)、Text类型(相当于Java中的String类型)和IntWritable类型(相当于Java的Integer类型)。
     map()方法的参数是输入键和输入值。以本程序为例,输入键LongWritable key是一个偏移量,输入值Text valueDear Car Bear Car ,我们首先将包含有一行输入的Text值转换成Java的String类型,之后使用substring()方法提取我们感兴趣的列。map()方法还提供了Context实例用于输出内容的写入。

(2)Reducer类

public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    /*
        (River, 1)
        (River, 1)
        (River, 1)
        (Spark , 1)
        (Spark , 1)
        (Spark , 1)
        (Spark , 1)

        key: River
        value: List(1, 1, 1)
        key: Spark
        value: List(1, 1, 1,1)

    */
    public void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }
        context.write(key, new IntWritable(sum));// 输出最终结果
    };
}

Reduce任务最初按照分区号从Map端抓取数据为:
(River, 1)
(River, 1)
(River, 1)
(spark, 1)
(Spark , 1)
(Spark , 1)
(Spark , 1)
经过处理后得到的结果为:
key: hello value: List(1, 1, 1)
key: spark value: List(1, 1, 1,1)
所以reduce()函数的形参 Iterable&lt;IntWritable&gt; values 接收到的值为List(1, 1, 1)List(1, 1, 1,1)

(3)Main函数

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class WordCountMain {
    //若在IDEA中本地执行MR程序,需要将mapred-site.xml中的mapreduce.framework.name值修改成local
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        if (args.length != 2 || args == null) {
            System.out.println("please input Path!");
            System.exit(0);
        }
        //System.setProperty("HADOOP_USER_NAME","hadoop2.7");
        Configuration configuration = new Configuration();
        //configuration.set("mapreduce.job.jar","/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar");
        //调用getInstance方法,生成job实例
        Job job = Job.getInstance(configuration, WordCountMain.class.getSimpleName());
        // 打jar包
        job.setJarByClass(WordCountMain.class);

        // 通过job设置输入/输出格式
        // MR的默认输入格式是TextInputFormat,所以下两行可以注释掉
        // job.setInputFormatClass(TextInputFormat.class);
        // job.setOutputFormatClass(TextOutputFormat.class);
        // 设置输入/输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 设置处理Map/Reduce阶段的类
        job.setMapperClass(WordCountMap.class);
        //map combine减少网路传出量
        job.setCombinerClass(WordCountReduce.class);
        job.setReducerClass(WordCountReduce.class);

        //如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的        输出的kv类型
        //job.setMapOutputKeyClass(.class)
        // job.setMapOutputKeyClass(Text.class);
        // job.setMapOutputValueClass(IntWritable.class);

        // 设置reduce task最终输出key/value的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 提交作业
        job.waitForCompletion(true);

    }
}

2本地运行

首先更改mapred-site.xml文件配置
将mapreduce.framework.name的值设置为local
在这里插入图片描述
然后本地运行:
在这里插入图片描述
查看结果:
在这里插入图片描述

3集群运行

方式一:

首先打包
在这里插入图片描述
更改配置文件,改成yarn模式
在这里插入图片描述
添加本地jar包位置:

 Configuration configuration = new Configuration();
 configuration.set("mapreduce.job.jar","C:\\Users\\tanglei1\\IdeaProjects\\Hadooptang\\target");

在这里插入图片描述
设置允许跨平台远程调用:

configuration.set("mapreduce.app-submission.cross-platform","true");

在这里插入图片描述
修改输入参数:
在这里插入图片描述
运行结果:
在这里插入图片描述

方式二:

将maven项目打包,在服务器端用命令运行mr程序

hadoop jar com.kaikeba.hadoop-1.0-SNAPSHOT.jar
com.kaikeba.hadoop.wordcount.WordCountMain /tttt.txt  /wordcount11

标签:Text,wordcount,Hadoop,Mapreduce,job,hadoop,Spark,River,class
来源: https://blog.51cto.com/10312890/2462281

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有