大数据-Hadoop-MapReduce

打印 上一主题 下一主题

主题 1845|帖子 1845|积分 5535

一、MapReduce的概念

Hadoop的三大组件:HDFS、Yarn、MapReduce。
HDFS:解决的是分布式存储的问题。
MapReduce: 解决的是盘算问题。
Yarn: 盘算的时候,使用的资源如何协调(Windows操作系统)
  1.     2004年,谷歌发表了一篇名为《MapReduce》的论文,主要介绍了如何在分布式的存储系统上对数据进行高效率的计算。2005年,Nutch团队使用Java语言实现了这个技术,并命名为MapReduce。时至今日,MapReduce是Apache Hadoop的核心模块之一,是运行在HDFS上的分布式运算程序的编程框架,用于大规模数据集(大于1TB)的并行运算。其中的概念,"Map(映射)"和"Reduce(归约)"
  2. reduce 其实就是合并的意思。
复制代码
数据量假如太大,盘算是代码随着数据走


mapReduce的优缺点:
  1. 1、易于编程
  2.       代码写起来有固定的格式,编写难度非常的小,号称是八股文【固定写法】。
  3. 2、良好的扩展性
  4.       代码的计算资源不够了,可以直接拓展几台即可解决
  5. 3、高容出错
  6.       如果负责计算的电脑挂掉了,直接可以将任务转移到其他电脑上,任务不会执行失败的。
  7. 4、非常适合大数据集的计算(PB级以上)  1P=1024T
复制代码
  1. 1、不适合做实时计算
  2.     mapreduce一个任务就要跑很长时间,不利于实时。不能做到秒级或者毫秒级的计算。
  3.     mapreduce 属于离线的技术。
  4. 2、不适合做流式计算
  5.     数据因为都是静态的,不是边产生数据,边计算。
  6.     固定计算:数据量是固定的,给了1T 就计算。
  7. 3、不适合做有向图(DAG)计算
  8.     多个应用程序之间有依赖关系,后一个程序需要依赖前面的程序的结果。这种场景就称之为有向图,mapreduce是不适合的。
复制代码
总结:MapReduce只能做离线的数据分析,并且盘算速度比力慢
我们之前打仗过:运行过一个WordCount 案例。
二、MapReduce案例--WordCount

1、新建maven项目,并且导入包








  1. <!--指定代码编译的版本-->
  2. <properties>
  3.   <maven.compiler.target>1.8</maven.compiler.target>
  4.   <maven.compiler.source>1.8</maven.compiler.source>
  5. </properties>
  6. <dependencies>
  7.   <dependency>
  8.     <groupId>org.apache.hadoop</groupId>
  9.     <artifactId>hadoop-common</artifactId>
  10.     <version>3.3.1</version>
  11.   </dependency>
  12.   <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
  13.   <dependency>
  14.     <groupId>org.apache.hadoop</groupId>
  15.     <artifactId>hadoop-client</artifactId>
  16.     <version>3.3.1</version>
  17.   </dependency>
  18.   <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
  19.   <dependency>
  20.     <groupId>org.apache.hadoop</groupId>
  21.     <artifactId>hadoop-hdfs</artifactId>
  22.     <version>3.3.1</version>
  23.   </dependency>
  24. </dependencies>
复制代码
补充Maven的支持:


2、创建一些数据

在项目标根路径下,创建一个文件夹,mr01 ,在mr01文件夹下,再创建数据的来源文件input文件夹,
在input文件夹下面,新建file,a.txt, b.txt, c.txt
  1. a.txt
  2. hello bigdata hello 1999 hello beijing hello
  3. world hello hello java good
  4. b.txt
  5. hello gaoxinqu hello bingbing
  6. hello chenchen hello
  7. ACMilan hello china
  8. c.txt
  9. hello hadoop hello java hello storm hello spark hello redis hello zookeeper
  10. hello hive hello hbase hello flume
复制代码


3、编写代码

1)编写Map代码




  1. package com.bigdata;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import java.io.IOException;
  7. /**
  8. *
  9. * Mapper中的四个泛型跟什么照应:
  10. * 1、LongWritable  行偏移量,一般都是LongWritable ,这一行的数据是从第几个字符开始计算的,因为数据量很多这个值也会很大,所以使用Long
  11. * 2、Text     指的是这一行数据
  12. * 3、Text     Map任务输出的Key值的类型           单词
  13. * 4、IntWritable     Map任务输出的Key值的类型     1
  14. *
  15. */
  16. public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
  17.    
  18.     // Ctrl + o  可以展示哪些方法可以重写
  19.     @Override
  20.     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  21.         // key 值 指的是行偏移量
  22.         // value 指的是 这一行数据
  23.         //hello bigdata hello 1999 hello beijing hello
  24.         String line = value.toString();
  25.         // [hello,bigdata,hello,1999,hello,beijing,hello]
  26.         String[] arr = line.split("\\s+");
  27.         //  hello-> 1,bigdata->1,hello->1,1999->1,hello->1,beijing->1,hello->1
  28.         for (String word: arr) {
  29.             context.write(new Text(word),new IntWritable(1));
  30.         }
  31.         // fori 循环
  32.         /*for (int i = 0; i < arr.length ; i++) {
  33.             context.write(new Text(arr[i]),new IntWritable(1));
  34.         }*/
  35.         
  36.     }
  37. }
复制代码
2)编写Reduce代码
  1. package com.bigdata;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. /**
  7. *   reduce 是用来合并的
  8. *   reduce四个泛型:
  9. *   前两个,跟map的输出类型一样
  10. *   后面两个泛型:reduce端的输出类型
  11. *     hello 5
  12. *     world 2
  13. *     ...
  14. */
  15. public class WordCountReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
  16.     // reduce 这个方法,有多少个key值,就会调用多少次
  17.     @Override
  18.     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  19.         // reduce 拿到的数据是什么样的呢   hello [1,1,1,1,1]   world [1,1]
  20.         // 数据不是你想像的这个样子 hello 1   hello 1  hello 1
  21.         int count = 0;
  22.         // 第一种写法
  23.         for (IntWritable num : values) {
  24.             int i = num.get();
  25.             count = count + i;
  26.         }
  27.         // 第二种写法
  28.         /*Iterator<IntWritable> iterator = values.iterator();
  29.         while(iterator.hasNext()){
  30.             int i = iterator.next().get();
  31.             count = count + i;
  32.         }*/
  33.         // hello 5
  34.         context.write(key,new IntWritable(count));
  35.     }
  36. }
复制代码
3)编写测试代码
  1. package com.bigdata.mr1;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import java.io.IOException;
  10. public class WordCountDriver {
  11.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  12.         Configuration configuration = new Configuration();
  13.         // 使用本地的文件系统,而不是hdfs
  14.         configuration.set("fs.defaultFS","file:///");
  15.         // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
  16.         configuration.set("mapreduce.framework.name","local");
  17.         Job job = Job.getInstance(configuration, "wordCount单词统计");
  18.         // 指定 map
  19.         job.setMapperClass(WordCountMapper.class);
  20.         // hello 1
  21.         job.setMapOutputKeyClass(Text.class);
  22.         job.setMapOutputValueClass(IntWritable.class);
  23.         // 设置reduceTask的数量
  24.         // reduce的数量决定了reduceTask的任务数量,每一个任务,结束后都会产生一个文件 part-r-xxxxx
  25.         // 结论:reduceTask的数量可以和分区数量不一致,但是没有意义,一般两者保持一致。
  26.         job.setNumReduceTasks(1);
  27.         // 指定 reduce
  28.         job.setReducerClass(WordCountReducer.class);
  29.         // hello 5
  30.         job.setOutputKeyClass(Text.class);
  31.         job.setOutputValueClass(IntWritable.class);
  32.         // 此处也可以使用绝对路径
  33.         FileInputFormat.setInputPaths(job,"../MapReduceDemo/mr01/input/");
  34.         FileOutputFormat.setOutputPath(job,new Path("../MapReduceDemo/mr01/output"));
  35.         boolean result = job.waitForCompletion(true);
  36.         // 返回结果如果为true表示任务成功了,正常退出,否则非正常退出
  37.         System.exit(result?0:-1);
  38.     }
  39. }
复制代码




4、遇到的错误:

1、输出路径已经存在



2、假如出现如下问题,如何解决?
  1. Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
  2.         at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
  3.         at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
  4.         at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1215)
  5.         at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1420)
  6.         at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
  7.         at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
  8.         at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
  9.         at org.apache.hadoop.fs.FileSystem$4.<init>(FileSystem.java:2180)
  10.         at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:2179)
  11.         at org.apache.hadoop.fs.ChecksumFileSystem.listLocatedStatus(ChecksumFileSystem.java:783)
  12.         at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:320)
  13.         at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:279)
  14.         at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:404)
  15.         at org.apache.hadoop.mapreduce.JobSubmitter.writeNewSplits(JobSubmitter.java:310)
  16.         at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:327)
  17.         at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:200)
  18.         at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1571)
  19.         at org.apache.hadoop.mapreduce.Job$11.run(Job.java:1568)
  20.         at java.security.AccessController.doPrivileged(Native Method)
  21.         at javax.security.auth.Subject.doAs(Subject.java:422)
  22.         at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
  23.         at org.apache.hadoop.mapreduce.Job.submit(Job.java:1568)
  24.         at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1589)
  25.         at mr02.WordCountDriver.main(WordCountDriver.java:58)
复制代码
假如之前在windows中的hadoop中拷贝过hadoop.dll等文件,是不会出现这个问题的。必要进行安装包的补丁配置。
解决方案:
  1. 将之前的 hadoop.dll 文件拷贝在  C:/Windows/System32文件夹下一份,可以不重启试试,假如不管用,重启一下。
复制代码
三、MR 中的自界说分区

需求:不仅单词统计,还必要将a-p 的单词存放在一起,q-z的单词存放在一起,其他单词存放在另一个文件中。


假如要完成以上的需求:就必要引入新的组件Partitioner。
1、编写代码
  1. package com.bigdata;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Partitioner;
  5. /**
  6. *
  7. * Map任务 --> Partitioner  --> Reducer
  8. *   Partitioner 其实就是Map端的输出
  9. */
  10. public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
  11.     // 分区的区号,一定是从0开始的,中间不能断   0 1 2 3 4..
  12.     @Override
  13.     public int getPartition(Text text, IntWritable intWritable, int i) {
  14.         // text就是一个单词
  15.         String letter = text.toString();
  16.         char c = letter.charAt(0);
  17.         if(c>='a' && c <='p'){
  18.             return 0;
  19.         }else if(c>='q' && c <='z'){
  20.             return 1;
  21.         }else{
  22.             return 2;
  23.         }
  24.     }
  25. }
复制代码
2、使用分区代码


  1. package com.bigdata;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. /**
  10. * @Author laoyan
  11. * @Description TODO
  12. * @Date 2022/8/1 10:02
  13. * @Version 1.0
  14. */
  15. public class WordCountDriver2 {
  16.     public static void main(String[] args) throws Exception {
  17.         
  18.         Configuration configuration = new Configuration();
  19.         // 使用本地的文件系统,而不是hdfs
  20.         configuration.set("fs.defaultFS","file:///");
  21.         // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
  22.         configuration.set("mapreduce.framework.name","local");
  23.         Job job = Job.getInstance(configuration, "单词统计WordCount");
  24.         // map任务的设置
  25.         job.setMapperClass(WordCountMapper.class);
  26.         job.setMapOutputKeyClass(Text.class);
  27.         job.setMapOutputValueClass(IntWritable.class);
  28.         
  29.         // 指定分区的类是哪一个
  30.         job.setPartitionerClass(WordCountPartitioner.class);
  31.         // 还要执行 reduce的数量  因为一个reduce 就会产生一个结果文件
  32.         job.setNumReduceTasks(3);
  33.         
  34.         // reduce任务的设置
  35.         job.setReducerClass(WordCountReducer.class);
  36.         job.setOutputKeyClass(Text.class);
  37.         job.setOutputValueClass(IntWritable.class);
  38.         
  39.         // 设置要统计的数据的路径,结果输出路径
  40.         FileInputFormat.setInputPaths(job,new Path("mr01/input"));
  41.         // ouput文件夹一定不要出现,否则会报错
  42.         FileOutputFormat.setOutputPath(job,new Path("mr01/output2"));
  43.         // 等待任务执行结束
  44.         boolean b = job.waitForCompletion(true);
  45.         // 此处是一定要退出虚拟机的
  46.         System.exit(b ? 0:-1);
  47.     }
  48. }
复制代码
假如分区文件,指定的是3个分区,reduce的使命设置的是1个,结果就是产生1个输出文件,三个分区的结果在一个文件中。
假如是3个分区,reduce的使命设置的是5个,输出结果就是5个文件,其中三个是有值的,背面两个是没有值的空文件。


综上所述:分区的数量,肯定要跟reduce的使命数量一致,服从是最高的

补充:如何自动删除输出文件夹
  1. // 设置输入路径
  2. FileInputFormat.setInputPaths(job,"./data/");
  3. // 设置输出路径
  4. // 如何自动删除文件夹
  5. // 判断该文件夹是否存在,如何存在,删除。
  6. String outPath = "./data/output03";
  7. Path path = new Path(outPath);
  8. FileSystem fileSystem = FileSystem.get(conf);
  9. if(fileSystem.exists(path)){
  10.     System.out.println("文件夹已存在");
  11.     fileSystem.delete(path,true);
  12. }
复制代码
再将从前的代码简化一下写法:
  1. package com.bigdata;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import java.io.IOException;
  14. //  java 1
  15. /**
  16. *   四个泛型分别是什么意思?
  17. *   第1和第2 个泛型指的是输入类型,固定的LongWritable,Text
  18. *
  19. *   第一个泛型指的是  文件的开始位置,就是偏移量  我们一般不用
  20. *   第二个泛型就是: 一行数据
  21. *
  22. *   也就意味着:一行数据执行一个map 方法
  23. *   第3和4 的泛型
  24. *   是输出类型
  25. *   第三个泛型:输出key的类型  String --> Text
  26. *   第四个泛型:输出value的类型  int  --> IntWritable
  27. *
  28. */
  29. class WordCountMapper2 extends Mapper<LongWritable,Text, Text, IntWritable> {
  30.     // ctrl + o  提示哪些方法可以重写
  31.     // hello bigdata hello 1999 hello beijing hello
  32.     // hello 1
  33.     // bigdata 1
  34.     // hello 1  ....
  35.     @Override
  36.     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  37.         System.out.println("本次的偏移量是:"+key.get());
  38.         // 如何获取一行数据
  39.         String line = value.toString();
  40.         String[] arr = line.split("\\s+");
  41.         for (String word : arr) {
  42.             // 循环一个个的单词,并且将单词往外写出   word 1  bigdata 1.....
  43.             context.write(new Text(word),new IntWritable(1));
  44.         }
  45.     }
  46. }
  47. class WordCountReducer2 extends Reducer<Text, IntWritable,Text, IntWritable> {
  48.     // reduce 是聚合的意思
  49.     // key 指的是一个单词
  50.     // values [1,1,1,1,1,1]
  51.     // hello  [1,1,1,1,1,1,1,1,1]
  52.     @Override
  53.     protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  54.         int sum = 0;
  55.         for (IntWritable value : values) {
  56.             sum += value.get();
  57.         }
  58.         context.write(key,new IntWritable(sum));
  59.     }
  60. }
  61. public class WcDriver {
  62.     public static void main(String[] args) throws Exception{
  63.         // 以下都是固定写法
  64.         Configuration conf = new Configuration();
  65.         // 使用本地的文件系统,而不是hdfs
  66.         conf.set("fs.defaultFS","file:///");
  67.         // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
  68.         conf.set("mapreduce.framework.name","local");
  69.         Job job = Job.getInstance(conf, "单词统计");
  70.         job.setMapperClass(WordCountMapper2.class);
  71.         job.setReducerClass(WordCountReducer2.class);
  72.         // bigdata 1  map类的输出类型
  73.         job.setMapOutputKeyClass(Text.class);
  74.         job.setMapOutputValueClass(IntWritable.class);
  75.         // reduce类的输出类型  bigdata 10
  76.         job.setOutputKeyClass(Text.class);
  77.         job.setOutputValueClass(IntWritable.class);
  78.         FileInputFormat.setInputPaths(job,new Path("./mr01/input"));
  79.         // 每次运行的时候,假如输出文件夹存在,就会报错,严重影响体验
  80.         // 是否可以判断一下,假如存在,直接删除
  81.         Path path = new Path("./mr01/output");
  82.         FileSystem fileSystem = FileSystem.get(conf);
  83.         if(fileSystem.exists(path)){
  84.             fileSystem.delete(path,true);
  85.         }
  86.         FileOutputFormat.setOutputPath(job,path);
  87.         boolean b = job.waitForCompletion(true);
  88.         System.exit(b?0:-1);
  89.     }
  90. }
复制代码
四、序列化

啥是序列化?
jvm中的一个对象,不是类,假如你想把一个对象,生存到磁盘上,必须序列化,你把文件中的对象进行恢复,是不是的反序列化。
假如你想把对象发送给另一个服务器,必要通过网络传输,也必须序列化,到另一侧要反序列化。
大数据中,必要用到序列化,因为数据在传输过程中,必要用到。
  1. 说到序列化,我们想到了Java的序列化。一个类实现了Serializable 接口即可。
  2. Java对象什么时候需要序列化?
  3. 1)需要保存到本地的时候
  4. 2)需要在网络之间传输的时候
  5. package com.bigdata;
  6. import java.io.Serializable;
  7. /**
  8. * @Author laoyan
  9. * @Description TODO
  10. * @Date 2022/8/1 11:43
  11. * @Version 1.0
  12. */
  13. public class User implements Serializable {
  14.     private String name;
  15.     private int age;
  16.     public String getName() {
  17.         return name;
  18.     }
  19.     public void setName(String name) {
  20.         this.name = name;
  21.     }
  22.     public int getAge() {
  23.         return age;
  24.     }
  25.     public void setAge(int age) {
  26.         this.age = age;
  27.     }
  28. }
复制代码
大数据技能Hadoop并没有采用java的序列化机制,而是自己又整了一套自己的序列化机制。为什么?
  1. Java的序列化携带的信息太多了,文件太大了,不便于在网络之间传输。
  2. User  使用Java  --> 100KB
  3. User  使用大数据的序列化 --> 5KB
  4. 大数据采用的序列化机制是  Writable 接口。
  5. 为什么非得序列化呢?因为需要在网路之间传输。
复制代码
Java数据范例
Hadoop序列化的数据范例
释义
byte
ByteWritable
字节范例
short
ShortWritable
短整型
int
IntWritable
整型
long
LongWritable
长整型
float
FloatWritable
单精度浮点型
double
DoubleWritable
双精度浮点型
boolean
BooleanWritable
布尔型
String
Text
字符串
array
ArrayWritable
数组
Map
MapWritable
Map
null
NullWritable

java的八大根本数据范例: byte short int long float double char boolean
只必要记住:String --> Text即可。null --> NullWritable,仅仅是为了在某个地方占位,符合语法而已。


1、测试java序列化 VS Hadoop序列化大小比力

  1. package com.bigdata;
  2. import java.io.Serializable;
  3. /**
  4. * @Author laoyan
  5. * @Description TODO
  6. * @Date 2022/8/1 11:43
  7. * @Version 1.0
  8. */
  9. public class User implements Serializable {
  10.     private String name;
  11.     private int age;
  12.     public User(String name, int age) {
  13.         this.name = name;
  14.         this.age = age;
  15.     }
  16.     public String getName() {
  17.         return name;
  18.     }
  19.     public void setName(String name) {
  20.         this.name = name;
  21.     }
  22.     public int getAge() {
  23.         return age;
  24.     }
  25.     public void setAge(int age) {
  26.         this.age = age;
  27.     }
  28. }
复制代码
  1. package com.bigdata;
  2. import org.apache.hadoop.io.Writable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. /**
  7. * @Author laoyan
  8. * @Description TODO
  9. * @Date 2022/8/1 14:06
  10. * @Version 1.0
  11. */
  12. public class UserWritable implements Writable {
  13.     private String name;
  14.     private int age;
  15.     public UserWritable(String name, int age) {
  16.         this.name = name;
  17.         this.age = age;
  18.     }
  19.     // 序列化
  20.     @Override
  21.     public void write(DataOutput out) throws IOException {
  22.         out.writeUTF(name);
  23.         out.writeInt(age);
  24.     }
  25.     // 反序列化
  26.     @Override
  27.     public void readFields(DataInput in) throws IOException {
  28.         // 进行反序列化的时候,读取的顺序一定要跟序列化的时候的顺序一致,否则报错
  29.         name = in.readUTF();
  30.         age = in.readInt();
  31.     }
  32.     public String getName() {
  33.         return name;
  34.     }
  35.     public void setName(String name) {
  36.         this.name = name;
  37.     }
  38.     public int getAge() {
  39.         return age;
  40.     }
  41.     public void setAge(int age) {
  42.         this.age = age;
  43.     }
  44. }
复制代码
  1. package com.bigdata;
  2. import java.io.FileNotFoundException;
  3. import java.io.FileOutputStream;
  4. import java.io.ObjectOutputStream;
  5. /**
  6. * @Author laoyan
  7. * @Description TODO
  8. * @Date 2022/8/1 14:11
  9. * @Version 1.0
  10. */
  11. public class TestXLH {
  12.     public static void main(String[] args) throws Exception {
  13.         User user = new User("zhangsan",20);
  14.         ObjectOutputStream objectOutputStream = new ObjectOutputStream(new FileOutputStream("D:/user1.txt"));
  15.         objectOutputStream.writeObject(user);
  16.         objectOutputStream.close();
  17.         UserWritable user2 = new UserWritable("zhangsan",20);
  18.         ObjectOutputStream objectOutputStream2 = new ObjectOutputStream(new FileOutputStream("D:/user2.txt"));
  19.         // 此时是序列化对象去write 对象流,此处需要注意
  20.         user2.write(objectOutputStream2);
  21.         objectOutputStream2.close();
  22.     }
  23. }
复制代码
java序列化的结果:


hadoop序列化的结果:


2、自界说序列化案例--手机流量统计

需求:假定拿到了一些关于手机流量的日志文件,统计每个手机号码的上行流量,下行流量,以及总流量。


读懂日志文件:
  1. 1363157995033         15920133257        5C-0E-8B-C7-BA-20:CMCC        120.197.40.4        sug.so.360.cn        信息安全        20        20        3156        2936        200
  2.    
  3. 第二列是手机号码       15920133257
  4. 倒数第三列是上行流量   3156
  5. 倒数第二列是下行流量   2936
复制代码
思绪:
  1. map任务:
  2.    15989002119   1232 3456
  3.    15989002119   2343 34343
  4.     ......
  5.    13726238888   3243  23432
  6.    13726238888  4343 4343
  7. reduce 任务:
  8. 15989002119   对相同的手机号码进行合并
  9.   Key:  15989002119    values: [{15989002119,34343,34343},{15989002119,8989,34343}....]
  10.   
  11. 此处:手机号码,上行流量,下行流量,总流量
  12. 此时就需要自定义数据类型了。
复制代码
统计的结果:


代码演示:
  1. package com.bigdata.phoneflow;
  2. import org.apache.hadoop.io.Writable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. /**
  7. * @Author laoyan
  8. * @Description TODO
  9. * @Date 2022/8/1 14:29
  10. * @Version 1.0
  11. */
  12. // 自定义一个数据类型
  13. public class PhoneFlowWritable implements Writable {
  14.     private String phone;
  15.     private int upFlow;
  16.     private int downFlow;
  17.     // 此处需要指定一个空参数的构造方法,否则报错:
  18.     // java.lang.NoSuchMethodException: com.bigdata.phoneflow.PhoneFlowWritable.<init>()
  19.     public PhoneFlowWritable() {
  20.     }
  21.     public PhoneFlowWritable(String phone, int upFlow, int downFlow) {
  22.         this.phone = phone;
  23.         this.upFlow = upFlow;
  24.         this.downFlow = downFlow;
  25.     }
  26.     public String getPhone() {
  27.         return phone;
  28.     }
  29.     public void setPhone(String phone) {
  30.         this.phone = phone;
  31.     }
  32.     public int getUpFlow() {
  33.         return upFlow;
  34.     }
  35.     public void setUpFlow(int upFlow) {
  36.         this.upFlow = upFlow;
  37.     }
  38.     public int getDownFlow() {
  39.         return downFlow;
  40.     }
  41.     public void setDownFlow(int downFlow) {
  42.         this.downFlow = downFlow;
  43.     }
  44.     @Override
  45.     public void write(DataOutput out) throws IOException {
  46.         out.writeUTF(phone);
  47.         out.writeInt(upFlow);
  48.         out.writeInt(downFlow);
  49.     }
  50.     @Override
  51.     public void readFields(DataInput in) throws IOException {
  52.         phone =  in.readUTF();
  53.         upFlow = in.readInt();
  54.         downFlow = in.readInt();
  55.     }
  56. }
复制代码
  1. package com.bigdata.phoneflow;
  2. import com.bigdata.WordCountMapper;
  3. import com.bigdata.WordCountPartitioner;
  4. import com.bigdata.WordCountReducer;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.IntWritable;
  8. import org.apache.hadoop.io.LongWritable;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import java.io.IOException;
  16. import java.util.Iterator;
  17. /**
  18. * @Author laoyan
  19. * @Description TODO
  20. * @Date 2022/8/1 14:33
  21. * @Version 1.0
  22. */
  23. class PhoneFlowMapper extends Mapper<LongWritable, Text,Text,PhoneFlowWritable> {
  24.     // 将每一句话,都变为   手机号码 -->  PhoneFlowWritable对象
  25.     @Override
  26.     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, PhoneFlowWritable>.Context context) throws IOException, InterruptedException {
  27.         String line = value.toString();
  28.         String[] arr = line.split("\\s+");
  29.         if(arr.length == 11){
  30.             String phone = arr[1];
  31.             // 正则表达式,判断手机号
  32.             if (phone.matches("^1[3456789][0-9]{9}$")){
  33.                 int upFlow = Integer.parseInt(arr[arr.length-3]);
  34.             int downFlow = Integer.parseInt(arr[arr.length-2]);
  35.             System.out.println(phone+","+upFlow+","+downFlow);
  36.             context.write(new Text(phone),new PhoneFlowWritable(phone,upFlow,downFlow));
  37.             }
  38.         }
  39.     }
  40. }
  41. //   手机号 --> 流量数据PhoneFlowWritable      手机号码 --> 统计的结果
  42. class PhoneFlowReducer extends Reducer<Text,PhoneFlowWritable,Text,Text> {
  43.     @Override
  44.     protected void reduce(Text key, Iterable<PhoneFlowWritable> values, Reducer<Text, PhoneFlowWritable, Text, Text>.Context context) throws IOException, InterruptedException {
  45.         int upFlowNum = 0;
  46.         int downFlowNum = 0;
  47.         Iterator<PhoneFlowWritable> iterator = values.iterator();
  48.         while(iterator.hasNext()){
  49.             PhoneFlowWritable phoneFlowWritable = iterator.next();
  50.             upFlowNum += phoneFlowWritable.getUpFlow();
  51.             downFlowNum += phoneFlowWritable.getDownFlow();
  52.         }
  53.         StringBuffer sb = new StringBuffer();
  54.         sb.append("手机号"+key+"流量统计:");
  55.         sb.append("上行流量是:"+upFlowNum);
  56.         sb.append("下行流量是:"+downFlowNum);
  57.         sb.append("总的流量是:"+(upFlowNum + downFlowNum));
  58.         context.write(key,new Text(sb.toString()));
  59.     }
  60. }
  61. public class PhoneFlowDriver {
  62.     public static void main(String[] args) throws Exception{
  63.         Configuration configuration = new Configuration();
  64.         // 使用本地的文件系统,而不是hdfs
  65.         configuration.set("fs.defaultFS","file:///");
  66.         // 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
  67.         configuration.set("mapreduce.framework.name","local");
  68.         Job job = Job.getInstance(configuration, "手机流量统计");
  69.         // map任务的设置
  70.         job.setMapperClass(PhoneFlowMapper.class);
  71.         job.setMapOutputKeyClass(Text.class);
  72.         job.setMapOutputValueClass(PhoneFlowWritable.class);
  73.         // reduce任务的设置
  74.         job.setReducerClass(PhoneFlowReducer.class);
  75.         job.setOutputKeyClass(Text.class);
  76.         job.setOutputValueClass(Text.class);
  77.         // 设置要统计的数据的路径,结果输出路径
  78.         FileInputFormat.setInputPaths(job,new Path("mr01/phoneFlow/input"));
  79.         // ouput文件夹一定不要出现,否则会报错
  80.         FileOutputFormat.setOutputPath(job,new Path("mr01/phoneFlow/output"));
  81.         // 等待任务执行结束
  82.         boolean b = job.waitForCompletion(true);
  83.         // 此处是一定要退出虚拟机的
  84.         System.exit(b ? 0:-1);
  85.     }
  86. }
复制代码
注意这个案例中有一个关于迭代器的用法,迭代器实在也可以通过foreach循环进行遍历:
  1. for (PhoneFlowWritable phoneFlow : values) {
  2.         sumUpFlow += phoneFlow.getUpFlow();
  3.         sumDownFlow += phoneFlow.getDownFlow();
  4. }
复制代码
随堂代码如下:
  1. package com.bigdata.phoneflow;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.NullWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import java.io.IOException;
  15. /**
  16. * @基本功能:
  17. * @program:MapReduce
  18. * @author: 闫哥
  19. * @create:2024-01-22 15:02:25
  20. **/
  21. // 一个java 文件中可以有多个class ,但是只能有一个public class ,而且这个类名必须和java 的文件名一样
  22.     //  手机号  一个自定义的类
  23. class PhoneFlowMapper extends Mapper<LongWritable, Text,Text,PhoneFlowWritable> {
  24.     // ctrl + o
  25.     @Override
  26.     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, PhoneFlowWritable>.Context context) throws IOException, InterruptedException {
  27.         String line = value.toString();
  28.         String[] arr = line.split("\\s+");
  29.         if(arr.length == 11){
  30.             String phone = arr[1];
  31.             // 正则表达式,判断手机号
  32.             if (phone.matches("^1[3456789][0-9]{9}$")){
  33.                 int upFlow = Integer.parseInt(arr[arr.length-3]);
  34.             int downFlow = Integer.parseInt(arr[arr.length-2]);
  35.                 context.write(new Text(phone),new PhoneFlowWritable(phone,upFlow,downFlow));
  36.             }
  37.         }
  38.     }
  39. }
  40. class PhoneFlowReducer extends Reducer<Text,PhoneFlowWritable, NullWritable,Text>{
  41.     @Override
  42.     protected void reduce(Text phone, Iterable<PhoneFlowWritable> values, Reducer<Text, PhoneFlowWritable, NullWritable, Text>.Context context) throws IOException, InterruptedException {
  43.         long sumUpFlow = 0;
  44.         long sumDownFlow = 0;
  45.         for (PhoneFlowWritable phoneFlow : values) {
  46.             sumUpFlow += phoneFlow.getUpFlow();
  47.             sumDownFlow += phoneFlow.getDownFlow();
  48.         }
  49.         String str = "手机号为:"+phone+",上行总流量:"+sumUpFlow+",下行总流量:"+sumDownFlow+",总量:"+(sumUpFlow+sumDownFlow);
  50.         context.write(NullWritable.get(),new Text(str));
  51.     }
  52. }
  53. public class PhoneFlowDriver {
  54.     public static void main(String[] args) throws Exception{
  55.         // 先获取一个Job对象
  56.         Configuration conf = new Configuration();
  57.         // 第一个:使用本地文件系统
  58.         conf.set("fs.defaultFS","file:///");
  59.         // 第二个:使用本地资源
  60.         conf.set("mapreduce.framework.name","local");
  61.         Job job = Job.getInstance(conf, "流量统计");
  62.         // 设置 map
  63.         job.setMapperClass(com.bigdata.phoneflow.PhoneFlowMapper.class);
  64.         job.setMapOutputKeyClass(Text.class);
  65.         job.setMapOutputValueClass(PhoneFlowWritable.class);
  66.         // 设置reduce
  67.         job.setReducerClass(com.bigdata.phoneflow.PhoneFlowReducer.class);
  68.         job.setOutputKeyClass(NullWritable.class);
  69.         job.setOutputValueClass(Text.class);
  70.         // 设置输入路径
  71.         FileInputFormat.setInputPaths(job,"./data/phoneflow/");
  72.         // 设置输出路径
  73.         // 如何自动删除文件夹
  74.         // 判断该文件夹是否存在,如何存在,删除。
  75.         String outPath = "./data/phoneflowresult/";
  76.         Path path = new Path(outPath);
  77.         FileSystem fileSystem = FileSystem.get(conf);
  78.         if(fileSystem.exists(path)){
  79.             System.out.println("文件夹已存在");
  80.             fileSystem.delete(path,true);
  81.         }
  82.         FileOutputFormat.setOutputPath(job,path);
  83.         boolean result = job.waitForCompletion(true);
  84.         System.exit(result?0:-1);
  85.     }
  86. }
复制代码
3、假如我们在编写代码的时候不出结果,也不报错,怎么办?

  1. # Global logging configuration
  2. #  Debug   info   warn  error
  3. log4j.rootLogger=WARN, stdout
  4. # MyBatis logging configuration...
  5. log4j.logger.org.mybatis.example.BlogMapper=TRACE
  6. # Console output...
  7. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  8. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  9. log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n
复制代码


五、关于片和块

  1. 假如我现在500M这样的数据,如何存储?
  2. 500M = 128M + 128M + 128M + 116M  分为四个块进行存储。
  3. 计算的时候,是按照片儿计算的,而不是块儿。
  4. 块是物理概念,一个块就是128M ,妥妥的,毋庸置疑。
  5. 片是逻辑概念,一个片大约等于一个块。
  6. 假如我现在需要计算一个300M的文件,这个时候启动多少个MapTask任务?答案是有多少个片儿,就启动多少个任务。
  7. 一个片儿约等于 一个块,但是最大可以 128M*1.1倍= 140.8
  8. 300M    128+128+44
  9. 128M  启动一个Map任务进行读取
  10. 172M  172M  和   128M * 1.1 =140.8M 进行比较,如果大于 ,继续进行切割
  11. 128M  启动一个任务Map任务
  12. 剩余44M   剩余的44M 和  128M*1.1倍比较,小于这个值,剩余的44M 就单独起一个Map任务
  13. 300m的数据,分给了3个MapTask任务进行处理。
  14. 如果是260M的数据,由多少个Map任务处理?
  15. 128M   第一个任务
  16. 132M   跟 128M * 1.1 进行比较,发现小于这个值,直接一个Map任务搞定,不在启动第三个任务了。
复制代码
比如班里的同砚一起搬砖,每人规定搬3块,假定砖还剩4块,到某个同砚了,他就直接搬完即可,没须要让另一个同砚因为一块砖,而专门跑一趟。
  1. 1、什么是片,什么是块?
  2. 块是物理概念,片是逻辑概念。一般片 = 块的,但是到最后一次的时候,有可能片> 块,但是绝对不能超过块的1.1倍。
  3. 2、mapreduce 启动多少个MapTask任务?
  4. 跟片有关系,有多少个片,就启动多少个map任务。跟块儿无关。
复制代码
六、Shuffle 过程【必问】

  1. MapReduce的Shuffle过程指的是MapTask的后半程,以及ReduceTask的前半程,共同组成的。
  2. 从MapTask中的map方法结束,到ReduceTask中的reduce方法开始,这个中间的部分就是Shuffle。是MapReduce的核心,心脏。
复制代码


map端:

  1. 1、map中的context.write方法,对外写出的时候,其实是写入到了一个环形缓冲区内(内存形式的),这个环形缓冲区大小是100M,可以通过参数设置。如果里面的数据大于80M,就开始溢写(从内存中将数据写入到磁盘上)。溢写的文件存放地址可以设置。
  2. 2、在溢写过程中,环形缓冲区不会停止工作,是会利用剩余的20%继续存入环形缓冲区的。除非是环形缓冲区的内存满了,map任务就被阻塞了。
  3. 在溢写出来的文件中,是排过序的,排序规则:快速排序算法。在排序之前,会根据分区的算法,对数据进行分区。是在内存中,先分区,在每一个分区中再排序,接着溢写到磁盘上的。
  4. 3、溢写出来的小文件需要合并为一个大文件,因为每一个MapTask只能有一份数据。就将相同的分区文件合并,并且排序(此处是归并排序)。每次合并的时候是10个小文件合并为一个大文件,进行多次合并,最终每一个分区的文件只能有一份。
  5. 假如100个小文件,需要合并几次呢?
  6. 100  每10分合并一次,第一轮:100个文件合并为了10个文件,这10个文件又合并为一个大文件,总共合并了11次。
  7. 4、将内存中的数据,溢写到磁盘上,还可以指定是否需要压缩,以及压缩的算法是什么。
复制代码
reduce端:

  1. 1、reduce端根据不同的分区,拉取每个服务器上的相同的分区的数据。
  2. reduce任务有少量复制线程,因此能够并行取得map输出。默认值是5个线程,但这个默认值可以修改设置mapreduce.reduce.shuffle. parallelcopies 属性即可。
  3. 2、如果map上的数据非常的小,该数据会拉取到reduce端的内存中,如果数据量比较大,直接拉取到reduce端的硬盘上。
复制代码


七、Combiner 【可有可无】

这个Combiner是一个优化的代码,对于我们最终的结果没有任何的影响。
map端产生的数据,会被拉去到reduce端进行合并,有可能map端产生的数据非常的大,未便于在网络间传输,那么有没有办法可以缩小map端的数据呢?
之前: java 1 java 1 java 1 传递给reduce
如今: java 3 传递给reduce
Combiner实在就是运行在mapTask中的reducer。 Reducer实在就是合并代码的。Combiner是作用在Map端的。
这个结果不是最终的结果,而是一个暂时的小统计。 最终reduce是会将所有的map结果再次进行汇总才是我们最终想要的统计结果。


  1. Combiner 只能用于对统计结果没有影响的场景下。
  2. 一般只用于  统计之和,统计最大值最小值的场景下。统计平均值等情况是不能用的。
复制代码
在代码中如何使用?


Combiner起作用的地方:


Combiner 实在作用于两个地方,一个是环形缓冲区溢写磁盘的时候,除了分区,排序之外,还可以做合并操作,将内存中的 hello 1 hello 1 hello 1 会合并为 hello 3
第二个位置是小文件合并为MapTask的大文件的时候,会将多个 hello 的值相加 hello 19,但是这个不是最终的答案,最终答案是将多个MapTask使掷中的hello 进行合并才是最终的结果。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

没腿的鸟

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表