Flink读写Doris操作介绍

打印 上一主题 下一主题

主题 844|帖子 844|积分 2532

Flink读写Doris操作介绍

  ​ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。可以将 Doris 表映射为 DataStream 或者 Table。


  • Flink操作Doris修改和删除只支持在 Unique Key 模子上
1. 准备开发情况



  • pom.xml参加依赖
  1. <dependency>
  2.     <groupId>org.apache.doris</groupId>
  3.     <artifactId>flink-doris-connector-1.13_2.12</artifactId>
  4.     <version>1.0.3</version>
  5. </dependency>
复制代码


  • 创建测试库测试表
  1. -- 切测试库
  2. use test_db;
  3. -- 创建测试表flinktest
  4. CREATE TABLE flinktest
  5. (
  6.     siteid INT DEFAULT '10',
  7.     citycode SMALLINT,
  8.     username VARCHAR(32) DEFAULT '',
  9.     pv BIGINT SUM DEFAULT '0'
  10. )
  11. AGGREGATE KEY(siteid, citycode, username)
  12. DISTRIBUTED BY HASH(siteid) BUCKETS 10
  13. PROPERTIES("replication_num" = "1");
  14. -- 插入样例数据
  15. insert into flinktest values
  16. (1,1,'jim',2),
  17. (2,1,'grace',2),
  18. (3,2,'tom',2),
  19. (4,3,'bush',3),
  20. (5,3,'helen',3);
  21. -- 查看表数据情况
  22. select * from flinktest;
  23. +--------+----------+----------+------+
  24. | siteid | citycode | username | pv   |
  25. +--------+----------+----------+------+
  26. |      1 |        1 | jim      |    2 |
  27. |      5 |        3 | helen    |    3 |
  28. |      4 |        3 | bush     |    3 |
  29. |      3 |        2 | tom      |    2 |
  30. |      2 |        1 | grace    |    2 |
  31. +--------+----------+----------+------+
复制代码


  • Doris 和 Flink 列范例映射关系
Doris TypeFlink TypeNULL_TYPENULLBOOLEANBOOLEANTINYINTTINYINTSMALLINTSMALLINTINTINTBIGINTBIGINTFLOATFLOATDOUBLEDOUBLEDATEDATEDATETIMETIMESTAMPDECIMALDECIMALCHARSTRINGLARGEINTSTRINGVARCHARSTRINGDECIMALV2DECIMALTIMEDOUBLEHLLUnsupported datatype 2. Flink-DataStream读Doris

代码示例:
  1. package com.zenitera.bigdata.doris;
  2. import org.apache.doris.flink.cfg.DorisStreamOptions;
  3. import org.apache.doris.flink.datastream.DorisSourceFunction;
  4. import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import java.util.Properties;
  8. public class Flink_stream_read_doris {
  9.     public static void main(String[] args) {
  10.         Configuration conf = new Configuration();
  11.         conf.setInteger("rest.port", 2000);
  12.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
  13.         env.setParallelism(1);
  14.         Properties props = new Properties();
  15.         props.setProperty("fenodes", "hdt-dmcp-ops01:8130");
  16.         props.setProperty("username", "root");
  17.         props.setProperty("password", "123456");
  18.         props.setProperty("table.identifier", "test_db.flinktest");
  19.         env
  20.                 .addSource(new DorisSourceFunction(new DorisStreamOptions(props), new SimpleListDeserializationSchema()))
  21.                 .print();
  22.         try {
  23.             env.execute();
  24.         } catch (Exception e) {
  25.             e.printStackTrace();
  26.         }
  27.     }
  28. }
  29. /*
  30.   代码控制台输出:
  31. [4, 3, bush, 3]
  32. [2, 1, grace, 2]
  33. [1, 1, jim, 2]
  34. [5, 3, helen, 3]
  35. [3, 2, tom, 2]
  36. */
复制代码
3. Flink写Doris

Flink 读写 Doris 数据重要有两种方式


  • DataStream
  • SQL
3.1 Flink-DataStream以 JSON 数据 写到Doris

代码示例:
  1. package com.zenitera.bigdata.doris;
  2. import org.apache.doris.flink.cfg.DorisExecutionOptions;
  3. import org.apache.doris.flink.cfg.DorisOptions;
  4. import org.apache.doris.flink.cfg.DorisSink;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import java.util.Properties;
  8. /**
  9. * 使用 Flink 将 JSON 数据 写到Doris数据库
  10. */
  11. public class Flink_stream_write_doris_json {
  12.     public static void main(String[] args) {
  13.         Configuration conf = new Configuration();
  14.         conf.setInteger("rest.port", 2000);
  15.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
  16.         env.setParallelism(1);
  17.         Properties pro = new Properties();
  18.         pro.setProperty("format", "json");
  19.         pro.setProperty("strip_outer_array", "true");
  20.         env
  21.                 .fromElements("{"siteid":"10", "citycode": "1001","username": "ww","pv":"100"}")
  22.                 .addSink(DorisSink.sink(
  23.                         new DorisExecutionOptions.Builder()
  24.                                 .setBatchIntervalMs(2000L)
  25.                                 .setEnableDelete(false)
  26.                                 .setMaxRetries(3)
  27.                                 .setStreamLoadProp(pro)
  28.                                 .build(),
  29.                         new DorisOptions.Builder()
  30.                                 .setFenodes("hdt-dmcp-ops01:8130")
  31.                                 .setUsername("root")
  32.                                 .setPassword("123456")
  33.                                 .setTableIdentifier("test_db.flinktest")
  34.                                 .build())
  35.                 );
  36.         try {
  37.             env.execute();
  38.         } catch (Exception e) {
  39.             e.printStackTrace();
  40.         }
  41.     }
  42. }
  43. /*
  44.     代码执行前: 5 rows
  45. select * from flinktest;
  46. +--------+----------+----------+------+
  47. | siteid | citycode | username | pv   |
  48. +--------+----------+----------+------+
  49. |      1 |        1 | jim      |    2 |
  50. |      5 |        3 | helen    |    3 |
  51. |      4 |        3 | bush     |    3 |
  52. |      3 |        2 | tom      |    2 |
  53. |      2 |        1 | grace    |    2 |
  54. +--------+----------+----------+------+
  55.     代码执行后: 6 rows
  56. select * from flinktest;
  57. +--------+----------+----------+------+
  58. | siteid | citycode | username | pv   |
  59. +--------+----------+----------+------+
  60. |      2 |        1 | grace    |    2 |
  61. |      3 |        2 | tom      |    2 |
  62. |      5 |        3 | helen    |    3 |
  63. |      1 |        1 | jim      |    2 |
  64. |     10 |     1001 | ww       |  100 |
  65. |      4 |        3 | bush     |    3 |
  66. +--------+----------+----------+------+
  67. */
复制代码
3.2 Flink-DataStream以 RowData 数据 写Doris

代码示例:
  1. package com.zenitera.bigdata.doris;
  2. import com.alibaba.fastjson.JSON;
  3. import com.alibaba.fastjson.JSONObject;
  4. import org.apache.doris.flink.cfg.DorisExecutionOptions;
  5. import org.apache.doris.flink.cfg.DorisOptions;
  6. import org.apache.doris.flink.cfg.DorisSink;
  7. import org.apache.flink.configuration.Configuration;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.table.data.GenericRowData;
  10. import org.apache.flink.table.data.StringData;
  11. import org.apache.flink.table.types.logical.*;
  12. public class Flink_stream_write_doris_rowdata {
  13.     public static void main(String[] args) {
  14.         Configuration conf = new Configuration();
  15.         conf.setInteger("rest.port", 2000);
  16.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
  17.         env.setParallelism(1);
  18.         LogicalType[] types = {new IntType(), new SmallIntType(), new VarCharType(), new BigIntType()};
  19.         String[] fields = {"siteid", "citycode", "username", "pv"};
  20.         env
  21.                 .fromElements("{"siteid":"100", "citycode": "1002","username": "wang","pv":"100"}")
  22.                 .map(json -> {
  23.                     JSONObject obj = JSON.parseObject(json);
  24.                     GenericRowData rowData = new GenericRowData(4);
  25.                     rowData.setField(0, obj.getIntValue("siteid"));
  26.                     rowData.setField(1, obj.getShortValue("citycode"));
  27.                     rowData.setField(2, StringData.fromString(obj.getString("username")));
  28.                     rowData.setField(3, obj.getLongValue("pv"));
  29.                     return rowData;
  30.                 })
  31.                 .addSink(DorisSink.sink(
  32.                         fields,
  33.                         types,
  34.                         new DorisExecutionOptions.Builder()
  35.                                 .setBatchIntervalMs(2000L)
  36.                                 .setEnableDelete(false)
  37.                                 .setMaxRetries(3)
  38.                                 .build(),
  39.                         new DorisOptions.Builder()
  40.                                 .setFenodes("hdt-dmcp-ops01:8130")
  41.                                 .setUsername("root")
  42.                                 .setPassword("123456")
  43.                                 .setTableIdentifier("test_db.flinktest")
  44.                                 .build())
  45.                 );
  46.         try {
  47.             env.execute();
  48.         } catch (Exception e) {
  49.             e.printStackTrace();
  50.         }
  51.     }
  52. }
  53. /*
  54.     代码执行前: 6 rows
  55. select * from flinktest;
  56. +--------+----------+----------+------+
  57. | siteid | citycode | username | pv   |
  58. +--------+----------+----------+------+
  59. |      2 |        1 | grace    |    2 |
  60. |      3 |        2 | tom      |    2 |
  61. |      5 |        3 | helen    |    3 |
  62. |      1 |        1 | jim      |    2 |
  63. |     10 |     1001 | ww       |  100 |
  64. |      4 |        3 | bush     |    3 |
  65. +--------+----------+----------+------+
  66.     代码执行后: 7 rows
  67. select * from flinktest;
  68. +--------+----------+----------+------+
  69. | siteid | citycode | username | pv   |
  70. +--------+----------+----------+------+
  71. |      1 |        1 | jim      |    2 |
  72. |      2 |        1 | grace    |    2 |
  73. |      3 |        2 | tom      |    2 |
  74. |      5 |        3 | helen    |    3 |
  75. |     10 |     1001 | ww       |  100 |
  76. |    100 |     1002 | wang     |  100 |
  77. |      4 |        3 | bush     |    3 |
  78. +--------+----------+----------+------+
  79. */
复制代码
3.3 Flink-SQL 方式写Doris

Doris测试表:
  1. use test_db;
  2. truncate table flinktest;
  3. insert into flinktest values
  4. (1,1,'aaa',1),
  5. (2,2,'bbb',2),
  6. (3,3,'ccc',3);
  7. select * from flinktest;
  8. +--------+----------+----------+------+
  9. | siteid | citycode | username | pv   |
  10. +--------+----------+----------+------+
  11. |      2 |        2 | bbb      |    2 |
  12. |      1 |        1 | aaa      |    1 |
  13. |      3 |        3 | ccc      |    3 |
  14. +--------+----------+----------+------+
  15. 3 rows in set (0.01 sec)
复制代码
Flink-SQL代码示例:
  1. package com.zenitera.bigdata.doris;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Data;
  4. import lombok.NoArgsConstructor;
  5. import org.apache.flink.configuration.Configuration;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  8. public class Flink_SQL_doris {
  9.     public static void main(String[] args) {
  10.         Configuration conf = new Configuration();
  11.         conf.setInteger("rest.port", 2000);
  12.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
  13.         env.setParallelism(1);
  14.         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  15.         tEnv.executeSql("create table flink_0518(" +
  16.                 " siteid int, " +
  17.                 " citycode int, " +
  18.                 " username string, " +
  19.                 " pv bigint " +
  20.                 ")with(" +
  21.                 "  'connector' = 'doris', " +
  22.                 "  'fenodes' = 'hdt-dmcp-ops01:8130', " +
  23.                 "  'table.identifier' = 'test_db.flinktest', " +
  24.                 "  'username' = 'root', " +
  25.                 "  'password' = '123456' " +
  26.                 ")");
  27.         tEnv.executeSql("insert into flink_0518(siteid, citycode, username, pv) values(4, 4, 'wangting', 4) ");
  28.     }
  29.     @Data
  30.     @NoArgsConstructor
  31.     @AllArgsConstructor
  32.     public static class Flink_0518 {
  33.         private Integer siteid;
  34.         private Integer citycode;
  35.         private String username;
  36.         private Long pv;
  37.     }
  38. }
复制代码
实行代码,实行完成后查看Doris对应表数据进行验证:
  1. select * from flinktest;
  2. +--------+----------+----------+------+
  3. | siteid | citycode | username | pv   |
  4. +--------+----------+----------+------+
  5. |      3 |        3 | ccc      |    3 |
  6. |      2 |        2 | bbb      |    2 |
  7. |      1 |        1 | aaa      |    1 |
  8. |      4 |        4 | wangting |    4 |
  9. +--------+----------+----------+------+
  10. 4 rows in set (0.01 sec)
复制代码
3.4 Flink-SQL 方式读Doris

  1. package com.zenitera.bigdata.doris;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. public class Flink_SQL_doris_read {
  6.     public static void main(String[] args) {
  7.         Configuration conf = new Configuration();
  8.         conf.setInteger("rest.port", 2000);
  9.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
  10.         env.setParallelism(1);
  11.         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  12.         tEnv.executeSql("create table flink_0520(" +
  13.                 " siteid int, " +
  14.                 " citycode SMALLINT, " +
  15.                 " username string, " +
  16.                 " pv bigint " +
  17.                 ")with(" +
  18.                 "  'connector' = 'doris', " +
  19.                 "  'fenodes' = 'hdt-dmcp-ops01:8130', " +
  20.                 "  'table.identifier' = 'test_db.flinktest', " +
  21.                 "  'username' = 'root', " +
  22.                 "  'password' = '123456' " +
  23.                 ")");
  24.         tEnv.sqlQuery("select * from flink_0520").execute().print();
  25.     }
  26. }
  27. /*
  28.    控制台输出信息:
  29. +----+-------------+----------+---------------+---------+
  30. | op |      siteid | citycode |      username |      pv |
  31. +----+-------------+----------+---------------+---------+
  32. | +I |           1 |        1 |           aaa |       1 |
  33. | +I |           3 |        3 |           ccc |       3 |
  34. | +I |           2 |        2 |           bbb |       2 |
  35. | +I |           4 |        4 |      wangting |       4 |
  36. +----+-------------+----------+---------------+---------+
  37. 4 rows in set
  38. */
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

乌市泽哥

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表