datax做增量导入数据到hive:mysql>hive

打印 上一主题 下一主题

主题 847|帖子 847|积分 2541

为什么要做增量导入? 例如mysql表中的数据导入hive,如果第一天抽取了mysql中t_user表中的全部数据,则第二天只必要抽取新增数据即可! 增长导入是利用where 条件查询实现的,查询条件一般是自增的id或者时间列 下面演示基于时间列的数据增量抽取。
1.数据准备

  1. # 1. 在mysql数据库创建如下表结构:
  2. create table t_order(
  3.     id               int   primary key auto_increment,
  4.     amt              decimal(10,2),
  5.     `status`         int  default 0,
  6.     user_id          int,
  7.     create_time      timestamp DEFAULT CURRENT_TIMESTAMP,
  8.     modify_time      timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
  9. );
  10. # 2.插入数据
  11. insert into t_order values(null,100,0,1001,'2023-07-01 10:10:10','2023-07-01 10:10:10');
  12. insert into t_order values(null,99,0,1002,'2023-07-01 10:10:10','2023-07-01 10:10:10');
  13. select *
  14. from t_order;
  15. -- 2.在hive创建如下表结构
  16. create table t_order(
  17.         id                    int,
  18.         amt                   decimal(10,2),
  19.         `status`              int,
  20.         user_id               int,
  21.         create_time           string,
  22.         modify_time           string
  23. )partitioned by (dt string)
  24. row format delimited  fields terminated by '\t';
  25. -- 手动添加分区
  26. alter table t_order add partition (dt='2023-07-01');
  27. show partitions t_order;
复制代码
2.编写增量数据导入datax配置文件

   {
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://hadoop11:3306/test1"],
                                "querySql": [
                                    "select id,amt,status,user_id,create_time,modify_time from t_order where DATE_FORMAT(modify_time, '%Y-%m-%d') = '$dt'"
                                ]
                            }
                        ],
                        "password": "123456",
                        "username": "root",
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                                {"name": "id","type": "int"},
                                {"name": "amt","type": "double"},
                                {"name": "status","type": "int"},
                                {"name": "user_id","type": "int"},
                                {"name": "create_time","type": "string"},
                                {"name": "modify_time","type": "string"}
                       ],
                        "defaultFS": "hdfs://hdfs-cluster", 
                        "hadoopConfig":{
                                "dfs.nameservices": "hdfs-cluster",
                                "dfs.ha.namenodes.hdfs-cluster": "nn1,nn2",
                                "dfs.namenode.rpc-address.hdfs-cluster.nn1": "hadoop11:8020",
                                "dfs.namenode.rpc-address.hdfs-cluster.nn2": "hadoop12:8020",
                                "dfs.client.failover.proxy.provider.hdfs-cluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
                        },
                        "path": "/user/hive/warehouse/t_order/dt=$dt",
                        "fieldDelimiter": "\t",
                        "fileName": "t_order",
                        "fileType": "text",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

  3.测试增量导入

 执行:python /opt/installs/datax/bin/datax.py /opt/installs/datax/job/test_job.json -p "-Ddt='2023-07-01'"  原有的两条数据已经导入证明json文件无误

  1. # 再添加到mysql一天的测试数据
  2. insert into t_order values(null,220,0,1001,'2023-07-02 10:10:10','2023-07-02 10:10:10');
  3. update t_order set `status` = 2 , modify_time = '2023-07-02 11:00:00' where id = 2;
复制代码

 
  1. -- 手动创建hive分区 2023-07-02
  2. alter table t_order add partition (dt='2023-07-02');
复制代码
 执行:python /opt/installs/datax/bin/datax.py /opt/installs/datax/job/test_job.json -p "-Ddt='2023-07-02'" 
查询结果:已经完成数据导入


4.编写对应的shell脚本执行下令

  1. #! /bin/bash
  2. # 1. 要求用户提供日期如果没有提供,则使用昨天日期
  3. dt=$1
  4. if [ 'x'$1 == 'x' ];then
  5.   dt=$(date -d'-1 day' +%Y-%m-%d)
  6. fi
  7. # 2. 查询dt对应日期的分区是否存在,默认返回结果表里面的列名需要去掉,只保留表中的数据赋值给x1变量
  8. x1=$(hive -e "set hive.cli.print.header=false;show partitions t_order partition(dt='$dt')")
  9. # 3. 如果x1变量等于空,说明分区不存在,则创建分区
  10. echo $x1
  11. if [ "$x1" == "" ]
  12. then
  13.   hive -e "alter table t_order add partition(dt='$dt')"
  14. fi
  15. # 4. 执行py文件
  16. python /opt/installs/datax/bin/datax.py -p "-Ddt=$dt" /opt/installs/datax/job/test_job.json
复制代码
 测试shell脚本,mysql增长2023-07-03的数据增量导入到hive:
  1. insert into t_order values(null,330,0,1003,'2023-07-03 10:10:10','2023-07-03 10:10:10');
  2. update t_order set `status` = 2 , modify_time = '2023-07-03 11:00:00' where id = 3;
复制代码


测试成功!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

傲渊山岳

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表