MapReduce开发

打印 上一主题 下一主题

主题 801|帖子 801|积分 2403

前提

1.yarn-site.xml配置

  1. <property>
  2.     <name>yarn.nodemanager.aux-services</name>
  3.     <value>mapreduce_shuffle</value>
  4. </property>
复制代码
2.解决HDFS权限问题



  • API方式
  1. System.setProperty("HADOOP_USER_NAME","root")
复制代码


  • 配置方式
  1. <property>
  2.         <name>dfs.permissions.enabled</name>
  3.         <value>false</value>
  4. </property>
复制代码
MR执行环境



  • 以WordCount程序为例
1.本地测试环境(IDEA)



  • 本地测试以多线程方式模拟服务器运行,无需开启集群
  • IDEA内创建Java项目,并导入依赖
  1. <properties>
  2.     <java.version>1.8</java.version>
  3.     <hadoop.version>2.7.1</hadoop.version>
  4. </properties>
  5. <dependencies>
  6.    
  7.     <dependency>
  8.         <groupId>org.apache.hadoop</groupId>
  9.         <artifactId>hadoop-hdfs</artifactId>
  10.         <version>${hadoop.version}</version>
  11.     </dependency>
  12.     <dependency>
  13.         <groupId>org.apache.hadoop</groupId>
  14.         <artifactId>hadoop-common</artifactId>
  15.         <version>${hadoop.version}</version>
  16.     </dependency>
  17.     <dependency>
  18.         <groupId>org.apache.hadoop</groupId>
  19.         <artifactId>hadoop-client</artifactId>
  20.         <version>${hadoop.version}</version>
  21.     </dependency>
  22.     <dependency>
  23.         <groupId>org.apache.hadoop</groupId>
  24.         <artifactId>hadoop-mapreduce-client-core</artifactId>
  25.         <version>${hadoop.version}</version>
  26.     </dependency>
  27.     <dependency>
  28.         <groupId>org.apache.hadoop</groupId>
  29.         <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
  30.         <version>${hadoop.version}</version>
  31.     </dependency>
  32. </dependencies>
复制代码


  • 编写输入数据在本地,注意程序运行起来输出目录会被删一次,所以最好创建一个新的目录,输出目录不用手动创建,程序运行时会自动创建

  • 编写WorkCount程序代码
  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import org.apache.hadoop.mapreduce.Reducer;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import java.io.IOException;
  11. import java.util.StringTokenizer;
  12. public class WordCount {
  13.     /**
  14.      * map映射器:将输入<字节偏移量,一行文本>切分成<单词,次数>
  15.      * map之前有一步split,将文本切分成<字节偏移量,一行文本>
  16.      */
  17.     public static class TokenizerMapper
  18.             extends Mapper<Object, Text, Text, IntWritable> {//<前两位表示输入类型<偏移量,一行文本>,后两位表示输出类型<单词,数字>>
  19.         private final static IntWritable one = new IntWritable(1);
  20.         private Text word = new Text();
  21.         /**
  22.          * 前两个参是输入类型
  23.          *
  24.          * @param key     偏移量
  25.          * @param value   一行文本,Text类型可序列化,可比较(WritableComparable接口)
  26.          * @param context hadoop运行容器,可取出运行时的环境变量
  27.          * @throws IOException
  28.          * @throws InterruptedException
  29.          */
  30.         @Override
  31.         public void map(Object key, Text value, Context context
  32.         ) throws IOException, InterruptedException {
  33.             System.out.println("切分split后数据--偏移量:" + key + "\t值:" + value);
  34.             StringTokenizer itr = new StringTokenizer(value.toString());//根据自然分隔符分割
  35.             while (itr.hasMoreTokens()) {
  36.                 word.set(itr.nextToken());//写入文本对象
  37.                 context.write(word, one);//保存出去(单词,数字)
  38.             }
  39.         }
  40.     }
  41.     /**
  42.      * combiner(单节点合并)和reduce(多节点数据合并)都是对相同键的数据进行规约,使用一个即可
  43.      * <前两个泛型表示规约的输入数据来源于map的输出,后两个是规约后的单词与数字>
  44.      */
  45.     public static class IntSumReducer
  46.             extends Reducer<Text, IntWritable, Text, IntWritable> {
  47.         private IntWritable result = new IntWritable();
  48.         /**
  49.          * @param key     单词
  50.          * @param values  相同单词对应出现次数的集合
  51.          *                类中泛型约束是IntWritable,为什么方法上是Iterable<IntWritable>?
  52.          *                因为在统计之前会相同的键做成列表word [1,1,1],然后在规约word 3
  53.          * @param context
  54.          * @throws IOException
  55.          * @throws InterruptedException
  56.          */
  57.         @Override
  58.         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  59.             int sum = 0;
  60.             for (IntWritable val : values) {//对一个键
  61.                 sum += val.get();//求和规约
  62.             }
  63.             result.set(sum);
  64.             context.write(key, result);
  65.         }
  66.     }
  67.     public static void main(String[] args) throws Exception {
  68.         Configuration conf = new Configuration();//配置参数(map数,reduce数...对于配置配置文件中的API配置)
  69.         Job job = Job.getInstance(conf, "word count");//名字
  70.         job.setJarByClass(WordCount.class);//上线的jar
  71.         job.setMapperClass(TokenizerMapper.class);//mapper
  72.         job.setCombinerClass(IntSumReducer.class);//combine:合并一个节点
  73.         job.setReducerClass(IntSumReducer.class);//reduce:合并不同节点
  74.         job.setOutputKeyClass(Text.class);//输出键的类型,与上面一致
  75.         job.setOutputValueClass(IntWritable.class);//输出值的类型,与上面一致
  76.         FileInputFormat.addInputPath(job, new Path("E:\\HadoopMRData\\input"));//输入目录
  77.         FileOutputFormat.setOutputPath(job, new Path("E:\\HadoopMRData\\output"));//输出目录,这个目录要不存在运行时创建,特别注意要设置到空目录,应为执行之前会删一次****
  78.         System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
  79.     }
  80. }
