一.MapReduce筹划思想
- 分而治之:对付大数据并行处理,将大的数据切分成多个小数据,交给更多的节点参与运算。注意:不可拆分的计算使命或相互间有依赖关系的数据无法举行并行计算。
- 抽象模子:Input、Split、Map、Shuffle、Reduce、Output。
- Input:读取数据。
- Split:对数据举行粗粒度切分。
- Map:对数据举行细粒度切分。
- Shuffle:洗牌。将各个 MapTask 效果归并输出到 Reduce。
- Reduce:对 Shuffle 举行汇总并输出到指定存储。
- Output:HDFS、Hive、Spark、Flume……
- 统一架构:步伐员需要考虑数据存储、划分、分发、效果收集、错误规复等诸多细节。为此,MapReduce 筹划并提供了统一的计算框架,为步伐员隐藏了绝大多数体系层面的处理细节。步伐员只需要集中于应用问题和算法本身,而不需要关注其他体系层的处理细节,大大减轻了步伐员开发步伐的负担。
- 离线框架:可以实现上千台服务器集群并发工作,适合 PB 级以上海量数据的离线处理。
- 不擅长及时计算:MapReduce 无法像 MySQL 一样,在毫秒或者秒级内返回效果。如果数据量小,使用 MR 反而不合适。
- 不擅长流式计算:流式计算的输入数据是动态的,而 MapReduce 的输入数据集是静态的,不能动态变革。这是由于 MapReduce 自身的筹划特点决定了数据源必须是静态的。
- 不擅长 DAG(有向图)计算:多个应用步伐存在依赖关系,后一个应用步伐的输入为前一个的输出。在这种环境下,MapReduce 并不是不能做,而是使用后,每个 MapReduce 作业的输出效果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。
- 计算向数据靠拢:将计算放在数据节点上举行工作。
- 次序处理数据、制止随机访问数据:大规模数据处理的特点决定了大量的数据记录不可能存放在内存、而只可能放在外存中举行处理。磁盘的次序访问和随即访问在性能上有巨大的差异。例:100 亿个数据记录,每个记录 100B,共计1TB 的数据库。更新 1% 的记录(随机访问)需要 1 个月时间;而次序访问并重写全部数据记录仅需 1 天时间。
- 失效被以为是常态:MapReduce 集群中使用大量的低端服务器(Google 现在在全球共使用百万台以上的服务器节点),因此,节点硬件失效和软件堕落是常态。因而:一个精良筹划、具有容错性的并行计算体系不能由于节点失效而影响计算服务的质量,任何节点失效都不应当导致效果的不一致或不确定性;任何一个节点失效时,其它节点要能够无缝接受失效节点的计算使命;当失效节点规复后应能自动无缝到场集群,而不需要管理员人工举行体系设置。MapReduce 并行计算软件框架使用了多种有效的机制,如节点自动重启技能,使集群和计算框架具有对付节点失效的健壮性,能有效处理失效节点的检测和规复。
二.MapReduce计算流程
1. 原始数据File
1T数据被切分成块存放在HDFS上,每一个块有128M巨细
2. 数据块Block
hdfs上数据存储的一个单元,同一个文件中块的巨细都是相同的
由于数据存储到HDFS上不可变,所以有可能块的数量和集群的计算本领不匹配
我们需要一个动态调整本次参与计算节点数量的一个单位
我们可以动态的改变这个单位–》参与的节点
3. 切片Split
- 切片是一个逻辑概念
- 在不改变如今数据存储的环境下,可以控制参与计算的节点数量
- 通过切片巨细可以达到控制计算节点数量的目的
- 有多少个切片就会执行多少个Map使命
- 一般切片巨细为Block的整数倍(2 1/2)
- 防止多余创建和很多的数据连接
- 如果Split>Block ,计算节点少了
- 如果Split<Block ,计算节点多了
- 默认环境下,Split切片的巨细等于Block的巨细 ,默认128M
- 一个切片对应一个MapTask
4. MapTask
- map默认从所属切片读取数据,每次读取一行(默认读取器)到内存中
- 我们可以根据自己书写的分词逻辑(空格分隔).计算每个单词出现的次数
- 这是就会产生 (Map<String,Integer>)临时数据,存放在内存中
- 但是内存巨细是有限的,如果多个使命同时执行有可能内存溢出(OOM)
- 如果把数据都直接存放到硬盘,效率太低
- 我们需要在OOM和效率低之间提供一个有效方案
5.环形数据缓冲区
可以循环使用这块内存区域,淘汰数据溢写时map的制止时间
- 每一个Map可以独享的一个内存区域
- 在内存中构建一个环形数据缓冲区(kvBuffer),默认巨细为100M
- 设置缓冲区的阈值为80%,当缓冲区的数据达到80M开始向外溢写到硬盘
- 溢写的时候还有20M的空间可以被使用效率并不会被减缓
- 而且将数据循环写到硬盘,不用担心OOM问题
6.分区Partation
- 数据写入环形数据缓冲区前会对 Key 举行分区处理
- 根据Key直接计算出对应的Reduce
- 分区的数量和Reduce的数量是相等的
- hash(key) % partation = num
- 默认分区的算法是Hash然后取余
- Object的hashCode()—equals()
- 如果两个对象equals,那么两个对象的hashcode一定相等
- 如果两个对象的hashcode相等,但是对象不一定equlas
- 作用:
-
- 均衡Map使命:分区可以确保数据均匀分布到各个Map使命中,制止某些Map使命过载而其他使命空闲。这有助于提高整体作业的并行度和效率。
- 均衡Reduce使命:分区还可以确保数据均匀分布到各个Reduce使命中,制止某些Reduce使命处理过多的数据而其他使命处理较少的数据。
- 数据本地性:Hadoop尽量将Map使命分配到数据地点的节点上执行,以淘汰网络传输开销。分区可以资助实现这一目的,确保数据和计算尽可能地靠近。
- 淘汰网络传输:通过合理的分区,可以淘汰数据在网络中的传输量,提高作业的执行速度。
- 并行处理:分区可以将大数据集分成多个较小的部分,每个部分可以并行处理。这提高了作业的并行度,收缩了作业的执行时间。
- 排序和聚合:在Reduce阶段,分区可以确保相同键的数据被发送到同一个Reduce使命中,这对于排序和聚合操纵非常重要。例如,在WordCount作业中,相同的单词需要被发送到同一个Reduce使命中举行计数。
5.自定义分区
- 灵活的分区计谋:Hadoop允许用户自定义分区计谋,通过实现 Partitioner 接口来定义怎样将数据分配到不同的Reduce使命中。这使得用户可以根据具体需求优化分区逻辑。
7.排序Sort
- 对要溢写的数据举行排序(QuickSort)
- 按照先Partation后Key的次序排序–>相同分区在一起,相同Key的在一起
- 我们将来溢写出的小文件也都是有序的
8. 溢写Spill
- 将内存中的数据循环写到硬盘,不用担心OOM问题
- 每次会产生一个80M的文件
- 如果本次Map产生的数据较多,可能会溢写多个文件
9. 归并Merge
- 由于溢写会产生很多有序(分区 key)的小文件,而且小文件的数量不确定
- 后面向reduce传递数据带来很大的问题
- 所以将小文件归并成一个大文件,将来拉取的数据直接从大文件拉取即可
- 归并小文件的时候同样举行排序(归并排序),最终产生一个有序的大文件
- 默认每次最多归并 10 个文件,由 mapreduce.task.io.sort.factor 参数举行控制,Map 和 Reduce 公用该设置
- Merge 是为了让传输的文件数量变少,但是网络传输数据量并没有改变,只是淘汰了网络 IO 次数
10. 组合器combiner
- 集群的带脱期制了mapreduce作业的数量,因此应该尽量制止map和reduce使命之间的数据传输。hadoop允许用户对map的输出数据举行处理,用户可自定义combiner函数(如同map函数和reduce函数一般),其逻辑一般和reduce函数一样,combiner的输入是map的输出,combiner的输出作为reduce的输入,很多环境下可以直接将reduce函数作为conbiner函数来使用(job.setCombinerClass(WordCountReducer.class);)。
- combiner属于优化方案,所以无法确定combiner函数会调用多少次,可以在环形缓存区溢出文件时调用combiner函数,也可以在溢出的小文件归并成大文件时(默认达到 mapreduce.map.combine.minspills = 3 个时)调用combiner。但要包管不管调用几次combiner函数都不会影响最终的效果,所以不是全部处理逻辑都可以使用combiner组件,有些逻辑如果在使用了combiner函数后会改变末了rerduce的输出效果(如求几个数的平均值,就不能先用combiner求一次各个map输出效果的平均值,再求这些平均值的平均值,这将导致效果错误)。
- Combiner 的意义是对每一个 MapTask 的输出举行局部汇总,以减小网络传输量。
- 原先传给reduce的数据是 a1 a1 a1 a1 a1
- 第一次combiner组合之后变为a{1,1,1,1,…}
- 第二次combiner后传给reduce的数据变为a{4,2,3,5…}
11. 拉取Fetch
- 我们需要将Map的临时效果拉取到Reduce节点
- 原则:
- 相同的Key必须拉取到同一个Reduce节点
- 但是一个Reduce节点可以有多个Key
- 未排序前拉取数据的时候必须对Map产生的最终的归并文件做全序遍历
- 如果map产生的大文件是有序的,每一个reduce只需要从文件中读取自己所需的即可
12.归并Merge
- 由于reduce拉取的时候,会从多个map拉取数据
- 那么每个map都会产生一个小文件,这些小文件(文件与文件之间无序,文件内部有序)
- 为了方便计算(没必要读取N个小文件),需要归并文件
- 归并算法归并成2个(qishishilia)
- 相同的key都在一起
13. 分组归并Reduce
- 将文件中的数据读取到内存中
- 一次性将相同的key全部读取到内存中
- 直接将相同的key得到效果–>最终效果
14. 写出Output
- 每个reduce将自己计算的最终效果都会存放到HDFS上
15.数据倾斜
三.Hadoop-YARN架构
1.基本概念
- 2.x开始使用Yarn(Yet Another Resource Negotiator,另一种资源和谐者)统一管理资源
- 以后其他的计算框架可以直接访问yarn获取当前集群的空闲节点
- client
- ResourceManager
- 资源和谐框架的管理者
- 分为主节点和备用节点(防止单点故障)
- 时刻与NodeManager保持心跳,继承NodeManager的陈诉
- 当有外部框架要使用资源的时候直接访问ResourceManager即可
- 如果有MR使命,先去ResourceManager申请资源,ResourceManager根据陈诉相对灵活分配资源
- 资源在NodeManager1,NodeManager1要负责开发资源
- NodeManager
- 资源和谐框架的执行者
- 每一个DataNode上默认有一个NodeManager
- NodeManager陈诉自己的信息到ResourceManager
- Container
- ApplicationMaster
- 我们本次Job使命的主导者
- 负责调度本次被分配的资源Container
- 当全部的节点使命全部完成,application告诉ResourceManager请求杀死当前ApplicationMaster线程
- 本次使命全部的资源都会被释放
- Task(MapTask–ReduceTask)
- 开始按照MR的流程执行业务
- 当使命完成时,ApplicationMaster接收到当前节点的回馈
2.工作流程
起首确认执行 MapReduce 作业的运行时框架,根据 mapreduce.framework.name 变量举行设置:
- 如果等于 yarn :则创建 YARNRunner 对象;
- 如果等于 local :则创建 LocalJobRunner 对象。
- 如果是 YARN 平台,客户端将对 ResouceManager 发起提交作业申请,具体流程如下:
Client 对 ResouceManager 发起提交作业申请;
- ResouceManager 返回 JobID(即 Application ID)和保存数据资源(作业的 Jar 文件,设置文件,计算所得输入分片,资源信息等)的临时目次(使用 JobID 定名的目次,hdfs://xxx/staging/xxx);
- 接着 Client 计算分片,拷贝资源(作业的 Jar 文件,设置文件,计算所得输入分片,资源信息等)到 HDFS,末了用submitApplication 函数提交 Job 给 ResouceManager;
- ApplicationManager 接收 submitApplication 方法提交的 Job,并将其交给 ResourceScheduler(调度器)处理;
- ResourceScheduler 选择一台 NodeManager 分配一个 Container,在 Container 中开启 ApplicationMaster 进程;
- 起首 ApplicationMaster 向 ResourceManager 举行注册,如许用户可以直接通过 ResourceManager 检察应用步伐的运行状态;然后 ApplicationMaster 收集计算后的输入分片环境来向 ResouceManager 申请对应的资源以运行 Task;末了ApplicationMaster 初始化一定数量的记录对象(bookkeeping)来跟踪 Job 的运行进度,并收集每个 Task 的进度和完成环境,直到运行竣事;
- ApplicationMaster 接纳轮询的方式,通过 RPC 协议向 ResourceManager 申请和领取资源;
- ApplicationMaster 申请到资源后,会与对应的 NodeManager 举行通讯,要求它启动 Container;
- NodeManager 为使命设置好运行环境(包括环境变量、Jar 包、二进制步伐等)后,将 Task 启动下令写到一个脚
本中,并通过运行该脚本启动对应的使命;
- 各个使命通过 RPC 协议向 ApplicationMaster 陈诉自己的状态和进度,方便 ApplicationMaster 随时掌握各个使命的
运行状态,从而可以在使命失败的时候重新启动使命;
- 此期间,客户端会每秒轮询检测 ApplicationMaster,如许就会随时收到更新信息,这些信息可以通过 Web UI 来进
行检察。除此之外,客户端还会每 5 秒轮询查抄 Job 是否完成,需要调用 Job 类下的 waitForCompletion() 方法,
Job 竣事后该方法返回。轮询时间隔断可以通过 mapreduce.client.completion.pollinterval 举行设置。
应用步伐运行完成后,ApplicationMaster 向 ResourceManager 注销并关闭自己。
- 应用步伐运行完成后,ApplicationMaster 向 ResourceManager 注销并关闭自己。

在Hadoop YARN(Yet Another Resource Negotiator)架构下,staging 目次用于临时存储与作业相关的文件和元数据。这些文件在作业提交时由客户端创建,并在作业运行期间由YARN的ResourceManager和NodeManager使用。staging 目次的重要作用是确保作业的设置文件、依赖的JAR包和其他资源文件能够安全地传输和存储,以便在集群中准确执行作业。
Staging 目次的作用
- 作业设置文件:
- 存储作业的设置文件,如 job.xml 和 job_*.conf.xml,这些文件包罗了作业的设置参数和环境设置。
- JAR包和依赖文件:
- 存储作业所需的JAR包和其他依赖文件。这些文件在作业提交时上传到 staging 目次,以便在作业运行时能够被Map和Reduce使命访问。
- 作业日记和历史记录:
- 存储作业的日记文件和历史记录文件,如 job_*.jhist,这些文件记录了作业的执行过程和效果。
- 作业状态和元数据:
- 存储作业的状态信息和元数据,这些信息用于跟踪作业的执行进度和状态。
Staging 目次的生命周期
- 作业提交:
- 当用户提交一个MapReduce作业时,Hadoop客户端会将作业的设置文件、JAR包和其他依赖文件上传到 staging 目次。
- 作业执行:
- ResourceManager 和 NodeManager 在作业执行过程中会从 staging 目次中读取这些文件,确保作业能够准确运行。
- 作业完成:
- 作业完成后,staging 目次中的文件通常会被保存一段时间,以便用户可以检察作业的设置和日记。之后,这些文件可能会被自动删除,以释放HDFS空间。
四.MapReduce压缩
1.概述
在时下大数据环境中,固然呆板性能好,节点多,但是并不代表我们的数据不需要做任何的压缩就开始处理。所以在某些环境下,我们照旧需要对数据做压缩处理。压缩技能能够有效淘汰存储体系的读写字节数,提高网络带宽和磁盘空间的效率。
在 Hadoop 中,当数据规模很大,工作负载非常密集时,I/O 操纵和网络数据传输需要花费大量的时间,Shuffle 与Merge 过程同样也面临着巨大的 I/O 压力。在这种环境下,数据压缩的重要性不言而喻。而在 Hive 中则体如今数据文件最终存储的格式是否启用压缩。
鉴于磁盘 I/O 和网络带宽是 Hadoop 的宝贵资源,数据压缩对于节省资源、最小化磁盘 I/O 和网络传输非常有资助,但其性能的提升和资源的节省并非没有代价(增长了 CPU 的运算负担)。如果磁盘 I/O 和网络带宽影响了 MapReduce 作业性能,在恣意 MapReduce 阶段启用压缩都可以改善端到端处理时间并淘汰 I/O 和网络流量。
2.条件与优缺点
- 为什么使用压缩(优点):淘汰存储体系读写字节数、提高网络带宽和磁盘空间的效率。
- 压缩的缺点:使用数据时需要先对文件解压,加重 CPU 负载,压缩算法越复杂,解压时间越长。
- 压缩的条件:空间和 CPU 要充裕。如果呆板 CPU 比力告急,慎用压缩。
- 压缩的技能:
有损压缩(LOSSY COMPRESSION):压缩息争压的过程中数据有丢失,使用场景:视频。
无损压缩(LOSSLESS COMPRESSION):压缩息争压的过程中数据没有丢失,使用场景:日记数据。
- 对称和非对称:
对称:压缩息争压的时间一致。
非对称:压缩息争压的时间不一致。
3.基本原则
计算密集型(CPU-Intensive)作业,少用压缩。
- 特点:要举行大量的计算,斲丧 CPU 资源。比如计算圆周率、对视频举行高清解码等等,端赖 CPU 的运算本领。计算密集型使命固然也可以用多使命完成,但是使命越多,花在使命切换的时间就越多,CPU 执利用命的效率就越低,所以,要最高效地使用 CPU,计算密集型使命同时举行的数量应当等于 CPU 的核心数。
- 计算密集型使命由于重要斲丧 CPU 资源,因此,代码运行效率至关重要。Python 如许的脚本语言运行效率很低,完全不适合计算密集型使命。对于计算密集型使命,最好用 C 语言编写。
IO 密集型(IO-Intensive)作业,多用压缩。
- 特点:CPU 斲丧很少,使命的大部分时间都在等待 IO 操纵完成(由于 IO 的速度远远低于 CPU 和内存的速度)。涉及到网络、磁盘 IO 的使命都是 IO 密集型使命。对于 IO 密集型使命,使命越多,CPU 效率越高,但也有一个限度。
- 常见的大部分使命都是 IO 密集型使命,比如 Web 应用。
- IO 密集型使命执行期间,99% 的时间都花在 IO 上,花在 CPU 上的时间很少,因此,用运行速度极快的 C 语言替换Python 如许运行速度极低的脚本语言,完全无法提升运行效率。对于 IO 密集型使命,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C 语言最差。
4.压缩选择
这些压缩文件最终都要经过 MR 步伐处理,所以我们需要知道 MR 步伐在哪些地方可以压缩,分别选什么样的压缩算法比力合适。
如下图,MR 重要在三个地方会用到数据压缩:
Input :数据泉源
Transformation :中间计算
Output :末了的输出
下面我们就针对这三个部分,来做对应的压缩选型。
- Use Compressd Map Input :第一次传入压缩文件,应选用可以切片的压缩方式,否则整个文件将只有一个 Map执行。建议:从 HDFS 中读取文件举行 MapReuce 作业,如果数据很大,可以使用压缩并且选择支持分片的压缩方式,例如 BZip2、LZO,如许可以实现并行处理,提高效率,淘汰磁盘读取时间,同时选择合适的存储格式例如 Sequence Files、RC、ORC 等。
- Compress Intermediate Data :第二次压缩应选择压缩解压速度快的压缩方式。建议:Map 的输出作为Reducer的输入,需要经过 Shuffle 这一过程,需要把数据读取到环形数据缓冲区,然后再读取到本地磁盘,所以选择压缩可以淘汰了存储文件所占空间,提升了数据传输速率,建议使用压缩/解压速度快的压缩方式,例如 Snappy、LZO、LZ4、Zstd。
- Compress ReducerOutput :第三次压缩有两种场景分别是:
当输出的文件为下一个 Job 的输入时,建议:选择可切分的压缩方式例如:BZip2。
当输出的文件直接存到 HDFS 作为归档时,建议:选择压缩比高的压缩方式。Reduce 阶段数据落盘通常使用 Gzip 或BZip2 举行压缩(淘汰磁盘使用)。
总结:
- Gzip:Hadoop 内置支持,压缩比高,不支持 Split。
用途:通常用来放不常访问的冷数据,较高的压缩比可以极大的节省磁盘空间。对应的编码/解码器: org.apache.hadoop.io.compress.GzipCodec 。
- BZip2:Hadoop 内置支持,压缩比高,支持 Split,支持多文件,缺点就是慢。
用途:适用于对处理速度要求不高的场景。一般不常用。
对应的编码/解码器: org.apache.hadoop.io.compress.BZip2Codec 。
- LZO: 压缩比一般,支持 Split(需要建索引,文件修改后需要重新建索引),压缩/解压速度快,支持 Hadoop Native库,需要自己安装。
用途:适合于经常访问的热数据。
对应的编码/解码器: com.hadoop.compression.lzo.LzopCodec 。
- LZ4:压缩比一般,不支持 Split,压缩/解压速度快,支持 Hadoop Native 库,需要自己安装。
用途:和 LZO 性能雷同,但不支持 Split,可以用于 Map 中间效果的压缩。
对应的编码/解码器: org.apache.hadoop.io.compress.Lz4Codec 。
- Snappy:压缩比一般,不支持 Split,压缩/解压速度快,支持 Hadoop Native 库,需要自己安装。
用途:和 LZO 性能雷同,但不支持 Split,可以用于 Map 中间效果的压缩。
对应的编码/解码器: org.apache.hadoop.io.compress.SnappyCodec 。
- Zstd:压缩比高跟 Deflate(Gzip 算法)相当,不支持 Split,压缩/解压速度快,支持 Hadoop Native 库,需要自己安装。
用途:和 LZO 性能雷同,但不支持 Split,可以用于 Map 中间效果的压缩。
对应的编码/解码器: org.apache.hadoop.io.compress.ZStandardCodec 。
五.MapReduce优化
1.概述
优化前我们需要知道 Hadoop 适合干什么活,适合什么场景,在工作中,我们要知道业务是怎样的,才能联合平台资源达到最优优化。除了这些我们还要知道 MapReduce 的执行流程,比如从文件的读取,Map 处理,Shuffle 过程,Reduce处理,文件的输出或者存储压缩等等。
在工作中,往往平台的参数都是固定的,不可能为了某一个作业去修改整个平台的参数,所以在作业的执行过程中,需要对作业举行单独的设定,如许既不会对其他作业产生影响,也能很好的提高作业的性能,提高优化的灵活性。
接下来,回首一下 Hadoop 的上风(适用场景):
- 可构建在便宜呆板上,设备成本相对较低;
- 高容错性,HDFS 将数据自动保存为多个副本,副本丢失后,自动规复,防止数据丢失或损坏;
- 适合批处理,HDFS 适合一次写入、多次查询(读取)的环境,适合在已有数据的环境下举行多次分析,稳固性好;
- 适合存储大文件,此中的大可以表示存储单个大文件,由于是分块存储的。也可以表示存储大量的数据,但不适合小文件。
2. 小文件优化
从概述中我们知道,很明显 Hadoop 适合大文件的处理和存储,那为什么不适合小文件呢?
- 从存储方面来说:Hadoop 存储的每个文件都会在 NameNode 上记录元数据,如果同样巨细的文件,文件很小的话,就会产生很多元数据文件,造成 NameNode 的压力;
- 从读取方面来说:同样巨细的文件分为很多小文件的话,会增长磁盘寻址次数,低落性能;
- 从计算方面来说:我们知道一个 MapTask 默认处理一个分片或者一个文件,如果 MapTask 的启动时间比数据处理的时间还要长,那么就会造成低性能。而且在 Map 端溢写磁盘的时候每一个 MapTask 最终会产生 Reduce 数量个数的中间效果,如果 MapTask 数量特别多,就会造成临时文件很多,造成 Reduce 拉取数据的时候增长磁盘的 IO。
明白小文件造成的毛病之后,那我们应该怎么处理这些小文件呢?
- 从源头解决问题,也就是在 HDFS 上不要存储小文件,在数据上传至 HDFS 的时候提前归并小文件;
- 如果小文件归并后的文件过大,可以更换文件存储格式或压缩存储,当然压缩存储需要考虑是否能切片的问题;
- 如果小文件已经存储至 HDFS 了,那么在 FileInputFormat 读取数据的时候使用实现类 CombineFileInputFormat 读取数据,在读取数据的时候举行归并。
3. 数据倾斜
MapReduce 是一个并行处理框架(分布式),那么处理的时间肯定是作业中全部使命最慢的谁人了,可谓木桶效应。
为什么会如许呢?
- 数据倾斜,每个 Reduce 处理的数据量巨细不一致,导致有些已经跑完了,有些还在执行;
- 还有可能就是某些作业地点的 NodeManager 有问题或者 Container 有问题或者 JVM GC 等,导致作业执行缓慢。
那么为什么会产生数据倾斜呢?比如数据本身就不均衡,所以在默认的 HashPartition 时造成分区数据不一致问题,还有就是代码筹划不合理等。那怎样解决数据倾斜的问题呢?
- 不使用默认的 Hash 分区算法,接纳自定义分区,联合业务特点,使得每个分区数据基本均衡;
- 或者既然有默认的分区算法,那么我们可以修改分区的键,让其符合 Hash 分区,并且使得末了的分区均衡,比如在Key 前加随机数或盐 n-key;
- 既然 Reduce 处理慢,那么可以增长 Reduce 的 memory 和 vcore,提高性能解决问题,固然没从根本上解决问题,但是还有效果的;
- 如果是由于只有一个 Reduce 导致作业很慢,可以增长 Reduce 的数量来分摊压力,然后再来一个作业实现最终聚合。
4. 推测执行
如果不是数据倾斜带来的问题,而是节点服务有问题造成某些 Map 和 Reduce 执行缓慢呢?可以使用推测执行,你跑的慢,我们可以找个其他节点重启一样的使命举行竞争,谁快以谁为准。推测执行是空间换时间的一种优化思想,会带来集群资源的浪费,给集群增长压力,所以一般环境下集群的推测执行都是关闭的,可以根据现实环境选择是否开启。
推测执行相关参数如下:
- # 是否启用 MapTask 推测执行,默认为 true
- mapreduce.map.speculative=true
- # 是否启用 ReduceTask 推测执行,默认为 true
- mapreduce.reduce.speculative=true
- # 推测任务占当前正在运行的任务数的比例,默认为 0.1
- mapreduce.job.speculative.speculative-cap-running-tasks=0.1;
- # 推测任务占全部要处理任务数的比例,默认为 0.01
- mapreduce.job.speculative.speculative-cap-total-tasks=0.01
- # 最少允许同时运行的推测任务数量,默认为 10
- mapreduce.job.speculative.minimum-allowed-tasks=10;
- # 本次推测没有任务下发,执行下一次推测任务的等待时间,默认为 1000(ms)
- mapreduce.job.speculative.retry-after-no-speculate=1000;
- # 本次推测有任务下发,执行下一次推测任务的等待时间,默认为 15000(ms)
- mapreduce.job.speculative.retry-after-speculate=15000;
- # 标准差,任务的平均进展率必须低于所有正在运行任务的平均值才会被认为是太慢的任务,默认为 1.0
- mapreduce.job.speculative.slowtaskthreshold=1.0;
复制代码 5.MapReduce 执行流程优化
1.Map
1. 临时文件
上面我们从 Hadoop 的某些特定场景下聊了 MapReduce 的优化,接下来我们从 MapReduce 的执行流程举行优化。
前面我们已经聊过小文件在数据读取这里也可以做优化,所以选择一个合适的数据文件的读取类(FIleInputFormat 的实现类)也很重要。我们在作业提交的过程中,会把作业 Jar 文件,设置文件,计算所得输入分片,资源信息等提交到HDFS 的临时目次(Job ID 定名的目次下),默认 10 个副本,可以通过 mapreduce.client.submit.file.replication参数修改副本数量。后期作业执行时会下载这些文件到本地,中间会产生磁盘 IO。如果集群很大的时候,可以增长该参数的值,如许集群很多副本都可以供 NM 访问,从而提高下载的效率。
2.分片
回首一下源码中分片的计算公式:
- // getFormatMinSplitSize():一个切片最少应该拥有 1 个字节
- // getMinSplitSize(job):读取程序员设置的切片的最小值,如果没有设置默认读取 1
- long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
- // 读取程序员设置的切片的最大值,如果没有设置默认读取 Long.MAX_VALUE
- long maxSize = getMaxSplitSize(job);
- // 获取 Block 的大小(默认为 128M)
- long blockSize = file.getBlockSize();
- // 获取 Split 的大小,切片的默认大小为 Block 的大小
- // return Math.max(minSize, Math.min(maxSize, blockSize));
- // minSize 为 64M --> 最终返回 128M,minSize 为 256M --> 最终返回 256M
- // maxSize 为 64M --> 最终返回 64M,maxSize 为 256M --> 最终返回 128M
- // 如果需要调大切片,则调节 minSize;如果需要调小切片,则调节 maxSize
- long splitSize = computeSplitSize(blockSize, minSize, maxSize);
复制代码 由于 Map 数没有具体的参数指定(默认环境下一个切片一个 MapTask),所以可以通过如上的公式调整切片的巨细,如许就可以实现动态设置 Map 数了,那么问题来了,Map 数该怎样设置呢?
3.资源
这些东西一定要联合业务,Map 数太多,会产生很多中间效果,导致 Reduce 拉取数据变慢;Map 数太少,每个 Map处理的时间又很长。那如果数据量就是很大,并且还需要控制 Map 的数量,这个时候每个 Map 的执行时间就比力长了,这时候可以调整每个 Map 的资源来提升 Map 的处理本领,相关参数如下。
- # MapTask 的执行内存,默认为 1024MB
- mapreduce.map.memory.mb=2048
- # MapTask 的虚拟 CPU 核数,默认为 1
- mapreduce.map.cpu.vcores=1
复制代码 这里需要注意的是,单个 Map/Reduce 申请的资源巨细,其值应该在每个容器申请的最大/最小分配之间,具体如下.
- # NodeManager 节点最大可用虚拟核,默认值为 -1。如果设置为 -1 且 yarn.nodemanager.resource.detect-
- hardware-capabilities 为 true(默认为 false),则会自动计算(在Windows和Linux环境下)。在其他情况下,默认为
- 8。
- # 推荐将该值设置为与物理 CPU 核数相同。如果你的节点 CPU 核数不够 8 个,则需要调减小这个值,因为 YARN 不会智能
- 的探测节点的物理 CPU 总数。
- yarn.nodemanager.resource.cpu-vcores=-1
- # 单个容器可申请的最小虚拟 CPU 核数,默认是 1,如果一个容器申请的 CPU 个数少于该数,则修改对应的值为这个数。
- yarn.scheduler.minimum-allocation-vcores=1
- # 单个容器可申请的最大虚拟 CPU 核数,默认是 4。
- yarn.scheduler.maximum-allocation-vcores=4
- # NodeManager 节点最大可用物理内存,默认值为 -1。如果设置为 -1 且 yarn.nodemanager.resource.detect-
- hardware-capabilities 为 true(默认为 false),则会自动计算(在Windows和Linux环境下)。在其他情况下,默认为
- 8192MB。
- yarn.nodemanager.resource.memory-mb=-1
- # ResourceManager 上每个容器可以申请内存资源的最小值,默认值为 1024MB
- yarn.scheduler.minimum-allocation-mb=1024
- # ResourceManager 上每个容器可以申请内存资源的最大值,默认值为 8192MB
- yarn.scheduler.maximum-allocation-mb=8192
复制代码 4… 环形缓冲区 & 溢写
从源头上确定好 Map 之后,接下来看看 Map 的具体执行过程。起首写环形数据缓冲区,为啥要写环形数据缓冲区呢,为什么不直接写磁盘?如许的目的重要是为了淘汰磁盘 IO。
每个 Map 使命不断地将键值对输出到在内存中构造的一个环形数据结构中。使用环形数据结构是为了更有效地使用内存空间,在内存中放置尽可能多的数据。该缓冲默以为 100M( mapreduce.task.io.sort.mb 参数控制),当达到 80%( mapreduce.map.sort.spill.percent 参数控制)时就会溢写至磁盘,每达到 80% 都会重写溢写到一个新的文件。
可以根据呆板的设置和数据量来设置这两个参数,当内存足够时,增大 mapreduce.task.io.sort.mb=500 会提高溢写的过程,而且会淘汰中间效果的文件数量。
- mapreduce.task.io.sort.mb=500
- mapreduce.map.sort.spill.percent=0.8
复制代码 5.归并
当文件溢写完后,Map 会对这些文件举行 Merge 归并,默认每次最多归并 10 个溢写的文件,由参数
mapreduce.task.io.sort.factor 举行设置。调大可以淘汰归并的次数,提高归并的并行度,低落对磁盘操纵的次
数。
- mapreduce.task.io.sort.factor=50
复制代码 6.输出
1. 组合器
在 Reduce 拉取数据之前,我们可以使用 Combiner 实现 Map-Side 的预聚合(不影响最终效果的环境下),如果自定义了 Combiner,此时会根据 Combiner 定义的函数对 map 方法的效果举行归并,如许可以淘汰数据的传输,低落磁盘和网络 IO,提升性能。
2. 压缩
终于走到了 Map 到 Reduce 的数据传输过程了,这中间重要的影响无非就是磁盘 IO,网络 IO,数据量的巨细了(是否压缩),其实淘汰数据量的巨细,就可以做到优化了,所以我们可以选择性压缩数据,压缩后数据量会进一步淘汰,低落磁盘和网络 IO,提升性能。
开启压缩后,数据会被压缩写入磁盘,Reduce 读的是压缩数据所以需要解压,在现实经验中 Hive 在 Hadoop 的运行的瓶颈一般都是 IO 而不是 CPU,压缩一般可以 10 倍的淘汰 IO 操纵。具体可以通过以下参数举行设置。
- # Map 的输出在通过网络发送之前是否被压缩,默认为 false 不压缩
- mapreduce.map.output.compress=false
- # 如果 Map 的输出被压缩,那么应该如何压缩它们,默认为 org.apache.hadoop.io.compress.DefaultCodec
- mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
复制代码 3.响应线程
Map 流程完成之后,会通过运行一个 HTTP Server 暴露自身,供 Reduce 端获取数据。这里用来响应 Reduce 数据请求的线程数量是可以设置的,通过 mapreduce.shuffle.max.threads 属性举行设置,默以为 0,表示当前呆板内核数量的两倍。注意该设置是针对 NodeManager 设置的,而不是每个作业设置。具体如下。
- mapreduce.shuffle.max.threads=0
复制代码 7.容错
Reduce 的每一个下载线程在下载某个 Map 数据的时候,有可能由于谁人 Map 中间效果地点的呆板发生错误,或者中间效果的文件丢失,或者网络中断等等环境,如许 Reduce 的下载就有可能失败,所以 Reduce 的下载线程并不会无休止的等待下去,当一定时间后下载仍然失败,那么下载线程就会放弃这次下载,并在随后实验从其他的地方下载(由于这段时间 Map 可能会重跑)。
为什么会从其他地方下载呢?由于 Map/Reduce Task 有容错机制,当使命执行失败后会实验重启使命,相关参数如下。
- # MapTask 最大重试次数,一旦重试次数超过该值,则认为 MapTask 运行失败,其对应的输入数据将不会产生任何结果,默
- 认为 4
- mapreduce.map.maxattempts=4
- # ReduceTask最大重试次数,一旦重试次数超过该值,则认为ReduceTask运行失败,其对应的输入数据将不会产生任何结
- 果,默认为4
- mapreduce.reduce.maxattempts=4
- # 当一个 NodeManager 上有超过 3 个任务失败时,ApplicationMaster 会将该节点上的任务调度到其他节点上执行
- # 该值必须小于 Map/Reduce Task 最大重试次数,否则失败的任务将永远不会在不同的节点上尝试
- mapreduce.job.maxtaskfailures.per.tracker=3
- # 当 NodeManager 发生故障,停止向 ResourceManager 节点发送心跳信息时,ResourceManager 节点并不会立即移除
- NodeManager,而是要等待一段时间,该参数如下,默认为 600000ms
- yarn.nm.liveness-monitor.expiry-interval-ms=600000
- # 如果一个 Task 在一定时间内没有任务进度的更新(ApplicationMaster 一段时间没有收到任务进度的更新),即不会读
- 取新的数据,也没有输出数据,则认为该 Task 处于 Block 状态,可能是临时卡住,也可能会永远卡住。为了防止 Task 永
- 远 Block 不退出,则设置了一个超时时间(单位毫秒),默认为 600000ms,为 0 表示禁用超时
- mapreduce.task.timeout=600000
- # YARN 中的应用程序失败之后,最多尝试的次数,默认为 2,即当 ApplicationMaster 失败 2 次以后,运行的任务将会
- 失败
- mapreduce.am.max-attempts=2
- # YARN 对 ApplicationMaster 的最大尝试次数做了限制,每个在 YARN 中运行的应用程序不能超过这个数量限制
- yarn.resourcemanager.am.max-attempts=2
- # Hadoop 对 ResourceManager 节点提供了检查点机制,当所有的 ResourceManager 节点失败后,重启 ResouceManager
- 节点,可以从上一个失败的 ResourceManager 节点保存的检查点进行状态恢复
- # 检查点的存储由 yarn-site.xml 配置文件中的 yarn-resourcemanager.store.class 属性进行设置,默认是保存到文
- 件中
- yarn.resourcemanager.store.class=org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMSt
- ateStore
复制代码 2.Reduce
1. 资源
接下来就是 Reduce 了,起首可以通过参数设置合理的 Reduce 数量( mapreduce.job.reduces 参数控制),以及通过参数设置每个 Reduce 的资源。具体如下。
- # 默认为 1
- mapreduce.job.reduces=1
- # ReduceTask 的执行内存,默认为 1024MB
- mapreduce.reduce.memory.mb=4096
- # ReduceTask 的虚拟 CPU 核数,默认为 1
- mapreduce.reduce.cpu.vcores=1
- # Map 和 Reduce 共享,当 MapTask 完成的比例达到该值后会为 ReduceTask 申请资源,默认是 0.05
- # 只要有溢写合并完成的 MapTask,申请到资源的 ReduceTask 就可以开始拉取
- mapreduce.job.reduce.slowstart.completedmaps=0.05
复制代码 这里需要注意的是,单个 Map/Reduce 申请的资源巨细,其值应该在每个容器申请的最大/最小分配之间,具体如下。
- # NodeManager 节点最大可用虚拟核,默认值为 -1。如果设置为 -1 且 yarn.nodemanager.resource.detect-
- hardware-capabilities 为 true(默认为 false),则会自动计算(在Windows和Linux环境下)。在其他情况下,默认为
- 8。
- # 推荐将该值设置为与物理 CPU 核数相同。如果你的节点 CPU 核数不够 8 个,则需要调减小这个值,因为 YARN 不会智能
- 的探测节点的物理 CPU 总数。
- yarn.nodemanager.resource.cpu-vcores=-1
- # 单个容器可申请的最小虚拟 CPU 核数,默认是 1,如果一个容器申请的 CPU 个数少于该数,则修改对应的值为这个数。
- yarn.scheduler.minimum-allocation-vcores=1
- # 单个容器可申请的最大虚拟 CPU 核数,默认是 4。
- yarn.scheduler.maximum-allocation-vcores=4
- # NodeManager 节点最大可用物理内存,默认值为 -1。如果设置为 -1 且 yarn.nodemanager.resource.detect-
- hardware-capabilities 为 true(默认为 false),则会自动计算(在Windows和Linux环境下)。在其他情况下,默认为
- 8192MB。
- yarn.nodemanager.resource.memory-mb=-1
- # ResourceManager 上每个容器可以申请内存资源的最小值,默认值为 1024MB
- yarn.scheduler.minimum-allocation-mb=1024
- # ResourceManager 上每个容器可以申请内存资源的最大值,默认值为 8192MB
- yarn.scheduler.maximum-allocation-mb=8192
复制代码 2.拉取
Reduce 在 Copy 的过程中默认使用 5 个( mapreduce.reduce.shuffle.parallelcopies 参数控制)并行度举行数据复制,可以将其调大例如 100。
Reduce 的每一个下载线程在下载某个 Map 数据的时候,有可能由于谁人 Map 中间效果地点的呆板发生错误,或者中间效果的文件丢失,或者网络中断等等环境,如许 Reduce 的下载就有可能失败,所以 Reduce 的下载线程并不会无休止的等待下去,当一定时间后下载仍然失败,那么下载线程就会放弃这次下载,并在随后实验从其他的地方下载(由于这段时间 Map 可能会重跑)。Reduce 下载线程的最大下载时间段可以通过mapreduce.reduce.shuffle.read.timeout (默以为 180000 秒)举行调整。
3. 缓冲区 & 溢写
Copy 过来的数据会先放入内存缓冲区中,然后当使用内存达到一定量的时候才 Spill 磁盘。这里的缓冲区巨细要比Map 端的更为灵活,它基于 JVM 的 Heap Size 举行设置。该内存巨细不像 Map 一样可以通过 mapreduce.task.io.sort.mb 来设置,而是通过另外一个参数 mapreduce.reduce.shuffle.input.buffer.percent (默以为 0.7)举行设置。意思是说,Shuffile 在 Reduce 内存中的数据最多使用内存量为:0.7 * maxHeap of reduce task,内存到磁盘 Merge 的启动门限可以通过 mapreduce.reduce.shuffle.merge.percent (默以为 0.66)举行设置。
假设 mapreduce.reduce.shuffle.input.buffer.percent 为 0.7,ReduceTask 的 max heapsize 为 1G,那么用来做拉取数据缓存的内存大概为 700MB 左右。这 700MB 的内存跟 Map 端一样,也不是要比及全部写满才会往磁盘溢写,而是达到指定的阈值就会开始往磁盘溢写(溢写前会先做 sortMerge)。这个限度阈值可以通过参数mapreduce.reduce.shuffle.merge.percent 来设定(默以为 0.66)。整个过程同 Map 雷同,如果用户设置了 Combiner,也会被启用,然后磁盘中会天生众多的溢写文件。这种 Merge 方式一直在运行,直到没有 Map 端的数据时才会竣事,然后启动磁盘到磁盘的 Merge 方式天生最终的文件。
4. 归并
同 Map 一样,当文件溢写完后,Reduce 会对这些文件举行 Merge 归并。最大归并因子默以为 10,由参数
mapreduce.task.io.sort.factor 举行设置。如果 Map 输出很多,则需要归并很多趟,所以可以淘汰归并的次数,提高归并的并行度,低落对磁盘操纵的次数。
5. 读缓存
默认环境下,数据达到一个阈值的时候,缓冲区中的数据就会写入磁盘,然后 Reduce 会从磁盘中得到全部的数据。
也就是说,缓冲区和 Reduce 是没有直接关联的,中间会有多次写磁盘 -> 读磁盘的过程,既然有这个毛病,那么可以通过修改参数,使得缓冲区中的一部分数据可以直接运送到 Reduce(缓冲区 -> 读缓存 -> Reduce),从而淘汰 IO 开销。
修改参数 mapreduce.reduce.input.buffer.percent ,默以为 0.0,表示不开启缓存,直接从磁盘读。当该值大于 0 的时候,会保存指定比例的内存用于缓存(缓冲区 -> 读缓存 -> Reduce),从而提升计算的速度。如许一来,设置缓冲区需要内存,读取数据需要内存,Reduce 计算也需要内存,所以要根据作业的用运行环境举行调整。当 Reduce 计算逻辑斲丧内存很小时,可以分一部分内存用来缓存数据,可以提升计算的速度。默认环境下都是从磁盘读取数据,如果内存足够大的话,务必设置该参数让 Reduce 直接从缓存读数据。
这些文件举行 Merge 归并。最大归并因子默以为 10,由参数
mapreduce.task.io.sort.factor 举行设置。如果 Map 输出很多,则需要归并很多趟,所以可以淘汰归并的次数,提高归并的并行度,低落对磁盘操纵的次数。
5. 读缓存
默认环境下,数据达到一个阈值的时候,缓冲区中的数据就会写入磁盘,然后 Reduce 会从磁盘中得到全部的数据。
也就是说,缓冲区和 Reduce 是没有直接关联的,中间会有多次写磁盘 -> 读磁盘的过程,既然有这个毛病,那么可以通过修改参数,使得缓冲区中的一部分数据可以直接运送到 Reduce(缓冲区 -> 读缓存 -> Reduce),从而淘汰 IO 开销。
修改参数 mapreduce.reduce.input.buffer.percent ,默以为 0.0,表示不开启缓存,直接从磁盘读。当该值大于 0 的时候,会保存指定比例的内存用于缓存(缓冲区 -> 读缓存 -> Reduce),从而提升计算的速度。如许一来,设置缓冲区需要内存,读取数据需要内存,Reduce 计算也需要内存,所以要根据作业的用运行环境举行调整。当 Reduce 计算逻辑斲丧内存很小时,可以分一部分内存用来缓存数据,可以提升计算的速度。默认环境下都是从磁盘读取数据,如果内存足够大的话,务必设置该参数让 Reduce 直接从缓存读数据。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |