ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Hadoop系列——详解MapReduce

2021-06-20 13:57:38  阅读:186  来源: 互联网

标签:MapReduce ReduceTask Hadoop MapTask Mapreduce 详解 key 序列化 数据


本文主要介绍MapReduce的基本概念以及详细介绍该框架的流程

文章目录

Mapreduce 简介

Mapreduce 是什么

Mapreduce 是面向大数据并行处理的计算模型、框架和平台,其处理过程分为 Map 阶段和 Reduce 阶段。

Mapreduce 的由来

Mapreduce 最早是由 Google 公司设计一种为了解决搜索引擎中大规模网页数据的并行化处理。正是这思想,Doug Cutting 模仿 Google 的 Mapreduce,基于 Java 设计了 Hadoop 开源的 Mapreduce 并行计算框架。

Mapreduce 设计目标

Mapreduce 是一种可用于数据处理的编程框架,其采用 分而治之 的思想,把大规模数据集分发给一个主节点管理的各个分节点完成,然后通过整合各个节点的中间结果,得到最终结果。

Mapreduce 适用场景

Mapreduce 适用于数据集是可拆分并行处理的,且不影响最后结果。

Mapreduce 特点

  • 优点:
  1. 易于编程
  2. 良好的扩展性
  3. 高容错性
  4. 适合 PB 级上数据的离线处理
  • 缺点:
  1. 不擅长实时计算
  2. 不擅长流式计算
  3. 不擅长 DAG(有向无环图) 计算

Mapreduce 的基本概念

  1. JobClient
    用来提交作业,配置参数 Configuration,打包成 jar 包存储在 HDFS 上,将文件路径提交给 JobTracker 的 Master 服务,然后由 Master 创建每个 task 将他们分发到各个 TaskTracker 服务中去执行。

  2. JobTracker
    用来协调作业的运行,由其负责资源监控和作业调度。

  3. TaskTracker
    用来处理作业划分后的任务,其主动与 JobTrack 进行通信,负责执行每个任务。

Mapreduce执行框架


Task 分为 MapTask 和 ReduceTask,均由 TaskTracker 启动。Mapreduce 处理的最小单位为 split,split 可由用户自由设置,计算公式如下:

S p l i t S i z e = M a t h . m a x ( m i n S i z e , M a t h . m i n ( m a x S i z e , b l o c k S i z e ) ) \begin{aligned} SplitSize= Math.max(minSize,Math.min(maxSize,blockSize)) \end{aligned} SplitSize=Math.max(minSize,Math.min(maxSize,blockSize))​

mapreduce.input.fileinputformat.split.minsize 默认为 1
mapreduce.input.fileinputformat.split.maxsize 默认为 Long.MAXValue
blockSize 默认为 128M
maxsize :该参数如果比 blockSize 小,则导致切片变小,即会等于配置的整个参数。
sminsize :该参数如果修改的比 blockSize 大,则切片大小会比 blockSize 大。

split 和 block 的关系

InputFormat

Mapreduce 开启任务时,需要对文件的读取,Hadoop 定义了不同的类读取不同的数据。

接口实现类


TextInputFormat

  • 默认使用类,按行读取每条数据,Key是该行数据的 offset,Value = 行内容。

KeyValueTExtInputFormat

  • 每行都是一条记录,被指定分隔符分割为Key跟Value,默认是 \t 。

NLineInputFormat

  • 该模型下每个 map 处理 InputSplit 时不再按照 Block 块去划分,而是按照指定的行数N来划分文件。

自定义InputFormat

  • 基础接口,改写 RecordReader,实现一次读取一个完整文件封装为 KV,使用 SequenceFileOutPutFormat 输出合并文件。

CombineTextInputFormat

  • 用于小文件过多场景,逻辑上合并多个小文件个一个切片任务。

OutputFormat

对 reduce 拉取的结果需要指定的输出方式写到文件系统里,可根据需求选择实现类。

接口实现类


TextOutputFormat

  • 系统默认输出格式,把每条记录写为文本行,他的 K 和 V 是任意类型,系统在写入时候会统一转化为字符串。

SequenceFileOutputFormat

  • 此模式下的输出结果作为后续MapReduce任务的输入,该模式下数据格式紧凑,很容易被压缩。

自定义OutputFormat

  • 自定义类继承 FileOutputFormat。
  • 重写 RecordWriter,改写具体输出数据的方法 write()。

序列化

  • 序列化:将内存中对象转换为二进制的字节序列,可以通过 输出流持久化存储 或者 网络传输。
  • 反序列化:将收到字节序列或者是硬盘的持久化数据,转换成内存中的对象。

  因为 Hadoop 在集群之间进行通讯或者 RPC 调用时是需要序列化的,而且要求序列化要快、且体积要小、占用带宽要小。而 Java 自带的序列化是重量级框架,对象序列化后会附带额外信息,比如各种校验信息、header、继承体系等。所以 Hadoop 自研了序列化框架。

Hadoop 序列化相关接口:Writable 实现的序列化机制、Comparable 管理 Key 的排序。

