标签:csv String price Mapreduce public job import 排序 id
1、总体思路
(1)首先将要分析的csv文件对象price和id,定义成String类型,因为MapReduce的输入和输出都是k,v键值对的形式。
@Override
protected void reduce(Text key, Iterable<CsvBean> values, Context context) throws IOException, InterruptedException {
for (CsvBean value : values) {
context.write(key,value);
}
}
(2)所以我们这里将price封装成一个对象,将price的对象属性按照csv文件进行设置。
//4 封装到对象
outV.setId(id);
outV.setAge(price);
outK.set(price);
(3)封装了对象后,我们需要对定义输入和输出的类型,这里用的是重写序列化方法以及重写反序列化方法。
重写序列化方法:writeUTF方法
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(price);
}
重写反序列化方法:readUTF方法
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.price = dataInput.readUTF();
}
(4)把结果显示在文件中,重写 toString(),这里用 "," 隔开显示。
@Override
public String toString() {
return "," + id ;
}
(5)最后我们设置csv文件的输入输出路径(这里的路径设置可以改为自己放置csv文件的位置)
//6 设置输入输出路径
FileInputFormat.setInputPaths(job,new Path("D:\\IDEA\\mapreduce\\steam\\input\\steam.csv"));
FileOutputFormat.setOutputPath(job,new Path("D:\\IDEA\\mapreduce\\steam\\output"));
2、代码展示
(1)封装对象CsvBean
package com.gis507.test.CsvSplit;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class CsvBean implements Writable{
private String id;
private String price;
public CsvBean() {
}
public CsvBean(String id, String age) {
this.id = id;
this.price = age;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getAge() {
return price;
}
public void setAge(String age) {
this.price = age;
}
@Override
public String toString() {
return "," + id ;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(price);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.id = dataInput.readUTF();
this.price = dataInput.readUTF();
}
}
(2)Mapper类,把id和price读进来
package com.gis507.test.CsvSplit;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CsvSplitMapper extends Mapper<LongWritable, Text,Text,CsvBean> {
private Text outK = new Text();
private CsvBean outV = new CsvBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1获取一行转为String
String line = value.toString();
//2 按照逗号分割
String[] csvComments = line.split(",");
//3 获取需要的值
String id = csvComments[0];
String price = csvComments[2];
//4 封装到对象
outV.setId(id);
outV.setAge(price);
outK.set(price);
//5 写出
context.write(outK,outV);
}
}
(3)Reducer类
package com.gis507.test.CsvSplit;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CsvSplitReducer extends Reducer<Text,CsvBean, Text,CsvBean> {
@Override
protected void reduce(Text key, Iterable<CsvBean> values, Context context) throws IOException, InterruptedException {
for (CsvBean value : values) {
context.write(key,value);
}
}
}
(4)定义输入输出函数CsvSplitDriver
package com.gis507.test.CsvSplit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class CsvSplitDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
//1 获取job对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2 关联Driver类
job.setJarByClass(CsvSplitDriver.class);
//3 关联Mapper和Reducer类
job.setMapperClass(CsvSplitMapper.class);
job.setReducerClass(CsvSplitReducer.class);
//4 设置Map的输入输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CsvBean.class);
//5 设置最终的输入输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(CsvBean.class);
//6 设置输入输出路径
FileInputFormat.setInputPaths(job,new Path("D:\\IDEA\\mapreduce\\steam\\input\\steam.csv"));
FileOutputFormat.setOutputPath(job,new Path("D:\\IDEA\\mapreduce\\steam\\output"));
//7 提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
3、查看输出结果
4、项目源代码
标签:csv,String,price,Mapreduce,public,job,import,排序,id 来源: https://blog.csdn.net/m0_56282664/article/details/122773109
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。