复制代码


  • 观察E:\HadoopMRData\output目录下的输出数据
2.直接在服务器使用命令的方式调用



  • 程序的执行过程也在服务器上
  • IDEA内创建Java项目,并导入依赖
  1. <properties>
  2.     <java.version>1.8</java.version>
  3.     <hadoop.version>2.7.1</hadoop.version>
  4. </properties>
  5. <dependencies>
  6.    
  7.     <dependency>
  8.          <groupId>org.apache.hadoop</groupId>
  9.          <artifactId>hadoop-core</artifactId>
  10.          <version>1.2.1</version>
  11.      </dependency>
  12. </dependencies>
  13. <build>
  14.     <pluginManagement>
  15.         <plugins>
  16.             <plugin>
  17.                 <groupId>org.apache.maven.plugins</groupId>
  18.                 <artifactId>maven-jar-plugin</artifactId>
  19.                 <version>2.4</version>
  20.             </plugin>
  21.         </plugins>
  22.     </pluginManagement>
  23. </build>
复制代码


  • 编写WorkCount程序代码
  1. import org.apache.hadoop.conf.Configuration;
  2. import org.apache.hadoop.fs.Path;
  3. import org.apache.hadoop.io.IntWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Job;
  6. import org.apache.hadoop.mapreduce.Mapper;
  7. import org.apache.hadoop.mapreduce.Reducer;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  10. import java.io.IOException;
  11. import java.util.StringTokenizer;
  12. public class WordCount {
  13.     /**
  14.      * map映射器:将输入<字节偏移量,一行文本>切分成<单词,次数>
  15.      * map之前有一步split,将文本切分成<字节偏移量,一行文本>
  16.      */
  17.     public static class TokenizerMapper
  18.             extends Mapper<Object, Text, Text, IntWritable> {//<前两位表示输入类型<偏移量,一行文本>,后两位表示输出类型<单词,数字>>
  19.         private final static IntWritable one = new IntWritable(1);
  20.         private Text word = new Text();
  21.         /**
  22.          * 前两个参是输入类型
  23.          *
  24.          * @param key     偏移量
  25.          * @param value   一行文本,Text类型可序列化,可比较(WritableComparable接口)
  26.          * @param context hadoop运行容器,可取出运行时的环境变量
  27.          * @throws IOException
  28.          * @throws InterruptedException
  29.          */
  30.         @Override
  31.         public void map(Object key, Text value, Context context
  32.         ) throws IOException, InterruptedException {
  33.             System.out.println("切分split后数据--偏移量:" + key + "\t值:" + value);
  34.             StringTokenizer itr = new StringTokenizer(value.toString());//根据自然分隔符分割
  35.             while (itr.hasMoreTokens()) {
  36.                 word.set(itr.nextToken());//写入文本对象
  37.                 context.write(word, one);//保存出去(单词,数字)
  38.             }
  39.         }
  40.     }
  41.     /**
  42.      * combiner(单节点合并)和reduce(多节点数据合并)都是对相同键的数据进行规约
  43.      * <前两个泛型表示规约的输入数据来源于map的输出,后两个是规约后的单词与数字>
  44.      */
  45.     public static class IntSumReducer
  46.             extends Reducer<Text, IntWritable, Text, IntWritable> {
  47.         private IntWritable result = new IntWritable();
  48.         /**
  49.          * @param key     单词
  50.          * @param values  相同单词对应出现次数的集合
  51.          *                类中泛型约束是IntWritable,为什么方法上是Iterable<IntWritable>?
  52.          *                因为在统计之前会相同的键做成列表word [1,1,1],然后在规约word 3
  53.          * @param context
  54.          * @throws IOException
  55.          * @throws InterruptedException
  56.          */
  57.         @Override
  58.         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  59.             int sum = 0;
  60.             for (IntWritable val : values) {//对一个键
  61.                 sum += val.get();//求和规约
  62.             }
  63.             result.set(sum);
  64.             context.write(key, result);
  65.         }
  66.     }
  67.     public static void main(String[] args) throws Exception {
  68.         Configuration conf = new Configuration();//配置参数(map数,reduce数...对于配置配置文件中的API配置)
  69.         Job job = Job.getInstance(conf, "word count");//名字
  70.         job.setJarByClass(WordCount.class);//上线的jar
  71.         job.setMapperClass(TokenizerMapper.class);//mapper
  72.         job.setCombinerClass(IntSumReducer.class);//combine:合并一个节点
  73.         job.setReducerClass(IntSumReducer.class);//reduce:合并不同节点
  74.         job.setOutputKeyClass(Text.class);//输出键的类型,与上面一致
  75.         job.setOutputValueClass(IntWritable.class);//输出值的类型,与上面一致
  76.         FileInputFormat.addInputPath(job, new Path(args[0]));//命令行运行时传入
  77.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  78.         System.exit(job.waitForCompletion(true) ? 0 : 1);//启动,0表示正常退出
  79.     }
  80. }
复制代码


  • 打包

  • 将打好的包放到Linux中

  • hdfs中创建输入目录,不要创建输出目录否则失败



  • 编写输入数据在hdfs



  • jar在hadoop上运行,命令格式:hadoop jar xxx.jar 类名 输入路径 输出路径

  • web观察http://node3:8088/有任务在进行,运行完后观察输出数据
下篇讲讲MR小案例


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

飞不高

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表