阿里DataX极简教程

打印 上一主题 下一主题

主题 921|帖子 921|积分 2763

目次

简介

DataX是一个数据同步工具,可以将数据从一个地方读取出来并以极快的速度写入另外一个地方。常见的如将mysql中的数据同步到另外一个mysql中,大概另外一个mongodb中。
工作流程


  • read:设置一个源,DataX从源读取数据
  • write:设置一个目标地,DataX将读取到的数据写入目标地
  • setting:同步设置,如设置并发通道、控制作业速度等
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术标题
  • 多线程:充分利用多线程来处理同步任务
核心架构


核心模块介绍

1:DataX完成单个数据同步的作业,我们称之为Job,DataX担当到一个Job之后,将启动一个历程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业盘算转化为多个子Task)、TaskGroup管理等功能。
2:DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
3:切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的全部Task,默认单个任务组的并发数量为5
4:每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作
5:DataX作业运行起来之后, Job监控并等候多个TaskGroup模块任务完成,等候全部TaskGroup任务完成后Job成功退出。否则,异常退出,历程退出值非0
DataX调度流程

举例来说,用户提交了一个DataX作业,而且配置了20个并发,目标是将一个100张分表的mysql数据同步到odps内里。 DataX的调度决议思路是:

  • DaXJob根据分库分表切分成了100个Task。
  • 根据20个并发,DataX盘算共须要分配4个TaskGroup。
  • 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
支持的数据

类型数据源Reader(读)Writer(写)文档RDBMS 关系型数据库MySQL√√Oracle√√OceanBase√√SQLServer√√PostgreSQL√√DRDS√√达梦√√通用RDBMS(支持全部关系型数据库)√√阿里云数仓数据存储ODPS√√ADS√OSS√√OCS√√NoSQL数据存储OTS√√Hbase0.94√√Hbase1.1√√MongoDB√√Hive√√无结构化数据存储TxtFile√√FTP√√HDFS√√Elasticsearch√实践

作为极简教程,本文将从mysql中读取一张表的数据,然后同步到clickhouse中。
下载

打开该项目标Github 首页进行下载:https://github.com/alibaba/DataX
下载链接:https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz
下载下来是一个tar.gz的包,windows下解压下令:
  1. tar  -zxvf  xxx.tar.gz
复制代码
程序目次:

  • bin:利用内里的 datax.py 来启动程序
  • job:内里放了一个job.json,用来检查运行情况,一般的建议下载完毕之后执行一次。
  • log:存放执行日志
  • plugin:插件集,插件分为read和write,分别对应datax可支持的数据库
  • 其他目次:......
情况

DataX是基于python和java的,须要呆板拥有python和java 的运行情况。
在下载完毕后,通过执行自检脚本,可确认情况是否精确
  1.  python {YOUR_DATAX_HOME}/bin/datax.py {YOUR_DATAX_HOME}/job/job.json
复制代码
执行流程

编写同步任务配置文件,在job目次中创建 mysql-to-clickhouse.json 文件,并填入如下内容
  1. {
  2.     "job": {
  3.         "setting": {
  4.             "speed": {
  5.                 "channel": 3
  6.             },
  7.             "errorLimit": {
  8.                 "record": 0,
  9.                 "percentage": 0.02
  10.             }
  11.         },
  12.         "content": [
  13.             {
  14.                 "reader": {
  15.                     "name": "mysqlreader",
  16.                     "parameter": {
  17.                         "username": "xxx",
  18.                         "password": "xxx",
  19.                         "column": [
  20.                             "id",
  21.                             "name"
  22.                         ],
  23.                         "splitPk": "id",
  24.                         "connection": [
  25.                             {
  26.                                 "table": [
  27.                                     "table_name"
  28.                                 ],
  29.                                 "jdbcUrl": [
  30.                                     "jdbc:mysql://192.168.1.xxx:xxx/db_name"
  31.                                 ]
  32.                             }
  33.                         ]
  34.                     }
  35.                 },
  36.                 "writer": {
  37.                     "name": "clickhousewriter",
  38.                     "parameter": {
  39.                         "username": "xxx",
  40.                         "password": "xxx",
  41.                         "column": [
  42.                             "id",
  43.                             "ame"
  44.                         ],
  45.                         "connection": [
  46.                             {
  47.                                 "jdbcUrl": "jdbc:clickhouse://192.168.1.xxx:xxx/table_name",
  48.                                 "table": [
  49.                                     "table_name"
  50.                                 ]
  51.                             }
  52.                         ],
  53.                         "preSql": [],
  54.                         "postSql": [],
  55.                         "batchSize": 65536,
  56.                         "batchByteSize": 134217728,
  57.                         "dryRun": false,
  58.                         "writeMode": "insert"
  59.                     }
  60.                 }
  61.             }
  62.         ]
  63.     }
  64. }
复制代码

  • job:一个job包含两个部分,setting中设置任务的执行速度,错误限制等,content中是任务具体的形貌。
  • reader:任务的数据输入源
  • writer:任务的数据输出源
根据任务配置文件启动datax,先cd到datax的根目次
  1. python bin/datax.py    job/mysql-to-clickhouse.json
复制代码
运行上述下令后,任务就开启了。本例从mysql数据库中的一张表中读取了两个字段(id,name),然后同步到clickhouse中,clickhouse中须要先创建同样的库,表和列。
任务执行非常快,140W数据仅用了 18s 就完成了同步。
  1. 2024-05-16 16:24:57.312 [job-0] INFO  JobContainer -
  2. 任务启动时刻                    : 2024-05-16 16:24:38
  3. 任务结束时刻                    : 2024-05-16 16:24:57
  4. 任务总计耗时                    :                 18s
  5. 任务平均流量                    :            2.21MB/s
  6. 记录写入速度                    :         142425rec/s
  7. 读出记录总数                    :             1424252
  8. 读写失败总数                    :                   0
复制代码
引用

技术交流QQ群:1158377441 欢迎关注我的微信公众号【TechnologyRamble】,后续博文将在公众号首发:

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

莫张周刘王

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表