ToB企服应用市场:ToB评测及商务社交产业平台

标题: 5、DataX(DataX简介、DataX架构原理、DataX部署、利用、同步MySQL数据到HD [打印本页]

作者: 惊落一身雪    时间: 2024-6-15 01:59
标题: 5、DataX(DataX简介、DataX架构原理、DataX部署、利用、同步MySQL数据到HD
1、DataX简介

1.1 DataX概述

  1. DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
复制代码
源码地址:https://github.com/alibaba/DataX
1.2 DataX支持的数据源

DataX现在已经有了比较全面的插件体系,主流的RDBMS数据库、NOSQL、大数据盘算系统都已经接入,现在支持数据如下图。

2、DataX架构原理

2.1 DataX设计理念

为相识决异构数据源同步问题,DataX将复杂的网状的同步链路酿成了星型数据链路,DataX作为中间传输载体负责毗连各种数据源。当需要接入一个新的数据源的时间,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

2.2 DataX框架设计

DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。
Reader:数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer:数据写入模块,负责不断向Framework取数据,并将数据写入到目标端。
Framework:用于毗连Reader和Writer,作为两者的数据传输通道,并处理缓存,流控,并发,数据转换等核心技术问题。
2.3 DataX运行流程

下面用一个DataX作业生命周期的时序图说明DataX的运行流程、核心概念以及每个概念之间的关系。

2.4 DataX调理决策思路

举例来说,用户提交了一个DataX作业,并且配置了总的并发度为20,目标是对一个有100张分表的mysql数据源举行同步。DataX的调理决策思路是:
1)DataX Job根据分库分表切分策略,将同步工作分成100个Task。
2)根据配置的总的并发度20,以及每个Task Group的并发度5,DataX盘算共需要分配4个TaskGroup。
3)4个TaskGroup中分100个Task,每一个TaskGroup负责运行25个Task。
2.5 DataX和Sqoop对比


3、DataX部署

1、下载DataX安装包并上传到hadoop102的/opt/software
下载地址:http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
2、解压datax.tar.gz到/opt/module
  1. tar -zxvf datax.tar.gz -C /opt/module/
复制代码
3、自检,执行如下命令
  1. python /opt/module/datax/bin/datax.py /opt/module/datax/job/job.json
复制代码
4、出现如下内容,则表明安装乐成

4、DataX利用

4.1 DataX利用概述

4.1.1 DataX任务提交命令

Datax的利用非常简单,用户只需要根据本身同步数据的数据源和目标地选择相应的Reader和Writer,并将Reader和Writer的信息配置在一个json文件中,然后执行如下命令提交数据同步任务即可。
  1. python bin/datax.py path/to/your/job.json
复制代码
4.1.2 DataX配置文件格式

可以利用如下定名检察DataX配置文件模板。
  1. python bin/datax.py -r mysqlreader -w hdfswriter
复制代码
配置文件模板如下,json最外层是一个job,job包罗setting和content两部门,其中setting用于对整个job举行配置,content用户配置数据源和目标地。

4.2 同步MySQL数据到HDFS案例

案例要求:同步gmall数据库中base_province表数据到HDFS的/base_province目次
需求分析:要实现该功能,需选用MySQLReader和HDFSWriter,MySQLReader具有两种模式分别是TableMode和QuerySQLMode,前者利用table,column,where等属性声明需要同步的数据;后者利用一条SQL查询语句声明需要同步的数据。
下面分别利用两种模式举行演示。
4.2.1 MySQLReader之TableMode

1、编写配置文件
(1)创建配置文件base_province.json
  1. vim /opt/module/datax/job/base_province.json
复制代码
(2)配置文件内容如下
  1. {
  2.     "job": {
  3.         "content": [
  4.             {
  5.                 "reader": {
  6.                     "name": "mysqlreader",
  7.                     "parameter": {
  8.                         "column": [
  9.                             "id",
  10.                             "name",
  11.                             "region_id",
  12.                             "area_code",
  13.                             "iso_code",
  14.                             "iso_3166_2"
  15.                         ],
  16.                         "where": "id>=3",
  17.                         "connection": [
  18.                             {
  19.                                 "jdbcUrl": [
  20.                                     "jdbc:mysql://hadoop102:3306/gmall"
  21.                                 ],
  22.                                 "table": [
  23.                                     "base_province"
  24.                                 ]
  25.                             }
  26.                         ],
  27.                         "password": "000000",
  28.                         "splitPk": "",
  29.                         "username": "root"
  30.                     }
  31.                 },
  32.                 "writer": {
  33.                     "name": "hdfswriter",
  34.                     "parameter": {
  35.                         "column": [
  36.                             {
  37.                                 "name": "id",
  38.                                 "type": "bigint"
  39.                             },
  40.                             {
  41.                                 "name": "name",
  42.                                 "type": "string"
  43.                             },
  44.                             {
  45.                                 "name": "region_id",
  46.                                 "type": "string"
  47.                             },
  48.                             {
  49.                                 "name": "area_code",
  50.                                 "type": "string"
  51.                             },
  52.                             {
  53.                                 "name": "iso_code",
  54.                                 "type": "string"
  55.                             },
  56.                             {
  57.                                 "name": "iso_3166_2",
  58.                                 "type": "string"
  59.                             }
  60.                         ],
  61.                         "compress": "gzip",
  62.                         "defaultFS": "hdfs://hadoop102:8020",
  63.                         "fieldDelimiter": "\t",
  64.                         "fileName": "base_province",
  65.                         "fileType": "text",
  66.                         "path": "/base_province",
  67.                         "writeMode": "append"
  68.                     }
  69.                 }
  70.             }
  71.         ],
  72.         "setting": {
  73.             "speed": {
  74.                 "channel": 1
  75.             }
  76.         }
  77.     }
  78. }
复制代码
2、配置文件说明
(1)Reader参数说明

(2)Writer参数说明

注意事项:
HFDS Writer并未提供nullFormat参数:也就是用户并不能自界说null值写到HFDS文件中的存储格式。默认情况下,HFDS Writer会将null值存储为空字符串(‘’),而Hive默认的null值存储格式为\N。所以后期将DataX同步的文件导入Hive表就会出现问题。
(3)Setting参数说明

3、提交任务
(1)在HDFS创建/base_province目次
利用DataX向HDFS同步数据时,需确保目标路径已存在
  1. hadoop fs -mkdir /base_province
复制代码
(2)进入DataX根目次
(3)执行如下命令
  1. python bin/datax.py job/base_province.json
复制代码
4、检察结果
(1)DataX打印日志

(2)检察HDFS文件
  1. hadoop fs -cat /base_province/* | zcat
复制代码
4.2.2 MySQLReader之QuerySQLMode

1、编写配置文件
(1)修改配置文件base_province.json
(2)配置文件内容如下
  1. {
  2.     "job": {
  3.         "content": [
  4.             {
  5.                 "reader": {
  6.                     "name": "mysqlreader",
  7.                     "parameter": {
  8.                         "connection": [
  9.                             {
  10.                                 "jdbcUrl": [
  11.                                     "jdbc:mysql://hadoop102:3306/gmall"
  12.                                 ],
  13.                                 "querySql": [
  14.                                     "select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
  15.                                 ]
  16.                             }
  17.                         ],
  18.                         "password": "000000",
  19.                         "username": "root"
  20.                     }
  21.                 },
  22.                 "writer": {
  23.                     "name": "hdfswriter",
  24.                     "parameter": {
  25.                         "column": [
  26.                             {
  27.                                 "name": "id",
  28.                                 "type": "bigint"
  29.                             },
  30.                             {
  31.                                 "name": "name",
  32.                                 "type": "string"
  33.                             },
  34.                             {
  35.                                 "name": "region_id",
  36.                                 "type": "string"
  37.                             },
  38.                             {
  39.                                 "name": "area_code",
  40.                                 "type": "string"
  41.                             },
  42.                             {
  43.                                 "name": "iso_code",
  44.                                 "type": "string"
  45.                             },
  46.                             {
  47.                                 "name": "iso_3166_2",
  48.                                 "type": "string"
  49.                             }
  50.                         ],
  51.                         "compress": "gzip",
  52.                         "defaultFS": "hdfs://hadoop102:8020",
  53.                         "fieldDelimiter": "\t",
  54.                         "fileName": "base_province",
  55.                         "fileType": "text",
  56.                         "path": "/base_province",
  57.                         "writeMode": "append"
  58.                     }
  59.                 }
  60.             }
  61.         ],
  62.         "setting": {
  63.             "speed": {
  64.                 "channel": 1
  65.             }
  66.         }
  67.     }
  68. }
复制代码
2、配置文件说明
(1)Reader参数说明

3、提交任务
(1)清空历史数据
  1. hadoop fs -rm -r -f /base_province/*
复制代码
(2)进入DataX根目次
(3)执行如下命令
  1. python bin/datax.py job/base_province.json
复制代码
4、检察结果
(1)DataX打印日志

(2)检察HDFS文件
  1. hadoop fs -cat /base_province/* | zcat
复制代码
4.2.3 DataX传参

通常情况下,离线数据同步任务需要逐日定时重复执行,故HDFS上的目标路径通常会包罗一层日期,以对逐日同步的数据加以区分,也就是说逐日同步数据的目标路径不是固定不变的,因此DataX配置文件中HDFS Writer的path参数的值应该是动态的。为实现这一结果,就需要利用DataX传参的功能。
DataX传参的用法如下,在JSON配置文件中利用${param}引用参数,在提交任务时利用-p"-Dparam=value"传入参数值,具体示比方下。
1、编写配置文件
(1)修改配置文件base_province.json
(2)配置文件内容如下
  1. {
  2.     "job": {
  3.         "content": [
  4.             {
  5.                 "reader": {
  6.                     "name": "mysqlreader",
  7.                     "parameter": {
  8.                         "connection": [
  9.                             {
  10.                                 "jdbcUrl": [
  11.                                     "jdbc:mysql://hadoop102:3306/gmall"
  12.                                 ],
  13.                                 "querySql": [
  14.                                     "select id,name,region_id,area_code,iso_code,iso_3166_2 from base_province where id>=3"
  15.                                 ]
  16.                             }
  17.                         ],
  18.                         "password": "000000",
  19.                         "username": "root"
  20.                     }
  21.                 },
  22.                 "writer": {
  23.                     "name": "hdfswriter",
  24.                     "parameter": {
  25.                         "column": [
  26.                             {
  27.                                 "name": "id",
  28.                                 "type": "bigint"
  29.                             },
  30.                             {
  31.                                 "name": "name",
  32.                                 "type": "string"
  33.                             },
  34.                             {
  35.                                 "name": "region_id",
  36.                                 "type": "string"
  37.                             },
  38.                             {
  39.                                 "name": "area_code",
  40.                                 "type": "string"
  41.                             },
  42.                             {
  43.                                 "name": "iso_code",
  44.                                 "type": "string"
  45.                             },
  46.                             {
  47.                                 "name": "iso_3166_2",
  48.                                 "type": "string"
  49.                             }
  50.                         ],
  51.                         "compress": "gzip",
  52.                         "defaultFS": "hdfs://hadoop102:8020",
  53.                         "fieldDelimiter": "\t",
  54.                         "fileName": "base_province",
  55.                         "fileType": "text",
  56.                         "path": "/base_province/${dt}",
  57.                         "writeMode": "append"
  58.                     }
  59.                 }
  60.             }
  61.         ],
  62.         "setting": {
  63.             "speed": {
  64.                 "channel": 1
  65.             }
  66.         }
  67.     }
  68. }
复制代码
2、提交任务
(1)创建目标路径
  1. hadoop fs -mkdir /base_province
  2. /2020-06-14
复制代码
(2)进入DataX根目次
(3)执行如下命令
  1. python bin/datax.py -p"-Ddt=2020-06-14" job/base_province.json
复制代码
3、检察结果
  1. hadoop fs -ls /base_province
复制代码
4.3 同步HDFS数据到MySQL案例

案例要求:同步HDFS上的/base_province目次下的数据到MySQL gmall 数据库下的test_province表。
需求分析:要实现该功能,需选用HDFSReader和MySQLWriter。
1、编写配置文件
(1)创建配置文件test_province.json
(2)配置文件内容如下
  1. {
  2.     "job": {
  3.         "content": [
  4.             {
  5.                 "reader": {
  6.                     "name": "hdfsreader",
  7.                     "parameter": {
  8.                         "defaultFS": "hdfs://hadoop102:8020",
  9.                         "path": "/base_province",
  10.                         "column": [
  11.                             "*"
  12.                         ],
  13.                         "fileType": "text",
  14.                         "compress": "gzip",
  15.                         "encoding": "UTF-8",
  16.                         "nullFormat": "\\N",
  17.                         "fieldDelimiter": "\t",
  18.                     }
  19.                 },
  20.                 "writer": {
  21.                     "name": "mysqlwriter",
  22.                     "parameter": {
  23.                         "username": "root",
  24.                         "password": "000000",
  25.                         "connection": [
  26.                             {
  27.                                 "table": [
  28.                                     "test_province"
  29.                                 ],
  30.                                 "jdbcUrl": "jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=utf-8"
  31.                             }
  32.                         ],
  33.                         "column": [
  34.                             "id",
  35.                             "name",
  36.                             "region_id",
  37.                             "area_code",
  38.                             "iso_code",
  39.                             "iso_3166_2"
  40.                         ],
  41.                         "writeMode": "replace"
  42.                     }
  43.                 }
  44.             }
  45.         ],
  46.         "setting": {
  47.             "speed": {
  48.                 "channel": 1
  49.             }
  50.         }
  51.     }
  52. }
复制代码
2、配置文件说明
(1)Reader参数说明

(2)Writer参数说明

3、提交任务
(1)在MySQL中创建gmall.test_province表
  1. DROP TABLE IF EXISTS `test_province`;
  2. CREATE TABLE `test_province`  (
  3.   `id` bigint(20) NOT NULL,
  4.   `name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  5.   `region_id` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  6.   `area_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  7.   `iso_code` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  8.   `iso_3166_2` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  9.   PRIMARY KEY (`id`)
  10. ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
复制代码
(2)进入DataX根目次
(3)执行如下命令
  1. python bin/datax.py job/test_province.json
复制代码
4、检察结果
(1)DataX打印日志
(2)检察MySQL目标表数据

5、DataX优化

5.1 速度控制

DataX3.0提供了包括通道(并发)、记录流、字节省三种流控模式,可以随意控制你的作业速度,让你的作业在数据库可以蒙受的范围内达到最佳的同步速度。

注意事项:
1.若配置了总record限速,则必须配置单个channel的record限速
2.若配置了总byte限速,则必须配置单个channe的byte限速
3.若配置了总record限速和总byte限速,channel并发数参数就会失效。因为配置了总record限速和总byte限速之后,现实channel并发数是通过盘算得到的:
盘算公式为:
min(总byte限速/单个channel的byte限速,总record限速/单个channel的record限速)
5.2 内存调解

当提升DataX Job内Channel并发数时,内存的占用会明显增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。比方Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部门Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,需调大JVM的堆内存。
发起将内存设置为4G或者8G,这个也可以根据现实情况来调解。
调解JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时间,加上对应的参数,如下:
python datax/bin/datax.py --jvm=“-Xms8G -Xmx8G” /path/to/your/job.json

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4