ToB企服应用市场:ToB评测及商务社交产业平台

标题: 云计算技能 实验七 MapReduce编程基础 [打印本页]

作者: 水军大提督    时间: 2024-6-9 15:08
标题: 云计算技能 实验七 MapReduce编程基础
参考资料为:
教材代码-林子雨编著《大数据基础编程、实验和案例教程(第2版)》教材所有章节代码_厦大数据库实验室博客
1.实验学时
4学时
2.实验目标

3.实验内容
(一)实现词频统计的根本的MapReduce编程。
首先创建两个txt文件。
让后向内里输入想要统计的句子。

然后启动ecplise完成步调编写:

首先编写map处理逻辑:(这里选择在windows上先编写,然后在linux上再复现一次)
下面为java代码:此为map处理逻辑
  1. public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
  2.         private static final IntWritable one = new IntWritable(1);
  3.         private Text word = new Text();
  4.         public TokenizerMapper() {
  5.         }
  6.         public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  7.             StringTokenizer itr = new StringTokenizer(value.toString());
  8.             while(itr.hasMoreTokens()) {
  9.                 this.word.set(itr.nextToken());
  10.                 context.write(this.word, one);
  11.             }
  12.         }
  13.     }
复制代码

在map阶段,文件wordfile1.txt和文件wordfile2.txt中的数据被读入,然后以键值对的情势被提交给map函数处理。键值对交给map函数之后,就可以运行自界说的map处理逻辑。
之后编写reduce处理逻辑。
Map阶段处理得到的中间结果,经过shuffle阶段,会分发给对应的reduce任务处理。
下面为java代码,此为reduce任务处理
  1. public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  2.         private IntWritable result = new IntWritable();
  3.         public IntSumReducer() {
  4.         }
  5.         public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  6.             int sum = 0;
  7.             IntWritable val;
  8.             for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
  9.                 val = (IntWritable)i$.next();
  10.             }
  11.             this.result.set(sum);
  12.             context.write(key, this.result);
  13.         }
  14.     }
复制代码

之后编写main函数
为了使TokenizerMapper类和IntSumReduce类可以或许正常协同工作,须要在主函数中通过job类设置hadoop步调的运行环境。
下面为java代码,此为main函数
  1. public static void main(String[] args) throws Exception {
  2.         Configuration conf = new Configuration();
  3.         String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
  4.         if(otherArgs.length < 2) {
  5.             System.err.println("Usage: wordcount <in> [<in>...] <out>");
  6.             System.exit(2);
  7.         }
  8.         Job job = Job.getInstance(conf, "word count");        //设置环境参数
  9.         job.setJarByClass(WordCount.class);                //设置整个程序的类名
  10.         job.setMapperClass(WordCount.TokenizerMapper.class); //添加Mapper类
  11.         job.setReducerClass(WordCount.IntSumReducer.class);  //添加Reducer类
  12.         job.setOutputKeyClass(Text.class);                  //设置输出类型
  13.         job.setOutputValueClass(IntWritable.class);             //设置输出类型
  14.         for(int i = 0; i < otherArgs.length - 1; ++i) {
  15.             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));  //设置输入文件
  16.         }
  17.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//设置输出文件
  18.         System.exit(job.waitForCompletion(true)?0:1);
  19.     }
复制代码

然后是打包步调。
先打开hadoop对应文件夹:

将代码传入文件夹之后,使用之前下载的java jar进行编译

编译完成之后查看文件夹,多出三个.class文件,然后进行文件的打包。

然后启动hadoop

然后输入命令查看结果:

结果:


(二)设置eclipse环境,跑词频统计的步调。
先启动eclipse


创建新的java工程进行编写步调

然后导入jar包




然后开始编写java步调
先创建新的java类开始编写

然后将之前编写号的代码输入到java文件之中,然后进行运行查看结果。
完整步调:
  1. import java.io.IOException;
  2. import java.util.Iterator;
  3. import java.util.StringTokenizer;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.util.GenericOptionsParser;
  14. public class WordCount {
  15.     public WordCount() {
  16.     }
  17.      public static void main(String[] args) throws Exception {
  18.         Configuration conf = new Configuration();
  19.         String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
  20.         if(otherArgs.length < 2) {
  21.             System.err.println("Usage: wordcount <in> [<in>...] <out>");
  22.             System.exit(2);
  23.         }
  24.         Job job = Job.getInstance(conf, "word count");
  25.         job.setJarByClass(WordCount.class);
  26.         job.setMapperClass(WordCount.TokenizerMapper.class);
  27.         job.setCombinerClass(WordCount.IntSumReducer.class);
  28.         job.setReducerClass(WordCount.IntSumReducer.class);
  29.         job.setOutputKeyClass(Text.class);
  30.         job.setOutputValueClass(IntWritable.class);
  31.         for(int i = 0; i < otherArgs.length - 1; ++i) {
  32.             FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  33.         }
  34.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
  35.         System.exit(job.waitForCompletion(true)?0:1);
  36.     }
  37.     public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
  38.         private static final IntWritable one = new IntWritable(1);
  39.         private Text word = new Text();
  40.         public TokenizerMapper() {
  41.         }
  42.         public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  43.             StringTokenizer itr = new StringTokenizer(value.toString());
  44.             while(itr.hasMoreTokens()) {
  45.                 this.word.set(itr.nextToken());
  46.                 context.write(this.word, one);
  47.             }
  48.         }
  49.     }
  50. public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  51.         private IntWritable result = new IntWritable();
  52.         public IntSumReducer() {
  53.         }
  54.         public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  55.             int sum = 0;
  56.             IntWritable val;
  57.             for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
  58.                 val = (IntWritable)i$.next();
  59.             }
  60.             this.result.set(sum);
  61.             context.write(key, this.result);
  62.         }
  63.     }
  64. }
复制代码

查看结果:


结果已经出现出来,然后把java步调打包生成jar包。放到hadoop平台之上运行。

查看对应文件夹:


然后为了运行,先启动hadoop

然后删除input和output防止堕落

然后新建文件

由于先前已经将对应文件传入linux,然后可以考虑现将文件上传到hdfs中的/user/Hadoop/input中
然后使用jar命令查看;

然后查看结果:

(三)编写MapReduce步调,实现计算平均结果的步调。
首先编写步调。
步调的主要点是输入三个txt文件,然后计算三个txt文件对应的科目之和的平均值。这里的导入方法与前面相似。
然后先创建新的项目,导入jar包


然后将对应的代码输入java文件中。
注意导入的包的个数题目。

然后导入对应的包:




然后导出jar文件。

编写对应的txt文件,设置


然后输入./bin/hdfs dfs -cat output*/
注意,这里的名字不能使用单个字符,不然会报错!!!,以是背面改成了多个字符。

4.思考题
(一)MapReduce的工作原理是什么?
MapRedece分为两部分,一个是Map函数,一个是Reduce函数。Map函数接受一个键值对(key-value pair),产生一组中间键值对。MapReduce框架会将map函数产生的中间键值对里键相同的值传递给一个reduce函数。 Reduce函数接受一个键,以及相关的一组值,将这组值进行归并产生一组规模更小的值(通常只有一个或零个值)。
下面是一个图介绍MapReduce的工作流程:

MapReduce库先把user program的输入文件分别为M份(M为用户界说),每一份通常有16MB到64MB,如图左方所示分成了split0~4;然后使用fork将用户进程拷贝到集群内别的呆板上。
user program的副本中有一个称为master,其余称为worker,master是负责调度的,为空闲worker分配作业(Map作业或者Reduce作业),worker的数量也是可以由用户指定的。
被分配了Map作业的worker,开始读取对应分片的输入数据,Map作业数量是由M决定的,和split逐一对应;Map作业从输入数据中抽取出键值对,每一个键值对都作为参数传递给map函数,map函数产生的中间键值对被缓存在内存中。
缓存的中间键值对会被定期写入本地磁盘,而且被分为R个区,R的大小是由用户界说的,未来每个区会对应一个Reduce作业;这些中间键值对的位置会被通报给master,master负责将信息转发给Reduce worker。
master通知分配了Reduce作业的worker它负责的分区在什么位置(肯定不止一个地方,每个Map作业产生的中间键值对都可能映射到所有R个不同分区),当Reduce worker把所有它负责的中间键值对都读过来后,先对它们进行排序,使得相同键的键值对聚集在一起。因为不同的键可能会映射到同一个分区也就是同一个Reduce作业(谁让分区少呢),以是排序是必须的。
reduce worker遍历排序后的中间键值对,对于每个唯一的键,都将键与关联的值传递给reduce函数,reduce函数产生的输出会添加到这个分区的输出文件中。
  当所有的Map和Reduce作业都完成了,master唤醒正版的user program,MapReduce函数调用返回user program的代码。

(二)Hadoop是如何运行MapReduce步调的?
有两个方法,这两个方法的前提是须要启动hadoop才可以运行。
方法一:
将本身的编译软件与hadoop相连(我用的是MyEclipse去链接hadoop),直接运行步调。运行完成之后在输出文件夹就可以查看输出的文件。
方法二:
方法二的话更加复杂,须要将mapreduce步调打包成jar文件。须要在linux上的eclipse编写好步调之后,将步调导出打包,之后实行这个jar文件,在输出文件中查看结果即可。
5.实验结论或体会
1.实验开始编写步调之前,须要将hadoop启动方才可以继续编写步调。
2.步调导出的时间,须要将jar文件导出到相应的hadoop步调的文件夹下,这样方便步调的运行。
3.编写步调的时间,须要将导入的包逐一对应,确保所有的包都导入到步调之中。
4.TXT文件须要提前写好,方便运行步调。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4