标签:天好 21 hadoop 第一期 job org apache import class
MapReduce中级编程实践
一、实验目的
通过实验掌握基本的MapReduce编程方法;
掌握用MapReduce解决一些常见的数据处理问题,包括数据去重计数、数据排序。
二、实验平台
操作系统:Linux
Hadoop版本:2.6.0
三、实验步骤
(一)对访问同一个网站的用户去重计数。
注:文件userurl_20150911中,数据以”\t”隔开,用户手机号为第三列,网站主域为第17列
在map中将文件按列分割,取需要的两列。在reduce中计数
package mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
public class Count{
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
if (str != null && !str.equals("")) {
String[] fa = str.split("\t");
String per = fa[2], web = fa[16];
context.write(new Text(per+'\t'+web),new IntWritable(1));
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public static HashMap<String, Integer> mp;
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum=0;
for(IntWritable t:values)
sum++;
context.write(key,new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(Count.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, new Path("hdfs://hadoop1:8020/dir1/userurl_20150911"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop1:8020/dir1/output_4"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
(二)对同一个用户不同记录产生的上下行流量求和后进行排序输出。
注:上行流量位于第25列,下行流量位于第26列
在map中将同一个人的第25、26列相加,在reduce中统计
```java
package mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.WritableComparable;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
public class Liuliang {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String str = value.toString();
if (str != null && !str.equals("")) {
String[] fa = str.split("\t");
String per = fa[2], web = fa[16], up = fa[24], down = fa[25];
int use = Integer.parseInt(up) + Integer.parseInt(down);
context.write(new Text(per + '\t' + web), new IntWritable(use));
}
}
}
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
public static HashMap<String, Integer> mp;
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable t : values) {
sum = t.get();
context.write(key, new IntWritable(sum));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf);
job.setJarByClass(Liuliang.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, new Path("hdfs://hadoop1:8020/dir1/userurl_20150911"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop1:8020/dir1/output_5"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
四、实验总结及问题
1、学会使用什么做什么事情;
学会使用mapreduce对文件进行按列分割再统计
2、在实验过程中遇到了什么问题?是如何解决的?
3、还有什么问题尚未解决?可能是什么原因导致的。
标签:天好,21,hadoop,第一期,job,org,apache,import,class 来源: https://blog.csdn.net/qq_45799727/article/details/121067793
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。