ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Hadoop 性能优化

2022-06-04 09:03:32  阅读:132  来源: 互联网

标签:文件 conf Text 性能 Hadoop SequenceFile key new 优化


Hadoop 性能优化

小文件问题

HDFS和MapReduce是针对大文件设计的,在小文件处理上效率低下,且十分消耗内存资源。每个小文件都会占用一个block、产生一个InputSplit、产生一个Map任务,这样map任务的启动时间很长,执行任务的时间很短。解决方法是使用容器将小文件组织起来,HDFS提供了两种容器:SequenceFile MapFile

  1. SequenceFile
    SequeceFile是Hadoop提供的一种二进制文件,这种二进制 文件直接将<key, value>对序列化到文件中
    一般对小文件可以使用这种文件合并,即将文件名作为key, 文件内容作为value序列化到大文件中
    注意:SequeceFile需要一个合并文件的过程,文件较大,且合并后的文件将不方便查看,必须通过遍历查看每一个小文件
    SequenceFile
    package org.example.mapreduce;
    
    import org.apache.commons.io.FileUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Text;
    
    import java.io.File;
    import java.nio.charset.StandardCharsets;
    
    public class SmallFileSeq {
    
        public static void main(String[] args) throws Exception {
            write("D:\\samllFile", "/SequenceFile");
        }
    
        /**
         * 生成 SequenceFile 文件
         * @param inputDir 本地存放小文件的目录
         * @param outPutFile 输出压缩文件的 hdfs 目录
         * @throws Exception
         */
        private static void write(String inputDir, String outPutFile) throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://ip:9000");
    
    
            // 如果输出文件存在则删除
            FileSystem fileSystem = FileSystem.get(conf);
            fileSystem.delete(new Path(outPutFile), true);
    
    
            // 构造 option 数组,有三个元素
            // 1. 输出路径
            // 2. key 类型
            // 3. value 类型
            SequenceFile.Writer.Option[] option = new SequenceFile.Writer.Option[]{
                    SequenceFile.Writer.file(new Path(outPutFile)),
                    SequenceFile.Writer.keyClass(Text.class),
                    SequenceFile.Writer.valueClass(Text.class)
            };
    
            // 创建一个 Write 实例
            SequenceFile.Writer writer = SequenceFile.createWriter(conf, option);
    
            // 指定要压缩的文件的目录
            File inputDirPath = new File(inputDir);
            if (inputDirPath.isDirectory()) {
                for (File file : inputDirPath.listFiles()) {
                    // 获取文件全部,对于小文件直接全部读取到内存
                    String content = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
                    // 获取文件名
                    String fileName = file.getName();
                    writer.append(new Text(fileName), new Text(content));
                }
            }
    
            writer.close();
        }
    
        /**
         * 读取 SequenceFile
         * @param inputFile
         * @throws Exception
         */
        private static void read(String inputFile) throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://ip:9000");
    
            // 创建阅读器
            SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile)));
    
            // 循环读取数据
            Text key = new Text();
            Text value = new Text();
            while (reader.next(key, value)) {
                System.out.println("文件名 : " + key);
                System.out.println("文件内容 : " + value);
            }
    
            reader.close();
        }
    }

    在 mapreduce 任务中需要指定输入处理类,重新指定 map 函数的key

    操作SequenceFile的MapReduce
    public class WordCountJobSeq {
        public static void main(String[] args) {
            try {
                if (args.length != 2) {
                    System.exit(1);
                }
    
                // 创建一个配置类
                Configuration conf = new Configuration();
                // 创建一个任务
                Job job = Job.getInstance(conf);
                
                
    
                // ... 省略
                
    
                
                // 默认情况下使用Text处理类,当处理 SequenceFile 时需要指定处理类
                job.setInputFormatClass(SequenceFileInputFormat.class);
    
                
    
                // 提交 job
                job.waitForCompletion(true);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        // 泛型的第一个参数和map函数的第一个参数默认应该为LongWriter用于表示每行字节偏移量
        // 读取SequenceFile时应为Text类型,值为压缩包内文件名
        public static class  MyMapper extends Mapper<Text, Text, Text, LongWritable> {
            @Override
            protected void map(Text key, // 默认情况下key为偏移字节数,读取 SequenceFile 时key为Text类型、内容为文件名
                               Text value,
                               Mapper<Text, Text, Text, LongWritable>.Context context
            ) throws IOException, InterruptedException {
                // ...
            }
        }
    
        // reduce 相同
        public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
            @Override
            protected void reduce(
                    Text key,
                    Iterable<LongWritable> values,
                    Reducer<Text, LongWritable, Text, LongWritable>.Context context
            ) throws IOException, InterruptedException {
                 // ...
            }
        }
    }
  2. MapFile
    MapFile是排序后的SequenceFile , MapFile由两部分组成, 分别是index和data
    index作为文件的数据索引,主要记录了每个Record的key值, 以及该Record在文件中的偏移位置
    在MapFile被访问的时候,索引文件会被加载到内存,通过索引 映射关系可迅速定位到指定Record所在文件位置
    优点:检索效率高
    缺点:要消耗一部分内存用于存储index索引
    MapFile
    package org.example.mapreduce;
    
    import org.apache.commons.io.FileUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.MapFile;
    import org.apache.hadoop.io.SequenceFile;
    import org.apache.hadoop.io.Text;
    
    import java.io.File;
    import java.nio.charset.StandardCharsets;
    
    public class SmallFileMap {
    
        public static void main(String[] args) throws Exception {
            write("D:\\samllFile", "/MapFile");
            read("/MapFile");
        }
    
        /**
         * 生成 MapFile 文件
         * @param inputDir 本地存放小文件的目录
         * @param outPutDir 输出压缩文件的 hdfs 目录,其下有两个文件 一个index索引 一个数据文件
         * @throws Exception
         */
        private static void write(String inputDir, String outPutDir) throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://ip:9000");
    
    
            // 如果输出文件存在则删除
            FileSystem fileSystem = FileSystem.get(conf);
            fileSystem.delete(new Path(outPutDir), true);
    
    
            // 构造 option 数组,有两个元素
            // 1. key 类型
            // 2. value 类型
            SequenceFile.Writer.Option[] options = new SequenceFile.Writer.Option[]{
                    MapFile.Writer.keyClass(Text.class),
                    MapFile.Writer.valueClass(Text.class)
            };
    
            // 创建一个 Write 实例
            MapFile.Writer writer = new MapFile.Writer(conf, new Path(outPutDir), options);
    
            // 指定要压缩的文件的目录
            File inputDirPath = new File(inputDir);
            if (inputDirPath.isDirectory()) {
                for (File file : inputDirPath.listFiles()) {
                    // 获取文件全部,对于小文件直接全部读取到内存
                    String content = FileUtils.readFileToString(file, StandardCharsets.UTF_8);
                    // 获取文件名
                    String fileName = file.getName();
                    writer.append(new Text(fileName), new Text(content));
                }
            }
    
            writer.close();
        }
    
        /**
         * 读取 MapFile
         * @param inputFile 读取 MapFile 文件路径
         * @throws Exception
         */
        private static void read(String inputFile) throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "hdfs://ip:9000");
    
            // 创建阅读器
            MapFile.Reader reader = new MapFile.Reader(new Path(inputFile), conf);
    
            // 循环读取数据
            Text key = new Text();
            Text value = new Text();
            while (reader.next(key, value)) {
                System.out.println("文件名 : " + key);
                System.out.println("文件内容 : " + value);
            }
    
            reader.close();
        }
    }

 

数据倾斜问题

为了提高效率启动多个Reduce进程并行,这时涉及到将数据分区不到不同进程。可以通过  job.getPartitionerClass() 和 job.setPartitionerClass() 查询和设置分区类。默认使用HashPartitioner.class

job.getPartitionerClass();

// getPartitionerClass 源码
  public Class<? extends Partitioner<?,?>> getPartitionerClass() 
     throws ClassNotFoundException {
    return (Class<? extends Partitioner<?,?>>) 
      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
  }

// 分区类默认使用这个
public class HashPartitioner<K, V> extends Partitioner<K, V> {
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

MapReduce程序执行时,Reduce节点大部分执行完毕,但是有一个或者几个Reduce节点运行很慢,导致整个程序处理 时间变得很长,具体表现为:Ruduce阶段一直卡着不动。

例子:一个 wordcount 任务,数据为0-9 的十个数字,其中数字"5"有910w个,其余9个数字有90w个。这时就叫数据发生了倾斜。为了加快速度,启动多个reduce任务。在reduce阶段,数字“5”会被分给同一个reduce任务执行,这个reduce任务的执行速度就会比其他的慢。

解决方法:

  1. 增加Reduce任务个数(针对数据倾斜不是太严重的情况有效)
  2. 把倾斜的数据打散
        public static class  MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
            @Override
            protected void map(LongWritable key,
                               Text value,
                               Mapper<LongWritable, Text, Text, LongWritable>.Context context
            ) throws IOException, InterruptedException {
                // 分割单词
                String[] words = value.toString().split(" ");
                // 迭代分出的单词
                for (String word : words) {
                    // 把迭代出的单词封装为 <k2, v2> 的形式
                    
                    /*****************************************************************************/
                    
                    if ("5".equals(word)) { // 数据向字符“5”倾斜,这里将数据打散
                        word = "5" + "_" + new Random().nextInt(10); // 将 5 变为 5_0 ~ 5_9
                    }
                    
                    // 在得到的结果中key也会变化,之后需要再使用mapreduce将数据聚合
    
                    /*****************************************************************************/
                    
                    Text keyOut = new Text(word);
                    LongWritable valueOut = new LongWritable(1L);
                    // 把 <k2, v2> 写出去
                    context.write(keyOut, valueOut);
                }
            }
        }

    结果类似这样,需要再来一个程序进行处理。

标签:文件,conf,Text,性能,Hadoop,SequenceFile,key,new,优化
来源: https://www.cnblogs.com/zhh567/p/16339777.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有