常见的 Hadoop 序列化类型:

Java 类型Hadoop Writable类型
booleanBooleanWritable
byteByteWritable
intIntWritable
floatFloatWritable
longLongWritable
doubleDoubleWritable
StringText
mapMapWritable
arrayArrayWritable
nullNullWritable

Mapreduce 流程

整体流程

Mapreduce 整体流程

MapTask 工作机制

  1. Read阶段:MapTask 通过用户编写的 RecordReader,从输入 InputSplit 中解析出一个个 key/value 对。
  2. Map阶段:将解析出的 key/value 交给用户编写 map() 函数处理,并产生一系列新的key/value。
  3. Collect收集阶段:它会将生成的 key/value 分区(调用Partitioner),并写入一个环形内存缓冲区中。
  4. Spill阶段:先按照分区进行排序,然后区内按照字典对 key 进行快排,并在必要时对数据进行合并、压缩等操作。
  5. Combine阶段:选择性可进行 MapTask 内的优化提速。

ReduceTask 工作机制

  1. Copy阶段:从所有的 MapTask 中收集结果然后决定将数据放入缓存还是磁盘。
  2. Merge阶段:copy 数据时后天会对磁盘还有内存数据进行 Merge。
  3. Sort阶段:ReduceTask 需对所有数据进行一次归并排序,方便执行 reduce 函数。
  4. Reduce阶段:调用用户 reduce() 函数将计算结果写到 HDFS 上。

Shuffle

MapReduce 的核心就是 Shuffle 过程,Shuffle 过程是贯穿于 map 和 reduce 两个过程的。在 Map 端包括 spill 过程,在 Reduce 端包括 copy 和 sort 过程。 具体 Shuffle 过程如下。

Shuffle 机制
  1. MapTask 收集我们的 map() 方法输出的键值对,放到内存缓冲区中。
  2. 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件,溢出前会按照分区针对 key 进行区内快排。
  3. 多个溢出文件会被合并成大的溢出文件。
  4. 在溢出过程及合并的过程中,都要调用 Partitioner 进行分区和针对 key 进行排序。
  5. ReduceTask 根据自己的分区号,去各个 MapTask 机器上取相应的结果分区数据。
  6. ReduceTask 对收集后的数据进行合并和归并排序
  7. 进入 ReduceTask 的逻辑运算过程,调用用户自定义的 reduce() 方法。
  8. Shuffle 中的缓冲区大小会影响到 MapReduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快。

环形缓冲区

Map 的输出结果由 Collector 处理,每个 Map 任务不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。

环形数据结构其实就是个字节数组 byte[],叫 kvbuffer,默认为 100M。里面主要存储数据元数据。中间有个分界点,并且分界点是变化的。当环形缓冲区写入的 buffer 的大小达到 80% 满足溢写条件的时候,开始溢写 spill。系统有两个线程一个负责写入数据,一个负责 spill 数据。

分区

MapReduce 默认的分区方式是 hashPartition,在这种分区方式下,KV 对根据 key 的 hashcode 值与 reduceTask 个数进行取模,决定该键值对该要访问哪个 ReduceTask。

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public HashPartitioner() {
    }
    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & 2147483647) % numReduceTasks;
		//numReduceTasks默认为1
    }
}

需要自定义分区的话可以重写 getPartition() 方法,设置 reduceTask 的数量。

排序

MapReduce 框架最重要的操作就是排序,MapTask 跟 ReduceTask 都会根据 key 进行按照字典顺序进行快排。

  • MapTask 将缓冲区数据快排后写入到磁盘,然后磁盘文件会进行归并排序
  • ReduceTask 统一对内存跟磁盘所有数据进行归并排序。

规约

  • Combiner 是 MR 程序中 Mapper 跟 Reducer 之外的组件。
  • Combiner 是在每一个 MapTask 所在节点运行,Reducer 是接受全部 Mapper 输出结果。
  • Combiner 属于局部汇总的意思,来减少网络传输。
  • Combiner 用的时候要注意不能影响最终结果就行。比如求平均值就不行,中间会改变结果逻辑。

分组

即相同的 key 的 value 会放到一个集合里。

压缩

我们可以把数据文件压缩后再存入 HDFS,以节省存储空间。但是,在使用 MapReduce 处理压缩文件时,必须考虑压缩文件的可分割性。目前,Hadoop 支持以下几种压缩格式。

压缩的基本原则:

  • 运算密集型任务 ,少压缩。
  • IO密集型任务,多压缩。
压缩格式自带算法扩展名是否可切分压缩后,代码修改
DEFLATEDEFLATE.deflate不需要修改
gzipDEFLATE.gz不需要修改
bzip2bzip2.bz2不需要修改
SnappySnappy.snappy不需要修改
LZOLZO.lzo需要建索引,还需要指定输入格式

标签:MapReduce,ReduceTask,Hadoop,MapTask,Mapreduce,详解,key,序列化,数据
来源: https://blog.csdn.net/lhrfighting/article/details/118069030

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有