ToB企服应用市场:ToB评测及商务社交产业平台

标题: 初玩hadoop分析数据并通过azkaban一键实行 [打印本页]

作者: 我可以不吃啊    时间: 2025-1-6 23:11
标题: 初玩hadoop分析数据并通过azkaban一键实行
        本文重要初步学习hadoop的过程,简单验证迩来几天的掌握情况,也是分析一下景象数据;分析数据之前,先学习了一些基础知识,可参考以下的文章,有条件可以按照文章的内容搭个集群,单点也可以玩,我搭的单点;
        hadoop学习
        分析景象数据
正文开始(下面是具体解析,最下面会整合,通过zakaban一键走完所有流程):
1、数据预备

        1.1、数据下载

        重要分析的2023年的数据,也附带上2024年的数据,网站上还有其它年份的,各人自由发挥
        2024年数据
        2023年数据
        1.2、数据上传

        hadoop的存储使用的hdfs,通过hdfs命令新建/data路径后上传,这里留意hdfs的部署路径(包括文章所有涉及有路径的命令),各人根据自己情况修改
  1. /home/chao/soft/hadoop/hadoop/bin/hdfs dfs -mkdir -p /data
  2. /home/chao/soft/hadoop/hadoop/bin/hdfs dfs -put 2023.tar.gz /data
复制代码
        1.3、加载初始数据

        如今数据存到了hdfs,必要将数据加载到hive中,各人先在hive上创建库,我这已经创建好了my_hive,接下来就是在库中创建表,建表hql尽量不要创建外部表,否则drop表的时候必要到hdfs上删除数据才可
           row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'  ---   加载csv会以字符串情势读取,自带双引号,加上可去掉双引号
          tblproperties ('skip.header.line.count'='1')   ---   读取csv可以忽略第一行(表头)(这里有个小坑,因为我load的压缩包,发现只省去了第一个文件的表头,后续处理的时候必要将这些数据特殊处理下)
  1. use myhive;
  2. create table if not exists data_2023 (station string, `date` string, latitude decimal(18,3), longitude decimal(18,3), elevation int, `name` string, temp decimal(18,3), temp_attributes int, dewp decimal(18,3),dewp_attributes int, slp decimal(18,3), slp_attributes int, stp decimal(18,3), stp_attributes int, visib decimal(18,3), visib_attributes int, wdsp decimal(18,3), wdsp_attributes int, mxspd decimal(18,3), gust int, `max` decimal(18,3), max_attributes string, `min` decimal(18,3), min_attributes string, prcp decimal(18,3), prcp_attributes int, sndp decimal(18,3), frshtt int )
  3. row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde' tblproperties ('skip.header.line.count'='1');
复制代码
        加载数据
  1. -- 初始化数据
  2. load data inpath '/data/2023.tar.gz' overwrite into table data_2023;
复制代码
        数据查看      共400多w数据
        

        


2、数据洗濯

        2.1 函数预备

        原始数据中都是华氏度,我这就写了一个摄氏度转化函数在hql中使用,各人也可以直接在hql中盘算,但是学都学了,搞得稍稍复杂点。写好之后必要加载到hive中使用。
        函数编写:
  1. import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
  2. import org.apache.hadoop.hive.ql.metadata.HiveException;
  3. import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
  4. import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
  5. import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
  6. import java.text.DecimalFormat;
  7. public class Transform extends GenericUDF {
  8.     @Override
  9.     public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
  10.         return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
  11.     }
  12.     @Override
  13.     public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
  14.         String input = deferredObjects[0].get().toString();
  15.         if (input == null) {
  16.             return -10000;
  17.         }
  18.         Double value = (Double.parseDouble(input.trim()) - 32) * 5 / 9;
  19.         DecimalFormat df = new DecimalFormat("#.00");
  20.         return df.format(value);
  21.     }
  22.     @Override
  23.     public String getDisplayString(String[] strings) {
  24.         return "";
  25.     }
  26. }
复制代码
        写好之后直接打jar包(可修改下包名),上传
  1. cp trans.jar /home/chao/soft/hive/packages/
复制代码
        加载到hive中(hive中实行)
  1.     -- 1.加jar包 转化 为摄氏度
  2. add jar /home/chao/soft/hive/packages/trans.jar;
  3.     -- 2.创建临时函数
  4. create temporary function c2f as 'com.god.chao.hive.udf.Transform';
复制代码
          2.2 洗濯数据

            创建洗濯之后的新表,根据自己必要改一下字段名,原字段欠好明白
                ​​​​​​​        

            row format delimited fields terminated by '\t'  --- 尽量加上字段分隔符,要不然后续可能会出现洗濯的字段到了新表的同一个字段中的标题
  1.     -- 3.创建新表
  2. create table if not exists clear_data_2023 (station string comment '站点', `date` string, area string comment '国家或地区', `year` int, `month` int, `day` int, temp decimal(18,3) comment '每日平均温度', temp_min decimal(18,3), temp_max decimal(18,3), wind_speed decimal(18,3) comment '风速', rainfall decimal(18,3) comment '降雨量')
  3. row format delimited fields terminated by '\t';
复制代码
          上面说的小坑,这里加上条件过滤掉。可以看到这里用到了自己写的函数c2f;
  1. -- 4.处理数据  data_2023已经设置csv导入省去第一行,或许导入的是压缩包,只会省去第一个文件的第一行,这里加上条件
  2. insert overwrite table clear_data_2023 select station as station, `date` as `date`, split(`name`, ',')[1] as area, split(`date`, '-')[0] as `year`, split(`date`, '-')[1] as `month`, split(`date`, '-')[2] as `day`, c2f(temp) as temp, c2f(`max`) as temp_max, c2f(`min`) as temp_min, if(abs(999.9-wdsp) > 0.1, wdsp, null) as wind_speed, if(abs(99.9-prcp) > 0.1, prcp, null) as rainfall from data_2023
  3. where `name` != 'NAME' and `date` is not null;
复制代码
        可以继续查看下数据洗濯效果
    

    

3、盘算数据

        基础数据已经存在了,如今就是按照需求写hql或者mr,简单的可以写hql,复杂的可以搞mr,我自己随便想了几个玩玩
   1. 每个国家每天的平均气温,最高,最低温度
  2. 每个站点每月的平均气温,最高,最低温度,平均风速,最大,最小风速
  3. 每个国家每月的平均降水量
  4. 每个国家最大降水量的月份及降水量   
  以下的是hql
  1. use myhive;
  2. -- 每个国家每天的平均气温,最高,最低温度
  3. create table day_country_temp comment '国家每天统计表' row format delimited fields terminated by '\t' as
  4.     select area, `date`, avg(temp) as temp, max(temp_max) as temp_max, min(temp_min) as temp_min from clear_data_2023 group by area, `date`;
  5. -- 每个站点每月的平均气温,最高,最低温度,平均风速,最大,最小风速
  6. create table day_station_monthly_data comment '每个站点每个月的数据' row format delimited fields terminated by '\t' as
  7.     select station, area , `month`, avg(temp) as temp, max(temp_max) as temp_max, min(temp_min) as temp_min, avg(wind_speed) as avg_wind_speed, min(wind_speed) as min_wind_speed, max(wind_speed) as max_wind_speed, avg(rainfall) as avg_rainfall, max(rainfall) as max_rainfall, min(rainfall) as min_rainfall from clear_data_2023 group by station, area, `month`;
  8. -- 每个国家每月的平均降水量
  9. create table monthly_area_rainfall row format delimited fields terminated by '\t' as
  10.     select area, `month`, avg(avg_rainfall) as avg_rainfall from day_station_monthly_data group by area, `month`;
  11. -- 每个国家最大降水量的月份及降水量
  12. create table area_max_rainfall_data row format delimited fields terminated by '\t' as
  13.     select t.area, a.month, t.max_rainfall from monthly_area_rainfall a join
  14.     (select area, max(avg_rainfall) as max_rainfall from monthly_area_rainfall group by area) t on a.area = t.area and a.avg_rainfall = t.max_rainfall;
复制代码
4、数据传输

        我这是将数据从hive导入到mysql
        4.1 mysql初始化

        先创建好mysql的库  my_test ;
  1. -- 先创建好数据库my_test
  2. use my_test;
  3. create table if not exists `day_country_temp` ( area varchar(20), `date` varchar(20), temp decimal(10,2), temp_max decimal(10,2), temp_min decimal(10,2)) comment '国家每天统计表';
  4. create table if not exists `day_station_monthly_data` ( station varchar(20), area varchar(20), `month` int(4), temp decimal(10,2), temp_max decimal(10,2), temp_min decimal(10,2), avg_wind_speed decimal(10,2), min_wind_speed decimal(10,2), max_wind_speed decimal(10,2), avg_rainfall decimal(10,2), max_rainfall decimal(10,2), min_rainfall decimal(10,2)) comment '每个站点每个月的数据';
  5. create table if not exists `monthly_area_rainfall` ( area varchar(20), `month` int(4), avg_rainfall decimal(10,2)) comment '每个国家每月的平均降水量';
  6. create table if not exists `area_max_rainfall_data` ( area varchar(20), `month` int(4), max_rainfall decimal(10,2)) comment '每个国家最大降水量的月份及降水量';
复制代码
        4.2 通过sqoop传输数据

        这里各人留意修改路径,myql的毗连地址,我这没设置暗码,如有必要可以在 username后加上        --userpassword xxx
   --input-null-string '\\N' --input-null-non-string '\\N'     ----   传输过程中可能会出现  \N 标题,这里加上可以将 \N 转为空存储
  1. /home/chao/soft/sqoop/sqoop/bin/sqoop export --connect jdbc:mysql://myhadoop1:3306/my_test --username root --table day_country_temp --export-dir /user/hive/warehouse/myhive.db/day_country_temp --input-fields-terminated-by '\t' --input-null-string '\\N' --input-null-non-string '\\N'
  2. /home/chao/soft/sqoop/sqoop/bin/sqoop export --connect jdbc:mysql://myhadoop1:3306/my_test --username root --table day_station_monthly_data --export-dir /user/hive/warehouse/myhive.db/day_station_monthly_data --input-fields-terminated-by '\t' --input-null-string '\\N' --input-null-non-string '\\N'
  3. /home/chao/soft/sqoop/sqoop/bin/sqoop export --connect jdbc:mysql://myhadoop1:3306/my_test --username root --table monthly_area_rainfall --export-dir /user/hive/warehouse/myhive.db/monthly_area_rainfall --input-fields-terminated-by '\t' --input-null-string '\\N' --input-null-non-string '\\N'
  4. /home/chao/soft/sqoop/sqoop/bin/sqoop export --connect jdbc:mysql://myhadoop1:3306/my_test --username root --table area_max_rainfall_data --export-dir /user/hive/warehouse/myhive.db/area_max_rainfall_data --input-fields-terminated-by '\t' --input-null-string '\\N' --input-null-non-string '\\N'
复制代码
5、通过azkaban一键走完流程

        前四章节是为第五步打的基础,各人可以参考玩玩,上面所有的步骤假如都实行完,就可以体会到,每次都手动上传,去敲命令有点麻烦,这次的洗濯是一次性的,假如后续有增量需求,必要每天定时跑,就会力不从心,所以就必要通过azkaban这样的平台来实行。
        5.1 编写job

        我分为了几个步骤,供各人参考
   1. 上传jar包,资源数据;
  2. 初始化hive库表,洗濯数据
  3. 数据盘算
  4. 数据传输
  -- mysql初始化只要在第四步之前即可
         一共5个job,之间有相互的依赖关系,实行的sql,脚本基本全是上面步骤的
        uploadjar.job       
  1. type=command
  2. command=sh upload.sh
复制代码
        init.job
  1. type=command
  2. command=/home/chao/soft/hive/bin/hive -f init.sql
  3. dependencies=uploadjar
复制代码
        calculate.job
  1. type=command
  2. command=/home/chao/soft/hive/bin/hive -f calculate.sql
  3. dependencies=init
复制代码
        mysqlinit.job
  1. type=command
  2. command=sh mysqlinit.sh
复制代码
        sqoop.job
  1. type=command
  2. command=sh sqoop.sh
  3. dependencies=mysqlinit,calculate
复制代码
        5.2 创建项目实行

        在azkaban上创建项目,上传压缩包,并实行
        上传上去可看到job依赖关系
                                ​​​​​​​        

          实行之后便是以下效果,mysql中也已经有数据,后续可以做统计大屏等
        

        

6. 结束语及相关脚本

        初次玩hadoop,有标题欢迎各人探讨,多多包涵。放上我实行的azkaban压缩包,各人改下路径,数据库链接相关内容,可以直接运行。
   通过网盘分享的文件:mr.zip
链接: https://pan.baidu.com/s/1DvnQVnUL6B8S1bXC-C8eHw 提取码: f7dz

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4