需求概述
我们需要从给定的数据中,统计每一年出现的最高温度。该需求可以通过 MapReduce 实现,处理惩罚格式化的气象数据,提取年份和温度,计算出每一年的最高温度。
业务分析
我们有大量气象记录数据,数据格式如下:
- 0151234567890123456789012345678901234567890123456789012345678901234567890123456789012345
- YYYYMMdd[TIME] [TEMPERATURE] ...
复制代码 测试数据:
- 0029029070999991901010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999999
- 0029029070999991901010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+01231+99999102001ADDGF108991999999999999999999
- 0029029070999991910010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+01501+99999102001ADDGF108991999999999999999999
- 0029029070999991910010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9-00231+99999102001ADDGF108991999999999999999999
- 0029029070999991920010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+02451+99999102001ADDGF108991999999999999999999
复制代码
- 年份信息在第 15 到 18 位。[15,18]
- 温度数据在第 88 到 91 位 [88,91],如果温度为9999为无效温度。
- 第87位为温度的符号(正负)
- 第92位用作数据合法性查抄,标记为 0 1 4 5 9 时才是有效的温度记录。
通过编写 MapReduce 作业,Mapper 阶段负责提取年份和温度,Reducer 阶段对同一年的温度举行汇总,终极输出每一年的最高温度。
代码附解释
1. TempMap.java —— Mapper 类
- package com.dxd.dxd;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- import java.io.IOException;
- public class TempMap extends Mapper<LongWritable, Text, Text, IntWritable>{
- // 定义输出的键值对类型
- private Text k2;
- private IntWritable v2;
- // 初始化方法,自动在执行map方法时调用,非必须, 延迟内存分配的时间, 提高程序的启动速度
- // 该方法可省略,直接定义+创建对象即可,如下
- //private Text k2 = new Text();
- //private IntWritable v2 = new IntWritable();
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException{
- k2 = new Text();
- v2 = new IntWritable();
- }
- // map 方法:提取数据中的年份和温度信息,并将其写入 context
- @Override
- protected void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException{
- // 为方便按index处理该行数据,转化为String类
- String line = value.toString();
-
- // 提取年份,数据从 15 到 19 位,不包含第19位,字符串索引第一位为0
- String year = line.substring(15, 19);
- //double temperature = Double.parseDouble(line.substring(87,92));
- //↑ 若为浮点型
-
- // 提取温度,数据从 87 到 92 位,转化为整数,不包含第92位
- int temperature = Integer.parseInt(line.substring(87, 92));
- // 提取质量标记,数据从 92 到 93 位,不包含第93位
- String check = line.substring(92, 93);
- //check.matches("[]")
- //[01459] ---- check="0hello87hhh" ---- true
- //只要有0 1 4 5 9 即可返回true
- //[^01459] ---- ^ == 非 ---- check="0hello87hhh" ---- false
- // 过滤无效数据,温度为 9999 或质量标记不符合要求时跳过
- if (Math.abs(temperature) == 9999 || check.matches("[^01459]")) {
- return;
- }
- // 设置输出的键和值
- k2.set(year); // 键为年份
- v2.set(temperature); // 值为温度
- // 将年份和温度作为键值对输出
- //需处理异常:Unhandled exceptions: java.io.IOException, java.lang.InterruptedException
- context.write(k2, v2);
- //context..write(new Text(year),new IntWritable(temperature));可替换上面三行
- }
- }
复制代码 代码思绪:
- 该 Mapper 类吸收每一行数据,提取此中的年份和温度,并举行简单的过滤(剔除无效数据)。
- 我们在 setup 中初始化键值对对象,以进步服从(制止每次调用 map 都创建新对象)。
- 如果温度无效(如标记为 9999 或质量标记不符合要求),则跳过该数据。否则,提取有效的年份和温度输出给 Reducer。
2. TempReduce.java —— Reducer 类
- package com.dxd.dxd;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Reducer;
- import java.io.IOException;
- public class TempReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
- // reduce 方法:接收每个年份对应的多个温度值,求出最高温度
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
- int max = Integer.MIN_VALUE;
- // 遍历该年份的所有温度,找到最大值
- for (IntWritable value : values) {
- max = Math.max(max, value.get());
- }
- // 将年份和最高温度输出
- //需处理异常:Unhandled exceptions: java.io.IOException, java.lang.InterruptedException
- context.write(key, new IntWritable(max));
- }
- }
复制代码 代码思绪:
- Reducer 负责吸收每个年份的温度列表,使用 Math.max 函数逐一比较,找出该年份的最高温度。
- 终极将年份和最高温度写入输出。
3. TempDriver.java —— Driver 类
- package com.dxd.dxd;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import java.io.IOException;
- public class TempDriver {
- public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
- // 设置用户权限
- System.setProperty("HADOOP_USER_NAME", "root");
- // 配置作业
- Configuration configuration = new Configuration();//需处理异常:Unhandled exception:java.io.IOException
- Job job = Job.getInstance(configuration);
-
- // 指定包含MapReduce作业代码的JAR文件的主类 即包含main()方法的类
- job.setJarByClass(TempDriver.class);
- // 设置 Mapper、Reducer 类
- job.setMapperClass(TempMap.class);
- job.setReducerClass(TempReduce.class);
- // 设置 Combiner 类,减少数据传输量(可选)
- job.setCombinerClass(TempReduce.class);
- // 设置最终输出的键值类型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- // 设置输入输出路径
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- // 提交作业并等待完成,该行有Exception需处理
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- //Unhandled exceptions: java.lang.InterruptedException, java.lang.ClassNotFoundException
-
- }
- }
复制代码 代码思绪:
- Driver 类负责配置和运行整个 MapReduce 作业。
- 通过 job.setMapperClass() 和 job.setReducerClass() 设置 Mapper 和 Reducer。
- job.setCombinerClass() 选项用于本地聚合,减少数据传输量,进步服从。
- 最后,通过 FileInputFormat 和 FileOutputFormat 指定输入和输出路径。
增补:
System.exit(job.waitForCompletion(true) ? 0 : 1); 是什么意思?
- System.exit(job.waitForCompletion(true) ? 0 : 1);
复制代码 是在 Hadoop MapReduce 程序的 Driver 类中用于提交作业并控制作业执行效果的代码。让我们渐渐分析这行代码的含义。
1. job.waitForCompletion(true)
- 该方法会启动 MapReduce 作业并等候其完成。它返回一个布尔值,表示作业是否成功执行。
- 参数 true 表示在作业运行期间会在控制台输出具体的进度信息,也就是作业的状态跟踪。
具体过程:
- MapReduce 作业开始执行,waitForCompletion(true) 会阻塞程序,直到作业完成。
- 如果作业执行成功,waitForCompletion(true) 返回 true。
- 如果作业执行失败,则返回 false。
2. ? 0 : 1(三元运算符)
- 这是 Java 中的三元条件运算符。格式如下:
- condition ? valueIfTrue : valueIfFalse;
复制代码
- condition:布尔表达式,判断效果为 true 或 false。
- valueIfTrue:如果 condition 为 true,则返回该值。
- valueIfFalse:如果 condition 为 false,则返回该值。
在这里:
- 如果 job.waitForCompletion(true) 返回 true(即作业执行成功),则返回 0。
- 如果 job.waitForCompletion(true) 返回 false(即作业执行失败),则返回 1。
3. System.exit(status)
- System.exit(int status) 是一个用于停止 Java 应用程序的命令。它会关闭当前 Java 假造机 (JVM)。
- 参数 status 是一个整数,表示程序的退出状态码:
- 0:通常表示程序正常退出(无错误)。
- 非 0 值:表示程序非常退出或有错误。例如,1 通常表示作业失败。
这行代码的完整含义是:
- 程序调用 job.waitForCompletion(true) 提交并等候 MapReduce 作业的执行效果。
- 如果作业成功,job.waitForCompletion(true) 返回 true,然后 ? 0 : 1 选择返回 0,表示程序成功退出。
- 如果作业失败,job.waitForCompletion(true) 返回 false,? 0 : 1 返回 1,表示程序非常退出。
- 最后通过 System.exit(0) 或 System.exit(1) 来停止整个 Java 程序。
为什么使用 System.exit()?
在 Hadoop 作业中,通常使用 System.exit() 来确保整个应用程序在作业完成后停止,返回的状态码可以作为操作系统判断程序是否成功执行的依据:
- 0 表示成功,可以触发后续的处理惩罚任务。
- 1 表示失败,可能会触发错误处理惩罚机制或发出警报。
总结:
- System.exit(job.waitForCompletion(true) ? 0 : 1);
复制代码
- 作用:等候 MapReduce 作业完成,并根据作业的执行效果退出 Java 程序。
- 含义:
- job.waitForCompletion(true) 提交作业并等候作业完成。
- 根据作业是否成功,返回 0 或 1。
- System.exit() 用于停止程序,并传递作业的成功或失败状态。
终极效果
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |