ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

MapReduce原理深入理解(二)

2021-09-23 20:35:42  阅读:143  来源: 互联网

标签:String import MapReduce hadoop job 深入 org apache 原理


1.Mapreduce操作不需要reduce阶段

 

 1 import org.apache.hadoop.conf.Configuration;
 2 import org.apache.hadoop.fs.FileSystem;
 3 import org.apache.hadoop.fs.Path;
 4 import org.apache.hadoop.io.LongWritable;
 5 import org.apache.hadoop.io.NullWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11 
12 import java.io.IOException;
13 
14 public class WordCount03 {
15     public static class MyMapper extends Mapper<LongWritable, Text,Text, NullWritable>{
16         @Override
17         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
18             String line = value.toString();
19             String s = line.split(",")[3];
20             if(s.equals("男")){
21                 context.write(new Text(s),NullWritable.get());
22             }
23         }
24     }
25     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
26         Job job= Job.getInstance();
27         job.setNumReduceTasks(0);
28         /**
29          * 有些情况下,不需要reduce(聚合程序),
30          * 在不需要聚合操作的时候,可以不需要reduce
31          * 而reduce默认为1,需要手动设置为0,
32          * 如果没有设置为0,会产生默认的reduce,只不过reduce不处理任何数据
33          */
34         job.setJobName("mr03程序");
35         job.setJarByClass(WordCount03.class);
36         job.setMapOutputKeyClass(Text.class);
37         job.setMapOutputValueClass(NullWritable.class);
38         Path in = new Path("/word");
39         FileInputFormat.addInputPath(job,in);
40         Path out = new Path("/output");
41         FileSystem fs = FileSystem.get(new Configuration());
42         if(fs.exists(out)){
43             fs.delete(out);
44         }
45         FileOutputFormat.setOutputPath(job,out);
46         job.waitForCompletion(true);
47     }
48 }

 

注意:

有些情况下,不需要reduce(聚合程序),
在不需要聚合操作的时候,可以不需要reduce
而reduce默认为1,需要手动设置为0,
如果没有设置为0,会产生默认的reduce,只不过reduce不处理任何数据

2.MapReduce中join操作(数据拼接)
  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.fs.FileSystem;
  3 import org.apache.hadoop.fs.Path;
  4 import org.apache.hadoop.io.LongWritable;
  5 import org.apache.hadoop.io.NullWritable;
  6 import org.apache.hadoop.io.Text;
  7 import org.apache.hadoop.mapreduce.InputSplit;
  8 import org.apache.hadoop.mapreduce.Job;
  9 import org.apache.hadoop.mapreduce.Mapper;
 10 import org.apache.hadoop.mapreduce.Reducer;
 11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 12 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 13 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 14 
 15 import java.io.IOException;
 16 import java.util.ArrayList;
 17 
 18 public class WordCount04 {
 19     public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{
 20         @Override
 21         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 22             //1.获取数据的路径 InputSplit
 23             //context 上面是hdfs 下面如果有reduce就是reduce 没有就是hdfs
 24             InputSplit inputSplit = context.getInputSplit();
 25             FileSplit fs=(FileSplit)inputSplit;
 26             String url = fs.getPath().toString();
 27             //2.判断
 28             if(url.contains("students")){//true当前数据为students.txt
 29                 String id = value.toString().split(",")[0];
 30                 //为了方便reduce数据的操作 针对于不同的数据 打一个标签
 31                 String line = "*" + value.toString();
 32                 context.write(new Text(id),new Text(line));
 33             }else {//false 当前数据为score.txt
 34                 //以学号作为k 也是两张数据的关联条件
 35                 String id = value.toString().split(",")[0];
 36                 //为了方便reduce数据的操作 针对于不同的数据 打一个标签
 37                 String line = "#" + value.toString();
 38                 context.write(new Text(id),new Text(line));
 39             }
 40         }
 41     }
 42     public static class JoinReduce extends Reducer<Text,Text,Text,NullWritable>{
 43         @Override
 44         protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 45             //数据在循环之外保存
 46             String stuInfo="";
 47             ArrayList<String> scores = new ArrayList<String>();
 48             //提取数据
 49             for (Text value : values) {
 50                 //获取一行一行的数据(所有数据包含students.txt和score.txt)
 51                 String line = value.toString();
 52                 if(line.startsWith("*")){//true 为学生数据
 53                     stuInfo= line.substring(1);
 54                 }else {//false  为学生成绩数据
 55                     scores.add(line.substring(1));
 56                 }
 57             }
 58             /**
 59              * 求的是 两张表的拼接
 60              */
 61             //数据拼接
 62             for (String score : scores) {
 63                 String subject = score.split(",")[1];
 64                 String s = score.split(",")[2];
 65                 String end=stuInfo+","+subject+","+s;
 66                 context.write(new Text(end),NullWritable.get());
 67             }
 68             /**
 69              * 求的是 两张表的拼接 拼接过程中对成绩求和
 70              */
 71 //            long sum=0l;
 72 //            for (String s : scores) {
 73 //                Integer sc =Integer.valueOf( s.split(",")[2]);
 74 //                sum+=sc;
 75 //            }
 76 //            String end=stuInfo+","+sum;
 77 //            context.write(new Text(end),NullWritable.get());
 78         }
 79     }
 80     public static void main(String[] args) throws Exception {
 81         Job job = Job.getInstance();
 82         job.setJobName("Join MapReduce");
 83         job.setJarByClass(WordCount04.class);
 84 
 85         job.setMapperClass(JoinMapper.class);
 86         job.setMapOutputKeyClass(Text.class);
 87         job.setMapOutputValueClass(Text.class);
 88 
 89         job.setReducerClass(JoinReduce.class);
 90         job.setOutputKeyClass(Text.class);
 91         job.setOutputValueClass(NullWritable.class);
 92         //指定路径
 93         FileInputFormat.addInputPath(job,new Path("/word"));
 94         Path path = new Path("/output");
 95         FileSystem fs = FileSystem.get(new Configuration());
 96         if(fs.exists(path)){
 97             fs.delete(path);
 98         }
 99         FileOutputFormat.setOutputPath(job,new Path("/output"));
100         job.waitForCompletion(true);
101         System.out.println("join 正在执行");
102     }
103 }

 

 

 

标签:String,import,MapReduce,hadoop,job,深入,org,apache,原理
来源: https://www.cnblogs.com/lmandcc/p/15327259.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有