马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
一、MapReduce框架原理
1.1InputFormat数据输入
MapTask并行度决定机制
1)数据块(HDFS存储数据单位),物理上把数据分成一块一块
2)数据切片(MapReduce步伐计算输入数据的单位):只是在逻辑上对输入举行分片,不在磁盘上将其切身分片举行存储。
1.2FileInputFormat
切片机制:
1)按照文件的内容长度举行切片
2)切片大小,默认即是Block大小
3)切片不思量数据集整体,而是逐个针对每一个
- //获取切片的文件名称
- String name = inputSplit.getPath () .getName () ;
- //根据文件类型获取切片信息
- FileSplit inputSplit = (FileSplit) context.getInputSplit ();
复制代码 文件举行单独切片
案例分析:
1)源码中计算切片大小的公式
- Math.max(minSize, Math.min(maxSize, blockSize));
- //设置切片的最大值和最小值
- mapreduce input.fileinputformat.split.minsize=1 默认值为1
- mapreduce input.fileinputformat.split maxsize= Long.MAXValue
复制代码 2)获取切片信息API
- //获取切片的文件名称
- String name = inputSplit.getPath () .getName () ;
- //根据文件类型获取切片信息
- FileSplit inputSplit = (FileSplit) context.getInputSplit ();
复制代码 1.3 TextInputFormat
介绍:是FileInputFormat的实现类,按行读取每条记载(key值存储该行在整个文件中的起始字节偏移量,为LongWritable范例;value值存储该行的内容,不包括任何行终止符,为Text范例
注:FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。
- //示例
- (0,Rich learning form)
- (20,Intelligent learning engine)
- (49,Learning more convenient)
- (74,From the real demand for more close to the enterprise)
复制代码
1.4CombineTextInputFormat切片机制
应用场景:实用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片(数据切片)中,多个小文件就可以交给一个MapTask处置惩罚。
假造存储切片最大值设置(最好根据实际的小文件大小环境来设置具体的值)
- CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);//4m
复制代码 切片机制:
1)假造存储过程:将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize值比较,假如不大于设置的最大值,逻辑上划分一个块。假如输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个假造存储块(防止出现太小切片)。
2)切片过程:判定假造存储的文件大小是否大于setMaxInputSplitSize值,大于即是则单独形成一个切片,若不大于则和下一个假造存储文件举行合并,共同形成一个切片。
二、MapReduce工作流程
三、Shuffle机制
1.Shuffle机制图解
Map方法之后,Reduce方法之前的数据处置惩罚过程称之为Shuffle。
2.Partition分区
- //默认的Partitioner分区
- public class HashPartitioner<K, V> extends Partitioner<K, V> {
- public int getPartition (K key, V value, int numReduceTasks) {
- return (key. hashCode () & Integer .MAX VALUE) & numReduceTasks;
- }
- }
复制代码 3.自定义Partitioner
1)自定义类继承Partitioner,重写getPartition()方法
- public class CustomPartitioner extends Partitioner<Text, FlowBean> {
- @Override
- public int getPartition(Text key, FlowBean value, int numPartitions) {
- // 控制分区代码逻辑
- return partition;
- }
- }
复制代码 2)在Job驱动中,设置自定义Partitioner
- job.setPartitionerClass(CustomPartitioner.class);
复制代码 3)自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask
- Tasks(5); educejob.setNumR
复制代码 案例分析:
3.WritableComparable排序
排序分类:
1)部分排序
MapReduce根据输入记载的键对数据集排序。保证输出的每个文件内部有序。
2)全排序
最终输出效果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在
处置惩罚大型文件时效率极低,因为一台机器处置惩罚所有文件,完全丧失了MapReduce所提供的并行架构。
3)辅助排序 GroupingComparator分组)
在Reduce端对key举行分组。应用于:在吸取的key为bean对象时,想让一个或几个字段相同(全部
字段比较不相同)的key进入到同一个reduce方法时,可以接纳分组排序。
4)二次排序
在自定义排序过程中,假如compareTo中的判定条件为两个即为二次排序。
自定义排序WritableComparable原理分析
bean对象做为key传输,必要实现WritableComparable接口重写compareTo方法,就可以实现排序
- @Override
- public int compareTo(FlowBean bean) {
- int result;
-
- // 按照总流量大小,倒序排列
- if (this.sumFlow > bean.getSumFlow()) {
- result = -1;
- }else if (this.sumFlow < bean.getSumFlow()) {
- result = 1;
- }else {
- result = 0;
- }
- return result;
- }
复制代码 4.Combiner合并
4.1Combiner详细介绍
1)Combiner是MR步伐中Mapper和Reducer之外的一种组件。
2)Combiner组件的父类就是Reducer。
3)Combiner和Reducer的区别在于运行的位置:
Combiner是在每一个MapTask地点的节点运行;
Reducer是吸取全局所有Mapper的输出效果;
4) Combiner的意义就是对每一个MapTask的输出举行局部汇总,以咸小网络传输量。
5)Combiner能够应用的前提是不能影响最终的业务逻辑,而且,Combiner的输出kv
应该跟Reducer的输入kv范例要对应起来。
4.2自定义Combiner实现
1)自定义一个Combiner继承Reducer,重写Reduce方法
- public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
- private IntWritable outV = new IntWritable();
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- int sum = 0;
- for (IntWritable value : values) {
- sum += value.get();
- }
- outV.set(sum);
- context.write(key,outV);
- }
- }
复制代码 2)在Job驱动类设置
- job.setCombinerClass(WordCountCombiner.class);
复制代码 四、OutputFormat数据输出
1.OutputFormat接口实现类(默认输出格式是TextOutputFormat)
2.自定义OutputFormat步骤
1)自定义一个类继承FileOutputFormat
2)改写RecordWriter,具体改写write()方法。
五、Join应用
1.Reduce Join
Map端的重要工作:为来自差别表或文件的key/value对,打标签以区别差别来源的记载。然后用连接字段作为key,其余部分和新加的标记作为value,末了举行输出。
Reduce端的重要工作:在Reduce端以连接字段作为key的分组已经完成,我们只必要在每一个分组当中将那些来源于差别文件的记载(在Map阶段已经打标记)分开,末了举行合并就ok了。
2.Map Join
2.1应用场景:实用于一张表特殊小或特殊大的环境
2.2优点:在Map端缓存多张表,提前处置惩罚业务逻辑,如许增加Map端业务,减少Reduce端数据的压力,尽可能的减少数据倾斜。
2.3具体方法: 使用DistributedCache
1)在Mapper的setup阶段,将文件读取到缓存集合中。
2)在Driver驱动类中加载缓存。
- //缓存普通文件到Task运行节点。
- job.addCacheFile(new URI("file:///e:/cache/pd.txt"));
- //如果是集群运行,需要设置HDFS路径
- job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));
复制代码 3.数据清洗(ETL)
E代表Extracte(抽取),T代表Transform(转换),L代表Load(加载)
运行核心业务MapReduce步伐之前,每每要先对数据举行清洗,清理掉不符合用户要求的数据。(清理过程一般只必要运行Mapper步伐,不必要运行Reduce步伐.
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |