标签:int class hadoop Hadoop 学堂 MapReduce org import public
案例描述
找出每个月气温最高的2天
数据集
1949-10-01 14:21:02 34c 1949-10-01 19:21:02 38c 1949-10-02 14:01:02 36c 1950-01-01 11:21:02 32c 1950-10-01 12:21:02 37c 1951-12-01 12:21:02 23c 1950-10-02 12:21:02 41c 1950-10-03 12:21:02 27c 1951-07-01 12:21:02 45c 1951-07-02 12:21:02 46c 1951-07-03 12:21:03 47c
代码
MyTQ.class
package com.hadoop.mr.tq; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 客户端 * @author Lindsey * */ public class MyTQ { public static void main(String args []) throws Exception{ //加载配置文件 Configuration conf = new Configuration(true); //创建客户端 Job job = Job.getInstance(conf); job.setJarByClass(MyTQ.class); //Map配置 job.setMapperClass(TMapper.class); job.setMapOutputKeyClass(Tq.class); job.setMapOutputValueClass(IntWritable.class); //分区类:处理大数据量均衡并发处理 job.setPartitionerClass(TPartitioner.class); //比较类:用buffer字节数组内的key排序 job.setSortComparatorClass(TSortComparator.class); //Reduce配置 job.setNumReduceTasks(2); job.setReducerClass(TReducer.class); //分组比较类:年月相同为一组 job.setGroupingComparatorClass(TGroupingComparator.class); //输入输出源 Path input = new Path("/user/hadoop/input/weather.txt"); FileInputFormat.addInputPath(job, input); Path output = new Path("/user/hadoop/output/weather"); if(output.getFileSystem(conf).exists(output)){ output.getFileSystem(conf).delete(output,true); } FileOutputFormat.setOutputPath(job, output); //提交 job.waitForCompletion(true); } }
TMapper.class
package com.hadoop.mr.tq; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; public class TMapper extends Mapper<LongWritable, Text, Tq,IntWritable>{ /* * k-v 映射 * K(Tq) V(IntWritable) * 1949-10-01 14:21:02 34c * */ Tq mkey = new Tq(); IntWritable mval =new IntWritable(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { try { //字符串分割 String [] strs = StringUtils.split(value.toString(),'\t'); //设置时间格式 注意月份是大写! SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); //解析为Date格式 Date date = sdf.parse(strs[0]); //日历上设置时间 Calendar cal = Calendar.getInstance(); cal.setTime(date); //Key mkey.setYear(cal.get(Calendar.YEAR)); mkey.setMonth(cal.get(Calendar.MONTH)+1); mkey.setDay(cal.get(Calendar.DAY_OF_MONTH)); int temperture = Integer.parseInt(strs[1].substring(0,strs[1].length()-1)); mkey.setTemperature(temperture); //value mval.set(temperture); //输出 context.write(mkey, mval); } catch (ParseException e) { e.printStackTrace(); } } }
Tq.class
package com.hadoop.mr.tq; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Tq implements WritableComparable<Tq>{ private int year; private int month; private int day; private int temperature; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public int getDay() { return day; } public void setDay(int day) { this.day = day; } public int getTemperature() { return temperature; } public void setTemperature(int temperature) { this.temperature = temperature; } @Override public void readFields(DataInput in) throws IOException { this.year=in.readInt(); this.month=in.readInt(); this.day=in.readInt(); this.temperature=in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(month); out.writeInt(day); out.writeInt(temperature); } @Override public int compareTo(Tq that) { //约定:日期正序 int y = Integer.compare(this.year,that.getYear()); if(y == 0){ //年份相同 int m = Integer.compare(this.month,that.getMonth()); if(m == 0){ //月份相同 return Integer.compare(this.day,that.getDay()); } return m; } return y; } }
TPartitioner.class
package com.hadoop.mr.tq; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * 分区规则设计 使数据分区均衡避免倾斜 * @author Lindsey * */ public class TPartitioner extends Partitioner<Tq,IntWritable>{ @Override public int getPartition(Tq key, IntWritable value, int numPartitions) { return key.getYear() % numPartitions; } }
TSortComparator.class
package com.hadoop.mr.tq; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TSortComparator extends WritableComparator{ //对字节数据中map排序 需要先将Key反序列化为对象再比较 public TSortComparator(){ super(Tq.class,true); //true是将Tq实例化 } /* 时间正序 、温度倒序 */ @Override public int compare(WritableComparable a, WritableComparable b) { Tq t1 = (Tq) a; Tq t2 = (Tq) b; int y = Integer.compare(t1.getYear(),t2.getYear()); if(y == 0){ int m = Integer.compare(t1.getMonth(),t2.getMonth()); if(m == 0){ //加上负号实现倒序 return -Integer.compare(t1.getTemperature(),t2.getTemperature()); } return m; } return y; } }
TReducer.class
package com.hadoop.mr.tq; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.shaded.org.glassfish.grizzly.compression.lzma.impl.lz.InWindow; public class TReducer extends Reducer<Tq, IntWritable, Text,IntWritable>{ Text rkey = new Text(); IntWritable rval = new IntWritable(); /* * 相同的Key为一组:Tq */ @Override protected void reduce(Tq key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int flg = 0; //标志,表示是否已经取了当天的天气 int day = 0; for(IntWritable v:values){ if(flg == 0){ day = key.getDay(); //设置文本内容 yyyy-mm-dd:temperture rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()); rval.set(key.getTemperature()); flg++; context.write(rkey, rval); } if(flg!=0 && day!=key.getDay()){ rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()); rval.set(key.getTemperature()); context.write(rkey, rval); break; } } } }
TGroupingComparator.class
package com.hadoop.mr.tq; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TGroupingComparator extends WritableComparator{ public TGroupingComparator() { super(Tq.class,true); } /* * 面向Reduce * 年月相同为一组 返回0表示为同一组 */ @Override public int compare(WritableComparable a, WritableComparable b) { Tq t1 = (Tq) a; Tq t2 = (Tq) b; int y = Integer.compare(t1.getYear(),t2.getYear()); if(y == 0){ return Integer.compare(t1.getMonth(),t2.getMonth()); } return y; } }
运行结果
part-r-00000
part-r-00001
标签:int,class,hadoop,Hadoop,学堂,MapReduce,org,import,public 来源: https://www.cnblogs.com/HiLindsey/p/10644475.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。