使用 Hadoop MapReduce 实现历年最高温度统计

打印 上一主题 下一主题

主题 930|帖子 930|积分 2790

需求概述

我们需要从给定的数据中,统计每一年出现的最高温度。该需求可以通过 MapReduce 实现,处理惩罚格式化的气象数据,提取年份和温度,计算出每一年的最高温度。
业务分析

我们有大量气象记录数据,数据格式如下:
  1. 0151234567890123456789012345678901234567890123456789012345678901234567890123456789012345
  2. YYYYMMdd[TIME] [TEMPERATURE] ...
复制代码
测试数据:
  1. 0029029070999991901010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999999
  2. 0029029070999991901010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+01231+99999102001ADDGF108991999999999999999999
  3. 0029029070999991910010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+01501+99999102001ADDGF108991999999999999999999
  4. 0029029070999991910010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9-00231+99999102001ADDGF108991999999999999999999
  5. 0029029070999991920010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+02451+99999102001ADDGF108991999999999999999999
复制代码


  • 年份信息在第 15 到 18 位。[15,18]
  • 温度数据在第 88 到 91 位 [88,91],如果温度为9999为无效温度。
  • 第87位为温度的符号(正负)
  • 第92位用作数据合法性查抄,标记为 0 1 4 5 9 时才是有效的温度记录。
通过编写 MapReduce 作业,Mapper 阶段负责提取年份和温度,Reducer 阶段对同一年的温度举行汇总,终极输出每一年的最高温度。
代码附解释

1. TempMap.java —— Mapper 类

  1. package com.dxd.dxd;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.LongWritable;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.mapreduce.Mapper;
  6. import java.io.IOException;
  7. public class TempMap extends Mapper<LongWritable, Text, Text, IntWritable>{
  8.     // 定义输出的键值对类型
  9.     private Text k2;
  10.     private IntWritable v2;
  11.     // 初始化方法,自动在执行map方法时调用,非必须, 延迟内存分配的时间, 提高程序的启动速度
  12.     // 该方法可省略,直接定义+创建对象即可,如下
  13.     //private Text k2 = new Text();
  14.     //private IntWritable v2 = new IntWritable();
  15.    
  16.     @Override
  17.     protected void setup(Context context) throws IOException, InterruptedException{
  18.         k2 = new Text();
  19.         v2 = new IntWritable();
  20.     }
  21.     // map 方法:提取数据中的年份和温度信息,并将其写入 context
  22.     @Override
  23.     protected void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException{
  24.         // 为方便按index处理该行数据,转化为String类
  25.         String line = value.toString();
  26.         
  27.         // 提取年份,数据从 15 到 19 位,不包含第19位,字符串索引第一位为0
  28.         String year = line.substring(15, 19);
  29.         //double temperature = Double.parseDouble(line.substring(87,92));
  30.         //↑ 若为浮点型
  31.         
  32.         // 提取温度,数据从 87 到 92 位,转化为整数,不包含第92位
  33.         int temperature = Integer.parseInt(line.substring(87, 92));
  34.         // 提取质量标记,数据从 92 到 93 位,不包含第93位
  35.         String check = line.substring(92, 93);
  36.         //check.matches("[]")
  37.         //[01459] ---- check="0hello87hhh" ---- true
  38.         //只要有0 1 4 5 9 即可返回true
  39.         //[^01459] ---- ^ == 非 ---- check="0hello87hhh" ---- false
  40.         // 过滤无效数据,温度为 9999 或质量标记不符合要求时跳过
  41.         if (Math.abs(temperature) == 9999 || check.matches("[^01459]")) {
  42.             return;
  43.         }
  44.         // 设置输出的键和值
  45.         k2.set(year);              // 键为年份
  46.         v2.set(temperature);       // 值为温度
  47.         // 将年份和温度作为键值对输出
  48.         //需处理异常:Unhandled exceptions: java.io.IOException, java.lang.InterruptedException
  49.         context.write(k2, v2);
  50.         //context..write(new Text(year),new IntWritable(temperature));可替换上面三行
  51.     }
  52. }
复制代码
代码思绪



  • 该 Mapper 类吸收每一行数据,提取此中的年份和温度,并举行简单的过滤(剔除无效数据)。
  • 我们在 setup 中初始化键值对对象,以进步服从(制止每次调用 map 都创建新对象)。
  • 如果温度无效(如标记为 9999 或质量标记不符合要求),则跳过该数据。否则,提取有效的年份和温度输出给 Reducer。
2. TempReduce.java —— Reducer 类

  1. package com.dxd.dxd;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. public class TempReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
  7.     // reduce 方法:接收每个年份对应的多个温度值,求出最高温度
  8.     @Override
  9.     protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
  10.         int max = Integer.MIN_VALUE;
  11.         // 遍历该年份的所有温度,找到最大值
  12.         for (IntWritable value : values) {
  13.             max = Math.max(max, value.get());
  14.         }
  15.         // 将年份和最高温度输出
  16.         //需处理异常:Unhandled exceptions: java.io.IOException, java.lang.InterruptedException
  17.         context.write(key, new IntWritable(max));
  18.     }
  19. }
复制代码
代码思绪



  • Reducer 负责吸收每个年份的温度列表,使用 Math.max 函数逐一比较,找出该年份的最高温度。
  • 终极将年份和最高温度写入输出。
3. TempDriver.java —— Driver 类

  1. package com.dxd.dxd;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.fs.Path;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.mapreduce.Job;
  7. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  9. import java.io.IOException;
  10. public class TempDriver {
  11.     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
  12.         // 设置用户权限
  13.         System.setProperty("HADOOP_USER_NAME", "root");
  14.         // 配置作业
  15.         Configuration configuration = new Configuration();//需处理异常:Unhandled exception:java.io.IOException
  16.         Job job = Job.getInstance(configuration);
  17.         
  18.         // 指定包含MapReduce作业代码的JAR文件的主类 即包含main()方法的类
  19.         job.setJarByClass(TempDriver.class);
  20.         // 设置 Mapper、Reducer 类
  21.         job.setMapperClass(TempMap.class);
  22.         job.setReducerClass(TempReduce.class);
  23.         // 设置 Combiner 类,减少数据传输量(可选)
  24.         job.setCombinerClass(TempReduce.class);
  25.         // 设置最终输出的键值类型
  26.         job.setOutputKeyClass(Text.class);
  27.         job.setOutputValueClass(IntWritable.class);
  28.         // 设置输入输出路径
  29.         FileInputFormat.setInputPaths(job, new Path(args[0]));
  30.         FileOutputFormat.setOutputPath(job, new Path(args[1]));
  31.         // 提交作业并等待完成,该行有Exception需处理
  32.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  33.         //Unhandled exceptions: java.lang.InterruptedException, java.lang.ClassNotFoundException
  34.         
  35.     }
  36. }
复制代码
代码思绪



  • Driver 类负责配置和运行整个 MapReduce 作业。
  • 通过 job.setMapperClass() 和 job.setReducerClass() 设置 Mapper 和 Reducer。
  • job.setCombinerClass() 选项用于本地聚合,减少数据传输量,进步服从。
  • 最后,通过 FileInputFormat 和 FileOutputFormat 指定输入和输出路径。
增补:

System.exit(job.waitForCompletion(true) ? 0 : 1); 是什么意思?

  1. System.exit(job.waitForCompletion(true) ? 0 : 1); 
复制代码
是在 Hadoop MapReduce 程序的 Driver 类中用于提交作业并控制作业执行效果的代码。让我们渐渐分析这行代码的含义。
1. job.waitForCompletion(true)



  • 该方法会启动 MapReduce 作业并等候其完成。它返回一个布尔值,表示作业是否成功执行。
  • 参数 true 表示在作业运行期间会在控制台输出具体的进度信息,也就是作业的状态跟踪
具体过程


  • MapReduce 作业开始执行,waitForCompletion(true) 会阻塞程序,直到作业完成。
  • 如果作业执行成功,waitForCompletion(true) 返回 true。
  • 如果作业执行失败,则返回 false。
2. ? 0 : 1(三元运算符)



  • 这是 Java 中的三元条件运算符。格式如下:
    1. condition ? valueIfTrue : valueIfFalse;
    复制代码

    • condition:布尔表达式,判断效果为 true 或 false。
    • valueIfTrue:如果 condition 为 true,则返回该值。
    • valueIfFalse:如果 condition 为 false,则返回该值。

在这里:


  • 如果 job.waitForCompletion(true) 返回 true(即作业执行成功),则返回 0。
  • 如果 job.waitForCompletion(true) 返回 false(即作业执行失败),则返回 1。
3. System.exit(status)



  • System.exit(int status) 是一个用于停止 Java 应用程序的命令。它会关闭当前 Java 假造机 (JVM)。
  • 参数 status 是一个整数,表示程序的退出状态码:

    • 0:通常表示程序正常退出(无错误)。
    • 非 0 值:表示程序非常退出或有错误。例如,1 通常表示作业失败。

这行代码的完整含义是:


  • 程序调用 job.waitForCompletion(true) 提交并等候 MapReduce 作业的执行效果。
  • 如果作业成功,job.waitForCompletion(true) 返回 true,然后 ? 0 : 1 选择返回 0,表示程序成功退出。
  • 如果作业失败,job.waitForCompletion(true) 返回 false,? 0 : 1 返回 1,表示程序非常退出。
  • 最后通过 System.exit(0) 或 System.exit(1) 来停止整个 Java 程序。
为什么使用 System.exit()?

在 Hadoop 作业中,通常使用 System.exit() 来确保整个应用程序在作业完成后停止,返回的状态码可以作为操作系统判断程序是否成功执行的依据:


  • 0 表示成功,可以触发后续的处理惩罚任务。
  • 1 表示失败,可能会触发错误处理惩罚机制或发出警报。
总结:

  1. System.exit(job.waitForCompletion(true) ? 0 : 1);
复制代码


  • 作用:等候 MapReduce 作业完成,并根据作业的执行效果退出 Java 程序。
  • 含义

    • job.waitForCompletion(true) 提交作业并等候作业完成。
    • 根据作业是否成功,返回 0 或 1。
    • System.exit() 用于停止程序,并传递作业的成功或失败状态。


终极效果



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

汕尾海湾

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表