ToB企服应用市场:ToB评测及商务社交产业平台
标题:
使用 Hadoop MapReduce 实现历年最高温度统计
[打印本页]
作者:
汕尾海湾
时间:
2024-10-24 20:07
标题:
使用 Hadoop MapReduce 实现历年最高温度统计
需求概述
我们需要从给定的数据中,统计每一年出现的最高温度。该需求可以通过
MapReduce
实现,处理惩罚格式化的气象数据,提取年份和温度,计算出每一年的最高温度。
业务分析
我们有大量气象记录数据,数据格式如下:
0151234567890123456789012345678901234567890123456789012345678901234567890123456789012345
YYYYMMdd[TIME] [TEMPERATURE] ...
复制代码
测试数据:
0029029070999991901010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999999
0029029070999991901010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+01231+99999102001ADDGF108991999999999999999999
0029029070999991910010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+01501+99999102001ADDGF108991999999999999999999
0029029070999991910010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9-00231+99999102001ADDGF108991999999999999999999
0029029070999991920010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+02451+99999102001ADDGF108991999999999999999999
复制代码
年份信息在第 15 到 18 位。[15,18]
温度数据在第 88 到 91 位 [88,91],如果温度为9999为无效温度。
第87位为温度的符号(正负)
第92位用作数据合法性查抄,标记为 0 1 4 5 9 时才是有效的温度记录。
通过编写
MapReduce
作业,Mapper 阶段负责提取年份和温度,Reducer 阶段对同一年的温度举行汇总,终极输出每一年的最高温度。
代码附解释
1. TempMap.java —— Mapper 类
package com.dxd.dxd;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class TempMap extends Mapper<LongWritable, Text, Text, IntWritable>{
// 定义输出的键值对类型
private Text k2;
private IntWritable v2;
// 初始化方法,自动在执行map方法时调用,非必须, 延迟内存分配的时间, 提高程序的启动速度
// 该方法可省略,直接定义+创建对象即可,如下
//private Text k2 = new Text();
//private IntWritable v2 = new IntWritable();
@Override
protected void setup(Context context) throws IOException, InterruptedException{
k2 = new Text();
v2 = new IntWritable();
}
// map 方法:提取数据中的年份和温度信息,并将其写入 context
@Override
protected void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException{
// 为方便按index处理该行数据,转化为String类
String line = value.toString();
// 提取年份,数据从 15 到 19 位,不包含第19位,字符串索引第一位为0
String year = line.substring(15, 19);
//double temperature = Double.parseDouble(line.substring(87,92));
//↑ 若为浮点型
// 提取温度,数据从 87 到 92 位,转化为整数,不包含第92位
int temperature = Integer.parseInt(line.substring(87, 92));
// 提取质量标记,数据从 92 到 93 位,不包含第93位
String check = line.substring(92, 93);
//check.matches("[]")
//[01459] ---- check="0hello87hhh" ---- true
//只要有0 1 4 5 9 即可返回true
//[^01459] ---- ^ == 非 ---- check="0hello87hhh" ---- false
// 过滤无效数据,温度为 9999 或质量标记不符合要求时跳过
if (Math.abs(temperature) == 9999 || check.matches("[^01459]")) {
return;
}
// 设置输出的键和值
k2.set(year); // 键为年份
v2.set(temperature); // 值为温度
// 将年份和温度作为键值对输出
//需处理异常:Unhandled exceptions: java.io.IOException, java.lang.InterruptedException
context.write(k2, v2);
//context..write(new Text(year),new IntWritable(temperature));可替换上面三行
}
}
复制代码
代码思绪
:
该 Mapper 类吸收每一行数据,提取此中的年份和温度,并举行简单的过滤(剔除无效数据)。
我们在 setup 中初始化键值对对象,以进步服从(制止每次调用 map 都创建新对象)。
如果温度无效(如标记为 9999 或质量标记不符合要求),则跳过该数据。否则,提取有效的年份和温度输出给 Reducer。
2. TempReduce.java —— Reducer 类
package com.dxd.dxd;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class TempReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
// reduce 方法:接收每个年份对应的多个温度值,求出最高温度
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int max = Integer.MIN_VALUE;
// 遍历该年份的所有温度,找到最大值
for (IntWritable value : values) {
max = Math.max(max, value.get());
}
// 将年份和最高温度输出
//需处理异常:Unhandled exceptions: java.io.IOException, java.lang.InterruptedException
context.write(key, new IntWritable(max));
}
}
复制代码
代码思绪
:
Reducer 负责吸收每个年份的温度列表,使用 Math.max 函数逐一比较,找出该年份的最高温度。
终极将年份和最高温度写入输出。
3. TempDriver.java —— Driver 类
package com.dxd.dxd;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class TempDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 设置用户权限
System.setProperty("HADOOP_USER_NAME", "root");
// 配置作业
Configuration configuration = new Configuration();//需处理异常:Unhandled exception:java.io.IOException
Job job = Job.getInstance(configuration);
// 指定包含MapReduce作业代码的JAR文件的主类 即包含main()方法的类
job.setJarByClass(TempDriver.class);
// 设置 Mapper、Reducer 类
job.setMapperClass(TempMap.class);
job.setReducerClass(TempReduce.class);
// 设置 Combiner 类,减少数据传输量(可选)
job.setCombinerClass(TempReduce.class);
// 设置最终输出的键值类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交作业并等待完成,该行有Exception需处理
System.exit(job.waitForCompletion(true) ? 0 : 1);
//Unhandled exceptions: java.lang.InterruptedException, java.lang.ClassNotFoundException
}
}
复制代码
代码思绪
:
Driver 类负责配置和运行整个 MapReduce 作业。
通过 job.setMapperClass() 和 job.setReducerClass() 设置 Mapper 和 Reducer。
job.setCombinerClass() 选项用于本地聚合,减少数据传输量,进步服从。
最后,通过 FileInputFormat 和 FileOutputFormat 指定输入和输出路径。
增补:
System.exit(job.waitForCompletion(true) ? 0 : 1); 是什么意思?
System.exit(job.waitForCompletion(true) ? 0 : 1);
复制代码
是在 Hadoop MapReduce 程序的 Driver 类中用于提交作业并控制作业执行效果的代码。让我们渐渐分析这行代码的含义。
1.
job.waitForCompletion(true)
该方法会启动 MapReduce 作业并等候其完成。它返回一个布尔值,表示作业是否成功执行。
参数 true 表示在作业运行期间会在控制台输出具体的进度信息,也就是
作业的状态跟踪
。
具体过程
:
MapReduce 作业开始执行,waitForCompletion(true) 会阻塞程序,直到作业完成。
如果作业执行成功,waitForCompletion(true) 返回 true。
如果作业执行失败,则返回 false。
2.
? 0 : 1(三元运算符)
这是 Java 中的
三元条件运算符
。格式如下:
condition ? valueIfTrue : valueIfFalse;
复制代码
condition:布尔表达式,判断效果为 true 或 false。
valueIfTrue:如果 condition 为 true,则返回该值。
valueIfFalse:如果 condition 为 false,则返回该值。
在这里:
如果 job.waitForCompletion(true) 返回 true(即作业执行成功),则返回 0。
如果 job.waitForCompletion(true) 返回 false(即作业执行失败),则返回 1。
3.
System.exit(status)
System.exit(int status) 是一个用于停止 Java 应用程序的命令。它会关闭当前 Java 假造机 (JVM)。
参数 status 是一个整数,表示程序的退出状态码:
0
:通常表示程序
正常退出
(无错误)。
非 0 值
:表示程序
非常退出
或有错误。例如,1 通常表示作业失败。
这行代码的完整含义是:
程序调用 job.waitForCompletion(true) 提交并等候 MapReduce 作业的执行效果。
如果作业成功,job.waitForCompletion(true) 返回 true,然后 ? 0 : 1 选择返回 0,表示程序成功退出。
如果作业失败,job.waitForCompletion(true) 返回 false,? 0 : 1 返回 1,表示程序非常退出。
最后通过 System.exit(0) 或 System.exit(1) 来停止整个 Java 程序。
为什么使用 System.exit()?
在 Hadoop 作业中,通常使用 System.exit() 来确保整个应用程序在作业完成后停止,返回的状态码可以作为操作系统判断程序是否成功执行的依据:
0
表示成功,可以触发后续的处理惩罚任务。
1
表示失败,可能会触发错误处理惩罚机制或发出警报。
总结:
System.exit(job.waitForCompletion(true) ? 0 : 1);
复制代码
作用
:等候 MapReduce 作业完成,并根据作业的执行效果退出 Java 程序。
含义
:
job.waitForCompletion(true) 提交作业并等候作业完成。
根据作业是否成功,返回 0 或 1。
System.exit() 用于停止程序,并传递作业的成功或失败状态。
终极效果
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4