本文重要初步学习hadoop的过程,简单验证迩来几天的掌握情况,也是分析一下景象数据;分析数据之前,先学习了一些基础知识,可参考以下的文章,有条件可以按照文章的内容搭个集群,单点也可以玩,我搭的单点;
hadoop学习
分析景象数据
正文开始(下面是具体解析,最下面会整合,通过zakaban一键走完所有流程):
1、数据预备
1.1、数据下载
重要分析的2023年的数据,也附带上2024年的数据,网站上还有其它年份的,各人自由发挥
2024年数据
2023年数据
1.2、数据上传
hadoop的存储使用的hdfs,通过hdfs命令新建/data路径后上传,这里留意hdfs的部署路径(包括文章所有涉及有路径的命令),各人根据自己情况修改
- /home/chao/soft/hadoop/hadoop/bin/hdfs dfs -mkdir -p /data
- /home/chao/soft/hadoop/hadoop/bin/hdfs dfs -put 2023.tar.gz /data
复制代码 1.3、加载初始数据
如今数据存到了hdfs,必要将数据加载到hive中,各人先在hive上创建库,我这已经创建好了my_hive,接下来就是在库中创建表,建表hql尽量不要创建外部表,否则drop表的时候必要到hdfs上删除数据才可
row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde' --- 加载csv会以字符串情势读取,自带双引号,加上可去掉双引号
tblproperties ('skip.header.line.count'='1') --- 读取csv可以忽略第一行(表头)(这里有个小坑,因为我load的压缩包,发现只省去了第一个文件的表头,后续处理的时候必要将这些数据特殊处理下)
- use myhive;
- create table if not exists data_2023 (station string, `date` string, latitude decimal(18,3), longitude decimal(18,3), elevation int, `name` string, temp decimal(18,3), temp_attributes int, dewp decimal(18,3),dewp_attributes int, slp decimal(18,3), slp_attributes int, stp decimal(18,3), stp_attributes int, visib decimal(18,3), visib_attributes int, wdsp decimal(18,3), wdsp_attributes int, mxspd decimal(18,3), gust int, `max` decimal(18,3), max_attributes string, `min` decimal(18,3), min_attributes string, prcp decimal(18,3), prcp_attributes int, sndp decimal(18,3), frshtt int )
- row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde' tblproperties ('skip.header.line.count'='1');
复制代码 加载数据
- -- 初始化数据
- load data inpath '/data/2023.tar.gz' overwrite into table data_2023;
复制代码 数据查看 共400多w数据
2、数据洗濯
2.1 函数预备
原始数据中都是华氏度,我这就写了一个摄氏度转化函数在hql中使用,各人也可以直接在hql中盘算,但是学都学了,搞得稍稍复杂点。写好之后必要加载到hive中使用。
函数编写:
- import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
- import org.apache.hadoop.hive.ql.metadata.HiveException;
- import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
- import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
- import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
- import java.text.DecimalFormat;
- public class Transform extends GenericUDF {
- @Override
- public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
- return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
- }
- @Override
- public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
- String input = deferredObjects[0].get().toString();
- if (input == null) {
- return -10000;
- }
- Double value = (Double.parseDouble(input.trim()) - 32) * 5 / 9;
- DecimalFormat df = new DecimalFormat("#.00");
- return df.format(value);
- }
- @Override
- public String getDisplayString(String[] strings) {
- return "";
- }
- }
复制代码 写好之后直接打jar包(可修改下包名),上传
- cp trans.jar /home/chao/soft/hive/packages/
复制代码 加载到hive中(hive中实行)
- -- 1.加jar包 转化 为摄氏度
- add jar /home/chao/soft/hive/packages/trans.jar;
- -- 2.创建临时函数
- create temporary function c2f as 'com.god.chao.hive.udf.Transform';
复制代码 2.2 洗濯数据
创建洗濯之后的新表,根据自己必要改一下字段名,原字段欠好明白
row format delimited fields terminated by '\t' --- 尽量加上字段分隔符,要不然后续可能会出现洗濯的字段到了新表的同一个字段中的标题
- -- 3.创建新表
- create table if not exists clear_data_2023 (station string comment '站点', `date` string, area string comment '国家或地区', `year` int, `month` int, `day` int, temp decimal(18,3) comment '每日平均温度', temp_min decimal(18,3), temp_max decimal(18,3), wind_speed decimal(18,3) comment '风速', rainfall decimal(18,3) comment '降雨量')
- row format delimited fields terminated by '\t';
复制代码 上面说的小坑,这里加上条件过滤掉。可以看到这里用到了自己写的函数c2f;
- -- 4.处理数据 data_2023已经设置csv导入省去第一行,或许导入的是压缩包,只会省去第一个文件的第一行,这里加上条件
- insert overwrite table clear_data_2023 select station as station, `date` as `date`, split(`name`, ',')[1] as area, split(`date`, '-')[0] as `year`, split(`date`, '-')[1] as `month`, split(`date`, '-')[2] as `day`, c2f(temp) as temp, c2f(`max`) as temp_max, c2f(`min`) as temp_min, if(abs(999.9-wdsp) > 0.1, wdsp, null) as wind_speed, if(abs(99.9-prcp) > 0.1, prcp, null) as rainfall from data_2023
- where `name` != 'NAME' and `date` is not null;
复制代码 可以继续查看下数据洗濯效果
3、盘算数据
基础数据已经存在了,如今就是按照需求写hql或者mr,简单的可以写hql,复杂的可以搞mr,我自己随便想了几个玩玩
1. 每个国家每天的平均气温,最高,最低温度
2. 每个站点每月的平均气温,最高,最低温度,平均风速,最大,最小风速
3. 每个国家每月的平均降水量
4. 每个国家最大降水量的月份及降水量
以下的是hql
- use myhive;
- -- 每个国家每天的平均气温,最高,最低温度
- create table day_country_temp comment '国家每天统计表' row format delimited fields terminated by '\t' as
- select area, `date`, avg(temp) as temp, max(temp_max) as temp_max, min(temp_min) as temp_min from clear_data_2023 group by area, `date`;
- -- 每个站点每月的平均气温,最高,最低温度,平均风速,最大,最小风速
- create table day_station_monthly_data comment '每个站点每个月的数据' row format delimited fields terminated by '\t' as
- select station, area , `month`, avg(temp) as temp, max(temp_max) as temp_max, min(temp_min) as temp_min, avg(wind_speed) as avg_wind_speed, min(wind_speed) as min_wind_speed, max(wind_speed) as max_wind_speed, avg(rainfall) as avg_rainfall, max(rainfall) as max_rainfall, min(rainfall) as min_rainfall from clear_data_2023 group by station, area, `month`;
- -- 每个国家每月的平均降水量
- create table monthly_area_rainfall row format delimited fields terminated by '\t' as
- select area, `month`, avg(avg_rainfall) as avg_rainfall from day_station_monthly_data group by area, `month`;
- -- 每个国家最大降水量的月份及降水量
- create table area_max_rainfall_data row format delimited fields terminated by '\t' as
- select t.area, a.month, t.max_rainfall from monthly_area_rainfall a join
- (select area, max(avg_rainfall) as max_rainfall from monthly_area_rainfall group by area) t on a.area = t.area and a.avg_rainfall = t.max_rainfall;
复制代码 4、数据传输
我这是将数据从hive导入到mysql
4.1 mysql初始化
先创建好mysql的库 my_test ;
- -- 先创建好数据库my_test
- use my_test;
- create table if not exists `day_country_temp` ( area varchar(20), `date` varchar(20), temp decimal(10,2), temp_max decimal(10,2), temp_min decimal(10,2)) comment '国家每天统计表';
- create table if not exists `day_station_monthly_data` ( station varchar(20), area varchar(20), `month` int(4), temp decimal(10,2), temp_max decimal(10,2), temp_min decimal(10,2), avg_wind_speed decimal(10,2), min_wind_speed decimal(10,2), max_wind_speed decimal(10,2), avg_rainfall decimal(10,2), max_rainfall decimal(10,2), min_rainfall decimal(10,2)) comment '每个站点每个月的数据';
- create table if not exists `monthly_area_rainfall` ( area varchar(20), `month` int(4), avg_rainfall decimal(10,2)) comment '每个国家每月的平均降水量';
- create table if not exists `area_max_rainfall_data` ( area varchar(20), `month` int(4), max_rainfall decimal(10,2)) comment '每个国家最大降水量的月份及降水量';
复制代码 4.2 通过sqoop传输数据
这里各人留意修改路径,myql的毗连地址,我这没设置暗码,如有必要可以在 username后加上 --userpassword xxx
--input-null-string '\\N' --input-null-non-string '\\N' ---- 传输过程中可能会出现 \N 标题,这里加上可以将 \N 转为空存储
- /home/chao/soft/sqoop/sqoop/bin/sqoop export --connect jdbc:mysql://myhadoop1:3306/my_test --username root --table day_country_temp --export-dir /user/hive/warehouse/myhive.db/day_country_temp --input-fields-terminated-by '\t' --input-null-string '\\N' --input-null-non-string '\\N'
- /home/chao/soft/sqoop/sqoop/bin/sqoop export --connect jdbc:mysql://myhadoop1:3306/my_test --username root --table day_station_monthly_data --export-dir /user/hive/warehouse/myhive.db/day_station_monthly_data --input-fields-terminated-by '\t' --input-null-string '\\N' --input-null-non-string '\\N'
- /home/chao/soft/sqoop/sqoop/bin/sqoop export --connect jdbc:mysql://myhadoop1:3306/my_test --username root --table monthly_area_rainfall --export-dir /user/hive/warehouse/myhive.db/monthly_area_rainfall --input-fields-terminated-by '\t' --input-null-string '\\N' --input-null-non-string '\\N'
- /home/chao/soft/sqoop/sqoop/bin/sqoop export --connect jdbc:mysql://myhadoop1:3306/my_test --username root --table area_max_rainfall_data --export-dir /user/hive/warehouse/myhive.db/area_max_rainfall_data --input-fields-terminated-by '\t' --input-null-string '\\N' --input-null-non-string '\\N'
复制代码 5、通过azkaban一键走完流程
前四章节是为第五步打的基础,各人可以参考玩玩,上面所有的步骤假如都实行完,就可以体会到,每次都手动上传,去敲命令有点麻烦,这次的洗濯是一次性的,假如后续有增量需求,必要每天定时跑,就会力不从心,所以就必要通过azkaban这样的平台来实行。
5.1 编写job
我分为了几个步骤,供各人参考
1. 上传jar包,资源数据;
2. 初始化hive库表,洗濯数据
3. 数据盘算
4. 数据传输
-- mysql初始化只要在第四步之前即可
一共5个job,之间有相互的依赖关系,实行的sql,脚本基本全是上面步骤的
uploadjar.job
- type=command
- command=sh upload.sh
复制代码 init.job
- type=command
- command=/home/chao/soft/hive/bin/hive -f init.sql
- dependencies=uploadjar
复制代码 calculate.job
- type=command
- command=/home/chao/soft/hive/bin/hive -f calculate.sql
- dependencies=init
复制代码 mysqlinit.job
- type=command
- command=sh mysqlinit.sh
复制代码 sqoop.job
- type=command
- command=sh sqoop.sh
- dependencies=mysqlinit,calculate
复制代码 5.2 创建项目实行
在azkaban上创建项目,上传压缩包,并实行
上传上去可看到job依赖关系
实行之后便是以下效果,mysql中也已经有数据,后续可以做统计大屏等
6. 结束语及相关脚本
初次玩hadoop,有标题欢迎各人探讨,多多包涵。放上我实行的azkaban压缩包,各人改下路径,数据库链接相关内容,可以直接运行。
通过网盘分享的文件:mr.zip
链接: https://pan.baidu.com/s/1DvnQVnUL6B8S1bXC-C8eHw 提取码: f7dz
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |