一、MapReduce的概念
Hadoop的三大组件:HDFS、Yarn、MapReduce。
HDFS:解决的是分布式存储的问题。
MapReduce: 解决的是盘算问题。
Yarn: 盘算的时候,使用的资源如何协调(Windows操作系统)
- 2004年,谷歌发表了一篇名为《MapReduce》的论文,主要介绍了如何在分布式的存储系统上对数据进行高效率的计算。2005年,Nutch团队使用Java语言实现了这个技术,并命名为MapReduce。时至今日,MapReduce是Apache Hadoop的核心模块之一,是运行在HDFS上的分布式运算程序的编程框架,用于大规模数据集(大于1TB)的并行运算。其中的概念,"Map(映射)"和"Reduce(归约)"
- reduce 其实就是合并的意思。
复制代码 数据量假如太大,盘算是代码随着数据走。
mapReduce的优缺点:
- 1、易于编程
- 代码写起来有固定的格式,编写难度非常的小,号称是八股文【固定写法】。
- 2、良好的扩展性
- 代码的计算资源不够了,可以直接拓展几台即可解决
- 3、高容出错
- 如果负责计算的电脑挂掉了,直接可以将任务转移到其他电脑上,任务不会执行失败的。
- 4、非常适合大数据集的计算(PB级以上) 1P=1024T
复制代码- 1、不适合做实时计算
- mapreduce一个任务就要跑很长时间,不利于实时。不能做到秒级或者毫秒级的计算。
- mapreduce 属于离线的技术。
- 2、不适合做流式计算
- 数据因为都是静态的,不是边产生数据,边计算。
- 固定计算:数据量是固定的,给了1T 就计算。
- 3、不适合做有向图(DAG)计算
- 多个应用程序之间有依赖关系,后一个程序需要依赖前面的程序的结果。这种场景就称之为有向图,mapreduce是不适合的。
复制代码 总结:MapReduce只能做离线的数据分析,并且盘算速度比力慢。
我们之前打仗过:运行过一个WordCount 案例。
二、MapReduce案例--WordCount
1、新建maven项目,并且导入包
- <!--指定代码编译的版本-->
- <properties>
- <maven.compiler.target>1.8</maven.compiler.target>
- <maven.compiler.source>1.8</maven.compiler.source>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>3.3.1</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>3.3.1</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>3.3.1</version>
- </dependency>
- </dependencies>
复制代码 补充Maven的支持:
2、创建一些数据
在项目标根路径下,创建一个文件夹,mr01 ,在mr01文件夹下,再创建数据的来源文件input文件夹,
在input文件夹下面,新建file,a.txt, b.txt, c.txt
- a.txt
- hello bigdata hello 1999 hello beijing hello
- world hello hello java good
- b.txt
- hello gaoxinqu hello bingbing
- hello chenchen hello
- ACMilan hello china
- c.txt
- hello hadoop hello java hello storm hello spark hello redis hello zookeeper
- hello hive hello hbase hello flume
复制代码
3、编写代码
1)编写Map代码


- package com.bigdata;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import java.io.IOException;
- /**
- *
- * Mapper中的四个泛型跟什么照应:
- * 1、LongWritable 行偏移量,一般都是LongWritable ,这一行的数据是从第几个字符开始计算的,因为数据量很多这个值也会很大,所以使用Long
- * 2、Text 指的是这一行数据
- * 3、Text Map任务输出的Key值的类型 单词
- * 4、IntWritable Map任务输出的Key值的类型 1
- *
- */
- public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
-
- // Ctrl + o 可以展示哪些方法可以重写
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
- // key 值 指的是行偏移量
- // value 指的是 这一行数据
- //hello bigdata hello 1999 hello beijing hello
- String line = value.toString();
- // [hello,bigdata,hello,1999,hello,beijing,hello]
- String[] arr = line.split("\\s+");
- // hello-> 1,bigdata->1,hello->1,1999->1,hello->1,beijing->1,hello->1
- for (String word: arr) {
- context.write(new Text(word),new IntWritable(1));
- }
- // fori 循环
- /*for (int i = 0; i < arr.length ; i++) {
- context.write(new Text(arr[i]),new IntWritable(1));
- }*/
-
- }
- }
复制代码 2)编写Reduce代码
- package com.bigdata;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import java.io.IOException;
- /**
- * reduce 是用来合并的
- * reduce四个泛型:
- * 前两个,跟map的输出类型一样
- * 后面两个泛型:reduce端的输出类型
- * hello 5
- * world 2
- * ...
- */
- public class WordCountReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
- // reduce 这个方法,有多少个key值,就会调用多少次
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- // reduce 拿到的数据是什么样的呢 hello [1,1,1,1,1] world [1,1]
- // 数据不是你想像的这个样子 hello 1 hello 1 hello 1
- int count = 0;
- // 第一种写法
- for (IntWritable num : values) {
- int i = num.get();
- count = count + i;
- }
- // 第二种写法
- /*Iterator<IntWritable> iterator = values.iterator();
- while(iterator.hasNext()){
- int i = iterator.next().get();
- count = count + i;
- }*/
- // hello 5
- context.write(key,new IntWritable(count));
- }
- }
复制代码 3)编写测试代码
- package com.bigdata.mr1;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import java.io.IOException;
- public class WordCountDriver {
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- Configuration configuration = new Configuration();
- // 使用本地的文件系统,而不是hdfs
- configuration.set("fs.defaultFS","file:///");
- // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
- configuration.set("mapreduce.framework.name","local");
- Job job = Job.getInstance(configuration, "wordCount单词统计");
- // 指定 map
- job.setMapperClass(WordCountMapper.class);
- // hello 1
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- // 设置reduceTask的数量
- // reduce的数量决定了reduceTask的任务数量,每一个任务,结束后都会产生一个文件 part-r-xxxxx
- // 结论:reduceTask的数量可以和分区数量不一致,但是没有意义,一般两者保持一致。
- job.setNumReduceTasks(1);
- // 指定 reduce
- job.setReducerClass(WordCountReducer.class);
- // hello 5
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- // 此处也可以使用绝对路径
- FileInputFormat.setInputPaths(job,"../MapReduceDemo/mr01/input/");
- FileOutputFormat.setOutputPath(job,new Path("../MapReduceDemo/mr01/output"));
- boolean result = job.waitForCompletion(true);
- // 返回结果如果为true表示任务成功了,正常退出,否则非正常退出
- System.exit(result?0:-1);
- }
- }
复制代码


4、遇到的错误:
1、输出路径已经存在

2、假如出现如下问题,如何解决?
- Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
- at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
- at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
- at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1215)
- at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1420)
- at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
- at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
- at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
- at org.apache.hadoop.fs.FileSystem$4.<init>(FileSystem.java:2180)
- at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:2179)
- at org.apache.hadoop.fs.ChecksumFileSystem.listLocatedStatus(ChecksumFileSystem.java:783)
- at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:320)
- at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:279)
- at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:404)
- at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:310)
- at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:327)
- at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:200)
- at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1571)
- at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1568)
- at java.security.AccessController.doPrivileged(Native Method)
- at javax.security.auth.Subject.doAs(Subject.java:422)
- at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
- at org.apache.hadoop.mapreduce.Job.submit(Job.java:1568)
- at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1589)
- at mr02.WordCountDriver.main(WordCountDriver.java:58)
复制代码 假如之前在windows中的hadoop中拷贝过hadoop.dll等文件,是不会出现这个问题的。必要进行安装包的补丁配置。
解决方案:
- 将之前的 hadoop.dll 文件拷贝在 C:/Windows/System32文件夹下一份,可以不重启试试,假如不管用,重启一下。
复制代码 三、MR 中的自界说分区
需求:不仅单词统计,还必要将a-p 的单词存放在一起,q-z的单词存放在一起,其他单词存放在另一个文件中。
假如要完成以上的需求:就必要引入新的组件Partitioner。
1、编写代码
- package com.bigdata;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Partitioner;
- /**
- *
- * Map任务 --> Partitioner --> Reducer
- * Partitioner 其实就是Map端的输出
- */
- public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
- // 分区的区号,一定是从0开始的,中间不能断 0 1 2 3 4..
- @Override
- public int getPartition(Text text, IntWritable intWritable, int i) {
- // text就是一个单词
- String letter = text.toString();
- char c = letter.charAt(0);
- if(c>='a' && c <='p'){
- return 0;
- }else if(c>='q' && c <='z'){
- return 1;
- }else{
- return 2;
- }
- }
- }
复制代码 2、使用分区代码

- package com.bigdata;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- /**
- * @Author laoyan
- * @Description TODO
- * @Date 2022/8/1 10:02
- * @Version 1.0
- */
- public class WordCountDriver2 {
- public static void main(String[] args) throws Exception {
-
- Configuration configuration = new Configuration();
- // 使用本地的文件系统,而不是hdfs
- configuration.set("fs.defaultFS","file:///");
- // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
- configuration.set("mapreduce.framework.name","local");
- Job job = Job.getInstance(configuration, "单词统计WordCount");
- // map任务的设置
- job.setMapperClass(WordCountMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
-
- // 指定分区的类是哪一个
- job.setPartitionerClass(WordCountPartitioner.class);
- // 还要执行 reduce的数量 因为一个reduce 就会产生一个结果文件
- job.setNumReduceTasks(3);
-
- // reduce任务的设置
- job.setReducerClass(WordCountReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
-
- // 设置要统计的数据的路径,结果输出路径
- FileInputFormat.setInputPaths(job,new Path("mr01/input"));
- // ouput文件夹一定不要出现,否则会报错
- FileOutputFormat.setOutputPath(job,new Path("mr01/output2"));
- // 等待任务执行结束
- boolean b = job.waitForCompletion(true);
- // 此处是一定要退出虚拟机的
- System.exit(b ? 0:-1);
- }
- }
复制代码 假如分区文件,指定的是3个分区,reduce的使命设置的是1个,结果就是产生1个输出文件,三个分区的结果在一个文件中。
假如是3个分区,reduce的使命设置的是5个,输出结果就是5个文件,其中三个是有值的,背面两个是没有值的空文件。

综上所述:分区的数量,肯定要跟reduce的使命数量一致,服从是最高的。
补充:如何自动删除输出文件夹
- // 设置输入路径
- FileInputFormat.setInputPaths(job,"./data/");
- // 设置输出路径
- // 如何自动删除文件夹
- // 判断该文件夹是否存在,如何存在,删除。
- String outPath = "./data/output03";
- Path path = new Path(outPath);
- FileSystem fileSystem = FileSystem.get(conf);
- if(fileSystem.exists(path)){
- System.out.println("文件夹已存在");
- fileSystem.delete(path,true);
- }
复制代码 再将从前的代码简化一下写法:
- package com.bigdata;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import java.io.IOException;
- // java 1
- /**
- * 四个泛型分别是什么意思?
- * 第1和第2 个泛型指的是输入类型,固定的LongWritable,Text
- *
- * 第一个泛型指的是 文件的开始位置,就是偏移量 我们一般不用
- * 第二个泛型就是: 一行数据
- *
- * 也就意味着:一行数据执行一个map 方法
- * 第3和4 的泛型
- * 是输出类型
- * 第三个泛型:输出key的类型 String --> Text
- * 第四个泛型:输出value的类型 int --> IntWritable
- *
- */
- class WordCountMapper2 extends Mapper<LongWritable,Text, Text, IntWritable> {
- // ctrl + o 提示哪些方法可以重写
- // hello bigdata hello 1999 hello beijing hello
- // hello 1
- // bigdata 1
- // hello 1 ....
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
- System.out.println("本次的偏移量是:"+key.get());
- // 如何获取一行数据
- String line = value.toString();
- String[] arr = line.split("\\s+");
- for (String word : arr) {
- // 循环一个个的单词,并且将单词往外写出 word 1 bigdata 1.....
- context.write(new Text(word),new IntWritable(1));
- }
- }
- }
- class WordCountReducer2 extends Reducer<Text, IntWritable,Text, IntWritable> {
- // reduce 是聚合的意思
- // key 指的是一个单词
- // values [1,1,1,1,1,1]
- // hello [1,1,1,1,1,1,1,1,1]
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
- int sum = 0;
- for (IntWritable value : values) {
- sum += value.get();
- }
- context.write(key,new IntWritable(sum));
- }
- }
- public class WcDriver {
- public static void main(String[] args) throws Exception{
- // 以下都是固定写法
- Configuration conf = new Configuration();
- // 使用本地的文件系统,而不是hdfs
- conf.set("fs.defaultFS","file:///");
- // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
- conf.set("mapreduce.framework.name","local");
- Job job = Job.getInstance(conf, "单词统计");
- job.setMapperClass(WordCountMapper2.class);
- job.setReducerClass(WordCountReducer2.class);
- // bigdata 1 map类的输出类型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(IntWritable.class);
- // reduce类的输出类型 bigdata 10
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- FileInputFormat.setInputPaths(job,new Path("./mr01/input"));
- // 每次运行的时候,假如输出文件夹存在,就会报错,严重影响体验
- // 是否可以判断一下,假如存在,直接删除
- Path path = new Path("./mr01/output");
- FileSystem fileSystem = FileSystem.get(conf);
- if(fileSystem.exists(path)){
- fileSystem.delete(path,true);
- }
- FileOutputFormat.setOutputPath(job,path);
- boolean b = job.waitForCompletion(true);
- System.exit(b?0:-1);
- }
- }
复制代码 四、序列化
啥是序列化?
jvm中的一个对象,不是类,假如你想把一个对象,生存到磁盘上,必须序列化,你把文件中的对象进行恢复,是不是的反序列化。
假如你想把对象发送给另一个服务器,必要通过网络传输,也必须序列化,到另一侧要反序列化。
大数据中,必要用到序列化,因为数据在传输过程中,必要用到。
- 说到序列化,我们想到了Java的序列化。一个类实现了Serializable 接口即可。
- Java对象什么时候需要序列化?
- 1)需要保存到本地的时候
- 2)需要在网络之间传输的时候
- package com.bigdata;
- import java.io.Serializable;
- /**
- * @Author laoyan
- * @Description TODO
- * @Date 2022/8/1 11:43
- * @Version 1.0
- */
- public class User implements Serializable {
- private String name;
- private int age;
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public int getAge() {
- return age;
- }
- public void setAge(int age) {
- this.age = age;
- }
- }
复制代码 大数据技能Hadoop并没有采用java的序列化机制,而是自己又整了一套自己的序列化机制。为什么?
- Java的序列化携带的信息太多了,文件太大了,不便于在网络之间传输。
- User 使用Java --> 100KB
- User 使用大数据的序列化 --> 5KB
- 大数据采用的序列化机制是 Writable 接口。
- 为什么非得序列化呢?因为需要在网路之间传输。
复制代码 Java数据范例
| Hadoop序列化的数据范例
| 释义
| byte
| ByteWritable
| 字节范例
| short
| ShortWritable
| 短整型
| int
| IntWritable
| 整型
| long
| LongWritable
| 长整型
| float
| FloatWritable
| 单精度浮点型
| double
| DoubleWritable
| 双精度浮点型
| boolean
| BooleanWritable
| 布尔型
| String
| Text
| 字符串
| array
| ArrayWritable
| 数组
| Map
| MapWritable
| Map
| null
| NullWritable
| 空
| java的八大根本数据范例: byte short int long float double char boolean
只必要记住:String --> Text即可。null --> NullWritable,仅仅是为了在某个地方占位,符合语法而已。
1、测试java序列化 VS Hadoop序列化大小比力
- package com.bigdata;
- import java.io.Serializable;
- /**
- * @Author laoyan
- * @Description TODO
- * @Date 2022/8/1 11:43
- * @Version 1.0
- */
- public class User implements Serializable {
- private String name;
- private int age;
- public User(String name, int age) {
- this.name = name;
- this.age = age;
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public int getAge() {
- return age;
- }
- public void setAge(int age) {
- this.age = age;
- }
- }
复制代码- package com.bigdata;
- import org.apache.hadoop.io.Writable;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- /**
- * @Author laoyan
- * @Description TODO
- * @Date 2022/8/1 14:06
- * @Version 1.0
- */
- public class UserWritable implements Writable {
- private String name;
- private int age;
- public UserWritable(String name, int age) {
- this.name = name;
- this.age = age;
- }
- // 序列化
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(name);
- out.writeInt(age);
- }
- // 反序列化
- @Override
- public void readFields(DataInput in) throws IOException {
- // 进行反序列化的时候,读取的顺序一定要跟序列化的时候的顺序一致,否则报错
- name = in.readUTF();
- age = in.readInt();
- }
- public String getName() {
- return name;
- }
- public void setName(String name) {
- this.name = name;
- }
- public int getAge() {
- return age;
- }
- public void setAge(int age) {
- this.age = age;
- }
- }
复制代码- package com.bigdata;
- import java.io.FileNotFoundException;
- import java.io.FileOutputStream;
- import java.io.ObjectOutputStream;
- /**
- * @Author laoyan
- * @Description TODO
- * @Date 2022/8/1 14:11
- * @Version 1.0
- */
- public class TestXLH {
- public static void main(String[] args) throws Exception {
- User user = new User("zhangsan",20);
- ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream("D:/user1.txt"));
- objectOutputStream.writeObject(user);
- objectOutputStream.close();
- UserWritable user2 = new UserWritable("zhangsan",20);
- ObjectOutputStream objectOutputStream2 = new ObjectOutputStream(new FileOutputStream("D:/user2.txt"));
- // 此时是序列化对象去write 对象流,此处需要注意
- user2.write(objectOutputStream2);
- objectOutputStream2.close();
- }
- }
复制代码 java序列化的结果:
hadoop序列化的结果:
2、自界说序列化案例--手机流量统计
需求:假定拿到了一些关于手机流量的日志文件,统计每个手机号码的上行流量,下行流量,以及总流量。
读懂日志文件:
- 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
-
- 第二列是手机号码 15920133257
- 倒数第三列是上行流量 3156
- 倒数第二列是下行流量 2936
复制代码 思绪:
- map任务:
- 15989002119 1232 3456
- 15989002119 2343 34343
- ......
- 13726238888 3243 23432
- 13726238888 4343 4343
- reduce 任务:
- 15989002119 对相同的手机号码进行合并
- Key: 15989002119 values: [{15989002119,34343,34343},{15989002119,8989,34343}....]
-
- 此处:手机号码,上行流量,下行流量,总流量
- 此时就需要自定义数据类型了。
复制代码 统计的结果:

代码演示:
- package com.bigdata.phoneflow;
- import org.apache.hadoop.io.Writable;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- /**
- * @Author laoyan
- * @Description TODO
- * @Date 2022/8/1 14:29
- * @Version 1.0
- */
- // 自定义一个数据类型
- public class PhoneFlowWritable implements Writable {
- private String phone;
- private int upFlow;
- private int downFlow;
- // 此处需要指定一个空参数的构造方法,否则报错:
- // java.lang.NoSuchMethodException: com.bigdata.phoneflow.PhoneFlowWritable.<init>()
- public PhoneFlowWritable() {
- }
- public PhoneFlowWritable(String phone, int upFlow, int downFlow) {
- this.phone = phone;
- this.upFlow = upFlow;
- this.downFlow = downFlow;
- }
- public String getPhone() {
- return phone;
- }
- public void setPhone(String phone) {
- this.phone = phone;
- }
- public int getUpFlow() {
- return upFlow;
- }
- public void setUpFlow(int upFlow) {
- this.upFlow = upFlow;
- }
- public int getDownFlow() {
- return downFlow;
- }
- public void setDownFlow(int downFlow) {
- this.downFlow = downFlow;
- }
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeUTF(phone);
- out.writeInt(upFlow);
- out.writeInt(downFlow);
- }
- @Override
- public void readFields(DataInput in) throws IOException {
- phone = in.readUTF();
- upFlow = in.readInt();
- downFlow = in.readInt();
- }
- }
复制代码- package com.bigdata.phoneflow;
- import com.bigdata.WordCountMapper;
- import com.bigdata.WordCountPartitioner;
- import com.bigdata.WordCountReducer;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import java.io.IOException;
- import java.util.Iterator;
- /**
- * @Author laoyan
- * @Description TODO
- * @Date 2022/8/1 14:33
- * @Version 1.0
- */
- class PhoneFlowMapper extends Mapper<LongWritable, Text,Text,PhoneFlowWritable> {
- // 将每一句话,都变为 手机号码 --> PhoneFlowWritable对象
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, PhoneFlowWritable>.Context context) throws IOException, InterruptedException {
- String line = value.toString();
- String[] arr = line.split("\\s+");
- if(arr.length == 11){
- String phone = arr[1];
- // 正则表达式,判断手机号
- if (phone.matches("^1[3456789][0-9]{9}$")){
- int upFlow = Integer.parseInt(arr[arr.length-3]);
- int downFlow = Integer.parseInt(arr[arr.length-2]);
- System.out.println(phone+","+upFlow+","+downFlow);
- context.write(new Text(phone),new PhoneFlowWritable(phone,upFlow,downFlow));
- }
- }
- }
- }
- // 手机号 --> 流量数据PhoneFlowWritable 手机号码 --> 统计的结果
- class PhoneFlowReducer extends Reducer<Text,PhoneFlowWritable,Text,Text> {
- @Override
- protected void reduce(Text key, Iterable<PhoneFlowWritable> values, Reducer<Text, PhoneFlowWritable, Text, Text>.Context context) throws IOException, InterruptedException {
- int upFlowNum = 0;
- int downFlowNum = 0;
- Iterator<PhoneFlowWritable> iterator = values.iterator();
- while(iterator.hasNext()){
- PhoneFlowWritable phoneFlowWritable = iterator.next();
- upFlowNum += phoneFlowWritable.getUpFlow();
- downFlowNum += phoneFlowWritable.getDownFlow();
- }
- StringBuffer sb = new StringBuffer();
- sb.append("手机号"+key+"流量统计:");
- sb.append("上行流量是:"+upFlowNum);
- sb.append("下行流量是:"+downFlowNum);
- sb.append("总的流量是:"+(upFlowNum + downFlowNum));
- context.write(key,new Text(sb.toString()));
- }
- }
- public class PhoneFlowDriver {
- public static void main(String[] args) throws Exception{
- Configuration configuration = new Configuration();
- // 使用本地的文件系统,而不是hdfs
- configuration.set("fs.defaultFS","file:///");
- // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
- configuration.set("mapreduce.framework.name","local");
- Job job = Job.getInstance(configuration, "手机流量统计");
- // map任务的设置
- job.setMapperClass(PhoneFlowMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(PhoneFlowWritable.class);
- // reduce任务的设置
- job.setReducerClass(PhoneFlowReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- // 设置要统计的数据的路径,结果输出路径
- FileInputFormat.setInputPaths(job,new Path("mr01/phoneFlow/input"));
- // ouput文件夹一定不要出现,否则会报错
- FileOutputFormat.setOutputPath(job,new Path("mr01/phoneFlow/output"));
- // 等待任务执行结束
- boolean b = job.waitForCompletion(true);
- // 此处是一定要退出虚拟机的
- System.exit(b ? 0:-1);
- }
- }
复制代码 注意这个案例中有一个关于迭代器的用法,迭代器实在也可以通过foreach循环进行遍历:
- for (PhoneFlowWritable phoneFlow : values) {
- sumUpFlow += phoneFlow.getUpFlow();
- sumDownFlow += phoneFlow.getDownFlow();
- }
复制代码 随堂代码如下:
- package com.bigdata.phoneflow;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.NullWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.Mapper;
- import org.apache.hadoop.mapreduce.Reducer;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import java.io.IOException;
- /**
- * @基本功能:
- * @program:MapReduce
- * @author: 闫哥
- * @create:2024-01-22 15:02:25
- **/
- // 一个java 文件中可以有多个class ,但是只能有一个public class ,而且这个类名必须和java 的文件名一样
- // 手机号 一个自定义的类
- class PhoneFlowMapper extends Mapper<LongWritable, Text,Text,PhoneFlowWritable> {
- // ctrl + o
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, PhoneFlowWritable>.Context context) throws IOException, InterruptedException {
- String line = value.toString();
- String[] arr = line.split("\\s+");
- if(arr.length == 11){
- String phone = arr[1];
- // 正则表达式,判断手机号
- if (phone.matches("^1[3456789][0-9]{9}$")){
- int upFlow = Integer.parseInt(arr[arr.length-3]);
- int downFlow = Integer.parseInt(arr[arr.length-2]);
- context.write(new Text(phone),new PhoneFlowWritable(phone,upFlow,downFlow));
- }
- }
- }
- }
- class PhoneFlowReducer extends Reducer<Text,PhoneFlowWritable, NullWritable,Text>{
- @Override
- protected void reduce(Text phone, Iterable<PhoneFlowWritable> values, Reducer<Text, PhoneFlowWritable, NullWritable, Text>.Context context) throws IOException, InterruptedException {
- long sumUpFlow = 0;
- long sumDownFlow = 0;
- for (PhoneFlowWritable phoneFlow : values) {
- sumUpFlow += phoneFlow.getUpFlow();
- sumDownFlow += phoneFlow.getDownFlow();
- }
- String str = "手机号为:"+phone+",上行总流量:"+sumUpFlow+",下行总流量:"+sumDownFlow+",总量:"+(sumUpFlow+sumDownFlow);
- context.write(NullWritable.get(),new Text(str));
- }
- }
- public class PhoneFlowDriver {
- public static void main(String[] args) throws Exception{
- // 先获取一个Job对象
- Configuration conf = new Configuration();
- // 第一个:使用本地文件系统
- conf.set("fs.defaultFS","file:///");
- // 第二个:使用本地资源
- conf.set("mapreduce.framework.name","local");
- Job job = Job.getInstance(conf, "流量统计");
- // 设置 map
- job.setMapperClass(com.bigdata.phoneflow.PhoneFlowMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(PhoneFlowWritable.class);
- // 设置reduce
- job.setReducerClass(com.bigdata.phoneflow.PhoneFlowReducer.class);
- job.setOutputKeyClass(NullWritable.class);
- job.setOutputValueClass(Text.class);
- // 设置输入路径
- FileInputFormat.setInputPaths(job,"./data/phoneflow/");
- // 设置输出路径
- // 如何自动删除文件夹
- // 判断该文件夹是否存在,如何存在,删除。
- String outPath = "./data/phoneflowresult/";
- Path path = new Path(outPath);
- FileSystem fileSystem = FileSystem.get(conf);
- if(fileSystem.exists(path)){
- System.out.println("文件夹已存在");
- fileSystem.delete(path,true);
- }
- FileOutputFormat.setOutputPath(job,path);
- boolean result = job.waitForCompletion(true);
- System.exit(result?0:-1);
- }
- }
复制代码 3、假如我们在编写代码的时候不出结果,也不报错,怎么办?
- # Global logging configuration
- # Debug info warn error
- log4j.rootLogger=WARN, stdout
- # MyBatis logging configuration...
- log4j.logger.org.mybatis.example.BlogMapper=TRACE
- # Console output...
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n
复制代码
五、关于片和块
- 假如我现在500M这样的数据,如何存储?
- 500M = 128M + 128M + 128M + 116M 分为四个块进行存储。
- 计算的时候,是按照片儿计算的,而不是块儿。
- 块是物理概念,一个块就是128M ,妥妥的,毋庸置疑。
- 片是逻辑概念,一个片大约等于一个块。
- 假如我现在需要计算一个300M的文件,这个时候启动多少个MapTask任务?答案是有多少个片儿,就启动多少个任务。
- 一个片儿约等于 一个块,但是最大可以 128M*1.1倍= 140.8
- 300M 128+128+44
- 128M 启动一个Map任务进行读取
- 172M 172M 和 128M * 1.1 =140.8M 进行比较,如果大于 ,继续进行切割
- 128M 启动一个任务Map任务
- 剩余44M 剩余的44M 和 128M*1.1倍比较,小于这个值,剩余的44M 就单独起一个Map任务
- 300m的数据,分给了3个MapTask任务进行处理。
- 如果是260M的数据,由多少个Map任务处理?
- 128M 第一个任务
- 132M 跟 128M * 1.1 进行比较,发现小于这个值,直接一个Map任务搞定,不在启动第三个任务了。
复制代码 比如班里的同砚一起搬砖,每人规定搬3块,假定砖还剩4块,到某个同砚了,他就直接搬完即可,没须要让另一个同砚因为一块砖,而专门跑一趟。
- 1、什么是片,什么是块?
- 块是物理概念,片是逻辑概念。一般片 = 块的,但是到最后一次的时候,有可能片> 块,但是绝对不能超过块的1.1倍。
- 2、mapreduce 启动多少个MapTask任务?
- 跟片有关系,有多少个片,就启动多少个map任务。跟块儿无关。
复制代码 六、Shuffle 过程【必问】
- MapReduce的Shuffle过程指的是MapTask的后半程,以及ReduceTask的前半程,共同组成的。
- 从MapTask中的map方法结束,到ReduceTask中的reduce方法开始,这个中间的部分就是Shuffle。是MapReduce的核心,心脏。
复制代码
map端:
- 1、map中的context.write方法,对外写出的时候,其实是写入到了一个环形缓冲区内(内存形式的),这个环形缓冲区大小是100M,可以通过参数设置。如果里面的数据大于80M,就开始溢写(从内存中将数据写入到磁盘上)。溢写的文件存放地址可以设置。
- 2、在溢写过程中,环形缓冲区不会停止工作,是会利用剩余的20%继续存入环形缓冲区的。除非是环形缓冲区的内存满了,map任务就被阻塞了。
- 在溢写出来的文件中,是排过序的,排序规则:快速排序算法。在排序之前,会根据分区的算法,对数据进行分区。是在内存中,先分区,在每一个分区中再排序,接着溢写到磁盘上的。
- 3、溢写出来的小文件需要合并为一个大文件,因为每一个MapTask只能有一份数据。就将相同的分区文件合并,并且排序(此处是归并排序)。每次合并的时候是10个小文件合并为一个大文件,进行多次合并,最终每一个分区的文件只能有一份。
- 假如100个小文件,需要合并几次呢?
- 100 每10分合并一次,第一轮:100个文件合并为了10个文件,这10个文件又合并为一个大文件,总共合并了11次。
- 4、将内存中的数据,溢写到磁盘上,还可以指定是否需要压缩,以及压缩的算法是什么。
复制代码 reduce端:
- 1、reduce端根据不同的分区,拉取每个服务器上的相同的分区的数据。
- reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,但这个默认值可以修改设置mapreduce.reduce.shuffle. parallelcopies 属性即可。
- 2、如果map上的数据非常的小,该数据会拉取到reduce端的内存中,如果数据量比较大,直接拉取到reduce端的硬盘上。
复制代码
七、Combiner 【可有可无】
这个Combiner是一个优化的代码,对于我们最终的结果没有任何的影响。
map端产生的数据,会被拉去到reduce端进行合并,有可能map端产生的数据非常的大,未便于在网络间传输,那么有没有办法可以缩小map端的数据呢?
之前: java 1 java 1 java 1 传递给reduce
如今: java 3 传递给reduce
Combiner实在就是运行在mapTask中的reducer。 Reducer实在就是合并代码的。Combiner是作用在Map端的。
这个结果不是最终的结果,而是一个暂时的小统计。 最终reduce是会将所有的map结果再次进行汇总才是我们最终想要的统计结果。
- Combiner 只能用于对统计结果没有影响的场景下。
- 一般只用于 统计之和,统计最大值最小值的场景下。统计平均值等情况是不能用的。
复制代码 在代码中如何使用?
Combiner起作用的地方:
Combiner 实在作用于两个地方,一个是环形缓冲区溢写磁盘的时候,除了分区,排序之外,还可以做合并操作,将内存中的 hello 1 hello 1 hello 1 会合并为 hello 3
第二个位置是小文件合并为MapTask的大文件的时候,会将多个 hello 的值相加 hello 19,但是这个不是最终的答案,最终答案是将多个MapTask使掷中的hello 进行合并才是最终的结果。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |