Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除)Doris 中存储的数据。本文档先容 Flink 如何通过 Datastream 和 SQL 操作 Doris。
注意:
- 修改和删除只支持在 Unique Key 模型上
- 目前的删除是支持 Flink CDC 的方式接入数据实现自动删除,假如是其他数据接入的方式删除必要本身实现。Flink CDC 的数据删除利用方式参照本文档最后一节
版本兼容
Connector VersionFlink VersionDoris VersionJava VersionScala Version1.0.31.11,1.12,1.13,1.140.15+82.11,2.121.1.11.141.0+82.11,2.121.2.11.151.0+8-1.3.01.161.0+8-1.4.01.15,1.16,1.171.0+8-1.5.21.15,1.16,1.17,1.181.0+8-1.6.21.15,1.16,1.17,1.18,1.191.0+8-24.0.11.15,1.16,1.17,1.18,1.19,1.201.0+8- 利用
Maven
添加 flink-doris-connector
- <!-- flink-doris-connector -->
- <dependency>
- <groupId>org.apache.doris</groupId>
- <artifactId>flink-doris-connector-1.16</artifactId>
- <version>24.0.1</version>
- </dependency>
复制代码 备注
1.请根据差别的 Flink 版本替换对应的 Connector 和 Flink 依赖版本。
2.也可从这里下载相关版本 jar 包。
编译
编译时,可直接运行sh build.sh,具体可参考这里。
编译成功后,会在 dist 目录生成目的 jar 包,如:flink-doris-connector-24.0.0-SNAPSHOT.jar。 将此文件复制到 Flink 的 classpath 中即可利用 Flink-Doris-Connector 。比方, Local 模式运行的 Flink ,将此文件放入 lib/ 文件夹下。 Yarn 集群模式运行的 Flink ,则将此文件放入预摆设包中。
利用方法
读取
SQL
- -- doris source
- CREATE TABLE flink_doris_source (
- name STRING,
- age INT,
- price DECIMAL(5,2),
- sale DOUBLE
- )
- WITH (
- 'connector' = 'doris',
- 'fenodes' = 'FE_IP:HTTP_PORT',
- 'table.identifier' = 'database.table',
- 'username' = 'root',
- 'password' = 'password'
- );
复制代码 备注
Flink Connector 24.0.0 版本之后支持利用Arrow Flight SQL 读取数据
- CREATE TABLE doris_source (
- name STRING,
- age int
- )
- WITH (
- 'connector' = 'doris',
- 'fenodes' = 'FE_IP:HTTP_PORT',
- 'table.identifier' = 'database.table',
- 'source.use-flight-sql' = 'true',
- 'source.flight-sql-port' = '{fe.conf:arrow_flight_sql_port}',
- 'username' = 'root',
- 'password' = ''
- )
复制代码 DataStream
- DorisOptions.Builder builder = DorisOptions.builder()
- .setFenodes("FE_IP:HTTP_PORT")
- .setTableIdentifier("db.table")
- .setUsername("root")
- .setPassword("password");
- DorisSource<List<?>> dorisSource = DorisSource.<List<?>>builder()
- .setDorisOptions(builder.build())
- .setDorisReadOptions(DorisReadOptions.builder().build())
- .setDeserializer(new SimpleListDeserializationSchema())
- .build();
- env.fromSource(dorisSource, WatermarkStrategy.noWatermarks(), "doris source").print();
复制代码 写入
SQL
- -- enable checkpoint
- SET 'execution.checkpointing.interval' = '10s';
- -- doris sink
- CREATE TABLE flink_doris_sink (
- name STRING,
- age INT,
- price DECIMAL(5,2),
- sale DOUBLE
- )
- WITH (
- 'connector' = 'doris',
- 'fenodes' = 'FE_IP:HTTP_PORT',
- 'table.identifier' = 'db.table',
- 'username' = 'root',
- 'password' = 'password',
- 'sink.label-prefix' = 'doris_label'
- );
- -- submit insert job
- INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
复制代码 DataStream
DorisSink 是通过 StreamLoad 向 Doris 写入数据,DataStream 写入时,支持差别的序列化方法
String 数据流 (SimpleStringSerializer)
- // enable checkpoint
- env.enableCheckpointing(10000);
- // using batch mode for bounded data
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- DorisSink.Builder<String> builder = DorisSink.builder();
- DorisOptions.Builder dorisBuilder = DorisOptions.builder();
- dorisBuilder.setFenodes("FE_IP:HTTP_PORT")
- .setTableIdentifier("db.table")
- .setUsername("root")
- .setPassword("password");
- Properties properties = new Properties();
- // 上游是 json 写入时,需要开启配置
- //properties.setProperty("format", "json");
- //properties.setProperty("read_json_by_line", "true");
- DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
- executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
- .setDeletable(false)
- .setStreamLoadProp(properties);
- builder.setDorisReadOptions(DorisReadOptions.builder().build())
- .setDorisExecutionOptions(executionBuilder.build())
- .setSerializer(new SimpleStringSerializer()) //serialize according to string
- .setDorisOptions(dorisBuilder.build());
- //mock csv string source
- List<Tuple2<String, Integer>> data = new ArrayList<>();
- data.add(new Tuple2<>("doris",1));
- DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(data);
- source.map((MapFunction<Tuple2<String, Integer>, String>) t -> t.f0 + "\t" + t.f1)
- .sinkTo(builder.build());
- //mock json string source
- //env.fromElements("{"name":"zhangsan","age":1}").sinkTo(builder.build());
复制代码 RowData 数据流 (RowDataSerializer)
- // enable checkpoint
- env.enableCheckpointing(10000);
- // using batch mode for bounded data
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- //doris sink option
- DorisSink.Builder<RowData> builder = DorisSink.builder();
- DorisOptions.Builder dorisBuilder = DorisOptions.builder();
- dorisBuilder.setFenodes("FE_IP:HTTP_PORT")
- .setTableIdentifier("db.table")
- .setUsername("root")
- .setPassword("password");
- // json format to streamload
- Properties properties = new Properties();
- properties.setProperty("format", "json");
- properties.setProperty("read_json_by_line", "true");
- DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
- executionBuilder.setLabelPrefix("label-doris") //streamload label prefix
- .setDeletable(false)
- .setStreamLoadProp(properties); //streamload params
- //flink rowdata‘s schema
- String[] fields = {"city", "longitude", "latitude", "destroy_date"};
- DataType[] types = {DataTypes.VARCHAR(256), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DATE()};
- builder.setDorisReadOptions(DorisReadOptions.builder().build())
- .setDorisExecutionOptions(executionBuilder.build())
- .setSerializer(RowDataSerializer.builder() //serialize according to rowdata
- .setFieldNames(fields)
- .setType("json") //json format
- .setFieldType(types).build())
- .setDorisOptions(dorisBuilder.build());
- //mock rowdata source
- DataStream<RowData> source = env.fromElements("")
- .map(new MapFunction<String, RowData>() {
- @Override
- public RowData map(String value) throws Exception {
- GenericRowData genericRowData = new GenericRowData(4);
- genericRowData.setField(0, StringData.fromString("beijing"));
- genericRowData.setField(1, 116.405419);
- genericRowData.setField(2, 39.916927);
- genericRowData.setField(3, LocalDate.now().toEpochDay());
- return genericRowData;
- }
- });
- source.sinkTo(builder.build());
复制代码 CDC 数据流 (JsonDebeziumSchemaSerializer)
备注
上游数据必须符合Debezium数据格式。
- // enable checkpoint
- env.enableCheckpointing(10000);
- Properties props = new Properties();
- props.setProperty("format", "json");
- props.setProperty("read_json_by_line", "true");
- DorisOptions dorisOptions = DorisOptions.builder()
- .setFenodes("127.0.0.1:8030")
- .setTableIdentifier("test.t1")
- .setUsername("root")
- .setPassword("").build();
- DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
- executionBuilder.setLabelPrefix("label-prefix")
- .setStreamLoadProp(props).setDeletable(true);
- DorisSink.Builder<String> builder = DorisSink.builder();
- builder.setDorisReadOptions(DorisReadOptions.builder().build())
- .setDorisExecutionOptions(executionBuilder.build())
- .setDorisOptions(dorisOptions)
- .setSerializer(JsonDebeziumSchemaSerializer.builder().setDorisOptions(dorisOptions).build());
- env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
- .sinkTo(builder.build());
复制代码 完备代码参考: CDCSchemaChangeExample
Lookup Join
- CREATE TABLE fact_table (
- `id` BIGINT,
- `name` STRING,
- `city` STRING,
- `process_time` as proctime()
- ) WITH (
- 'connector' = 'kafka',
- ...
- );
- create table dim_city(
- `city` STRING,
- `level` INT ,
- `province` STRING,
- `country` STRING
- ) WITH (
- 'connector' = 'doris',
- 'fenodes' = '127.0.0.1:8030',
- 'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',
- 'table.identifier' = 'dim.dim_city',
- 'username' = 'root',
- 'password' = ''
- );
- SELECT a.id, a.name, a.city, c.province, c.country,c.level
- FROM fact_table a
- LEFT JOIN dim_city FOR SYSTEM_TIME AS OF a.process_time AS c
- ON a.city = c.city
复制代码 配置
通用配置项
KeyDefault ValueRequiredCommentfenodes–YDoris FE http 地址,支持多个地址,利用逗号分隔benodes–NDoris BE http 地址,支持多个地址,利用逗号分隔,参考#187jdbc-url–Njdbc 连接信息,如:jdbc:mysql://127.0.0.1:9030table.identifier–YDoris 表名,如:db.tblusername–Y访问 Doris 的用户名password–Y访问 Doris 的密码auto-redirecttrueN是否重定向 StreamLoad 请求。开启后 StreamLoad 将通过 FE 写入,不再显示获取 BE 信息doris.request.retries3N向 Doris 发送请求的重试次数doris.request.connect.timeout30sN向 Doris 发送请求的连接超时时间doris.request.read.timeout30sN向 Doris 发送请求的读取超时时间 Source 配置项
KeyDefault ValueRequiredCommentdoris.request.query.timeout21600sN查询 Doris 的超时时间,默认值为 6 小时doris.request.tablet.size1N一个 Partition 对应的 Doris Tablet 个数。此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。doris.batch.size1024N一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间创建连接的次数。从而减轻网络延伸所带来的额外时间开销。doris.exec.mem.limit8192mbN单个查询的内存限制。默认为 8GB,单元为字节doris.deserialize.arrow.asyncFALSEN是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatchdoris.deserialize.queue.size64N异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效source.use-flight-sqlFALSEN是否利用 Arrow Flight SQL 读取source.flight-sql-port-N利用 Arrow Flight SQL 读取时,FE 的 arrow_flight_sql_port DataStream 专有配置项
KeyDefault ValueRequiredCommentdoris.read.field–N读取 Doris 表的列名列表,多列之间利用逗号分隔doris.filter.query–N过滤读取数据的表达式,此表达式透传给 Doris。Doris 利用此表达式完成源端数据过滤。比如 age=18。 Sink 配置项
KeyDefault ValueRequiredCommentsink.label-prefix–YStream load 导入利用的 label 前缀。2pc 场景下要求全局唯一,用来保证 Flink 的 EOS 语义。sink.properties.*–NStream Load 的导入参数。 比方: ‘sink.properties.column_separator’ = ', ’ 定义列分隔符, ‘sink.properties.escape_delimiters’ = ‘true’ 特别字符作为分隔符,\x01会被转换为二进制的 0x01 。 JSON 格式导入 ‘sink.properties.format’ = ‘json’ ‘sink.properties.read_json_by_line’ = ‘true’ 详细参数参考这里。 Group Commit 模式 比方:‘sink.properties.group_commit’ = ‘sync_mode’ 设置 group commit 为同步模式。flink connector 从 1.6.2 开始支持导入配置 group commit ,详细利用和限制参考 group commit 。sink.enable-deleteTRUEN是否启用删除。此选项必要 Doris 表开启批量删除功能 (Doris0.15+ 版本默认开启),只支持 Unique 模型。sink.enable-2pcTRUEN是否开启两阶段提交 (2pc),默认为 true,保证 Exactly-Once 语义。关于两阶段提交可参考这里。sink.buffer-size1MBN写数据缓存 buffer 巨细,单元字节。不建议修改,默认配置即可sink.buffer-count3N写数据缓存 buffer 个数。不建议修改,默认配置即可sink.max-retries3NCommit 失败后的最大重试次数,默认 3 次sink.use-cachefalseN异常时,是否利用内存缓存举行恢复,开启后缓存中会保留 Checkpoint 期间的数据sink.enable.batch-modefalseN是否利用攒批模式写入 Doris,开启后写入时机不依赖 Checkpoint,通过 sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval 参数来控制写入时机。 同时开启后将不保证 Exactly-once 语义,可借助 Uniq 模型做到幂等sink.flush.queue-size2N攒批模式下,缓存的队列巨细。sink.buffer-flush.max-rows500000N攒批模式下,单个批次最多写入的数据行数。sink.buffer-flush.max-bytes100MBN攒批模式下,单个批次最多写入的字节数。sink.buffer-flush.interval10sN攒批模式下,异步刷新缓存的隔断sink.ignore.update-beforetrueN是否忽略 update-before 变乱,默认忽略。 Lookup Join 配置项
KeyDefault ValueRequiredCommentlookup.cache.max-rows-1Nlookup 缓存的最大行数,默认值 -1,不开启缓存lookup.cache.ttl10sNlookup 缓存的最大时间,默认 10slookup.max-retries1Nlookup 查询失败后的重试次数lookup.jdbc.asyncfalseN是否开启异步的 lookup,默认 falselookup.jdbc.read.batch.size128N异步 lookup 下,每次查询的最大批次巨细lookup.jdbc.read.batch.queue-size256N异步 lookup 时,中间缓冲队列的巨细lookup.jdbc.read.thread-size3N每个 task 中 lookup 的 jdbc 线程数 Doris 和 Flink 列类型映射关系
Doris TypeFlink TypeNULL_TYPENULLBOOLEANBOOLEANTINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTFLOATFLOATDOUBLEDOUBLEDATEDATEDATETIMETIMESTAMPDECIMALDECIMALCHARSTRINGLARGEINTSTRINGVARCHARSTRINGSTRINGSTRINGDECIMALV2DECIMALARRAYARRAYMAPMAPJSONSTRINGVARIANTSTRINGIPV4STRINGIPV6STRING 自 connector-1.6.1 开始支持读取 Variant,IPV6,IPV4 三种数据类型,其中读取 IPV6,Variant 需 Doris 版本 2.1.1 及以上。
Flink 写入指标
其中 Counter 类型的指标值为导入任务从开始到当前的累加值,可以在 Flink Webui metrics 中观察各表的各项指标。
NameMetric TypeDescriptiontotalFlushLoadBytesCounter已经刷新导入的总字节数flushTotalNumberRowsCounter已经导入处理的总行数totalFlushLoadedRowsCounter已经成功导入的总行数totalFlushTimeMsCounter已经成功导入完成的总时间totalFlushSucceededNumberCounter已经成功导入的次数totalFlushFailedNumberCounter失败导入 的次数totalFlushFilteredRowsCounter数据质量不及格的总行数totalFlushUnselectedRowsCounter被 where 条件过滤的总行数beginTxnTimeMsHistogram向 Fe 请求开始一个事务所耗费的时间,单元毫秒putDataTimeMsHistogram向 Fe 请求获取导入数据实行计划所耗费的时间readDataTimeMsHistogram读取数据所耗费的时间writeDataTimeMsHistogram实行写入数据操作所耗费的时间commitAndPublishTimeMsHistogram向 Fe 请求提交并且发布事务所耗费的时间loadTimeMsHistogram导入完成的时间 利用 FlinkSQL 通过 CDC 接入 Doris 示例
- -- enable checkpoint
- SET 'execution.checkpointing.interval' = '10s';
- CREATE TABLE cdc_mysql_source (
- id int
- ,name VARCHAR
- ,PRIMARY KEY (id) NOT ENFORCED
- ) WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = '127.0.0.1',
- 'port' = '3306',
- 'username' = 'root',
- 'password' = 'password',
- 'database-name' = 'database',
- 'table-name' = 'table'
- );
- -- 支持同步 insert/update/delete 事件
- CREATE TABLE doris_sink (
- id INT,
- name STRING
- )
- WITH (
- 'connector' = 'doris',
- 'fenodes' = '127.0.0.1:8030',
- 'table.identifier' = 'database.table',
- 'username' = 'root',
- 'password' = '',
- 'sink.properties.format' = 'json',
- 'sink.properties.read_json_by_line' = 'true',
- 'sink.enable-delete' = 'true', -- 同步删除事件
- 'sink.label-prefix' = 'doris_label'
- );
- insert into doris_sink select id,name from cdc_mysql_source;
复制代码 利用 FlinkSQL 通过 CDC 接入并实现部分列更新示例
- -- enable checkpoint
- SET 'execution.checkpointing.interval' = '10s';
- CREATE TABLE cdc_mysql_source (
- id int
- ,name STRING
- ,bank STRING
- ,age int
- ,PRIMARY KEY (id) NOT ENFORCED
- ) WITH (
- 'connector' = 'mysql-cdc',
- 'hostname' = '127.0.0.1',
- 'port' = '3306',
- 'username' = 'root',
- 'password' = 'password',
- 'database-name' = 'database',
- 'table-name' = 'table'
- );
- CREATE TABLE doris_sink (
- id INT,
- name STRING,
- bank STRING,
- age int
- )
- WITH (
- 'connector' = 'doris',
- 'fenodes' = '127.0.0.1:8030',
- 'table.identifier' = 'database.table',
- 'username' = 'root',
- 'password' = '',
- 'sink.properties.format' = 'json',
- 'sink.properties.read_json_by_line' = 'true',
- 'sink.properties.columns' = 'id,name,bank,age',
- 'sink.properties.partial_columns' = 'true' -- 开启部分列更新
- );
- insert into doris_sink select id,name,bank,age from cdc_mysql_source;
复制代码 利用 Flink CDC 接入多表或整库 (支持 MySQL,Oracle,PostgreSQL,SQLServer,MongoDB)
语法
- <FLINK_HOME>bin/flink run \
- -c org.apache.doris.flink.tools.cdc.CdcTools \
- lib/flink-doris-connector-1.16-1.6.1.jar \
- <mysql-sync-database|oracle-sync-database|postgres-sync-database|sqlserver-sync-database|mongodb-sync-database> \
- --database <doris-database-name> \
- [--job-name <flink-job-name>] \
- [--table-prefix <doris-table-prefix>] \
- [--table-suffix <doris-table-suffix>] \
- [--including-tables <mysql-table-name|name-regular-expr>] \
- [--excluding-tables <mysql-table-name|name-regular-expr>] \
- --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
- --oracle-conf <oracle-cdc-source-conf> [--oracle-conf <oracle-cdc-source-conf> ...] \
- --postgres-conf <postgres-cdc-source-conf> [--postgres-conf <postgres-cdc-source-conf> ...] \
- --sqlserver-conf <sqlserver-cdc-source-conf> [--sqlserver-conf <sqlserver-cdc-source-conf> ...] \
- --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
- [--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]
复制代码 KeyComment–job-nameFlink 任务名称,非必需–database同步到 Doris 的数据库名–table-prefixDoris 表前缀名,比方 --table-prefix ods_。–table-suffix同上,Doris 表的后缀名。–including-tables必要同步的 MySQL 表,可以利用 `–excluding-tables不必要同步的表,用法同上。–mysql-confMySQL CDCSource 配置,比方–mysql-conf hostname=127.0.0.1,您可以在这里查看所有配置 MySQL-CDC,其中 hostname/username/password/database-name 是必需的。同步的库表中含有非主键表时,必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。 比方:scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...,差别的库表列之间用,隔开。–oracle-confOracle CDCSource 配置,比方–oracle-conf hostname=127.0.0.1,您可以在这里查看所有配置 Oracle-CDC,其中 hostname/username/password/database-name/schema-name 是必需的。–postgres-confPostgres CDCSource 配置,比方–postgres-conf hostname=127.0.0.1,您可以在这里查看所有配置 Postgres-CDC,其中 hostname/username/password/database-name/schema-name/slot.name 是必需的。–sqlserver-confSQLServer CDCSource 配置,比方–sqlserver-conf hostname=127.0.0.1,您可以在这里查看所有配置 SQLServer-CDC,其中 hostname/username/password/database-name/schema-name 是必需的。–db2-confSQLServer CDCSource 配置,比方–db2-conf hostname=127.0.0.1,您可以在这里查看所有配置 DB2-CDC,其中 hostname/username/password/database-name/schema-name 是必需的。|–sink-confDoris Sink 的所有配置,可以在这里查看完备的配置项。–mongodb-confMongoDB CDCSource 配置,比方 --mongodb-conf hosts=127.0.0.1:27017,您可以在这里查看所有配置 Mongo-CDC,其中 hosts/username/password/database 是必须的。其中 --mongodb-conf schema.sample-percent 为自动采样 mongodb 数据为 Doris 建表的配置,默认为 0.2–table-confDoris 表的配置项,即 properties 中包含的内容(其中 table-buckets 例外,非 properties 属性)。比方 --table-conf replication_num=1,而 --table-conf table-buckets="tbl1:10,tbl2:20,a.*:30,b.*:40,.*:50"表示按照正则表达式次序指定差别表的 buckets 数目,假如没有匹配到则采用 BUCKETS AUTO 建表。–ignore-default-value关闭同步 MySQL 表布局的默认值。实用于同步 MySQL 数据到 Doris 时,字段有默认值,但实际插入数据为 null 情况。参考#152–use-new-schema-change是否利用新的 schema change,支持同步 MySQL 多列变更、默认值,1.6.0 开始该参数默认为 true。参考#167–schema-change-mode剖析 schema change 的模式,支持 debezium_structure、sql_parser 两种剖析模式,默认采用 debezium_structure 模式。 debezium_structure 剖析上游 CDC 同步数据时所利用的数据布局,通过剖析该布局判断 DDL 变更操作。 sql_parser 通过剖析上游 CDC 同步数据时的 DDL 语句,从而判断 DDL 变更操作,因此该剖析模式更加精确。 利用例子:--schema-change-mode debezium_structure 本功能将在 1.6.2.1 后的版本中提供–single-sink是否利用单个 Sink 同步所有表,开启后也可自动辨认上游新创建的表,自动创建表。–multi-to-one-origin将上游多张表写入同一张表时,源表的配置,比如:`–multi-to-one-origin "a_.*–multi-to-one-target与 multi-to-one-origin 搭配利用,目的表的配置,比如:--multi-to-one-target "a\–create-table-only是否只仅仅同步表的布局 备注
- 同步时必要在 $FLINK_HOME/lib 目录下添加对应的 Flink CDC 依赖,比如 flink-sql-connector-mysql-cdc- v e r s i o n . j a r , f l i n k − s q l − c o n n e c t o r − o r a c l e − c d c − {version}.jar,flink-sql-connector-oracle-cdc- version.jar,flink−sql−connector−oracle−cdc−{version}.jar ,flink-sql-connector-mongodb-cdc-${version}.jar
- Connector 24.0.0 之后依赖的 Flink CDC 版本必要在 3.1 以上,假如需利用 Flink CDC 同步 MySQL 和 Oracle,还必要在 $FLINK_HOME/lib 下增加相关的 JDBC 驱动。
MySQL 多表同步示例
- <FLINK_HOME>bin/flink run \
- -Dexecution.checkpointing.interval=10s \
- -Dparallelism.default=1 \
- -c org.apache.doris.flink.tools.cdc.CdcTools \
- lib/flink-doris-connector-1.16-24.0.1.jar \
- mysql-sync-database \
- --database test_db \
- --mysql-conf hostname=127.0.0.1 \
- --mysql-conf port=3306 \
- --mysql-conf username=root \
- --mysql-conf password=123456 \
- --mysql-conf database-name=mysql_db \
- --including-tables "tbl1|test.*" \
- --sink-conf fenodes=127.0.0.1:8030 \
- --sink-conf username=root \
- --sink-conf password=123456 \
- --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
- --sink-conf sink.label-prefix=label \
- --table-conf replication_num=1
复制代码 Oracle 多表同步示例
- <FLINK_HOME>bin/flink run \
- -Dexecution.checkpointing.interval=10s \
- -Dparallelism.default=1 \
- -c org.apache.doris.flink.tools.cdc.CdcTools \
- ./lib/flink-doris-connector-1.16-24.0.1.jar \
- oracle-sync-database \
- --database test_db \
- --oracle-conf hostname=127.0.0.1 \
- --oracle-conf port=1521 \
- --oracle-conf username=admin \
- --oracle-conf password="password" \
- --oracle-conf database-name=XE \
- --oracle-conf schema-name=ADMIN \
- --including-tables "tbl1|tbl2" \
- --sink-conf fenodes=127.0.0.1:8030 \
- --sink-conf username=root \
- --sink-conf password=\
- --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
- --sink-conf sink.label-prefix=label \
- --table-conf replication_num=1
复制代码 PostgreSQL 多表同步示例
- <FLINK_HOME>/bin/flink run \
- -Dexecution.checkpointing.interval=10s \
- -Dparallelism.default=1\
- -c org.apache.doris.flink.tools.cdc.CdcTools \
- ./lib/flink-doris-connector-1.16-24.0.1.jar \
- postgres-sync-database \
- --database db1\
- --postgres-conf hostname=127.0.0.1 \
- --postgres-conf port=5432 \
- --postgres-conf username=postgres \
- --postgres-conf password="123456" \
- --postgres-conf database-name=postgres \
- --postgres-conf schema-name=public \
- --postgres-conf slot.name=test \
- --postgres-conf decoding.plugin.name=pgoutput \
- --including-tables "tbl1|tbl2" \
- --sink-conf fenodes=127.0.0.1:8030 \
- --sink-conf username=root \
- --sink-conf password=\
- --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
- --sink-conf sink.label-prefix=label \
- --table-conf replication_num=1
复制代码 SQLServer 多表同步示例
- <FLINK_HOME>/bin/flink run \
- -Dexecution.checkpointing.interval=10s \
- -Dparallelism.default=1 \
- -c org.apache.doris.flink.tools.cdc.CdcTools \
- ./lib/flink-doris-connector-1.16-24.0.1.jar \
- sqlserver-sync-database \
- --database db1\
- --sqlserver-conf hostname=127.0.0.1 \
- --sqlserver-conf port=1433 \
- --sqlserver-conf username=sa \
- --sqlserver-conf password="123456" \
- --sqlserver-conf database-name=CDC_DB \
- --sqlserver-conf schema-name=dbo \
- --including-tables "tbl1|tbl2" \
- --sink-conf fenodes=127.0.0.1:8030 \
- --sink-conf username=root \
- --sink-conf password=\
- --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
- --sink-conf sink.label-prefix=label \
- --table-conf replication_num=1
复制代码 DB2 多表同步示例
- <FLINK_HOME>bin/flink run \
- -Dexecution.checkpointing.interval=10s \
- -Dparallelism.default=1 \
- -c org.apache.doris.flink.tools.cdc.CdcTools \
- lib/flink-doris-connector-1.16-24.0.1.jar \
- db2-sync-database \
- --database db2_test \
- --db2-conf hostname=127.0.0.1 \
- --db2-conf port=50000 \
- --db2-conf username=db2inst1 \
- --db2-conf password=doris123456 \
- --db2-conf database-name=testdb \
- --db2-conf schema-name=DB2INST1 \
- --including-tables "FULL_TYPES|CUSTOMERS" \
- --single-sink true \
- --use-new-schema-change true \
- --sink-conf fenodes=127.0.0.1:8030 \
- --sink-conf username=root \
- --sink-conf password=123456 \
- --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
- --sink-conf sink.label-prefix=label \
- --table-conf replication_num=1
复制代码 MongoDB 多表同步示例
- <FLINK_HOME>/bin/flink run \
- -Dexecution.checkpointing.interval=10s \
- -Dparallelism.default=1 \
- -c org.apache.doris.flink.tools.cdc.CdcTools \
- ./lib/flink-doris-connector-1.18-24.0.1.jar \
- mongodb-sync-database \
- --database doris_db \
- --schema-change-mode debezium_structure \
- --mongodb-conf hosts=127.0.0.1:27017 \
- --mongodb-conf username=flinkuser \
- --mongodb-conf password=flinkpwd \
- --mongodb-conf database=test \
- --mongodb-conf scan.startup.mode=initial \
- --mongodb-conf schema.sample-percent=0.2 \
- --including-tables "tbl1|tbl2" \
- --sink-conf fenodes=127.0.0.1:8030 \
- --sink-conf username=root \
- --sink-conf password= \
- --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
- --sink-conf sink.label-prefix=label \
- --sink-conf sink.enable-2pc=false \
- --table-conf replication_num=1
复制代码 利用 Flink CDC 更新 Key 列
一样平常在业务数据库中,会利用编号来作为表的主键,比如 Student 表,会利用编号 (id) 来作为主键,但是随着业务的发展,数据对应的编号有可能是会发生变革的。 在这种场景下,利用 Flink CDC + Doris Connector 同步数据,便可以自动更新 Doris 主键列的数据。
原理
Flink CDC 底层的收罗工具是 Debezium,Debezium 内部利用 op 字段来标识对应的操作:op 字段的取值分别为 c、u、d、r,分别对应 create、update、delete 和 read。 而对于主键列的更新,Flink CDC 会向卑鄙发送 DELETE 和 INSERT 变乱,同时数据同步到 Doris 中后,就会自动更新主键列的数据。
利用
Flink 程序可参考上面 CDC 同步的示例,成功提交任务后,在 MySQL 侧实行 Update 主键列的语句 (update student set id = '1002' where id = '1001'),即可修改 Doris 中的数据。
利用 Flink 根据指定列删除数据
一样平常 Kafka 中的消息会利用特定字段来标志操作类型,比如{“op_type”:“delete”,data:{…}}。针对这类数据,希望将 op_type=delete 的数据删除掉。
DorisSink 默认会根据 RowKind 来区分变乱的类型,通常这种在 cdc 情况下可以直接获取到变乱类型,对隐藏列__DORIS_DELETE_SIGN__举行赋值达到删除的目的,而 Kafka 则必要根据业务逻辑判断,显示的传入隐藏列的值。
利用
- -- 比如上游数据:{"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
- CREATE TABLE KAFKA_SOURCE(
- data STRING,
- op_type STRING
- ) WITH (
- 'connector' = 'kafka',
- ...
- );
- CREATE TABLE DORIS_SINK(
- id INT,
- name STRING,
- __DORIS_DELETE_SIGN__ INT
- ) WITH (
- 'connector' = 'doris',
- 'fenodes' = '127.0.0.1:8030',
- 'table.identifier' = 'db.table',
- 'username' = 'root',
- 'password' = '',
- 'sink.enable-delete' = 'false', -- false 表示不从 RowKind 获取事件类型
- 'sink.properties.columns' = 'id, name, __DORIS_DELETE_SIGN__' -- 显示指定 streamload 的导入列
- );
- INSERT INTO DORIS_SINK
- SELECT json_value(data,'$.id') as id,
- json_value(data,'$.name') as name,
- if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__
- from KAFKA_SOURCE;
复制代码 Java 示例
samples/doris-demo/ 下提供了 Java 版本的示例,可供参考,查看点击这里
最佳实践
应用场景
利用 Flink Doris Connector 最适当的场景就是及时/批次同步源数据(MySQL,Oracle,PostgreSQL 等)到 Doris,利用 Flink 对 Doris 中的数据和其他数据源举行团结分析,也可以利用 Flink Doris Connector。
其他
- Flink Doris Connector 重要是依赖 Checkpoint 举行流式写入,以是 Checkpoint 的隔断即为数据的可见延伸时间。
- 为了保证 Flink 的 Exactly Once 语义,Flink Doris Connector 默认开启两阶段提交,Doris 在 1.1 版本后默认开启两阶段提交。1.0 可通过修改 BE 参数开启,可参考two_phase_commit。
常见问题
- Doris Source 在数据读取完成后,流为什么就竣事了?
目前 Doris Source 是有界流,不支持 CDC 方式读取。
- Flink 读取 Doris 可以举行条件下推吗?
通过配置 doris.filter.query 参数,详情参考配置末节。
- CREATE TABLE bitmap_sink (
- dt int,
- page string,
- user_id int
- )
- WITH (
- 'connector' = 'doris',
- 'fenodes' = '127.0.0.1:8030',
- 'table.identifier' = 'test.bitmap_test',
- 'username' = 'root',
- 'password' = '',
- 'sink.label-prefix' = 'doris_label',
- 'sink.properties.columns' = 'dt,page,user_id,user_id=to_bitmap(user_id)'
- )
复制代码
- errCode = 2, detailMessage = Label [label_0_1] has already been used, relate to txn [19650]
Exactly-Once 场景下,Flink Job 重启时必须从最新的 Checkpoint/Savepoint 启动,否则会报如上错误。 不要求 Exactly-Once 时,也可通过关闭 2PC 提交(sink.enable-2pc=false)或更换差别的 sink.label-prefix 办理。
- errCode = 2, detailMessage = transaction [19650] not found
发生在 Commit 阶段,checkpoint 内里记录的事务 ID,在 FE 侧已颠末期,此时再次 commit 就会出现上述错误。 此时无法从 checkpoint 启动,后续可通过修改 fe.conf 的 streaming_label_keep_max_second 配置来延伸过期时间,默认 12 小时。
- errCode = 2, detailMessage = current running txns on db 10006 is 100, larger than limit 100
这是由于同一个库并发导入超过了 100,可通过调解 fe.conf 的参数 max_running_txn_num_per_db 来办理,具体可参考 max_running_txn_num_per_db。
同时,一个任务频繁修改 label 重启,也可能会导致这个错误。2pc 场景下 (Duplicate/Aggregate 模型),每个任务的 label 必要唯一,并且从 checkpoint 重启时,flink 任务才会主动 abort 掉之前已经 precommit 成功,没有 commit 的 txn,频繁修改 label 重启,会导致大量 precommit 成功的 txn 无法被 abort,占用事务。在 Unique 模型下也可关闭 2pc,可以实现幂等写入。
- Flink 写入 Uniq 模型时,如何保证一批数据的有序性?
可以添加 sequence 列配置来保证,具体可参考 sequence
Connector1.1.0 版本以前,是攒批写入的,写入均是由数据驱动,必要判断上游是否有数据写入。1.1.0 之后,依赖 Checkpoint,必须开启 Checkpoint 才能写入。
- tablet writer write failed, tablet_id=190958, txn_id=3505530, err=-235
通常发生在 Connector1.1.0 之前,是由于写入频率过快,导致版本过多。可以通过设置 sink.batch.size 和 sink.batch.interval 参数来低落 Streamload 的频率。在Connector1.1.0之后,默认写入时机是由Checkpoint控制,可以通过增加Checkpoint隔断来低落写入频率。
Flink 在数据导入时,假如有脏数据,比如字段格式、长度等问题,会导致 StreamLoad 报错,此时 Flink 会不断的重试。假如必要跳过,可以通过禁用 StreamLoad 的严格模式 (strict_mode=false,max_filter_ratio=1) 或者在 Sink 算子之前对数据做过滤。
- 源表和 Doris 表应如何对应? 利用 Flink Connector 导入数据时,要注意两个方面,第一是源表的列和类型跟 flink sql 中的列和类型要对应上;第二个是 flink sql 中的列和类型要跟 Doris 表的列和类型对应上,具体可以参考上面的"Doris 和 Flink 列类型映射关系"
- TApplicationException: get_next failed: out of sequence response: expected 4 but got 3
这是由于 Thrift 框架存在并发 bug 导致的,建议你利用尽可能新的 connector 以及与之兼容的 flink 版本。
- DorisRuntimeException: Fail to abort transaction 26153 with url http://192.168.0.1:8040/api/table_name/_stream_load_2pc
你可以在 TaskManager 中搜索日志 abort transaction response,根据 http 返回码确定是 client 的问题照旧 server 的问题。
- 利用 doris.filter.query 出现 org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered “xx” at line x, column xx
出现这个问题重要是条件 varchar/string 类型,必要加引号导致的,精确写法是 xxx = ‘‘xxx’’,这样 Flink SQL 剖析器会将两个连续的单引号表明为一个单引号字符,而不是字符串的竣事,并将拼接后的字符串作为属性的值。比如说:t1 >= '2024-01-01',可以写成'doris.filter.query' = 't1 >=''2024-01-01'''。Connector1.6.0 之后 FlinkSQL 可以实现自动谓词和投影下推。
- 假如出现 Failed to connect to backend: http://host:webserver_port, 并且 Be 照旧活着的
可能是由于你配置的 be 的 ip,外部的 Flink 集群无法访问。这重要是由于当连接 fe 时,会通过 fe 剖析出 be 的地址。比方,当你添加的 be 地址为127.0.0.1,那么 Flink 通过 fe 获取的 be 地址就为127.0.0.1:webserver_port,此时 Flink 就会去访问这个地址。当出现这个问题时,可以通过在 with 属性中增加实际对应的 be 外部 ip 地'benodes' = "be_ip:webserver_port, be_ip:webserver_port...",整库同步则可增加--sink-conf benodes=be_ip:webserver,be_ip:webserver...。
- 假如利用整库同步 MySQL 数据到 Doris,出现 timestamp 类型与源数据相差多个小时
整库同步默认 timezone=“UTC+8”,假如你同步的数据不是该时区,可以实验如下设置相对应的时区,比方:--mysql-conf debezium.date.format.timestamp.zone="UTC+3" 来办理。
Connector1.5.0 之后支持攒批写入,攒批写入不依赖 Checkpoint,将数据缓存在内存中,根据 sink.buffer-flush.max-rows/sink.buffer-flush.max-bytes/sink.buffer-flush.interval 参数来控制写入时机。流式写入必须开启 Checkpoint,在整个 Checkpoint 期间连续的将上游数据写入到 Doris 中,不会一直将数据缓存在内存中。
links:
Flink Doris Connector - Apache Doris
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |