一、安装datax
通过https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz这个地址下载datax.tar.gz,解压到某个目录,如我的目录/opt/conf/datax-20230301,解压完后会在当前目录下生成datax目录,进入datax目录后的目录如下图所示:
之后在datax安装目录下,运行以下命令,赋予执行权限。二、测试datax是否正确安装
- /opt/conf/datax-20230301/datax/bin/datax.py /opt/conf/datax-20230301/datax/job/job.json
复制代码 运行以上命令,看是否能正确启动,启动后运行完结果如下图:
如果那个正确运行,说明/opt/conf/datax-20230301/datax/bin/datax.py这个文件的编码不是utf-8,需要重新编码。用我这个替换一下即可正常使用。
datax.py
三、编写配置文件
在datax安装目录下的job文件夹,使用以下命令新建配置文件- vim job_air_data_source_mysql_hdfs.json
复制代码 之后将下面的json文件内容拷贝粘贴到刚才打开的文件,保存即可。- {
- "job": {
- "setting": {
- "speed": {
- "channel": 3
- },
- "errorLimit": {
- "record": 0,
- "percentage": 0.02
- }
- },
- "content": [
- {
- "reader": {
- "name": "mysqlreader",
- "parameter": {
- "username": "root",
- "password": "root",
- "column": ["*"],
- "splitPk": "id",
- "connection": [
- {
- "table": [
- "air_data_source"
- ],
- "jdbcUrl": [
- "jdbc:mysql://master:3306/air_data"
- ]
- }
- ]
- }
- },
- "writer": {
- "name": "hdfswriter",
- "parameter": {
- "defaultFS": "hdfs://master:9820",
- "fileType": "TEXT",
- "path": "/user/hive/warehouse/air_data.db/air_data_source",
- "fileName": "air_data_source_202302",
- "column": [
- {"name": " id ","type": "STRING"},
- {"name": " airlinelogo ","type": "STRING"},
- {"name": " airlineshortcompany ","type": "STRING"},
- {"name": " arractcross ","type": "STRING"},
- {"name": " arracttime ","type": "STRING"},
- {"name": " arrairport ","type": "STRING"},
- {"name": " arrcode ","type": "STRING"},
- {"name": " arrontimerate ","type": "STRING"},
- {"name": " arrplancross ","type": "STRING"},
- {"name": " arrplantime ","type": "STRING"},
- {"name": " arrterminal ","type": "STRING"},
- {"name": " checkintable ","type": "STRING"},
- {"name": " checkintablewidth ","type": "STRING"},
- {"name": " depactcross ","type": "STRING"},
- {"name": " depacttime ","type": "STRING"},
- {"name": " depairport ","type": "STRING"},
- {"name": " depcode ","type": "STRING"},
- {"name": " depplancross ","type": "STRING"},
- {"name": " depplantime ","type": "STRING"},
- {"name": " depterminal ","type": "STRING"},
- {"name": " flightno ","type": "STRING"},
- {"name": " flightstate ","type": "STRING"},
- {"name": " localdate ","type": "STRING"},
- {"name": " mainflightno ","type": "STRING"},
- {"name": " shareflag ","type": "STRING"},
- {"name": " statecolor ","type": "STRING"}
- ],
- "writeMode": "truncate",
- "fieldDelimiter": "\u0001",
- "compress":"GZIP"
- }
- }
- }
- ]
- }
- }
复制代码 四、Hive建数据库、数据表
- create database air_data;
- use air_data;
- CREATE TABLE `air_data_source`(
- `id` int COMMENT '主键',
- `airlinelogo` string COMMENT '航空公司logo',
- `airlineshortcompany` string COMMENT '航空公司简称',
- `arractcross` string,
- `arracttime` string COMMENT '实际起飞时间',
- `arrairport` string,
- `arrcode` string,
- `arrontimerate` string COMMENT '到达准点率',
- `arrplancross` string,
- `arrplantime` string COMMENT '计划到达时间',
- `arrterminal` string,
- `checkintable` string,
- `checkintablewidth` string,
- `depactcross` string,
- `depacttime` string COMMENT '实际到达时间',
- `depairport` string COMMENT '到达机场名称',
- `depcode` string COMMENT '到达机场代码',
- `depplancross` string,
- `depplantime` string COMMENT '计划起飞时间',
- `depterminal` string,
- `flightno` string COMMENT '航班号',
- `flightstate` string COMMENT '航班状态',
- `localdate` string,
- `mainflightno` string,
- `shareflag` string,
- `statecolor` string)
- COMMENT '航空数据原始表'
- ROW FORMAT SERDE
- 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
- STORED AS INPUTFORMAT
- 'org.apache.hadoop.mapred.TextInputFormat'
- OUTPUTFORMAT
- 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
- ;
复制代码 运行完以上任务后,接着可以进行数据抽取了。
四、运行任务
在当前目录下执行以下命令:- /opt/conf/datax-20230301/datax/bin/datax.py /opt/conf/datax-20230301/datax/job/job_air_data_source_mysql_hdfs.json
复制代码 即可正确启动数据同步任务,运行完结果如下:
查看HDFS上是否已经有了数据文件,运行一下命令,得到输出。- hadoop fs -ls hdfs://master:9820/user/hive/warehouse/air_data.db/air_data_source
复制代码
至此,利用datax将mysql数据同步到hdfs任务已配置完成。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |