ToB企服应用市场:ToB评测及商务社交产业平台

标题: DataX插件二次开发指南 [打印本页]

作者: 麻花痒    时间: 2023-2-7 11:53
标题: DataX插件二次开发指南
一、 DataX为什么要使用插件机制?

从设计之初,DataX就把异构数据源同步作为自身的使命,为了应对不同数据源的差异、同时提供一致的同步原语和扩展能力,DataX自然而然地采用了框架 + 插件 的模式:
作为插件开发人员,则需要关注两个问题:
二、插件视角看框架

逻辑执行模型

插件开发者基本只需要关注特定数据源系统的读和写,以及自己的代码在逻辑上是怎样被执行的,哪一个方法是在什么时候被调用的。开发之前需要明确以下概念:
简而言之, Job拆分成Task,在分别在框架提供的容器中执行,插件只需要实现Job和Task两部分逻辑
物理执行模型

框架为插件提供物理上的执行能力(线程)。DataX框架有三种运行模式:
当然,上述三种模式对插件的编写而言没有什么区别,你只需要避开一些小错误,插件就能够在单机/分布式之间无缝切换了。
当JobContainer和TaskGroupContainer运行在同一个进程内时,就是单机模式(Standalone和Local);当它们分布在不同的进程中执行时,就是分布式(Distributed)模式。
编程接口

Job和Task的逻辑是怎么对应到具体的代码中的?
首先,插件的入口类必须扩展Reader或Writer抽象类,并且实现分别实现Job和Task两个内部抽象类,Job和Task的实现必须是 内部类 的形式,原因见 加载原理 一节。以Reader为例:
  1. public class SomeReader extends Reader {
  2.     public static class Job extends Reader.Job {
  3.         @Override
  4.         public void init() {
  5.         }
  6.                
  7.                 @Override
  8.                 public void prepare() {
  9.         }
  10.         @Override
  11.         public List<Configuration> split(int adviceNumber) {
  12.             return null;
  13.         }
  14.         @Override
  15.         public void post() {
  16.         }
  17.         @Override
  18.         public void destroy() {
  19.         }
  20.     }
  21.     public static class Task extends Reader.Task {
  22.         @Override
  23.         public void init() {
  24.         }
  25.                
  26.                 @Override
  27.                 public void prepare() {
  28.         }
  29.         @Override
  30.         public void startRead(RecordSender recordSender) {
  31.         }
  32.         @Override
  33.         public void post() {
  34.         }
  35.         @Override
  36.         public void destroy() {
  37.         }
  38.     }
  39. }
复制代码
Job接口功能如下:
Task接口功能如下:
需要注意的是:
框架按照如下的顺序执行Job和Task的接口:

上图中,黄色表示Job部分的执行阶段,蓝色表示Task部分的执行阶段,绿色表示框架执行阶段。
相关类关系如下:

插件定义

代码写好了,有没有想过框架是怎么找到插件的入口类的?框架是如何加载插件的呢?
在每个插件的项目中,都有一个plugin.json文件,这个文件定义了插件的相关信息,包括入口类。例如:
  1. {
  2.     "name": "mysqlwriter",
  3.     "class": "com.alibaba.datax.plugin.writer.mysqlwriter.MysqlWriter",
  4.     "description": "Use Jdbc connect to database, execute insert sql.",
  5.     "developer": "alibaba"
  6. }
复制代码
打包发布

DataX使用assembly打包,assembly的使用方法请咨询谷哥或者度娘。打包命令如下:
  1. mvn clean package -DskipTests assembly:assembly
