ToB企服应用市场:ToB评测及商务社交产业平台

标题: 03初识MapReduce [打印本页]

作者: 雁过留声    时间: 2023-2-3 01:32
标题: 03初识MapReduce
初识MapReduce

一、什么是MapReduce

MapReduce是一种编程范式,它借助Map将一个大任务分解成多个小任务,再借助Reduce归并Map的结果。MapReduce虽然原理很简单,但是使用MapReduce设计出一个解决问题的应用却不是一件简单的事情。下面通过一个简单的小例子来介绍MapReduce。
二、使用MapReduce寻找销售人员业绩最大值

《Hadoop权威指南》的例子是寻找天气最大值,需要去下载数据。但是我们并不需要完全复刻他的场景,所以这里用了另外一个例子。假设有一批销售日志数据文件,它的一部分是这样的。
  1. 66$2021-01-01$5555
  2. 67$2021-01-01$5635
复制代码
每一行代表某一位销售人员某个日期的销售数量,具体格式为
  1. 销售用户id$统计日期$销售数量
复制代码
我们需要寻找每一个销售用户的销售最大值是多少。需要说明的是,这里仅仅是举一个很简单的示例,便于学习MapReduce。
1、数据解析器

我首先写了一个解析器来识别每一行的文本,它的作用是将每一行文本转换为数据实体,数据实体这里偷了个懒,字段全部设置成了public。代码片段如下:
  1. /**
  2. * 销售数据解释器
  3. * 销售数据格式为
  4. * userId$countDate(yyyy-MM-dd)$saleCount
  5. */
  6. public class SaleDataParse implements TextParse<SaleDataEntity> {
  7.     @Override
  8.     public SaleDataEntity parse(String text) {
  9.         if (text == null) {
  10.             return null;
  11.         }
  12.         text = text.trim();
  13.         if (text.isEmpty()) {
  14.             return null;
  15.         }
  16.         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
  17.         String[] split = text.split("\\$");
  18.         SaleDataEntity data = new SaleDataEntity();
  19.         data.userId = Long.valueOf(split[0]);
  20.         data.countDate = sdf.parse(split[1], new ParsePosition(0));
  21.         data.saleCount = Integer.valueOf(split[2]);
  22.         return data;
  23.     }
  24. }
  25. /**
  26. * 销售数据实体
  27. */
  28. public class SaleDataEntity {
  29.     /**
  30.      * 销售用户id
  31.      */
  32.     public Long userId;
  33.     /**
  34.      * 销售日期
  35.      */
  36.     public Date countDate;
  37.     /**
  38.      * 销售总数
  39.      */
  40.     public Integer saleCount;
  41. }
复制代码
2、Map函数

Mapper<输入键, 输入值, 输出键, 输出值>是一个泛型类,它需要4个泛型参数,从左到右分别是输入键、输入值、输出键和输出值。也就是这样
  1. Mapper<输入键, 输入值, 输出键, 输出值>
复制代码
其中输入键和输入值的格式是由InputFormatClass决定的,关于输入格式的讨论之后会展开讨论。MapReduce默认会把文件按行拆分,然后偏移量(输入键)->行文本(输入值)的映射传递给Mapper<输入键, 输入值, 输出键, 输出值>的map方法。输出键和输出值则由用户进行指定。
这里由于是找每一个用户的最大销售数量,Mapper<输入键, 输入值, 输出键, 输出值>的功能是接收并解析每行数据。所以输出键我设成了销售人员id->销售数量的映射。所以实际的Mapper<输入键, 输入值, 输出键, 输出值>实现看起来像这样:
  1. /** * 解析输入的文本数据 */public class MaxSaleMapper<输入键, 输入值, 输出键, 输出值> extends Mapper<输入键, 输入值, 输出键, 输出值> {    protected TextParse saleDataParse = new SaleDataParse();    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        String s = value.toString();        SaleDataEntity data = saleDataParse.parse(s);        if (data != null) {            //写入输出给Reducer<输入键, 输入值, 输出键, 输出值>            context.write(new LongWritable(data.userId), new IntWritable(data.saleCount));        }    }}
复制代码
其中LongWritable相当于java里的long,Text相当于java里的String,IntWritable相当于java里的int。
这里你可能会想到,既然已经解析成了数据实体,为什么不直接把实体设置成输出值?因为map函数和reduce函数不一定运行在同一个进程里,所以会涉及到序列化和反序列化,这里先不展开。
3、Reduce函数

Reducer<输入键, 输入值, 输出键, 输出值>也是一个泛型类,它也需要4个参数,从左到右分别是输入键、输入值、输出键和输出值。也就是这样
  1. Reducer<输入键, 输入值, 输出键, 输出值>
复制代码
与Mapper<输入键, 输入值, 输出键, 输出值>不同的是,输入键和输入值来源于Mapper<输入键, 输入值, 输出键, 输出值>的输出,也就是Mapper<输入键, 输入值, 输出键, 输出值>实现中的context.write()。
输出键和输出值也是由用户指定,默认的输出会写到文件中,关于Reducer<输入键, 输入值, 输出键, 输出值>的输出以后会讨论。
Reducer<输入键, 输入值, 输出键, 输出值>的功能是寻找每个用户的最大值,所以Reducer<输入键, 输入值, 输出键, 输出值>的实现看起来像这样:
  1. /** * 查找每一个用户的最大销售值 */public class MaxSaleReducer<输入键, 输入值, 输出键, 输出值> extends Reducer<输入键, 输入值, 输出键, 输出值> {    @Override    protected void reduce(LongWritable key, Iterable values, Context context) throws IOException, InterruptedException {        int max = 0;        for (IntWritable value : values) {            if (value.get() > max) {                max = value.get();            }        }        context.write(key, new IntWritable(max));    }}
复制代码
你可能会奇怪,为什么reduce方法的第二个参数是一个迭代器。简单来说,Mapper<输入键, 输入值, 输出键, 输出值>会把映射的值进行归并,然后再传递给Reducer<输入键, 输入值, 输出键, 输出值>。
4、驱动程序

我们已经完成了map和reduce函数的实现,现在我们需要把它们组装起来。我们需要写一个Main类,它看起来像这样
  1. public class MaxSale {    public static void main(String[] args) throws Exception {        Job job = Job.getInstance();        job.setJarByClass(MaxSale.class);        job.setJobName("MaxSale");        FileInputFormat.addInputPath(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        job.setMapper<输入键, 输入值, 输出键, 输出值>Class(MaxSaleMapper<输入键, 输入值, 输出键, 输出值>.class);        job.setMapOutputKeyClass(LongWritable.class);        job.setMapOutputValueClass(IntWritable.class);        job.setReducer<输入键, 输入值, 输出键, 输出值>Class(MaxSaleReducer<输入键, 输入值, 输出键, 输出值>.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        //设置Reduce任务数        job.setNumReduceTasks(1);        System.exit(job.waitForCompletion(true) ? 0 : 1);    }}
复制代码
这里解释一下
5、运行结果

使用maven package打包,会生成一个jar,我生成的名字是maxSaleMapReduce-1.0-SNAPSHOT.jar。如果打包的jar有除了Hadoop的其他依赖,需要设置一下HADOOP_CLASSPATH,然后把依赖放到HADOOP_CLASSPATH目录中。
最后输入启动命令,格式为:hadoop jar 生成的jar.jar  输入数据目录 输出数据目录。这里给出我使用的命令示例:
  1. Windows:
  2. set HADOOP_CLASSPATH=C:\xxxxxxxxx\lib\*
  3. hadoop jar maxSaleMapReduce-1.0-SNAPSHOT.jar input output
复制代码
然后你会看到程序有如下输出,这里截取的部分:
  1. 23/01/18 12:10:29 INFO mapred.MapTask: Starting flush of map output
  2. 23/01/18 12:10:29 INFO mapred.MapTask: Spilling map output
  3. 23/01/18 12:10:29 INFO mapred.MapTask: bufstart = 0; bufend = 17677320; bufvoid = 104857600
  4. 23/01/18 12:10:29 INFO mapred.MapTask: kvstart = 26214396(104857584); kvend = 20321960(81287840); length = 5892437/6553600
  5. 23/01/18 12:10:30 INFO mapred.MapTask: Finished spill 0
  6. 23/01/18 12:10:30 INFO mapred.Task: Task:attempt_local1909247000_0001_m_000000_0 is done. And is in the process of committing
  7. 23/01/18 12:10:30 INFO mapred.LocalJobRunner: map
  8. 23/01/18 12:10:30 INFO mapred.Task: Task 'attempt_local1909247000_0001_m_000000_0' done.
  9. 23/01/18 12:10:30 INFO mapred.Task: Final Counters for attempt_local1909247000_0001_m_000000_0: Counters: 17
  10.         File System Counters
  11.                 FILE: Number of bytes read=33569210
  12.                 FILE: Number of bytes written=21132276
  13.                 FILE: Number of read operations=0
  14.                 FILE: Number of large read operations=0
  15.                 FILE: Number of write operations=0
  16.         Map-Reduce Framework
  17.                 Map input records=1473110
  18.                 Map output records=1473110
  19.                 Map output bytes=17677320
  20.                 Map output materialized bytes=20623546
  21.                 Input split bytes=122
  22.                 Combine input records=0
  23.                 Spilled Records=1473110
  24.                 Failed Shuffles=0
  25.                 Merged Map outputs=0
  26.                 GC time elapsed (ms)=36
  27.                 Total committed heap usage (bytes)=268435456
  28.         File Input Format Counters
  29.                 Bytes Read=33558528
  30. 23/01/18 12:10:30 INFO mapred.LocalJobRunner: Finishing task: attempt_local1909247000_0001_m_000000_0
  31. 23/01/18 12:10:30 INFO mapred.LocalJobRunner: Starting task: attempt_local1909247000_0001_m_000001_0
  32. 23/01/18 12:10:30 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
  33. 23/01/18 12:10:30 INFO output.FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
复制代码
等待程序执行结束,output文件夹会有输出part-r-00000,文件里每一行是每一个用户的id和他销售最大值。
  1. 0        9994
  2. 1        9975
  3. 2        9987
  4. 3        9985
  5. 4        9978
  6. 5        9998
复制代码
三、MapReduce执行流程


简单分析一下这个示例程度的执行流程:
四、示例代码说明

本文所有的代码放在我的github上,地址是:https://github.com/xunpengliu/hello-hadoop
下面是项目目录说明:
最后需要说明的是,项目代码主要用于学习,代码风格并非代表本人实际风格,不完善之处请轻喷。
五、常见问题


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4