Hadoop分布式计算框架(MapReduce)——案例实践:气象大数据离线分析 ...

打印 上一主题 下一主题

主题 968|帖子 968|积分 2904

目次

(1)项目需求
(2)数据格式
(3)实现思路
(4)项目开发


1)项目需求

现在有一份来自美国国家海洋和大气管理局的数据集,内里包罗近30年每个气象站、每小时的气候预报数据,每个报告的文件大小大约15M。一共有263个气象站,每个报告文件的名字包罗气象站ID,每条记录包罗气温、风向、气候状态等多个字段信息。现在要求统计美国各气象站30年平均气温。

2)数据格式

气候预报每行数据的每个字段都是定长的,完整数据格式如下。


数据格式由Year(年)、Month(月)、Day(日)、Hour(时)、Temperature(气温)、Dew(湿度)、Pressure(气压)、Wind dir.(风向)、Wind speed(风速)、Sky Cond.(气候状态)、Rain 1h(每小时降雨量)、Rain 6h(每6小时降雨量)组成。

3实现思路

我们的目的是统计近30年每个气象站的平均气温,由此可以设计一个MapReduce如下所示:

  1. Map = {key = weather station id, value = temperature}
  2. Reduce = {key = weather station id, value = mean(temperature)}
复制代码
首先调用mapper的map()函数提取气象站id作为key,提取气温值作为value,然后调用reducer的reduce()函数对相同气象站的全部气温求平均值。

4项目开发

打开IDEA的bigdata项目,开发MapReduce分布式应用程序,统计美国各气象站近30年的平均气温。

(1)引入Hadoop依赖

由于开发MapReduce程序需要依赖Hadoop客户端,所以需要在项目的pom.xml文件中引入Hadoop的相关依赖,添加如下内容:

  1. <dependency>  
  2.     <groupId>org.apache.hadoop</groupId
  3.     <artifactId>hadoop-client</artifactId>
  4.     <version>2.9.2</version>
  5. </dependency>
复制代码
(2)实现Mapper

由于气候预报每行数据的每个字段都是固定的,所以可以使用substring(start,end)函数提取气温值。由于气象站每个报告文件的名字都包罗气象站ID,首先可以使用FileSplit类获取文件名称,再使用substring(start,end)函数截取气象站ID。

在Reducer中,重写reducer()函数,首先对全部气温值累加求和,末了计算出每个气象站的平均气温值。

完整代码如下:

  1. package com.itheima;
  2. import org.apache.hadoop.conf.Configuration;
  3. import org.apache.hadoop.conf.Configured;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  16. import java.io.IOException;
  17. public class WeatherAnalysis {
  18.    public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  19.        @Override
  20.        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  21.            String line = value.toString();
  22.            int temperature = Integer.parseInt(line.substring(14, 19).trim());
  23.            if (temperature != -9999) {
  24.                FileSplit failsplit = (FileSplit) context.getInputSplit();
  25.                String id = failsplit.getPath().getName().substring(5, 10);
  26.                //输出气象站id
  27.                context.write(new Text(id), new IntWritable(temperature));
  28.            }
  29.        }
  30.    }
  31.    public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  32.        private IntWritable sean = new IntWritable();
  33.        @Override
  34.        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  35.            int sum = 0;
  36.            int count = 0;
  37.            for (IntWritable val : values) {
  38.                sum += val.get();
  39.                count++;
  40.            }
  41.            //求平均值气温
  42.            sean.set(sum / count);
  43.            context.write(key, sean);
  44.        }
  45.    }
  46.    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  47.        org.apache.hadoop.conf.Configuration conf = new Configuration();
  48.        Job job = Job.getInstance(conf, "WeatherAnalysis");
  49.        job.setJarByClass(WeatherAnalysis.class);
  50.        //输入输出路径
  51.        FileInputFormat.addInputPath(job,new Path(args[0]));
  52.        FileOutputFormat.setOutputPath(job,new Path(args[1]));
  53.        //输入输出格式
  54.        job.setInputFormatClass(TextInputFormat.class);
  55.        job.setOutputFormatClass(TextOutputFormat.class);
  56.        //设置mapper及map输出的key value类型
  57.        job.setMapperClass(MyMapper.class);
  58.        job.setMapOutputKeyClass(Text.class);
  59.        job.setMapOutputValueClass(IntWritable.class);
  60.        //设置Reducer及reduce输出key value类型
  61.        job.setReducerClass(MyReducer.class);
  62.        job.setOutputKeyClass(Text.class);
  63.        job.setOutputValueClass(IntWritable.class);
  64.        job.waitForCompletion(true);
  65.    }
  66. }
复制代码
③项目编译打包

在IDEA工具的Terminal控制台中,输入mvn clean package命令对项目进行打包


打包乐成后,在项目的targer目次下找到编译好的bigdata-1.0-SNAPSHOT.jar包,然后将其上传至/home/hadoop/shell/lib目次下(没有相关目次可手动创建)


④准备数据源

由于气象站比较多,为了方便测试,这里只将10个气象站的气候报告文件上传至HDFS的/weather目次下。(没有需要手动创建该目次)

HDFS上创建/weather目次

 hdfs dfs -mkdir /weather


先将数据源上传至当地捏造机目次/home/hadoop/shell/data(该目次需要手动创建)


再将当地数据源上传至HDFS的/weather目次

hdfs dfs -put /home/hadoop/shell/data/* /weather


⑤编写shell脚本

为了便于提交MapReduce作业,在/home/hadoop/shell/bin目次下编写weatherMR.sh脚本,封装作业提交命令,具体脚本内容如下:

  1. #!/bin/bash
  2. echo "start weather mapreduce"
  3. HADOOP_HOME=/soft/hadoop
  4. if($HADOOP_HOME/bin/hdfs dfs -test -e /weather/out)
  5. then
  6.     $HADOOP_HOME/bin/hdfs dfs -rm -r /weather/out
  7. fi
  8. $HADOOP_HOME/bin/yarn jar /home/hadoop/shell/lib/bigdata-1.0-SNAPSHOT.jar com.itheima.WeatherAnalysis -Dmapreduce.job.queuename=root.offline /weather/* /weather/out >> /home/hadoop/shell/logs/weather.log 2>&1
复制代码

⑥为weatherMR.sh 脚本添加可执行权限:

  1. chmod u+x weatherMR.sh
复制代码

⑦提交MapReduce作业

到该脚本目次下,执行weatherMR.sh脚本提交MapReduce作业

  1. ./weatherMR.sh
复制代码

⑧查察运行效果

使用HDFS命令查察美国各气象站近30年的平均气温:

  1. hdfs dfs -cat /weather/out/part-r-00000
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

笑看天下无敌手

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表