头歌大数据——MapReduce 底子实战 答案 无剖析

打印 上一主题 下一主题

主题 875|帖子 875|积分 2625

第1关:成绩统计

编程要求
使用MapReduce计算班级每个门生的最好成绩,输入文件路径为/user/test/input,请将计算后的结果输出到/user/test/output/目录下。
代码:
必要先在命令行启动HDFS
  1. #命令行
  2. start-dfs.sh
复制代码
再在代码文件中写入以下代码
  1. #代码文件
  2. import java.io.IOException;
  3. import java.util.StringTokenizer;
  4. import java.io.IOException;
  5. import java.util.StringTokenizer;
  6. import org.apache.hadoop.conf.Configuration;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.*;
  9. import org.apache.hadoop.io.Text;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Reducer;
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.util.GenericOptionsParser;
  16. public class WordCount {
  17.     /********** Begin **********/
  18.         //Mapper函数
  19.     public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  20.         private final static IntWritable one = new IntWritable(1);
  21.         private Text word = new Text();
  22.         private int maxValue = 0;
  23.         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  24.             StringTokenizer itr = new StringTokenizer(value.toString(),"\n");
  25.             while (itr.hasMoreTokens()) {
  26.                 String[] str = itr.nextToken().split(" ");
  27.                 String name = str[0];
  28.                 one.set(Integer.parseInt(str[1]));
  29.                 word.set(name);
  30.                 context.write(word,one);
  31.             }
  32.             //context.write(word,one);
  33.         }
  34.     }
  35.     public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  36.         private IntWritable result = new IntWritable();
  37.         public void reduce(Text key, Iterable<IntWritable> values, Context context)
  38.                 throws IOException, InterruptedException {
  39.             int maxAge = 0;
  40.             int age = 0;
  41.             for (IntWritable intWritable : values) {
  42.                 maxAge = Math.max(maxAge, intWritable.get());
  43.             }
  44.             result.set(maxAge);
  45.             context.write(key, result);
  46.         }
  47.     }
  48.     public static void main(String[] args) throws Exception {
  49.         Configuration conf = new Configuration();
  50.         Job job = new Job(conf, "word count");
  51.         job.setJarByClass(WordCount.class);
  52.         job.setMapperClass(TokenizerMapper.class);
  53.         job.setCombinerClass(IntSumReducer.class);
  54.         job.setReducerClass(IntSumReducer.class);
  55.         job.setOutputKeyClass(Text.class);
  56.         job.setOutputValueClass(IntWritable.class);
  57.         String inputfile = "/user/test/input";
  58.         String outputFile = "/user/test/output/";
  59.         FileInputFormat.addInputPath(job, new Path(inputfile));
  60.         FileOutputFormat.setOutputPath(job, new Path(outputFile));
  61.         job.waitForCompletion(true);
  62.     /********** End **********/
  63.     }
  64. }
复制代码

第2关:文件内容归并去重

编程要求
接下来我们通过一个练习来巩固学习到的MapReduce知识吧。
对于两个输入文件,即文件file1和文件file2,请编写MapReduce步伐,对两个文件进行归并,并剔除此中重复的内容,得到一个新的输出文件file3。 为了完成文件归并去重的任务,你编写的步伐要能将含有重复内容的不同文件归并到一个没有重复的整合文件,规则如下:


  • 第一列按学号分列;
  • 学号相同,按x,y,z分列;
  • 输入文件路径为:/user/tmp/input/;
  • 输出路径为:/user/tmp/output/。
注意:输入文件后台已经帮你创建好了,不必要你再重复创建。
 代码:
必要先在命令行启动HDFS
  1. #命令行
  2. start-dfs.sh
复制代码
再在代码文件中写入以下代码:
  1. #代码文件
  2. import java.io.IOException;
  3. import java.util.*;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.*;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  12. import org.apache.hadoop.util.GenericOptionsParser;
  13. public class Merge {
  14.         /**
  15.          * @param args
  16.          * 对A,B两个文件进行合并,并剔除其中重复的内容,得到一个新的输出文件C
  17.          */
  18.         //在这重载map函数,直接将输入中的value复制到输出数据的key上 注意在map方法中要抛出异常:throws IOException,InterruptedException
  19.         public static class Map  extends Mapper<Object, Text, Text, Text>{
  20.        
  21.     /********** Begin **********/
  22.         public void map(Object key, Text value, Context content)
  23.             throws IOException, InterruptedException {  
  24.             Text text1 = new Text();
  25.             Text text2 = new Text();
  26.             StringTokenizer itr = new StringTokenizer(value.toString());
  27.             while (itr.hasMoreTokens()) {
  28.                 text1.set(itr.nextToken());
  29.                 text2.set(itr.nextToken());
  30.                 content.write(text1, text2);
  31.             }
  32.         }  
  33.         /********** End **********/
  34.         }
  35.                
  36.         //在这重载reduce函数,直接将输入中的key复制到输出数据的key上  注意在reduce方法上要抛出异常:throws IOException,InterruptedException
  37.         public static class  Reduce extends Reducer<Text, Text, Text, Text> {
  38.     /********** Begin **********/
  39.         
  40.         public void reduce(Text key, Iterable<Text> values, Context context)
  41.             throws IOException, InterruptedException {
  42.             Set<String> set = new TreeSet<String>();
  43.             for(Text tex : values){
  44.                 set.add(tex.toString());
  45.             }
  46.             for(String tex : set){
  47.                 context.write(key, new Text(tex));
  48.             }
  49.         }  
  50.    
  51.         /********** End **********/
  52.         }
  53.        
  54.         public static void main(String[] args) throws Exception{
  55.                 // TODO Auto-generated method stub
  56.                 Configuration conf = new Configuration();
  57.                 conf.set("fs.default.name","hdfs://localhost:9000");
  58.                
  59.                 Job job = Job.getInstance(conf,"Merge and duplicate removal");
  60.                 job.setJarByClass(Merge.class);
  61.                 job.setMapperClass(Map.class);
  62.                 job.setCombinerClass(Reduce.class);
  63.                 job.setReducerClass(Reduce.class);
  64.                 job.setOutputKeyClass(Text.class);
  65.                 job.setOutputValueClass(Text.class);
  66.                 String inputPath = "/user/tmp/input/";  //在这里设置输入路径
  67.                 String outputPath = "/user/tmp/output/";  //在这里设置输出路径
  68.                 FileInputFormat.addInputPath(job, new Path(inputPath));
  69.                 FileOutputFormat.setOutputPath(job, new Path(outputPath));
  70.                 System.exit(job.waitForCompletion(true) ? 0 : 1);
  71.         }
  72. }
复制代码

第3关:信息发掘 - 发掘父子关系

编程要求
你编写的步伐要能发掘父子辈关系,给出祖孙辈关系的表格。规则如下:


  • 孙子在前,祖父在后;
  • 输入文件路径:/user/reduce/input;
  • 输出文件路径:/user/reduce/output。
  代码:
必要先在命令行启动HDFS
  1. #命令行
  2. start-dfs.sh
复制代码
再在代码文件中写入以下代码:
  1. #代码文件
  2. import java.io.IOException;
  3. import java.util.*;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.util.GenericOptionsParser;
  14. public class simple_data_mining {
  15.         public static int time = 0;
  16.         /**
  17.          * @param args
  18.          * 输入一个child-parent的表格
  19.          * 输出一个体现grandchild-grandparent关系的表格
  20.          */
  21.         //Map将输入文件按照空格分割成child和parent,然后正序输出一次作为右表,反序输出一次作为左表,需要注意的是在输出的value中必须加上左右表区别标志
  22.         public static class Map extends Mapper<Object, Text, Text, Text>{
  23.                 public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
  24.                         /********** Begin **********/
  25.                 String line = value.toString();
  26.              String[] childAndParent = line.split(" ");
  27.              List<String> list = new ArrayList<>(2);
  28.               for (String childOrParent : childAndParent) {
  29.                  if (!"".equals(childOrParent)) {
  30.                      list.add(childOrParent);
  31.                   }
  32.               }
  33.               if (!"child".equals(list.get(0))) {
  34.                   String childName = list.get(0);
  35.                   String parentName = list.get(1);
  36.                   String relationType = "1";
  37.                   context.write(new Text(parentName), new Text(relationType + "+"
  38.                         + childName + "+" + parentName));
  39.                   relationType = "2";
  40.                   context.write(new Text(childName), new Text(relationType + "+"
  41.                         + childName + "+" + parentName));
  42.               }
  43.                         /********** End **********/
  44.                 }
  45.         }
  46.         public static class Reduce extends Reducer<Text, Text, Text, Text>{
  47.                 public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
  48.                                 /********** Begin **********/
  49.                             //输出表头
  50.           if (time == 0) {
  51.                 context.write(new Text("grand_child"), new Text("grand_parent"));
  52.                 time++;
  53.             }
  54.                                 //获取value-list中value的child
  55. List<String> grandChild = new ArrayList<>();
  56.                                 //获取value-list中value的parent
  57. List<String> grandParent = new ArrayList<>();
  58.                                 //左表,取出child放入grand_child
  59. for (Text text : values) {
  60.                 String s = text.toString();
  61.                 String[] relation = s.split("\\+");
  62.                 String relationType = relation[0];
  63.                 String childName = relation[1];
  64.                 String parentName = relation[2];
  65.                 if ("1".equals(relationType)) {
  66.                     grandChild.add(childName);
  67.                 } else {
  68.                     grandParent.add(parentName);
  69.                 }
  70.             }
  71.                                 //右表,取出parent放入grand_parent
  72. int grandParentNum = grandParent.size();
  73.                int grandChildNum = grandChild.size();
  74.                if (grandParentNum != 0 && grandChildNum != 0) {
  75.                 for (int m = 0; m < grandChildNum; m++) {
  76.                     for (int n = 0; n < grandParentNum; n++) {
  77.                         //输出结果
  78.                     context.write(new Text(grandChild.get(m)), new Text(
  79.                                 grandParent.get(n)));
  80.                     }
  81.                 }
  82.             }
  83.                                 /********** End **********/
  84.                 }
  85.         }
  86.         public static void main(String[] args) throws Exception{
  87.                 // TODO Auto-generated method stub
  88.                 Configuration conf = new Configuration();
  89.                 Job job = Job.getInstance(conf,"Single table join");
  90.                 job.setJarByClass(simple_data_mining.class);
  91.                 job.setMapperClass(Map.class);
  92.                 job.setReducerClass(Reduce.class);
  93.                 job.setOutputKeyClass(Text.class);
  94.                 job.setOutputValueClass(Text.class);
  95.                 String inputPath = "/user/reduce/input";   //设置输入路径
  96.                 String outputPath = "/user/reduce/output";   //设置输出路径
  97.                 FileInputFormat.addInputPath(job, new Path(inputPath));
  98.                 FileOutputFormat.setOutputPath(job, new Path(outputPath));
  99.                 System.exit(job.waitForCompletion(true) ? 0 : 1);
  100.         }
  101. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

天津储鑫盛钢材现货供应商

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表