标签:org hadoop 单词 计数 job MR apache import class
----------------------------------主程序入口---------------------------------- package com.demo01.wordcount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class JobMain extends Configured implements Tool { /** *主程序入口 * @param args */ public static void main(String[] args) throws Exception { //这里执行完成,返回一个程序退出状态码 0成功 //这里设置configguration相当于给父类赋值了 int run = ToolRunner.run(new Configuration(),new JobMain(),args); System.exit(run); } /** * * run方法很重要,用来组装8个类,用Job组装在一起 * @param strings * @return * @throws Exception */ @Override public int run(String[] strings) throws Exception { //1.读取文件解析成value对 //第一个是configuration配置文件,第二个定义job的名字 Job job = Job.getInstance(super.getConf(),"XXX"); //设置程序入口类 job.setJarByClass(JobMain.class); //设置job接收的的数据类型 job.setInputFormatClass(TextInputFormat.class); //设置需要处理的文件 //hdfs集群下执行 // FileInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount")); //本地测试 FileInputFormat.addInputPath(job,new Path("file:///D:\\dsj\\baishi课件\\hadoop\\3、大数据离线第三天\\3、大数据离线第三天\\wordcount\\input")); //2.自定义mapper类 job.setMapperClass(WordCountMapper.class); //设置key2和value2的类 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); /** * 第三到六步: * 分区 相同key的value,放松到一个reduce,key合并,value形成一个集合 * 排序 * 规约 * 分组 */ //7.自定义reduce逻辑 job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //8.输出文件 //路径一定要不存在,存在就报错 // TextOutputFormat.setOutputPath(job,new Path("hdfs://node01/wordcountoutput")); //本地测试 TextOutputFormat.setOutputPath(job,new Path("file:///D:\\dsj\\baishi课件\\hadoop\\3、大数据离线第三天\\3、大数据离线第三天\\wordcount\\output")); //提交任务到集群上 boolean b = job.waitForCompletion(true); return b?0:1; } }
----------------------------------mapper程序----------------------------------
package com.demo01.wordcount; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; //此处泛型hadoop对java基础类型进行了包装,加快网络传输, 4个参数代表 public class WordCountMapper extends Mapper<LongWritable, Text,Text, LongWritable> { //重写map方法:自定义k1 v1转换到k2 v2的方法 /** * * @param key k1 * @param value v1 * @param context 上下文对象,对接我们上面的组件与下面的组件 * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //hive,sqoop,flume,hello String[] split = value.toString().split(","); //遍历k2和v2往下发送 for (String word : split) { Text k2 = new Text(word); LongWritable v2 = new LongWritable(1); context.write(k2,v2); } } }
----------------------------REDUCE程序--------------------------------------
package com.demo01.wordcount; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; //k2,v2,k3,v3 public class WordCountReduce extends Reducer<Text, LongWritable, Text,LongWritable> { /** * * @param key k2 * @param values 一个集合,集合类型是v2的类型 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { int num = 0; for (LongWritable value : values) { //IntWritable这个类没有加方法,通过get()编程编程java类型 num += value.get(); } context.write(key,new LongWritable(num)); } }
标签:org,hadoop,单词,计数,job,MR,apache,import,class 来源: https://blog.csdn.net/RZH_long/article/details/121364527
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。