为什么要做增量导入? 例如mysql表中的数据导入hive,如果第一天抽取了mysql中t_user表中的全部数据,则第二天只必要抽取新增数据即可! 增长导入是利用where 条件查询实现的,查询条件一般是自增的id或者时间列 下面演示基于时间列的数据增量抽取。
1.数据准备
- # 1. 在mysql数据库创建如下表结构:
- create table t_order(
- id int primary key auto_increment,
- amt decimal(10,2),
- `status` int default 0,
- user_id int,
- create_time timestamp DEFAULT CURRENT_TIMESTAMP,
- modify_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
- );
- # 2.插入数据
- insert into t_order values(null,100,0,1001,'2023-07-01 10:10:10','2023-07-01 10:10:10');
- insert into t_order values(null,99,0,1002,'2023-07-01 10:10:10','2023-07-01 10:10:10');
- select *
- from t_order;
- -- 2.在hive创建如下表结构
- create table t_order(
- id int,
- amt decimal(10,2),
- `status` int,
- user_id int,
- create_time string,
- modify_time string
- )partitioned by (dt string)
- row format delimited fields terminated by '\t';
- -- 手动添加分区
- alter table t_order add partition (dt='2023-07-01');
- show partitions t_order;
复制代码 2.编写增量数据导入datax配置文件
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": ["jdbc:mysql://hadoop11:3306/test1"],
"querySql": [
"select id,amt,status,user_id,create_time,modify_time from t_order where DATE_FORMAT(modify_time, '%Y-%m-%d') = '$dt'"
]
}
],
"password": "123456",
"username": "root",
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{"name": "id","type": "int"},
{"name": "amt","type": "double"},
{"name": "status","type": "int"},
{"name": "user_id","type": "int"},
{"name": "create_time","type": "string"},
{"name": "modify_time","type": "string"}
],
"defaultFS": "hdfs://hdfs-cluster",
"hadoopConfig":{
"dfs.nameservices": "hdfs-cluster",
"dfs.ha.namenodes.hdfs-cluster": "nn1,nn2",
"dfs.namenode.rpc-address.hdfs-cluster.nn1": "hadoop11:8020",
"dfs.namenode.rpc-address.hdfs-cluster.nn2": "hadoop12:8020",
"dfs.client.failover.proxy.provider.hdfs-cluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
"path": "/user/hive/warehouse/t_order/dt=$dt",
"fieldDelimiter": "\t",
"fileName": "t_order",
"fileType": "text",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
3.测试增量导入
执行:python /opt/installs/datax/bin/datax.py /opt/installs/datax/job/test_job.json -p "-Ddt='2023-07-01'" 原有的两条数据已经导入证明json文件无误
- # 再添加到mysql一天的测试数据
- insert into t_order values(null,220,0,1001,'2023-07-02 10:10:10','2023-07-02 10:10:10');
- update t_order set `status` = 2 , modify_time = '2023-07-02 11:00:00' where id = 2;
复制代码
- -- 手动创建hive分区 2023-07-02
- alter table t_order add partition (dt='2023-07-02');
复制代码 执行:python /opt/installs/datax/bin/datax.py /opt/installs/datax/job/test_job.json -p "-Ddt='2023-07-02'"
查询结果:已经完成数据导入
4.编写对应的shell脚本执行下令
- #! /bin/bash
- # 1. 要求用户提供日期如果没有提供,则使用昨天日期
- dt=$1
- if [ 'x'$1 == 'x' ];then
- dt=$(date -d'-1 day' +%Y-%m-%d)
- fi
- # 2. 查询dt对应日期的分区是否存在,默认返回结果表里面的列名需要去掉,只保留表中的数据赋值给x1变量
- x1=$(hive -e "set hive.cli.print.header=false;show partitions t_order partition(dt='$dt')")
- # 3. 如果x1变量等于空,说明分区不存在,则创建分区
- echo $x1
- if [ "$x1" == "" ]
- then
- hive -e "alter table t_order add partition(dt='$dt')"
- fi
- # 4. 执行py文件
- python /opt/installs/datax/bin/datax.py -p "-Ddt=$dt" /opt/installs/datax/job/test_job.json
复制代码 测试shell脚本,mysql增长2023-07-03的数据增量导入到hive:
- insert into t_order values(null,330,0,1003,'2023-07-03 10:10:10','2023-07-03 10:10:10');
- update t_order set `status` = 2 , modify_time = '2023-07-03 11:00:00' where id = 3;
复制代码
测试成功!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |