二百二十九、离线数仓——离线数仓Hive从Kafka、MySQL到ClickHouse的完整开 ...

打印 上一主题 下一主题

主题 937|帖子 937|积分 2811

一、目的

为了整理离线数仓开辟的全流程,算是温故知新吧
离线数仓的数据源是Kafka和MySQL数据库,Kafka存业务数据,MySQL存维度数据
采集工具是Kettle和Flume,Flume采集Kafka数据,Kettle采集MySQL数据
离线数仓是Hive
目标数据库是ClickHouse
任务调度器是海豚
二、数据采集

(一)Flume采集Kafka数据

1、Flume设置文件

## agent a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1
## configure source s1
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.27:9092
a1.sources.s1.kafka.topics = topic_b_queue
a1.sources.s1.kafka.consumer.group.id = queue_group
a1.sources.s1.kafka.consumer.auto.offset.reset = latest
a1.sources.s1.batchSize = 1000
## configure channel c1
## a1.channels.c1.type = memory
## a1.channels.c1.capacity = 10000
## a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/data/flumeData/checkpoint/queue
a1.channels.c1.dataDirs = /home/data/flumeData/flumedata/queue
## configure sink k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hurys23:8020/user/hive/warehouse/hurys_dc_ods.db/ods_queue/day=%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = queue
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 1200000000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 60
a1.sinks.k1.hdfs.minBlockReplicas = 1
a1.sinks.k1.hdfs.fileType = SequenceFile
a1.sinks.k1.hdfs.codeC = gzip
## Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

2、用海豚调度Flume任务

#!/bin/bash
source /etc/profile
/usr/local/hurys/dc_env/flume/flume190/bin/flume-ng agent -n a1 -f /usr/local/hurys/dc_env/flume/flume190/conf/queue.properties

3、目标路径


(二)Kettle采集MySQL维度数据

1、Kettle任务设置


2、用海豚调度Kettle任务

#!/bin/bash
source /etc/profile
/usr/local/hurys/dc_env/kettle/data-integration/pan.sh -rep=hurys_linux_kettle_repository -user=admin -pass=admin -dir=/mysql_to_hdfs/ -trans=23_MySQL_to_HDFS_tb_radar_lane level=Basic >>/home/log/kettle/23_MySQL_to_HDFS_tb_radar_lane_`date +%Y%m%d`.log 

3、目标路径


三、ODS层

(一)业务数据表

  1. use hurys_dc_ods;
  2. create external table  if not exists  ods_queue(
  3.     queue_json  string
  4. )
  5. comment '静态排队数据表——静态分区'
  6. partitioned by (day string)
  7. stored as SequenceFile
  8. ;
复制代码
  1. --刷新表分区
  2. msck repair table ods_queue;
  3. --查看表分区
  4. show partitions ods_queue;
  5. --查看表数据
  6. select * from ods_queue;
复制代码

(二)维度数据表

  1. use hurys_dc_basic;
  2. create  external  table  if not exists  tb_device_scene(
  3.     id        int      comment '主键id',
  4.     device_no string   comment '设备编号',
  5.     scene_id  string   comment '场景编号'
  6. )
  7. comment '雷达场景表'
  8. row format delimited fields terminated by ','
  9. stored as  textfile  location '/data/tb_device_scene'
  10. tblproperties("skip.header.line.count"="1") ;
  11. --查看表数据
  12. select * from hurys_dc_basic.tb_device_scene;
复制代码

四、DWD层

(一)业务数据清洗

1、业务数据的JSON有多层

  1. --1、静态排队数据内部表——动态分区  dwd_queue
  2. create  table  if not exists  dwd_queue(
  3.     device_no    string          comment '设备编号',
  4.     lane_num     int             comment '车道数量',
  5.     create_time  timestamp       comment '创建时间',
  6.     lane_no      int             comment '车道编号',
  7.     lane_type    int             comment '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',
  8.     queue_count  int             comment '排队车辆数',
  9.     queue_len    decimal(10,2)   comment '排队长度(m)',
  10.     queue_head   decimal(10,2)   comment '排队第一辆车距离停止线距离(m)',
  11.     queue_tail   decimal(10,2)   comment '排队最后一辆车距离停止线距离(m)'
  12. )
  13. comment '静态排队数据表——动态分区'
  14. partitioned by (day string)
  15. stored as orc
  16. ;
  17. --动态插入数据
  18. with t1 as(
  19. select
  20.        get_json_object(queue_json,'$.deviceNo')   device_no,
  21.        get_json_object(queue_json,'$.createTime') create_time,
  22.        get_json_object(queue_json,'$.laneNum')    lane_num,
  23.        get_json_object(queue_json,'$.queueList')  queue_list
  24. from hurys_dc_ods.ods_queue
  25.     )
  26. insert  overwrite  table  hurys_dc_dwd.dwd_queue partition(day)
  27. select
  28.         t1.device_no,
  29.         t1.lane_num,
  30.         substr(create_time,1,19)                                               create_time ,
  31.         get_json_object(list_json,'$.laneNo')                                  lane_no,
  32.         get_json_object(list_json,'$.laneType')                                lane_type,
  33.         get_json_object(list_json,'$.queueCount')                              queue_count,
  34.         cast(get_json_object(list_json,'$.queueLen')   as decimal(10,2))       queue_len,
  35.         cast(get_json_object(list_json,'$.queueHead')  as decimal(10,2))       queue_head,
  36.         cast(get_json_object(list_json,'$.queueTail')  as decimal(10,2))       queue_tail,
  37.         date(t1.create_time) day
  38. from t1
  39. lateral view explode(split(regexp_replace(regexp_replace(queue_list,
  40.                                                 '\\[|\\]','') ,   --将json数组两边的中括号去掉
  41.                             '\\}\\,\\{','\\}\\;\\{'),  --将json数组元素之间的逗号换成分号
  42.                   '\\;') --以分号作为分隔符(split函数以分号作为分隔)
  43.           )list_queue as list_json
  44. where  device_no is not null  and create_time is not null and  get_json_object(list_json,'$.queueLen') between 0 and 500
  45. and  get_json_object(list_json,'$.queueHead')  between 0 and 500 and  get_json_object(list_json,'$.queueTail')  between 0 and 500 and  get_json_object(list_json,'$.queueCount') between 0 and 100
  46. group by t1.device_no, t1.lane_num, substr(create_time,1,19), get_json_object(list_json,'$.laneNo'), get_json_object(list_json,'$.laneType'), get_json_object(list_json,'$.queueCount'), cast(get_json_object(list_json,'$.queueLen')   as decimal(10,2)), cast(get_json_object(list_json,'$.queueHead')  as decimal(10,2)), cast(get_json_object(list_json,'$.queueTail')  as decimal(10,2)), date(t1.create_time)
  47. ;
  48. --查看分区
  49. show partitions dwd_queue;
  50. --查看数据
  51. select * from dwd_queue
  52. where day='2024-03-11';
  53. --删掉表分区
  54. alter table hurys_dc_dwd.dwd_queue drop partition (day='2024-03-11');
复制代码
2、业务数据的JSON只有一层

  1. --2、转向比数据内部表——动态分区  dwd_turnratio
  2. create  table  if not exists  dwd_turnratio(
  3.     device_no       string        comment '设备编号',
  4.     cycle           int           comment '转向比数据周期' ,
  5.     create_time     timestamp     comment '创建时间',
  6.     volume_sum      int           comment '指定时间段内通过路口的车辆总数',
  7.     speed_avg       decimal(10,2) comment '指定时间段内通过路口的所有车辆速度的平均值',
  8.     volume_left     int           comment '指定时间段内通过路口的左转车辆总数',
  9.     speed_left      decimal(10,2) comment '指定时间段内通过路口的左转车辆速度的平均值',
  10.     volume_straight int           comment '指定时间段内通过路口的直行车辆总数',
  11.     speed_straight  decimal(10,2) comment '指定时间段内通过路口的直行车辆速度的平均值',
  12.     volume_right    int           comment '指定时间段内通过路口的右转车辆总数',
  13.     speed_right     decimal(10,2) comment '指定时间段内通过路口的右转车辆速度的平均值',
  14.     volume_turn     int           comment '指定时间段内通过路口的掉头车辆总数',
  15.     speed_turn      decimal(10,2) comment '指定时间段内通过路口的掉头车辆速度的平均值'
  16. )
  17. comment '转向比数据表——动态分区'
  18. partitioned by (day string)   --分区字段不能是表中已经存在的数据,可以将分区字段看作表的伪列。
  19. stored as orc                 --表存储数据格式为orc
  20. ;
  21. --动态插入数据
  22. --解析json字段、去重、非空、volumeSum>=0
  23. --speed_avg、speed_left、speed_straight、speed_right、speed_turn 等字段保留两位小数
  24. --0<=volume_sum<=1000、0<=speed_avg<=150、0<=volume_left<=1000、0<=speed_left<=100、0<=volume_straight<=1000
  25. --0<=speed_straight<=150、0<=volume_right<=1000、0<=speed_right<=100、0<=volume_turn<=100、0<=speed_turn<=100
  26. with t1 as(
  27. select
  28.         get_json_object(turnratio_json,'$.deviceNo')        device_no,
  29.         get_json_object(turnratio_json,'$.cycle')           cycle,
  30.         get_json_object(turnratio_json,'$.createTime')      create_time,
  31.         get_json_object(turnratio_json,'$.volumeSum')       volume_sum,
  32.         cast(get_json_object(turnratio_json,'$.speedAvg')     as decimal(10,2))    speed_avg,
  33.         get_json_object(turnratio_json,'$.volumeLeft')      volume_left,
  34.         cast(get_json_object(turnratio_json,'$.speedLeft')    as decimal(10,2))    speed_left,
  35.         get_json_object(turnratio_json,'$.volumeStraight')  volume_straight,
  36.         cast(get_json_object(turnratio_json,'$.speedStraight')as decimal(10,2))    speed_straight,
  37.         get_json_object(turnratio_json,'$.volumeRight')     volume_right,
  38.         cast(get_json_object(turnratio_json,'$.speedRight')   as decimal(10,2))    speed_right ,
  39.         case when  get_json_object(turnratio_json,'$.volumeTurn')  is null then 0 else get_json_object(turnratio_json,'$.volumeTurn')  end as   volume_turn ,
  40.         case when  get_json_object(turnratio_json,'$.speedTurn')   is null then 0 else cast(get_json_object(turnratio_json,'$.speedTurn')as decimal(10,2))   end as   speed_turn
  41. from hurys_dc_ods.ods_turnratio)
  42. insert overwrite table hurys_dc_dwd.dwd_turnratio partition (day)
  43. select
  44.        t1.device_no,
  45.        cycle,
  46.        substr(create_time,1,19)              create_time ,
  47.        volume_sum,
  48.        speed_avg,
  49.        volume_left,
  50.        speed_left,
  51.        volume_straight,
  52.        speed_straight ,
  53.        volume_right,
  54.        speed_right ,
  55.        volume_turn,
  56.        speed_turn,
  57.        date(create_time) day
  58. from t1
  59. where device_no is not null and volume_sum between 0 and 1000 and speed_avg between 0 and 150 and volume_left  between 0 and 1000
  60. and speed_left between 0 and 100 and volume_straight between 0 and 1000 and speed_straight between 0 and 150
  61. and volume_right between 0 and 1000 and speed_right between 0 and 100 and volume_turn between 0 and 100 and speed_turn between 0 and 100
  62. group by t1.device_no, cycle, substr(create_time,1,19), volume_sum, speed_avg, volume_left, speed_left, volume_straight, speed_straight, volume_right, speed_right, volume_turn, speed_turn, date(create_time)
  63. ;
  64. --查看分区
  65. show partitions dwd_turnratio;
  66. --查看数据
  67. select * from hurys_dc_dwd.dwd_turnratio
  68. where day='2024-03-11';
  69. --删掉表分区
  70. alter table hurys_dc_dwd.dwd_turnratio drop partition (day='2024-03-11');
复制代码
(二)维度数据清洗

  1. create table if not exists  dwd_radar_lane(
  2.     device_no         string  comment '雷达编号',
  3.     lane_no           string  comment '车道编号',
  4.     lane_id           string  comment '车道id',
  5.     lane_direction    string  comment '行驶方向',
  6.     lane_type         int     comment '车道类型 0渠化,1来向路段,2出口,3去向路段,4路口,5非路口路段,6其他',
  7.     lane_length       float   comment '车道长度',
  8.     lane_type_name    string  comment '车道类型名称'
  9. )
  10. comment '雷达车道信息表'
  11. stored as orc
  12. ;
  13. --create table if not exists  dwd_radar_lane  stored as orc as
  14. --加载数据
  15. insert overwrite table  hurys_dc_dwd.dwd_radar_lane
  16. select
  17. device_no, lane_no, lane_id, lane_direction, lane_type,lane_length ,
  18.        case when lane_type='0' then '渠化'
  19.             when lane_type='1' then '来向路段'
  20.             when lane_type='2' then '出口'
  21.             when lane_type='3' then '去向路段'
  22.        end as lane_type_name
  23. from hurys_dc_basic.tb_radar_lane
  24. where lane_length is not null
  25. group by device_no, lane_no, lane_id, lane_direction, lane_type, lane_length
  26. ;
  27. --查看表数据
  28. select * from hurys_dc_dwd.dwd_radar_lane;
复制代码
五、DWS层

  1. create  table  if not exists  dws_statistics_volume_1hour(
  2.     device_no        string         comment '设备编号',
  3.     scene_name       string         comment '场景名称',
  4.     lane_no          int            comment '车道编号',
  5.     lane_direction   string         comment '车道流向',
  6.     section_no       int            comment '断面编号',
  7.     device_direction string         comment '雷达朝向',
  8.     sum_volume_hour  int            comment '每小时总流量',
  9.     start_time       timestamp      comment '开始时间'
  10. )
  11. comment '统计数据流量表——动态分区——1小时周期'
  12. partitioned by (day string)
  13. stored as orc
  14. ;
  15. --动态加载数据  --两个一起 1m41s 、 convert.join=false  1m43s、
  16. --注意字段顺序  查询语句中字段顺序与建表字段顺序一致
  17. insert  overwrite  table  hurys_dc_dws.dws_statistics_volume_1hour  partition(day)
  18. select
  19.        dwd_st.device_no,
  20.        dwd_sc.scene_name,
  21.        dwd_st.lane_no,
  22.        dwd_rl.lane_direction,
  23.        dwd_st.section_no,
  24.        dwd_rc.device_direction,
  25.        sum(volume_sum) sum_volume_hour,
  26.        concat(substr(create_time, 1, 14), '00:00') start_time,
  27.        day
  28. from hurys_dc_dwd.dwd_statistics as dwd_st
  29.     right join hurys_dc_dwd.dwd_radar_lane as dwd_rl
  30.               on dwd_rl.device_no=dwd_st.device_no and dwd_rl.lane_no=dwd_st.lane_no
  31.     right join hurys_dc_dwd.dwd_device_scene as dwd_ds
  32.               on dwd_ds.device_no=dwd_st.device_no
  33.     right join hurys_dc_dwd.dwd_scene as dwd_sc
  34.               on dwd_sc.scene_id = dwd_ds.scene_id
  35.     right join hurys_dc_dwd.dwd_radar_config as dwd_rc
  36.               on dwd_rc.device_no=dwd_st.device_no
  37. where dwd_st.create_time is not null
  38. group by dwd_st.device_no, dwd_sc.scene_name, dwd_st.lane_no, dwd_rl.lane_direction, dwd_st.section_no, dwd_rc.device_direction, concat(substr(create_time, 1, 14), '00:00'), day
  39. ;
  40. --查看分区
  41. show partitions dws_statistics_volume_1hour;
  42. --查看数据
  43. select * from hurys_dc_dws.dws_statistics_volume_1hour
  44. where day='2024-02-29';
复制代码
六、ADS层

这里的ADS层,其实就是用Kettle把Hive的DWS层效果数据同步到ClickHouse中,也是一个Kettle任务而已

如许用海豚进行调度每一层的任务,整个离线数仓流程就跑起来了
七、海豚调度任务(除了2个采集任务外)


(一)delete_stale_data(根据删除策略删除ODS层原始数据)

#! /bin/bash
source /etc/profile
nowdate=`date --date='0 days ago' "+%Y%m%d"`
day_30_ago_date=`date -d "30 day ago " +%Y-%m-%d`
#静态列队数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_queue/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_queue/day=${day_30_ago_date}
fi
#轨迹数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_track/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_track/day=${day_30_ago_date}
fi
#动态列队数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_queue_dynamic/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_queue_dynamic/day=${day_30_ago_date}
fi
#区域数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_area/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_area/day=${day_30_ago_date}
fi
#事件数据
hadoop fs -test -e /user/hive/warehouse/hurys_dc_ods.db/ods_event/day=${day_30_ago_date}
if [ $? -ne 0 ]; then
    echo "文件不存在"
else 
    hdfs dfs -rm -r /user/hive/warehouse/hurys_dc_ods.db/ods_event/day=${day_30_ago_date}
fi
#删除表分区
hive -e "
use hurys_dc_ods;
alter table hurys_dc_ods.ods_area drop partition (day='$day_30_ago_date');
alter table hurys_dc_ods.ods_event drop partition (day='$day_30_ago_date');
alter table hurys_dc_ods.ods_queue drop partition (day='$day_30_ago_date');
alter table hurys_dc_ods.ods_queue_dynamic drop partition (day='$day_30_ago_date');
alter table hurys_dc_ods.ods_track drop partition (day='$day_30_ago_date')
"
(二)flume(Flume采集Kafka业务数据)

(三)create_database_table(主动创建Hive和ClickHouse的库表)

1、创建Hive库表

#! /bin/bash
source /etc/profile
hive -e "
source  1_dws.sql
"

2、创建ClickHouse库表

#! /bin/bash
source /etc/profile
clickhouse-client --user default --password hurys@123 -d default --multiquery <1_ads.sql

(四)hive_dws(DWS层任务)

#! /bin/bash
source /etc/profile
nowdate=`date --date='0 days ago' "+%Y%m%d"`
yesdate=`date -d yesterday +%Y-%m-%d`
hive -e "
use hurys_dc_dws;
set hive.vectorized.execution.enabled=false;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;
set hive.exec.max.dynamic.partitions=2000;
    
            
insert  overwrite  table  hurys_dc_dws.dws_statistics_volume_1hour  partition(day='$yesdate')
select
       dwd_st.device_no,
       dwd_sc.scene_name,
       dwd_st.lane_no,
       dwd_rl.lane_direction,
       dwd_st.section_no,
       dwd_rc.device_direction,
       sum(volume_sum) sum_volume_hour,
       concat(substr(create_time, 1, 14), '00:00') start_time
from hurys_dc_dwd.dwd_statistics as dwd_st
    right join hurys_dc_dwd.dwd_radar_lane as dwd_rl
              on dwd_rl.device_no=dwd_st.device_no and dwd_rl.lane_no=dwd_st.lane_no
    right join hurys_dc_dwd.dwd_device_scene as dwd_ds
              on dwd_ds.device_no=dwd_st.device_no
    right join hurys_dc_dwd.dwd_scene as dwd_sc
              on dwd_sc.scene_id = dwd_ds.scene_id
    right join hurys_dc_dwd.dwd_radar_config as dwd_rc
              on dwd_rc.device_no=dwd_st.device_no
where dwd_st.create_time is not null  and  day= '$yesdate'
group by dwd_st.device_no, dwd_sc.scene_name, dwd_st.lane_no, dwd_rl.lane_direction, dwd_st.section_no, dwd_rc.device_direction, concat(substr(create_time, 1, 14), '00:00')    
"
(五)hive_basic(维度表基础库)

#! /bin/bash
source /etc/profile
hive -e "
set hive.vectorized.execution.enabled=false;
use hurys_dc_basic
"

(六)dolphinscheduler_log(删除海豚日记文件)

#! /bin/bash
source /etc/profile
nowdate=`date --date='0 days ago' "+%Y%m%d"`
yesdate=`date -d yesterday +%Y-%m-%d`
cd  /usr/local/hurys/dc_env/dolphinscheduler/dolphin/logs/
rm -rf dolphinscheduler-api.$yesdate*.log
rm -rf dolphinscheduler-master.$yesdate*.log
rm -rf dolphinscheduler-worker.$yesdate*.log

(七)Kettle_Hive_to_ClickHouse(Kettle采集Hive的DWS层数据同步到ClickHouse的ADS层中)

#!/bin/bash
source /etc/profile
/usr/local/hurys/dc_env/kettle/data-integration/pan.sh -rep=hurys_linux_kettle_repository -user=admin -pass=admin -dir=/hive_to_clickhouse/ -trans=17_Hive_to_ClickHouse_ads_avg_volume_15min level=Basic >>/home/log/kettle/17_Hive_to_ClickHouse_ads_avg_volume_15min_`date +%Y%m%d`.log 

(八)Kettle_MySQL_to_HDFS(Kettle采集MySQL维度表数据到HDFS中)

(九)hive_dwd(DWD层任务)

1、业务数据的JSON有多层

#! /bin/bash
source /etc/profile
nowdate=`date --date='0 days ago' "+%Y%m%d"`
yesdate=`date -d yesterday +%Y-%m-%d`
hive -e "
use hurys_dc_dwd;
set hive.vectorized.execution.enabled=false;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;
set hive.exec.max.dynamic.partitions=1500;
with t1 as(
select
       get_json_object(queue_json,'$.deviceNo')   device_no,
       get_json_object(queue_json,'$.createTime') create_time,
       get_json_object(queue_json,'$.laneNum')    lane_num,
       get_json_object(queue_json,'$.queueList')  queue_list
from hurys_dc_ods.ods_queue
where date(get_json_object(queue_json,'$.createTime')) = '$yesdate'
    )
insert  overwrite  table  hurys_dc_dwd.dwd_queue partition(day='$yesdate')
select
        t1.device_no,
        t1.lane_num,
        substr(create_time,1,19)                                               create_time ,
        get_json_object(list_json,'$.laneNo')                                  lane_no,
        get_json_object(list_json,'$.laneType')                                lane_type,
        get_json_object(list_json,'$.queueCount')                              queue_count,
        cast(get_json_object(list_json,'$.queueLen')   as decimal(10,2))       queue_len,
        cast(get_json_object(list_json,'$.queueHead')  as decimal(10,2))       queue_head,
        cast(get_json_object(list_json,'$.queueTail')  as decimal(10,2))       queue_tail
from t1
lateral view explode(split(regexp_replace(regexp_replace(queue_list,
                                                '\\\\[|\\\\]','') ,      --将json数组两边的中括号去掉
                                 '\\\\}\\\\,\\\\{','\\\\}\\\\;\\\\{'),   --将json数组元素之间的逗号换成分号
                   '\\\\;')   --以分号作为分隔符(split函数以分号作为分隔)
          )list_queue as list_json
where  device_no is not null  and  get_json_object(list_json,'$.queueLen') between 0 and 500 and  get_json_object(list_json,'$.queueHead')  between 0 and 500 and  get_json_object(list_json,'$.queueTail')  between 0 and 500 and  get_json_object(list_json,'$.queueCount') between 0 and 100
group by t1.device_no, t1.lane_num, substr(create_time,1,19), get_json_object(list_json,'$.laneNo'), get_json_object(list_json,'$.laneType'), get_json_object(list_json,'$.queueCount'), cast(get_json_object(list_json,'$.queueLen')   as decimal(10,2)), cast(get_json_object(list_json,'$.queueHead')  as decimal(10,2)), cast(get_json_object(list_json,'$.queueTail')  as decimal(10,2))
"
2、业务数据的JSON单层

#! /bin/bash
source /etc/profile
nowdate=`date --date='0 days ago' "+%Y%m%d"`
yesdate=`date -d yesterday +%Y-%m-%d`
hive -e "
use hurys_dc_dwd;
set hive.vectorized.execution.enabled=false;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;
set hive.exec.max.dynamic.partitions=1500;
with t1 as(
select
        get_json_object(turnratio_json,'$.deviceNo')        device_no,
        get_json_object(turnratio_json,'$.cycle')           cycle,
        get_json_object(turnratio_json,'$.createTime')      create_time,
        get_json_object(turnratio_json,'$.volumeSum')       volume_sum,
        cast(get_json_object(turnratio_json,'$.speedAvg')     as decimal(10,2))    speed_avg,
        get_json_object(turnratio_json,'$.volumeLeft')      volume_left,
        cast(get_json_object(turnratio_json,'$.speedLeft')    as decimal(10,2))    speed_left,
        get_json_object(turnratio_json,'$.volumeStraight')  volume_straight,
        cast(get_json_object(turnratio_json,'$.speedStraight')as decimal(10,2))    speed_straight,
        get_json_object(turnratio_json,'$.volumeRight')     volume_right,
        cast(get_json_object(turnratio_json,'$.speedRight')   as decimal(10,2))    speed_right ,
        case when  get_json_object(turnratio_json,'$.volumeTurn')  is null then 0 else get_json_object(turnratio_json,'$.volumeTurn')  end as   volume_turn ,
        case when  get_json_object(turnratio_json,'$.speedTurn')   is null then 0 else cast(get_json_object(turnratio_json,'$.speedTurn')as decimal(10,2))   end as   speed_turn
from hurys_dc_ods.ods_turnratio
where date(get_json_object(turnratio_json,'$.createTime')) = '$yesdate'
)
insert overwrite table hurys_dc_dwd.dwd_turnratio partition (day='$yesdate')
select
       t1.device_no,
       cycle,
       substr(create_time,1,19)              create_time ,
       volume_sum,
       speed_avg,
       volume_left,
       speed_left,
       volume_straight,
       speed_straight ,
       volume_right,
       speed_right ,
       volume_turn,
       speed_turn
from t1
where device_no is not null and volume_sum between 0 and 1000 and speed_avg between 0 and 150 and volume_left  between 0 and 1000 and speed_left between 0 and 100 and volume_straight between 0 and 1000 and speed_straight between 0 and 150 and volume_right between 0 and 1000 and speed_right between 0 and 100 and volume_turn between 0 and 100 and speed_turn between 0 and 100
group by t1.device_no, cycle, substr(create_time,1,19), volume_sum, speed_avg, volume_left, speed_left, volume_straight, speed_straight, volume_right, speed_right, volume_turn, speed_turn
"
3、维度数据

#! /bin/bash
source /etc/profile
hive -e "
use hurys_dc_dwd;
set hive.vectorized.execution.enabled=false;
insert overwrite table hurys_dc_dwd.dwd_holiday
select
day, holiday,year
from hurys_dc_basic.tb_holiday
group by day, holiday, year
"
(十)hive_ods(ODS层任务)

#! /bin/bash
source /etc/profile
hive -e "
use hurys_dc_ods;
msck repair table ods_queue;
msck repair table ods_turnratio;
msck repair table ods_queue_dynamic;
msck repair table ods_statistics;
msck repair table ods_area;
msck repair table ods_pass;
msck repair table ods_track;
msck repair table ods_evaluation;
msck repair table ods_event;
"


目前,整个离线数仓的流程大抵就是如许,有题目的后面再完善!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

美食家大橙子

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表