笑看天下无敌手 发表于 2024-12-19 07:24:07

Hadoop(九)MapReduce 案例2

MapReduce 处理阶段

MapReduce 框架通常由三个阶段组成:


[*]Map:读取文件数据,按照规则对文本进行拆分,生成 KV 情势的数据。
[*]Shuffle:工作节点根据输出键(由 map 函数生成)重新分配数据,对数据排序、分组、拷贝,目标是属于一个键的所有数据都位于同一个工作节点上。
[*]Reduce:工作节点并行处理每个键的一组数据,对结果进行汇总。https://i-blog.csdnimg.cn/direct/bade58ee250a48b289bafcaa28cd9ef7.png

下图把 MapReduce 的过程分为两个部门,而实际上从两边的 Map 和 Reduce 到中央的那一大块都属于 Shuffle 过程,也就是说,Shuffle 过程有一部门是在 Map 端,有一部门是在 Reduce 端。
https://i-blog.csdnimg.cn/direct/a1cbc1ca1cad4801a0585d854b4a889e.png
案例统计单词出现的次数

数据
hello hbase
hello hadoop
hello hive
hello kubernetes
hello java 原理图

https://i-blog.csdnimg.cn/direct/20502b4bf59e4c10b5e0fd6295fbb911.png
https://i-blog.csdnimg.cn/direct/ae082e7a6e6347f0b8ec35339d8ff57b.png
wordcount 代码实现

用户编写的 MapReduce 程序分成三个部门:Mapper,Reducer,Driver:


[*]用户自界说 Mapper 类继承 Mapper 类,实现 map() 方法,输出和输出的数据都是 <K,V> 对情势,<K,V> 范例可以根据实际环境自界说。MapTask 进程对每一个 <K,V> 调用一次。
[*]用户自界说 Reduce 类继承 Reduce 类,实现 reduce() 方法,输出和输出的数据都是 <K,V> 对情势,<K,V> 范例可以根据实际环境自界说。Reducetask 进程对每一组相同 K 的 <K,V> 组调用一次 reduce() 方法。
[*]整个 MapReduce 程序须要一个 Drvier 类来进行提交,提交的是一个描述了各种须要信息的 Job 对象。
 在 HDFS 中创建目录并上传文件

在 HDFS 中创建一个目录

hadoop fs -mkdir /wcinput

#将本机 words.txt 文件上传到 HDFS 的 /wcinput 目录中

hadoop fs -put /words.txt /wcinput https://i-blog.csdnimg.cn/direct/aa39fe4e98f84edfa3a0166f7acd4cdd.png
引入pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>hadoopmapreduce</artifactId>
    <version>1.0-SNAPSHOT</version>
<dependencies>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-slf4j-impl</artifactId>
      <version>2.12.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>3.1.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>3.1.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>3.1.3</version>
    </dependency>

</dependencies>

</project> 编写 Mapper 类

编写一个 WordMapper 类继承 Mapper 类,并重写 map() 方法。Mapper 类是一个泛型类,4 个泛型范例分别代表(KeyIn,ValueIn,KeyOut,ValueOut )。泛型的范例可以根据自己实际的场景来指定。在 wordcount 这个例子中指定的范例如下:


[*]KeyIn(输入的键):LongWritable 范例,表现每行笔墨的起始位置(偏移量)
[*]ValueIn(输入的值):Text 范例,表现每行的文本。
[*]KeyOut(输出的键):Text 范例,表现每个单词。
[*]ValueOut(输出的值为):LongWritable 范例,表现单词出现的次数(1次)
Mapper 阶段依次读取每一行的数据,每行按照空格拆分出单词,得到 <单词,1> 的键值对,键是单词,值是 1,之后 Reduce 阶段累计单词出现的次数就累加 1 即可。
Mapper 阶段代码如下:
package words;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

/**
* @description Map 阶段,分别计算每行每个单词出现的次数,key 是单词,value 为 1(表示 1 个单词)。
*/
public class WordMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
      //1、切分单词
      String[] words = value.toString().split(" ");
      //2、单词转换 单词 -> <单词,1>
      for (String word : words) {
            //3、写入到上下文
            context.write(new Text(word),new LongWritable(1));
      }
    }
} 编写 Reduce 类

编写一个类 WordReducer 继承 Reducer 类,并重写 reduce() 方法。Reducer 类是也是一个泛型类,4 个泛型范例分别代表(KeyIn,ValueIn,KeyOut,ValueOut )泛型的范例可以根据自己实际的场景来指定。在 这个例子中我们指定的范例如下:


[*]KeyIn(输入的键):Text 范例,表现每个单词
[*]ValueIn(输入的值):LongWritable 范例,表现单词出现的次数(1次)
[*]KeyOut(输出的键):Text 范例,表现每个单词
[*]ValueOut(输出的值为):LongWritable 范例,表现单词出现的总数
Reduce 阶段接收到数据键是单词,值是一个可迭代的对象,是相同单词对应的次数(每个都是 1),只须要把这些 1 累加起来,就可以得到单词出现的总数了。
执行完后达到的结果
 <hello,> Reduce 阶段代码如下:

package words;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**

* @description Reduce 阶段,把 key 相同的数据进行累计,得到每个单词出现的次数
*/
public class WordReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
@Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
      //1、定义一个变量
      long count = 0;
      //2、迭代
      for (LongWritable value : values) {
            count += value.get();
      }
      //3、写入上下文
      context.write(key,new LongWritable(count));
    }
} 编写 Driver 类

创建提交给 YARN 集群运行的 Job 对象,封装了 MapReduce 程序运行所须要的相干参数,例如输入数据路径,输出数据路径,Mapper 参数,Reduce 参数
package words;



import count.CharCountDriver;
import count.CharCountMapper;
import count.CharCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

public class JobMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

      //一、初始化Job
      Configuration configuration = new Configuration();

      //获取运行命令的参数,参数一:输入文件路径,参数二:输出文件路径
      //如果输入路径是一个文件,那么只处理这个文件,如果指定的路径是目录,则处理这个目录下的所有文件
      //输出路径只能是不存在的目录名
      String [] otherArgs = new GenericOptionsParser(configuration,args).getRemainingArgs();
      if(otherArgs.length < 2){
            System.err.println("必须提供输入文件路径和输出文件路径");
            System.exit(2);
      }
      Job job = Job.getInstance(configuration, "mr");
      job.setJarByClass(JobMain.class);

      //二、设置Job的相关信息 8个小步骤
      //1、设置输入路径
      job.setInputFormatClass(TextInputFormat.class);
      //本地运行
      //TextInputFormat.addInputPath(job,new Path("/tmp/input/mr1.txt"));
      TextInputFormat.addInputPath(job,new Path(args));

      //2、设置Mapper类型,并设置输出键和输出值
      job.setMapperClass(WordMapper.class);
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(LongWritable.class);

      //shuffle阶段,使用默认的
      //3、设置Reducer类型,并设置输出键和输出值
      job.setReducerClass(WordReducer.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(LongWritable.class);

      //4、设置输出路径
      job.setOutputFormatClass(TextOutputFormat.class);
      //本地运行
      //TextOutputFormat.setOutputPath(job,new Path("/tmp/output/mr"));
      TextOutputFormat.setOutputPath(job,new Path(args));
      //三、等待完成
      boolean isfinish = job.waitForCompletion(true);
      System.out.println(isfinish ==true?"MapReduce 任务执行成功!":"MapReduce 任务执行失败!");
      System.exit(isfinish? 0 : 1);
    }
}

 idea中打包成jar包

https://i-blog.csdnimg.cn/direct/83961eb7f3bf4d67906b71b27834a5f6.png
 上传JAR包到服务器上

注意:假如输入路径是一个文件,那么只处理这个文件,假如指定的路径是目录,则处理这个目录下的所有文件
执行 hadoop jar

hadoop jar /hadoopmapreduce-1.0-SNAPSHOT.jar danci.JobMain /wcinput /my_wcoutput https://i-blog.csdnimg.cn/direct/15ff8d3bbd54437d95ac20a5c8aa6408.png
 查看结果https://i-blog.csdnimg.cn/direct/6f810c4e2de44588ac39aab0038b52f6.png

 https://i-blog.csdnimg.cn/direct/1bb326c2149a45f3bc6437e14c7467c0.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Hadoop(九)MapReduce 案例2