复制代码
DataX插件需要遵循统一的目录结构:
  1. ${DATAX_HOME}
  2. |-- bin      
  3. |   `-- datax.py
  4. |-- conf
  5. |   |-- core.json
  6. |   `-- logback.xml
  7. |-- lib
  8. |   `-- datax-core-dependencies.jar
  9. `-- plugin
  10.     |-- reader
  11.     |   `-- mysqlreader
  12.     |       |-- libs
  13.     |       |   `-- mysql-reader-plugin-dependencies.jar
  14.     |       |-- mysqlreader-0.0.1-SNAPSHOT.jar
  15.     |       `-- plugin.json
  16.     `-- writer
  17.         |-- mysqlwriter
  18.         |   |-- libs
  19.         |   |   `-- mysql-writer-plugin-dependencies.jar
  20.         |   |-- mysqlwriter-0.0.1-SNAPSHOT.jar
  21.         |   `-- plugin.json
  22.         |-- oceanbasewriter
  23.         `-- odpswriter
复制代码
插件目录分为reader和writer子目录,读写插件分别存放。插件目录规范如下:
尽管框架加载插件时,会把${PLUGIN_HOME}下所有的jar放到classpath,但还是推荐依赖库的jar和插件本身的jar分开存放。
注意:
插件的目录名字必须和plugin.json中定义的插件名称一致。
配置文件

DataX使用json作为配置文件的格式。一个典型的DataX任务配置如下:
  1. {
  2.   "job": {
  3.     "content": [
  4.       {
  5.         "reader": {
  6.           "name": "odpsreader",
  7.           "parameter": {
  8.             "accessKey": "",
  9.             "accessId": "",
  10.             "column": [""],
  11.             "isCompress": "",
  12.             "odpsServer": "",
  13.             "partition": [
  14.               ""
  15.             ],
  16.             "project": "",
  17.             "table": "",
  18.             "tunnelServer": ""
  19.           }
  20.         },
  21.         "writer": {
  22.           "name": "oraclewriter",
  23.           "parameter": {
  24.             "username": "",
  25.             "password": "",
  26.             "column": ["*"],
  27.             "connection": [
  28.               {
  29.                 "jdbcUrl": "",
  30.                 "table": [
  31.                   ""
  32.                 ]
  33.               }
  34.             ]
  35.           }
  36.         }
  37.       }
  38.     ]
  39.   }
  40. }
复制代码
DataX框架有core.json配置文件,指定了框架的默认行为。任务的配置里头可以指定框架中已经存在的配置项,而且具有更高的优先级,会覆盖core.json中的默认值。
配置中job.content.reader.parameter的value部分会传给Reader.Job;job.content.writer.parameter的value部分会传给Writer.Job ,Reader.Job和Writer.Job可以通过super.getPluginJobConf()来获取。
DataX框架支持对特定的配置项进行RSA加密,例子中以*开头的项目便是加密后的值。 配置项加密解密过程对插件是透明,插件仍然以不带*的key来查询配置和操作配置项
如何设计配置参数

配置文件的设计是插件开发的第一步!
任务配置中reader和writer下parameter部分是插件的配置参数,插件的配置参数应当遵循以下原则:
如何使用Configuration类

为了简化对json的操作,DataX提供了简单的DSL配合Configuration类使用。
Configuration提供了常见的get, 带类型get,带默认值get,set等读写配置项的操作,以及clone, toJSON等方法。配置项读写操作都需要传入一个path做为参数,这个path就是DataX定义的DSL。语法有两条:
比如操作如下json:
  1. {
  2.   "a": {
  3.     "b": {
  4.       "c": 2
  5.     },
  6.     "f": [
  7.       1,
  8.       2,
  9.       {
  10.         "g": true,
  11.         "h": false
  12.       },
  13.       4
  14.     ]
  15.   },
  16.   "x": 4
  17. }
复制代码
比如调用configuration.get(path)方法,当path为如下值的时候得到的结果为:
注意,因为插件看到的配置只是整个配置的一部分。使用Configuration对象时,需要注意当前的根路径是什么。
更多Configuration的操作请参考ConfigurationTest.java。
插件数据传输

跟一般的生产者-消费者模式一样,Reader插件和Writer插件之间也是通过channel来实现数据的传输的。channel可以是内存的,也可能是持久化的,插件不必关心。插件通过RecordSender往channel写入数据,通过RecordReceiver从channel读取数据。
channel中的一条数据为一个Record的对象,Record中可以放多个Column对象,这可以简单理解为数据库中的记录和列。
Record有如下方法:
  1. public interface Record {
  2.     // 加入一个列,放在最后的位置
  3.     void addColumn(Column column);
  4.     // 在指定下标处放置一个列
  5.     void setColumn(int i, final Column column);
  6.     // 获取一个列
  7.     Column getColumn(int i);
  8.     // 转换为json String
  9.     String toString();
  10.     // 获取总列数
  11.     int getColumnNumber();
  12.     // 计算整条记录在内存中占用的字节数
  13.     int getByteSize();
  14. }
复制代码
因为Record是一个接口,Reader插件首先调用RecordSender.createRecord()创建一个Record实例,然后把Column一个个添加到Record中。
Writer插件调用RecordReceiver.getFromReader()方法获取Record,然后把Column遍历出来,写入目标存储中。当Reader尚未退出,传输还在进行时,如果暂时没有数据RecordReceiver.getFromReader()方法会阻塞直到有数据。如果传输已经结束,会返回null,Writer插件可以据此判断是否结束startWrite方法。
Column的构造和操作,我们在《类型转换》一节介绍。
类型转换

为了规范源端和目的端类型转换操作,保证数据不失真,DataX支持六种内部数据类型:
对应地,有DateColumn、LongColumn、DoubleColumn、BytesColumn、StringColumn和BoolColumn六种Column的实现。
Column除了提供数据相关的方法外,还提供一系列以as开头的数据类型转换转换方法。

DataX的内部类型在实现上会选用不同的java类型:
内部类型实现类型备注Datejava.util.DateLongjava.math.BigInteger使用无限精度的大整数,保证不失真Doublejava.lang.String用String表示,保证不失真Bytesbyte[]Stringjava.lang.StringBooljava.lang.Boolean类型之间相互转换的关系如下:
from\toDateLongDoubleBytesStringBoolDate-使用毫秒时间戳不支持不支持使用系统配置的date/time/datetime格式转换不支持Long作为毫秒时间戳构造Date-BigInteger转为BigDecimal,然后BigDecimal.doubleValue()不支持BigInteger.toString()0为false,否则trueDouble不支持内部String构造BigDecimal,然后BigDecimal.longValue()-不支持直接返回内部StringBytes不支持不支持不支持-按照common.column.encoding配置的编码转换为String,默认utf-8不支持String按照配置的date/time/datetime/extra格式解析用String构造BigDecimal,然后取longValue()用String构造BigDecimal,然后取doubleValue(),会正确处理NaN/Infinity/-Infinity按照common.column.encoding配置的编码转换为byte[],默认utf-8-"true"为true, "false"为false,大小写不敏感。其他字符串不支持Bool不支持true为1L,否则0Ltrue为1.0,否则0.0不支持-脏数据处理

什么是脏数据?

目前主要有三类脏数据:
如何处理脏数据

在Reader.Task和Writer.Task中,通过AbstractTaskPlugin.getTaskPluginCollector()可以拿到一个TaskPluginCollector,它提供了一系列collectDirtyRecord的方法。当脏数据出现时,只需要调用合适的collectDirtyRecord方法,把被认为是脏数据的Record传入即可。
用户可以在任务的配置中指定脏数据限制条数或者百分比限制,当脏数据超出限制时,框架会结束同步任务,退出。插件需要保证脏数据都被收集到,其他工作交给框架就好。
加载原理

三、插件介绍文档

每个插件都必须在DataX官方wiki中有一篇文档,文档需要包括但不限于以下内容:

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4