Hadoop-MapReduce

打印 上一主题 下一主题

主题 504|帖子 504|积分 1512

MapReduce

1、优缺点

  1. 优点:易于编程、良好的扩展性、高容错性
  2. 缺点:不适合做实时计算、不适合做流式计算(要求数据是静态的)、不适合DAG(有向图)计算
复制代码
2、MapReduce的阶段分类【掌握】


MapReduce的步调在运行的过程中,一样平常分为两个阶段:Map阶段和Reduce阶段
2.1、第一阶段:Map

第一阶段,也称之为Map阶段。这个阶段会有若干个MapTask实例,完全并行运行,互不干系。每个MapTask会读取分析一个InputSplit(输入分片,简称分片)对应的原始数据。计算的结果数据会暂时生存到地点节点的本地磁盘里。
该阶段的编程模型中会有一个map函数需要开辟人员重写,map函数的输入是一个<key,value>对,map函数的输出也是一个<key,value>对,key和value的范例需要开辟人员指定。

2.2、第二阶段:Reduce

第二阶段,也称为Reduce阶段。这个阶段会有若干个ReduceTask实例并发运行,互不干系。但是他们的数据依靠于上一个阶段的所有MapTask并发实例的输出。一个ReudceTasK会从多个MapTask运行节点上fetch自己要处理的分区数据。颠末处理后,输出到HDFS上。
该阶段的编程模型中有一个reduce函数需要开辟人员重写,reduce函数的输入也是一个<key,value>对,reduce函数的输出也是一个<key,value>对。这里要夸大的是,reduce的输入其实就是map的输出,只不过map的输出颠末shuffle技术后变成了<key,List>而已。

注意:MapReduce编程模型只能包罗一个map阶段和一个reduce阶段,假如用户的业务逻辑非常复杂,那就只能多个MapReduce步调,串行执行。
3、IDE 编写MapReduce 实现wordCount

3.1、在IDE中需要依次创建WordCountMapper、WordCountReducer、WordCountDriver

示例中的包路径:com.ms.mshadoop.mapreduce
  1. package com.ms.mshadoop.mapreduce;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import java.io.IOException;
  7. /**
  8. * Map阶段开始的时候,每一个MapTask都会逐行读取分片中的数据,并将读取到的数据进行扭转形成 keyIn, valueIn
  9. * 由于计算程序需要到不同节点之间进行移动,因此涉及到 keyIn/valueIn/keyOut/valueOut都必须支持序列化的类型
  10. * Hadoop提供了一套序列化的机制 writeable
  11. * byte    =>  ByteWritable
  12. * short   =>  ShortWritable
  13. * int     =>  IntWritable
  14. * long    =>  LongWritable
  15. * float   =>  FloatWritable
  16. * double  =>  DoubleWritable
  17. * boolean =>  BooleanWritable
  18. * String  =>  Text
  19. * <p>
  20. * keyIn:读取到的 行数据 中 首字母 的偏移量  需要设计为LongWritable
  21. * valueIn:读取到的 行数据                需要设计为Text
  22. * <p>
  23. * keyOut:经过逻辑处理后,需要写出的键值对中 键的类型 Text
  24. * valueOut:经过逻辑处理后,需要写出的键值对中 值的类型 IntWritable
  25. */
  26. public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  27.     /**
  28.      * 每当读取到一行数据时,将其扭转为keyIn和valueIn,调用该方法
  29.      * @param key  行偏移量
  30.      * @param value 行记录
  31.      * @param context 操作上下文
  32.      * @throws IOException
  33.      * @throws InterruptedException
  34.      */
  35.     @Override
  36.     protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  37.         //1、将读取的行数据,切割出每一个单词
  38.         // 正则表达式中\s匹配任何空白字符,包括空格、制表符、换页符等等, 等价于[\f\n\r\t\v]
  39.         // 而\s+则表示匹配任意多个上面的字符
  40.         String[] words = value.toString().split("\\s+");
  41.         //2、遍历每一个单词
  42.         for (String word : words) {
  43.             //3、为每一单词配上(value,1) 组成键值对 写出
  44.             context.write(new Text(word),new IntWritable(1));
  45.         }
  46.     }
  47. }
复制代码
  1. package com.ms.mshadoop.mapreduce;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. /**
  7. * keyIn: Map阶段输出的键类型
  8. * valueIn: Map阶段输出的值类型
  9. * <p>
  10. * keyOut: 最终输出的键值对中,键的类型
  11. * valueOut: 最终输出的键值对中,值的类型
  12. */
  13. public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  14.     /**
  15.      * Reduce阶段逻辑处理
  16.      *
  17.      * @param key     键,Map阶段输出的键
  18.      * @param values  输入进这个方法之前,MapReduce会按照键进行分组,将相同的键对应的所有值聚合到一起
  19.      * @param context 上下文
  20.      * @throws IOException
  21.      * @throws InterruptedException
  22.      */
  23.     @Override
  24.     protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  25.         //1、定义一个变量,来记录单子出现的总次数
  26.         int timeCount = 0;
  27.         for (IntWritable value : values) {
  28.             timeCount += value.get();
  29.         }
  30.         context.write(key, new IntWritable(timeCount));
  31.     }
  32. }
复制代码
  1. package com.ms.mshadoop.mapreduce;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import java.io.IOException;
  10. /**
  11. * 驱动类:主要功能
  12. * 1、创建Job
  13. * 2、Job属性配置
  14. * 3、Job提交
  15. */
  16. public class WordCountDriver {
  17.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  18.         //1、创建Job
  19.         Configuration conf = new Configuration();
  20.         conf.set("fs.defaultFS", "hdfs://hadoop01:9820");
  21.         Job job = Job.getInstance();
  22.         //2、设置Job属性
  23.         //在MapReduce程序中,用于处理Map任务的类
  24.         job.setMapperClass(WordCountMapper.class);
  25.         //在MapReduce程序中,用于处理Reduce任务的类
  26.         job.setReducerClass(WordCountReducer.class);
  27.         //在MapReduce程序中,用于处理驱动的类
  28.         job.setJarByClass(WordCountDriver.class);
  29.         //Map阶段输出的键值对的类型
  30.         job.setMapOutputKeyClass(Text.class);
  31.         job.setMapOutputValueClass(IntWritable.class);
  32.         //Reduce阶段输出的键值对的类型
  33.         job.setOutputKeyClass(Text.class);
  34.         job.setOutputValueClass(IntWritable.class);
  35.         //设置输入和输出的路径
  36.         FileInputFormat.setInputPaths(job, new Path(args[0]));
  37.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  38.         //3、提交任务
  39.         System.exit(job.waitForCompletion(true) ? 0 : -1);
  40.     }
  41. }
复制代码
注意:需要天生多个文件数量时,代码中增长
  1. //设置ReduceTask的数量,决定了最终生成的文件数量。这个数量最好和分区的数量保持一致。
  2.         /**
  3.          * 1、如果ReduceTask的数量多于分区的数量:会出现多余的ReduceTask空占资源,
  4.          *      不去处理任何的数据,浪费资源,且生成空的结果文件
  5.          * 2、如果ReduceTask的数量少于分区的数量:会出现某个分区的数据暂时无法处理,
  6.          *      需要等待某ReduceTask任务处理结束后再1处理这个分区的数据,无法高效并发
  7.          */
  8. job.setNumReduceTasks(2);
复制代码
3.2 执行步调

  1. 1、IDE中将项目打包成jar,上传到linux 的hadoop集群中
  2. 2、运行程序
  3. # hadoop jar 打包上传到服务器的jar  程序中WordCountDriver的包路径         /输入文件路径        /输出文件路径
  4. hadoop jar MapReduceApi-1.0-SNAPSHOT.jar com.ms.mshadoop.mapreduce.WordCountDriver /input /output
复制代码
4、分区器(Partitioner)

需求:将单词a-g开头的存一个文件,h-o存一个文件,其他存一个
4.1、在IDE中需要依次创建WordCountMapper、WordCountReducer、WordCountDriver、WordCountPartitioner,其中WordCountMapper、WordCountReducer与3.1相同

  1. package com.ms.mshadoop.partitioner;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Partitioner;
  5. /**
  6. * Partitioner : 分区器,可以将map阶段输出的键值对,按照规则进行分区
  7. * 泛型,对应的是map方法输出的键值对类型
  8. */
  9. public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
  10.     /**
  11.      * 每一个map方法输出的键值对,都会调用该方法进行分区,来确定分区号
  12.      * @param text map 阶段输出的 键
  13.      * @param intWritable map 阶段输出的 值
  14.      * @param i ReduceTask 的数量
  15.      * @return 分区号,从零开始,且必须连续
  16.      */
  17.     @Override
  18.     public int getPartition(Text text, IntWritable intWritable, int i) {
  19.         char c = text.toString().charAt(0);
  20.         if (c >= 'a' && c <= 'g') {
  21.             return 0;
  22.         } else if (c >= 'h' && c <= 'o') {
  23.             return 1;
  24.         } else {
  25.             return 2;
  26.         }
  27.     }
  28. }
复制代码
WordCountDriver中增长如下代码
  1.                 //应用分区器
  2.         job.setPartitionerClass(WordCountPartitioner.class);
  3.         //设置ReduceTask的数量,决定了最终生成的文件数量。这个数量最好和分区的数量保持一致。
  4.         /**
  5.          * 1、如果ReduceTask的数量多于分区的数量:会出现多余的ReduceTask空占资源,
  6.          *      不去处理任何的数据,浪费资源,且生成空的结果文件
  7.          * 2、如果ReduceTask的数量少于分区的数量:会出现某个分区的数据暂时无法处理,
  8.          *      需要等待某ReduceTask任务处理结束后再1处理这个分区的数据,无法高效并发
  9.          */
  10.         job.setNumReduceTasks(3);
复制代码
5、IDE运行MapReduce的模式

  1. 在IDEA中运行MapReduce的程序,可以选择计算资源,也可以选择文件系统
  2.     计算资源:mapreduce.framework.name
  3.                 local:使用本地计算资源(CPU、内存)
  4.                 yarn:使用集群的计算资源
  5.     文件系统:fs.defaultFS
  6.                 hdfs://hadoop01:9820    使用分布式文件系统
  7.                 file:///                使用本地文件系统
  8. 本地调试时可使用local模式进行,正常推荐使用yarn模式进行
复制代码
5.1、local 模式测本地文件

原理:
1、将MapReduce的任务资源调度设置为local,倒霉用YARN举行资源调度
2、将文件系统设置为本地文件系统,倒霉用HDFS
  1. Configuration conf = new Configuration();
  2. conf.set("mapreduce.framework.name","local");//设置为本地运行模式,任务不会在YARN上运行
  3. conf.set("fs.defaultFS","file:///");//设置为本地文件系统,不使用HDFS。
  4. Job job = Job.getInstance(conf );
  5. //设置输入和输出的路径
  6. FileInputFormat.setInputPaths(job, new Path("/输入:本地文件路径"));
  7. FileOutputFormat.setOutputPath(job, new Path("/输出:本地文件路径"));
复制代码
5.2、local 模式测集群文件

原理:
1、将MapReduce的任务资源调度设置为local,倒霉用YARN举行资源调度。
2、将文件系统设置为HDFS
  1. System.setProperty("HADOOP_USER_NAME", "root");        //设置hadoop的操作用户
  2. Configuration conf = new Configuration();
  3. conf.set("mapreduce.framework.name","local");//设置为本地运行模式,任务不会在YARN上运行
  4. conf.set("fs.defaultFS","hdfs://hadoop01:9820");//设置为分布式文件系统
  5. Job job = Job.getInstance(conf );
  6. //设置输入和输出的路径
  7. FileInputFormat.setInputPaths(job, new Path("/输入:hdfs文件路径"));
  8. FileOutputFormat.setOutputPath(job, new Path("/输出:dhfs文件路径"));
复制代码
5.3、YARN模式测集群

原理:
1、将MapReduce的任务资源调度设置为YARN
2、将文件系统设置为HDFS
  1.         System.setProperty("HADOOP_USER_NAME", "root");        //设置hadoop的操作用户
  2.         //1、创建Job
  3.         Configuration conf = new Configuration();
  4.         //local 模式测本地文件
  5.         /*conf.set("mapreduce.framework.name","local");//设置为本地运行模式,任务不会在YARN上运行
  6.         conf.set("fs.defaultFS","file:///");//设置为本地文件系统,不使用HDFS。*/
  7.         //local 模式测集群文件
  8.         /*conf.set("mapreduce.framework.name","local");//设置为本地运行模式,任务不会在YARN上运行
  9.         conf.set("fs.defaultFS","hdfs://hadoop01:9820");//设置为分布式文件系统*/
  10.         //yarn 模式测集群文件
  11.         conf.set("mapreduce.framework.name", "yarn");//设置为本地运行模式,任务不会在YARN上运行
  12.         conf.set("yarn.resourcemanager.hostname", "hadoop01");//设置ResourceManager
  13.         conf.set("fs.defaultFS", "hdfs://hadoop01:9820");//设置为分布式文件系统
  14.         conf.set("yarn.app.mapreduce.am.env", "HADOOP_MAPRED_HOME=/usr/local/hadoop-3.3.6");//用于指定MapReduce应用程序的ApplicationMaster(AM)启动时的环境变量。
  15.         conf.set("mapreduce.map.env", "HADOOP_MAPRED_HOME=/usr/local/hadoop-3.3.6");//用于指定MapReduce应用程序的Map启动时的环境变量。
  16.         conf.set("mapreduce.reduce.env", "HADOOP_MAPRED_HOME=/usr/local/hadoop-3.3.6");//用于指定MapReduce应用程序的Reduce启动时的环境变量。
  17.         conf.set("hadoop.security.authentication","simple");
  18.         conf.set("mapreduce.app-submission.cross-platform","true");//跨平台提交任务
复制代码
注意事项:
在上述的配置都完成后,将步调打jar包,然后将jar包添加到classpath,才可以运行步调
  1. 在jar包上右键-> Add As Library
复制代码
6 Hadoop 序列化

  1. package com.ms.mshadoop.writable;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.io.Writable;
  4. import java.io.DataInput;
  5. import java.io.DataOutput;
  6. import java.io.IOException;
  7. import java.io.Serializable;
  8. public class Student implements Writable, Serializable {
  9.     private String name;
  10.     private int age;
  11. //    private Text remarks;
  12.     public Student() {
  13.     }
  14.     public Student(String name, int age, Text remarks) {
  15.         this.name = name;
  16.         this.age = age;
  17. //        this.remarks = remarks;
  18.     }
  19.     /**
  20.      * 序列化:将内存对象,序列化为一个字节序列
  21.      * @param dataOutput
  22.      */
  23.     @Override
  24.     public void write(DataOutput dataOutput) throws IOException {
  25.         //该方法中依次将所有属性序列化
  26.         //当属性非hadoop中的序列类型时
  27.         //将String 序列化
  28.         dataOutput.writeUTF(name);
  29.         //将int 序列化
  30.         dataOutput.writeInt(age);
  31.         //当属性为hadoop中的序列化类型时
  32. //        remarks.write(dataOutput);
  33.     }
  34.     /**
  35.      * 反序列化:将字节序列转 内存对象
  36.      * 注意:反序列化时,读取属性顺序 必须 和序列化时顺序一致
  37.      * @param dataInput
  38.      */
  39.     @Override
  40.     public void readFields(DataInput dataInput) throws IOException {
  41.         //当属性非hadoop中的序列类型时
  42.         //将String 反序列化
  43.         name = dataInput.readUTF();
  44.         //将int 反序列化
  45.         age = dataInput.readInt();
  46.         //当属性为hadoop中的序列化类型时
  47. //        remarks.readFields(dataInput);
  48.     }
  49. }
  50. package com.ms.mshadoop.writable;
  51. import org.apache.hadoop.io.Text;
  52. import java.io.*;
  53. public class Test {
  54.     public static void main(String[] args) throws IOException {
  55.         Student student = new Student("张三丰",100,new Text("11111111"));
  56.         File file = new File("C:\\Users\\xazyh\\Desktop\\1111.txt");
  57.         ObjectOutputStream outputStream = new ObjectOutputStream(new FileOutputStream(file));
  58.         outputStream.writeObject(student);
  59.         outputStream.close();
  60.         File file1= new File("C:\\Users\\xazyh\\Desktop\\2222.txt");
  61.         DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(file1));
  62.         student.write(dataOutputStream);
  63.         dataOutputStream.close();
  64.     }
  65. }
复制代码

7 Mapreduce 实现统计流量

  1. package com.ms.mshadoop.phoneFlow;
  2. import org.apache.hadoop.io.Writable;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. public class PhoneFlowBean implements Writable {
  7.     private String phone;
  8.     private int flowDown;
  9.     private int flowUp;
  10.     public PhoneFlowBean() {
  11.     }
  12.     public PhoneFlowBean(String phone, int flowDown, int flowUp) {
  13.         this.phone = phone;
  14.         this.flowDown = flowDown;
  15.         this.flowUp = flowUp;
  16.     }
  17.     @Override
  18.     public void write(DataOutput dataOutput) throws IOException {
  19.         dataOutput.writeUTF(phone);
  20.         dataOutput.writeInt(flowDown);
  21.         dataOutput.writeInt(flowUp);
  22.     }
  23.     @Override
  24.     public void readFields(DataInput dataInput) throws IOException {
  25.         phone = dataInput.readUTF();
  26.         flowDown = dataInput.readInt();
  27.         flowUp = dataInput.readInt();
  28.     }
  29.     public int getSumFlow() {
  30.         return flowDown + flowUp;
  31.     }
  32.     public String getPhone() {
  33.         return phone;
  34.     }
  35.     public void setPhone(String phone) {
  36.         this.phone = phone;
  37.     }
  38.     public int getFlowDown() {
  39.         return flowDown;
  40.     }
  41.     public void setFlowDown(int flowDown) {
  42.         this.flowDown = flowDown;
  43.     }
  44.     public int getFlowUp() {
  45.         return flowUp;
  46.     }
  47.     public void setFlowUp(int flowUp) {
  48.         this.flowUp = flowUp;
  49.     }
  50. }
复制代码
  1. package com.ms.mshadoop.phoneFlow;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.io.IOException;
  6. public class PhoneFlowMapper extends Mapper<LongWritable, Text, Text, PhoneFlowBean> {
  7.     @Override
  8.     protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  9.         String line = value.toString();
  10.         //根据数据格式进行数据分割
  11.         String[] split = line.split("\t");
  12.         //手机号
  13.         String phone = split[0];
  14.         //下行流量
  15.         String flowDown = split[1];
  16.         //上行流量
  17.         String flowUp = split[2];
  18.         PhoneFlowBean phoneFlowBean = new PhoneFlowBean(phone,Integer.parseInt(flowDown),Integer.parseInt(flowUp));
  19.         //写入 K2 V2
  20.         context.write(new Text(phone),phoneFlowBean);
  21.     }
  22. }
复制代码
  1. package com.ms.mshadoop.phoneFlow;
  2. import org.apache.hadoop.io.Text;
  3. import org.apache.hadoop.mapreduce.Reducer;
  4. import java.io.IOException;
  5. public class PhoneFlowReudcer extends Reducer<Text, PhoneFlowBean, Text, Text> {
  6.     @Override
  7.     protected void reduce(Text key, Iterable<PhoneFlowBean> values, Reducer<Text, PhoneFlowBean, Text, Text>.Context context) throws IOException, InterruptedException {
  8.         //总下行流量
  9.         int sumFlowDown = 0;
  10.         //总上行流量
  11.         int sumFlowUp = 0;
  12.         for (PhoneFlowBean value : values) {
  13.             sumFlowDown += value.getFlowDown();
  14.             sumFlowUp += value.getFlowUp();
  15.         }
  16.         String flowInfo = String.format("总上行流量:%d,总下行流量:%d,总流量:%d。", sumFlowUp, sumFlowDown, (sumFlowUp + sumFlowDown));
  17.         //写入 K3 V3
  18.         context.write(key, new Text(flowInfo));
  19.     }
  20. }
复制代码
  1. package com.ms.mshadoop.phoneFlow;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  7. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  8. import java.io.IOException;
  9. public class PhoneFlowDirver {
  10.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  11.         Configuration configuration = new Configuration();
  12.         configuration.set("fs.defaultFS","file:///");
  13.         configuration.set("mapreduce.framework.name","local");
  14.         Job job = Job.getInstance(configuration);
  15.         job.setMapperClass(PhoneFlowMapper.class);
  16.         job.setReducerClass(PhoneFlowReudcer.class);
  17.         job.setJarByClass(PhoneFlowDirver.class);
  18.         job.setMapOutputKeyClass(Text.class);
  19.         job.setMapOutputValueClass(PhoneFlowBean.class);
  20.         job.setOutputKeyClass(Text.class);
  21.         job.setOutputValueClass(Text.class);
  22.         //设置输入和输出的路径
  23.         FileInputFormat.setInputPaths(job, new Path("输入文件路径"));
  24.         //为防止文件已存在,可先进行判断,存在则删除文件
  25.         FileOutputFormat.setOutputPath(job, new Path("输出文件路径"));
  26.        System.exit( job.waitForCompletion(true)?0:1);
  27.     }
  28. }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

小小小幸运

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表