大数据实验 实验五:MapReduce 初级编程实践

火影  金牌会员 | 2024-7-25 11:11:33 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 545|帖子 545|积分 1635

大数据实验 实验五:MapReduce 初级编程实践

实验情况



  • 操作系统 centos7
  • Hadoop版本:3,3,0
实验内容与完成情况

(一)编程实现文件合并和去重操作

对于两个输入文件,即文件 A 和文件 B,请编写 MapReduce 程序,对两个文件举行合并,
并剔除其中重复的内容,得到一个新的输出文件 C。下面是输入文件和输出文件的一个样例 供参考。
输入文件 A 的样例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 x
输入文件 B 的样例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根据输入文件 A 和 B 合并得到的输出文件 C 的样例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 x

创建文件A.txt和B.txt

将两个文件上传到HDFS中

Java程序
  1. package Main;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import org.apache.hadoop.mapreduce.Reducer;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import java.io.IOException;
  11. public class main {
  12.     //重载map函数,直接将输入中的value复制到输出数据的key上
  13.     public static class Map extends Mapper<Object, Text, Text, Text> {
  14.         private static Text text = new Text();
  15.         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  16.             text = value;
  17.             context.write(text, new Text(""));
  18.         }
  19.     }
  20.     //重载reduce函数,直接将输入中的key复制到输出数据的key上
  21.     public static class Reduce extends Reducer<Text, Text, Text, Text> {
  22.         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  23.             context.write(key, new Text(""));
  24.         }
  25.     }
  26.     public static void main(String[] args) throws Exception {
  27.         Configuration conf = new Configuration();
  28.         conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
  29.         conf.set("fs.defaultFS", "hdfs://localhost:8020");
  30.         String[] otherArgs = new String[]{"/input/test", "/output/test"};
  31.         if (otherArgs.length != 2) {
  32.             System.err.println("Usage: wordcount <in><out>");
  33.             System.exit(2);
  34.         }
  35.         Job job = Job.getInstance(conf, "Merge and duplicate removal");
  36.         job.setJarByClass(main.class);
  37.         job.setMapperClass(Map.class);
  38.         job.setCombinerClass(Reduce.class);
  39.         job.setReducerClass(Reduce.class);
  40.         job.setOutputKeyClass(Text.class);
  41.         job.setOutputValueClass(Text.class);
  42.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  43.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  44.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  45.     }
  46. }
复制代码
运行指令

检察运行结果

(二)编程实现对输入文件的排序

现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取全部文件中的整 数,举行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数 字为第二个整数的排序位次,第二个整数为原待分列的整数。下面是输入文件和输出文件的
一个样例供参考。
输入文件 1 的样例如下:
33
37
12
40
输入文件 2 的样例如下:
4
16
39
5
输入文件 3 的样例如下:
1
45
25
根据输入文件 1、2 和 3 得到的输出文件如下:
1 1
2 4
3 5
4 12
5 16
6 25
7 33
54
8 37
9 39
10 40
11 45

新建三个文件

将文件上传到hdfs

编写java代码
  1. package Main;
  2. import java.io.IOException;
  3. import java.util.Objects;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Partitioner;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  14. import org.apache.hadoop.util.GenericOptionsParser;
  15. public class main {
  16.     public static class Map extends Mapper<Object, Text, IntWritable, IntWritable> {
  17.         private static IntWritable data = new IntWritable();
  18.         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  19.             String text = value.toString();
  20.             if (!Objects.equals(text, "")) {
  21.                 data.set(Integer.parseInt(text));
  22.                 context.write(data, new IntWritable(1));
  23.             }
  24.         }
  25.     }
  26.     public static class Reduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
  27.         private static IntWritable line_num = new IntWritable(1);
  28.         public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  29.             for (IntWritable val : values) {
  30.                 context.write(line_num, key);
  31.                 line_num = new IntWritable(line_num.get() + 1);
  32.             }
  33.         }
  34.     }
  35.     public static class Partition extends Partitioner<IntWritable, IntWritable> {
  36.         public int getPartition(IntWritable key, IntWritable value, int num_Partition) {
  37.             int Maxnumber = 65223;
  38.             int bound = Maxnumber / num_Partition + 1;
  39.             int keynumber = key.get();
  40.             for (int i = 0; i < num_Partition; i++) {
  41.                 if (keynumber < bound * (i + 1) && keynumber >= bound * i) {
  42.                     return i;
  43.                 }
  44.             }
  45.             return -1;
  46.         }
  47.     }
  48.     public static void main(String[] args) throws Exception {
  49.         Configuration conf = new Configuration();
  50.         //conf.set("fs.default.name","hdfs://localhost:9000");
  51.         conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
  52.         conf.set("fs.defaultFS", "hdfs://localhost:8020");
  53.         String[] otherArgs = new String[]{"/input/test", "/output/test"};
  54.         if (otherArgs.length != 2) {
  55.             System.err.println("Usage: wordcount <in><out>");
  56.             System.exit(2);
  57.         }
  58.         Job job = Job.getInstance(conf, "Merge and sort");
  59.         job.setJarByClass(main.class);
  60.         job.setMapperClass(Map.class);
  61.         job.setReducerClass(Reduce.class);
  62.         job.setPartitionerClass(Partition.class);
  63.         job.setOutputKeyClass(IntWritable.class);
  64.         job.setOutputValueClass(IntWritable.class);
  65.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  66.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  67.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  68.     }
  69. }
复制代码
运行程序

运行结果

(三)对给定的表格举行信息发掘

下面给出一个 child-parent 的表格,要求发掘其中的父子辈关系,给出祖孙辈关系的
表格。
输入文件内容如下:
child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma
输出文件内容如下:
grandchild grandparent
Steven Alice
Steven Jesse
Jone Alice
Jone Jesse
Steven Mary
Steven Frank
Jone Mary
Jone Frank
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse

新建数据

编写程序
  1. package Main;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.FileSystem;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.Text;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.Mapper;
  9. import org.apache.hadoop.mapreduce.Reducer;
  10. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  11. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import java.io.BufferedReader;
  14. import java.io.IOException;
  15. import java.io.InputStreamReader;
  16. import java.util.ArrayList;
  17. public class main {
  18.     public static class TokenizerMapper extends Mapper<Object, Text, Text, Text> {
  19.         public static IntWritable data = new IntWritable();
  20.         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
  21.             String str = value.toString();
  22.             if (str != null && !str.equals("") && !str.equals("child parent")) {
  23.                 String[] fa = str.split("\t");
  24.                 String son = fa[0], parent = fa[1];
  25.                 context.write(new Text(parent), new Text("son" + son));
  26.                 context.write(new Text(son), new Text("fa" + parent));
  27.             }
  28.         }
  29.     }
  30.     public static class Reduce extends Reducer<Text, Text, Text, Text> {
  31.         public static boolean flag = false;
  32.         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
  33.             if (!flag) {
  34.                 context.write(new Text("grandson"), new Text("grandparent"));
  35.                 flag = true;
  36.             }
  37.             ArrayList<Text> grandChild = new ArrayList<Text>();
  38.             ArrayList<Text> grandParent = new ArrayList<Text>();
  39.             for (Text val : values) {
  40.                 String s = val.toString();
  41.                 if (s.startsWith("son")) {
  42.                     grandChild.add(new Text(s.substring(3)));
  43.                 } else {
  44.                     grandParent.add(new Text(s.substring(2)));
  45.                 }
  46.             }
  47.             for (Text gc : grandChild)
  48.                 for (Text gp : grandParent)
  49.                     context.write(gc, gp);
  50.         }
  51.     }
  52.     public static void main(String[] args) throws Exception {
  53.         String[] a = {"a", "b"};
  54.         Configuration conf = new Configuration();
  55.         conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
  56.         //conf.set("fs.defaultFS", "hdfs://localhost:8020");
  57.         FileSystem fs = FileSystem.get(conf);
  58.         Job job = Job.getInstance(conf, "merge and duplicate removal");
  59.         job.setJarByClass(main.class);
  60.         job.setMapperClass(TokenizerMapper.class);
  61.         job.setReducerClass(Reduce.class);
  62.         job.setMapOutputKeyClass(Text.class);
  63.         job.setMapOutputValueClass(Text.class);
  64.         job.setOutputKeyClass(Text.class);
  65.         job.setOutputValueClass(Text.class);
  66.         job.setInputFormatClass(TextInputFormat.class);
  67.         FileInputFormat.addInputPath(job, new Path("hdfs://localhost:8020/input/test"));
  68.         FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:8020/output/test"));
  69.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  70.     }
  71. }
复制代码
运行程序

运行结果

出现的问题

问题一

在将运行的文件达成jar包在centos上运行时

出现无法运行缺少运行设置

出现传入参数为空的报错
解决方案

在程序中添加设置
conf.set(“fs.hdfs.impl”,“org.apache.hadoop.hdfs.DistributedFileSystem”);
将 java -jar XXX.jar 改为 hadoop jar xxx.jar 命令实行。
由于我们知道实行Hadoop命令时是会自动加载Hadoop干系jar包及设置的,但确保情况变量已设置生效
这样,通过Hadoop命令去实行,纵然不设置fs.hdfs.impl参数也不会报No FileSystem for scheme异常
问题解决

此处的代码改为

问题解决

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

火影

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表