ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

MapReduce WordCount Combiner程序

2021-06-04 18:04:52  阅读:216  来源: 互联网

标签:Combiner -- WordCount MapReduce hadoop key import apache org


MapReduce WordCount Combiner程序

MapReduce WordCount Combiner程序

注意使用Combiner之后的累加情况是不同的;

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.stono</groupId>
    <artifactId>mr01</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>mr01</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.7</java.version>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <maven.build.timestamp.format>yyyy-MM-dd HH:mm:ss</maven.build.timestamp.format>

        <hadoop-mapreduce-client.version>2.7.2</hadoop-mapreduce-client.version>
        <hbase-client.version>1.1.2</hbase-client.version>
        <slf4j.version>1.7.25</slf4j.version>
        <kafka-client.version>0.10.2.1</kafka-client.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>jdk.tools</groupId>
            <artifactId>jdk.tools</artifactId>
            <version>1.8</version>
            <scope>system</scope>
            <systemPath>D:/Java/jdk1.8.0_161/lib/tools.jar</systemPath>
        </dependency>
        <!-- 日志记录 Slf4j -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <!-- mapreduce -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop-mapreduce-client.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop-mapreduce-client.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>false</addClasspath>
                            <mainClass>com.bsr.combiner.JobRunner</mainClass> <!-- 你的主类名 -->
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <!--<plugin>-->
            <!--<artifactId> maven-assembly-plugin </artifactId>-->
            <!--<configuration>-->
            <!--<descriptorRefs>-->
            <!--<descriptorRef>jar-with-dependencies</descriptorRef>-->
            <!--</descriptorRefs>-->
            <!--<archive>-->
            <!--<manifest>-->
            <!--<mainClass>com.bsr.basis.JobRunner</mainClass>-->
            <!--</manifest>-->
            <!--</archive>-->
            <!--</configuration>-->
            <!--<executions>-->
            <!--<execution>-->
            <!--<id>make-assembly</id>-->
            <!--<phase>package</phase>-->
            <!--<goals>-->
            <!--<goal>single</goal>-->
            <!--</goals>-->
            <!--</execution>-->
            <!--</executions>-->
            <!--</plugin>-->
        </plugins>
    </build>

</project>

Mapper:

package com.bsr.combiner;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
四个参数的含义
第一个参数:map中key-value的key的类型,默认值是输入行的偏移量
第二个参数:map中key-value的value的类型 在此需求中是某一行的内容(数据)
第三个参数:reduce中key-value中的key类型
第四个参数:redece的输出参数int
但是在mapreduce中涉及到了网络间的传输,所以需要序列化,而hadoop提供了相关的序列化类型
long-LongWritable
String-Text
int-IntWritable
 */


public class MapperWordCount extends Mapper<LongWritable, Text, Text, IntWritable>{
    
    /*重写mapper的map方法 编写自己的逻辑
     * key是偏移量不用管
     * value是一行的内容 例:hello zhangsan you you 
     * context是返回结果
     */
    @Override
    protected void map(LongWritable key, Text value,
            Context context)
            throws IOException, InterruptedException {
        
        String[] values=value.toString().split(" ");//对得到的一行数据进行切分 在此需求中是以空格进行切分
        
        for(String word:values){
            
            context.write(new Text(word), new IntWritable(1));//遍历数组 输出map的返回值 即<hello,1><zhangsan,1><you,1><you,1>
            
        }
        
    }
    

}

 

Combiner:

package com.bsr.combiner;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class Combiner extends Reducer<Text, IntWritable,Text, IntWritable>{
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,
                    Context context)
                    throws IOException, InterruptedException {
                int count=0;//初始一个计数器
                
                for(IntWritable value:values){
count
++;//对values进行遍历 每次加1 } context.write(key,new IntWritable(count));//最后写返回值<hello,5> } }

 

reduce:

package com.bsr.combiner;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/*
 * 此方法是WordCount的reduce
 * 参数:1.map阶段返回的key类型String-Text
 *         2.map阶段返回值中value的类型Int-IntWritable
 *         3.reduce阶段返回值中key的类型String-Text
 *         4.reduce阶段返回值中value的类型Int-IntWritable
 */

public class ReducerWordCount extends Reducer<Text, IntWritable,Text, IntWritable>{
    
    
    /*
     * 实现父类的reduce方法
     *key是一组key-value的相同的哪个key
     *values是一组key-value的所有value
     *key value 的情况,比如<hello,{1,1,1,1,1}>
     * 
     * context 返回值,<hello,5>
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context)throws IOException, InterruptedException {
        
            int count=0;//初始一个计数器
        
            for(IntWritable value:values){
                count = count + i.get();//对values进行遍历 需要累加
            }
            context.write(key,new IntWritable(count));//最后写返回值<hello,5>
            
            
        
    }
    
    
}

 

Job:

package com.bsr.combiner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;



public class JobRunner {
    
    /*
     * 提交写好的mapreduce程序 当做一个Job进行提交
     * 
     */
    
    public static void main(String[] args) throws Exception {
        //读取classpath下的所有xxx-site.xml配置文件,并进行解析
        Configuration conf=new Configuration();
        FileSystem fs = FileSystem.get(configuration);
        String s = "/wc/output2";
        Path path = new Path(s);
        fs.delete(path, true)

        Job wcjob=Job.getInstance(conf);//初始一个job
        
        //通过主类的类加载器机制获取到本job的所有代码所在的jar包
        wcjob.setJarByClass(JobRunner.class);
        
        //指定本job使用的mapper类
        wcjob.setMapperClass(MapperWordCount.class);
        
        //指定本job使用的reducer类
        wcjob.setReducerClass(ReducerWordCount.class);
        
        //设置本job使用的从combiner类
        wcjob.setCombinerClass(Combiner.class);
        
        //指定mapper输出的kv的数据类型
        wcjob.setMapOutputKeyClass(Text.class);
        wcjob.setMapOutputValueClass(IntWritable.class);
        
        //指定reducer输出的kv数据类型
        wcjob.setOutputKeyClass(Text.class);
        wcjob.setOutputValueClass(IntWritable.class);
        
        //指定本job要处理的文件所在的路径
        FileInputFormat.setInputPaths(wcjob, new Path("/wc/data/"));
        
        //指定本job输出的结果文件放在哪个路径
        FileOutputFormat.setOutputPath(wcjob, new Path("/wc/output2/"));
        
        //将本job向hadoop集群提交执行
        boolean res=wcjob.waitForCompletion(true);
        
        System.exit(res?0:1);//执行成功的话正常退出系统执行有误则终止执行
    }

}

 

注意:https://www.cnblogs.com/esingchan/p/3917094.html 的讲解

 

标签:Combiner,--,WordCount,MapReduce,hadoop,key,import,apache,org
来源: https://blog.51cto.com/u_15241951/2862988

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有