Flink读写Doris操作介绍
Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。可以将 Doris 表映射为 DataStream 或者 Table。
- Flink操作Doris修改和删除只支持在 Unique Key 模子上
1. 准备开发情况
- <dependency>
- <groupId>org.apache.doris</groupId>
- <artifactId>flink-doris-connector-1.13_2.12</artifactId>
- <version>1.0.3</version>
- </dependency>
复制代码
- -- 切测试库
- use test_db;
- -- 创建测试表flinktest
- CREATE TABLE flinktest
- (
- siteid INT DEFAULT '10',
- citycode SMALLINT,
- username VARCHAR(32) DEFAULT '',
- pv BIGINT SUM DEFAULT '0'
- )
- AGGREGATE KEY(siteid, citycode, username)
- DISTRIBUTED BY HASH(siteid) BUCKETS 10
- PROPERTIES("replication_num" = "1");
- -- 插入样例数据
- insert into flinktest values
- (1,1,'jim',2),
- (2,1,'grace',2),
- (3,2,'tom',2),
- (4,3,'bush',3),
- (5,3,'helen',3);
- -- 查看表数据情况
- select * from flinktest;
- +--------+----------+----------+------+
- | siteid | citycode | username | pv |
- +--------+----------+----------+------+
- | 1 | 1 | jim | 2 |
- | 5 | 3 | helen | 3 |
- | 4 | 3 | bush | 3 |
- | 3 | 2 | tom | 2 |
- | 2 | 1 | grace | 2 |
- +--------+----------+----------+------+
复制代码
Doris TypeFlink TypeNULL_TYPENULLBOOLEANBOOLEANTINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTFLOATFLOATDOUBLEDOUBLEDATEDATEDATETIMETIMESTAMPDECIMALDECIMALCHARSTRINGLARGEINTSTRINGVARCHARSTRINGDECIMALV2DECIMALTIMEDOUBLEHLLUnsupported datatype 2. Flink-DataStream读Doris
代码示例:
- package com.zenitera.bigdata.doris;
- import org.apache.doris.flink.cfg.DorisStreamOptions;
- import org.apache.doris.flink.datastream.DorisSourceFunction;
- import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import java.util.Properties;
- public class Flink_stream_read_doris {
- public static void main(String[] args) {
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 2000);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
- env.setParallelism(1);
- Properties props = new Properties();
- props.setProperty("fenodes", "hdt-dmcp-ops01:8130");
- props.setProperty("username", "root");
- props.setProperty("password", "123456");
- props.setProperty("table.identifier", "test_db.flinktest");
- env
- .addSource(new DorisSourceFunction(new DorisStreamOptions(props), new SimpleListDeserializationSchema()))
- .print();
- try {
- env.execute();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- /*
- 代码控制台输出:
- [4, 3, bush, 3]
- [2, 1, grace, 2]
- [1, 1, jim, 2]
- [5, 3, helen, 3]
- [3, 2, tom, 2]
- */
复制代码 3. Flink写Doris
Flink 读写 Doris 数据重要有两种方式
3.1 Flink-DataStream以 JSON 数据 写到Doris
代码示例:
- package com.zenitera.bigdata.doris;
- import org.apache.doris.flink.cfg.DorisExecutionOptions;
- import org.apache.doris.flink.cfg.DorisOptions;
- import org.apache.doris.flink.cfg.DorisSink;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import java.util.Properties;
- /**
- * 使用 Flink 将 JSON 数据 写到Doris数据库
- */
- public class Flink_stream_write_doris_json {
- public static void main(String[] args) {
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 2000);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
- env.setParallelism(1);
- Properties pro = new Properties();
- pro.setProperty("format", "json");
- pro.setProperty("strip_outer_array", "true");
- env
- .fromElements("{"siteid":"10", "citycode": "1001","username": "ww","pv":"100"}")
- .addSink(DorisSink.sink(
- new DorisExecutionOptions.Builder()
- .setBatchIntervalMs(2000L)
- .setEnableDelete(false)
- .setMaxRetries(3)
- .setStreamLoadProp(pro)
- .build(),
- new DorisOptions.Builder()
- .setFenodes("hdt-dmcp-ops01:8130")
- .setUsername("root")
- .setPassword("123456")
- .setTableIdentifier("test_db.flinktest")
- .build())
- );
- try {
- env.execute();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- /*
- 代码执行前: 5 rows
- select * from flinktest;
- +--------+----------+----------+------+
- | siteid | citycode | username | pv |
- +--------+----------+----------+------+
- | 1 | 1 | jim | 2 |
- | 5 | 3 | helen | 3 |
- | 4 | 3 | bush | 3 |
- | 3 | 2 | tom | 2 |
- | 2 | 1 | grace | 2 |
- +--------+----------+----------+------+
- 代码执行后: 6 rows
- select * from flinktest;
- +--------+----------+----------+------+
- | siteid | citycode | username | pv |
- +--------+----------+----------+------+
- | 2 | 1 | grace | 2 |
- | 3 | 2 | tom | 2 |
- | 5 | 3 | helen | 3 |
- | 1 | 1 | jim | 2 |
- | 10 | 1001 | ww | 100 |
- | 4 | 3 | bush | 3 |
- +--------+----------+----------+------+
- */
复制代码 3.2 Flink-DataStream以 RowData 数据 写Doris
代码示例:
- package com.zenitera.bigdata.doris;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONObject;
- import org.apache.doris.flink.cfg.DorisExecutionOptions;
- import org.apache.doris.flink.cfg.DorisOptions;
- import org.apache.doris.flink.cfg.DorisSink;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.data.GenericRowData;
- import org.apache.flink.table.data.StringData;
- import org.apache.flink.table.types.logical.*;
- public class Flink_stream_write_doris_rowdata {
- public static void main(String[] args) {
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 2000);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
- env.setParallelism(1);
- LogicalType[] types = {new IntType(), new SmallIntType(), new VarCharType(), new BigIntType()};
- String[] fields = {"siteid", "citycode", "username", "pv"};
- env
- .fromElements("{"siteid":"100", "citycode": "1002","username": "wang","pv":"100"}")
- .map(json -> {
- JSONObject obj = JSON.parseObject(json);
- GenericRowData rowData = new GenericRowData(4);
- rowData.setField(0, obj.getIntValue("siteid"));
- rowData.setField(1, obj.getShortValue("citycode"));
- rowData.setField(2, StringData.fromString(obj.getString("username")));
- rowData.setField(3, obj.getLongValue("pv"));
- return rowData;
- })
- .addSink(DorisSink.sink(
- fields,
- types,
- new DorisExecutionOptions.Builder()
- .setBatchIntervalMs(2000L)
- .setEnableDelete(false)
- .setMaxRetries(3)
- .build(),
- new DorisOptions.Builder()
- .setFenodes("hdt-dmcp-ops01:8130")
- .setUsername("root")
- .setPassword("123456")
- .setTableIdentifier("test_db.flinktest")
- .build())
- );
- try {
- env.execute();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- /*
- 代码执行前: 6 rows
- select * from flinktest;
- +--------+----------+----------+------+
- | siteid | citycode | username | pv |
- +--------+----------+----------+------+
- | 2 | 1 | grace | 2 |
- | 3 | 2 | tom | 2 |
- | 5 | 3 | helen | 3 |
- | 1 | 1 | jim | 2 |
- | 10 | 1001 | ww | 100 |
- | 4 | 3 | bush | 3 |
- +--------+----------+----------+------+
- 代码执行后: 7 rows
- select * from flinktest;
- +--------+----------+----------+------+
- | siteid | citycode | username | pv |
- +--------+----------+----------+------+
- | 1 | 1 | jim | 2 |
- | 2 | 1 | grace | 2 |
- | 3 | 2 | tom | 2 |
- | 5 | 3 | helen | 3 |
- | 10 | 1001 | ww | 100 |
- | 100 | 1002 | wang | 100 |
- | 4 | 3 | bush | 3 |
- +--------+----------+----------+------+
- */
复制代码 3.3 Flink-SQL 方式写Doris
Doris测试表:
- use test_db;
- truncate table flinktest;
- insert into flinktest values
- (1,1,'aaa',1),
- (2,2,'bbb',2),
- (3,3,'ccc',3);
- select * from flinktest;
- +--------+----------+----------+------+
- | siteid | citycode | username | pv |
- +--------+----------+----------+------+
- | 2 | 2 | bbb | 2 |
- | 1 | 1 | aaa | 1 |
- | 3 | 3 | ccc | 3 |
- +--------+----------+----------+------+
- 3 rows in set (0.01 sec)
复制代码 Flink-SQL代码示例:
- package com.zenitera.bigdata.doris;
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- public class Flink_SQL_doris {
- public static void main(String[] args) {
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 2000);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
- env.setParallelism(1);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- tEnv.executeSql("create table flink_0518(" +
- " siteid int, " +
- " citycode int, " +
- " username string, " +
- " pv bigint " +
- ")with(" +
- " 'connector' = 'doris', " +
- " 'fenodes' = 'hdt-dmcp-ops01:8130', " +
- " 'table.identifier' = 'test_db.flinktest', " +
- " 'username' = 'root', " +
- " 'password' = '123456' " +
- ")");
- tEnv.executeSql("insert into flink_0518(siteid, citycode, username, pv) values(4, 4, 'wangting', 4) ");
- }
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- public static class Flink_0518 {
- private Integer siteid;
- private Integer citycode;
- private String username;
- private Long pv;
- }
- }
复制代码 实行代码,实行完成后查看Doris对应表数据进行验证:
- select * from flinktest;
- +--------+----------+----------+------+
- | siteid | citycode | username | pv |
- +--------+----------+----------+------+
- | 3 | 3 | ccc | 3 |
- | 2 | 2 | bbb | 2 |
- | 1 | 1 | aaa | 1 |
- | 4 | 4 | wangting | 4 |
- +--------+----------+----------+------+
- 4 rows in set (0.01 sec)
复制代码 3.4 Flink-SQL 方式读Doris
- package com.zenitera.bigdata.doris;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- public class Flink_SQL_doris_read {
- public static void main(String[] args) {
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 2000);
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
- env.setParallelism(1);
- StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- tEnv.executeSql("create table flink_0520(" +
- " siteid int, " +
- " citycode SMALLINT, " +
- " username string, " +
- " pv bigint " +
- ")with(" +
- " 'connector' = 'doris', " +
- " 'fenodes' = 'hdt-dmcp-ops01:8130', " +
- " 'table.identifier' = 'test_db.flinktest', " +
- " 'username' = 'root', " +
- " 'password' = '123456' " +
- ")");
- tEnv.sqlQuery("select * from flink_0520").execute().print();
- }
- }
- /*
- 控制台输出信息:
- +----+-------------+----------+---------------+---------+
- | op | siteid | citycode | username | pv |
- +----+-------------+----------+---------------+---------+
- | +I | 1 | 1 | aaa | 1 |
- | +I | 3 | 3 | ccc | 3 |
- | +I | 2 | 2 | bbb | 2 |
- | +I | 4 | 4 | wangting | 4 |
- +----+-------------+----------+---------------+---------+
- 4 rows in set
- */
复制代码 免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |