ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

MR自定义OutputFormat

2021-02-16 17:58:11  阅读:194  来源: 互联网

标签:OutputFormat 自定义 org hadoop job IOException MR apache import


自定义OutputFormat使用场景:

控制文件最终的输出路径和格式。比如:一个MR程序要求根据不同的结果将数据输出到不同的目录中。

需求

过滤输入的log日志,包含百度的网站输出到 baidu.log,不包含baidu的网站输出到 other.log。

代码实现

1.自定义MyOutputFormat类,继承FileOutputFormat类。
其中的泛型是Reduce端数据输出的k-v类型。

package com.aura.hadoop.outputformat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author panghu
 * @description
 * @create 2021-02-16-17:12
 */
public class MyOutputformat extends FileOutputFormat<LongWritable,Text> {
    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        return new MyRecordWriter(job);
    }
}

2.自定义MyRecordWriter继承RecordWriter。作用是把k-v键值对转换为数据。

package com.aura.hadoop.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author panghu
 * @description
 * @create 2021-02-16-17:14
 */
public class MyRecordWriter extends RecordWriter<LongWritable, Text> {

    FSDataOutputStream baidu;
    FSDataOutputStream other;

    /**
     * 开流
     *
     * @param job context上下文对象
     */
    public MyRecordWriter(TaskAttemptContext job) throws IOException {
        Configuration conf = job.getConfiguration();
        // 获取文件输出路径
        String outDir = conf.get(FileOutputFormat.OUTDIR);

        // 设置不同输出路径
        FileSystem fs = FileSystem.get(conf);

        baidu = fs.create(new Path(outDir + "/baidu.log"));
        other = fs.create(new Path(outDir + "/other.log"));

    }

    /**
     * 处理逻辑,不同的日志输出到不同的文件夹
     *
     * @param longWritable
     * @param text
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void write(LongWritable longWritable, Text text) throws IOException, InterruptedException {
        String line = text.toString() + "\n";
        if (line.contains("baidu")) {
            baidu.write(line.getBytes());
        } else {
            other.write(line.getBytes());
        }
    }

    /**
     * 关流
     *
     * @param job
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void close(TaskAttemptContext job) throws IOException, InterruptedException {
        IOUtils.closeStream(other);
        IOUtils.closeStream(baidu);
    }
}

3.在此需求中,Mapper端和Reducer端可以不做任何处理,直接写出即可。创建Driver类。

package com.aura.hadoop.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author panghu
 * @description
 * @create 2021-02-16-17:30
 */
public class OutputDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(OutputDriver.class);

        // 指定outformat类型
        job.setOutputFormatClass(MyOutputformat.class);

        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);

        // 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\data\\hadoopdata\\自定义输出流\\log.txt"));
        FileOutputFormat.setOutputPath(job, new Path("D:\\data\\out\\MyOutputformat_out"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}

标签:OutputFormat,自定义,org,hadoop,job,IOException,MR,apache,import
来源: https://blog.csdn.net/FlatTiger/article/details/113826683

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有