FusionInsight MRS Flink DataStream API读写Hudi实践

打印 上一主题 下一主题

主题 505|帖子 505|积分 1515

摘要:目前Hudi只支持FlinkSQL进行数据读写,但是在实际项目开发中一些客户存在使用Flink DataStream API读写Hudi的诉求。
本文分享自华为云社区《FusionInsight MRS Flink DataStream API读写Hudi实践》,作者: yangxiao_mrs 。
目前Hudi只支持FlinkSQL进行数据读写,但是在实际项目开发中一些客户存在使用Flink DataStream API读写Hudi的诉求。
该实践包含三部分内容:
1)HoodiePipeline.java ,该类将Hudi内核读写接口进行封装,提供Hudi DataStream API。
2)WriteIntoHudi.java ,该类使用 DataStream API将数据写入Hudi。
3)ReadFromHudi.java ,该类使用 DataStream API读取Hudi数据。
1.HoodiePipeline.java 将Hudi内核读写接口进行封装,提供Hudi DataStream API。关键实现逻辑:
第一步:将原来Hudi流表的列名、主键、分区键set后,通过StringBuilder拼接成create table SQL。
第二步:将该hudi流表注册到catalog中。
第三步:将DynamicTable转换为DataStreamProvider后,进行数据produce或者consume。
  1. import org.apache.flink.configuration.ConfigOption;
  2. import org.apache.flink.configuration.Configuration;
  3. import org.apache.flink.configuration.ReadableConfig;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSink;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.table.api.EnvironmentSettings;
  8. import org.apache.flink.table.api.internal.TableEnvironmentImpl;
  9. import org.apache.flink.table.catalog.Catalog;
  10. import org.apache.flink.table.catalog.CatalogTable;
  11. import org.apache.flink.table.catalog.ObjectIdentifier;
  12. import org.apache.flink.table.catalog.ObjectPath;
  13. import org.apache.flink.table.catalog.exceptions.TableNotExistException;
  14. import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
  15. import org.apache.flink.table.connector.source.DataStreamScanProvider;
  16. import org.apache.flink.table.connector.source.ScanTableSource;
  17. import org.apache.flink.table.data.RowData;
  18. import org.apache.flink.table.factories.DynamicTableFactory;
  19. import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
  20. import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
  21. import org.apache.hudi.exception.HoodieException;
  22. import org.apache.hudi.table.HoodieTableFactory;
  23. import java.util.ArrayList;
  24. import java.util.Arrays;
  25. import java.util.HashMap;
  26. import java.util.List;
  27. import java.util.Map;
  28. import java.util.stream.Collectors;
  29. /**
  30. *  A tool class to construct hoodie flink pipeline.
  31. *
  32. *  <p>How to use ?</p>
  33. *  Method {@link #builder(String)} returns a pipeline builder. The builder
  34. *  can then define the hudi table columns, primary keys and partitions.
  35. *
  36. *  <p>An example:</p>
  37. *  <pre>
  38. *    HoodiePipeline.Builder builder = HoodiePipeline.builder("myTable");
  39. *    DataStreamSink<?> sinkStream = builder
  40. *        .column("f0 int")
  41. *        .column("f1 varchar(10)")
  42. *        .column("f2 varchar(20)")
  43. *        .pk("f0,f1")
  44. *        .partition("f2")
  45. *        .sink(input, false);
  46. *  </pre>
  47. */
  48. public class HoodiePipeline {
  49.   /**
  50.    * Returns the builder for hoodie pipeline construction.
  51.    */
  52.   public static Builder builder(String tableName) {
  53.     return new Builder(tableName);
  54.   }
  55.     /**
  56.      * Builder for hudi source/sink pipeline construction.
  57.      */
  58.     public static class Builder {
  59.       private final String tableName;
  60.       private final List<String> columns;
  61.       private final Map<String, String> options;
  62.       private String pk;
  63.       private List<String> partitions;
  64.       private Builder(String tableName) {
  65.         this.tableName = tableName;
  66.         this.columns = new ArrayList<>();
  67.         this.options = new HashMap<>();
  68.         this.partitions = new ArrayList<>();
  69.       }
  70.       /**
  71.        * Add a table column definition.
  72.        *
  73.        * @param column the column format should be in the form like 'f0 int'
  74.        */
  75.       public Builder column(String column) {
  76.         this.columns.add(column);
  77.         return this;
  78.       }
  79.       /**
  80.        * Add primary keys.
  81.        */
  82.       public Builder pk(String... pks) {
  83.         this.pk = String.join(",", pks);
  84.         return this;
  85.       }
  86.       /**
  87.        * Add partition fields.
  88.        */
  89.       public Builder partition(String... partitions) {
  90.         this.partitions = new ArrayList<>(Arrays.asList(partitions));
  91.         return this;
  92.       }
  93.       /**
  94.        * Add a config option.
  95.        */
  96.       public Builder option(ConfigOption<?> option, Object val) {
  97.         this.options.put(option.key(), val.toString());
  98.         return this;
  99.       }
  100.       public Builder option(String key, Object val) {
  101.         this.options.put(key, val.toString());
  102.         return this;
  103.       }
  104.       public Builder options(Map<String, String> options) {
  105.         this.options.putAll(options);
  106.         return this;
  107.       }
  108.       public DataStreamSink<?> sink(DataStream<RowData> input, boolean bounded) {
  109.         TableDescriptor tableDescriptor = getTableDescriptor();
  110.         return HoodiePipeline.sink(input, tableDescriptor.getTableId(), tableDescriptor.getCatalogTable(), bounded);
  111.       }
  112.       public TableDescriptor getTableDescriptor() {
  113.         EnvironmentSettings environmentSettings = EnvironmentSettings
  114.             .newInstance()
  115.             .build();
  116.         TableEnvironmentImpl tableEnv = TableEnvironmentImpl.create(environmentSettings);
  117.         String sql = getCreateHoodieTableDDL(this.tableName, this.columns, this.options, this.pk, this.partitions);
  118.         tableEnv.executeSql(sql);
  119.         String currentCatalog = tableEnv.getCurrentCatalog();
  120.         CatalogTable catalogTable = null;
  121.         String defaultDatabase = null;
  122.         try {
  123.             Catalog catalog = tableEnv.getCatalog(currentCatalog).get();
  124.             defaultDatabase = catalog.getDefaultDatabase();
  125.             catalogTable = (CatalogTable) catalog.getTable(new ObjectPath(defaultDatabase, this.tableName));
  126.         } catch (TableNotExistException e) {
  127.             throw new HoodieException("Create table " + this.tableName + " exception", e);
  128.         }
  129.         ObjectIdentifier tableId = ObjectIdentifier.of(currentCatalog, defaultDatabase, this.tableName);
  130.         return new TableDescriptor(tableId, catalogTable);
  131.       }
  132.       public DataStream<RowData> source(StreamExecutionEnvironment execEnv) {
  133.         TableDescriptor tableDescriptor = getTableDescriptor();
  134.         return HoodiePipeline.source(execEnv, tableDescriptor.tableId, tableDescriptor.getCatalogTable());
  135.       }
  136.     }
  137.     private static String getCreateHoodieTableDDL(
  138.       String tableName,
  139.       List<String> fields,
  140.       Map<String, String> options,
  141.       String pkField,
  142.       List<String> partitionField) {
  143.       StringBuilder builder = new StringBuilder();
  144.       builder.append("create table ")
  145.           .append(tableName)
  146.           .append("(\n");
  147.       for (String field : fields) {
  148.         builder.append("  ")
  149.               .append(field)
  150.               .append(",\n");
  151.       }
  152.       builder.append("  PRIMARY KEY(")
  153.           .append(pkField)
  154.           .append(") NOT ENFORCED\n")
  155.           .append(")\n");
  156.       if (!partitionField.isEmpty()) {
  157.         String partitons = partitionField
  158.             .stream()
  159.             .map(partitionName -> "`" + partitionName + "`")
  160.             .collect(Collectors.joining(","));
  161.         builder.append("PARTITIONED BY (")
  162.             .append(partitons)
  163.             .append(")\n");
  164.       }
  165.       builder.append("with ('connector' = 'hudi'");
  166.       options.forEach((k, v) -> builder
  167.           .append(",\n")
  168.           .append("  '")
  169.           .append(k)
  170.           .append("' = '")
  171.           .append(v)
  172.           .append("'"));
  173.       builder.append("\n)");
  174.       System.out.println(builder.toString());
  175.       return builder.toString();
  176.     }
  177.     /**
  178.      * Returns the data stream sink with given catalog table.
  179.      *
  180.      * @param input        The input datastream
  181.      * @param tablePath    The table path to the hoodie table in the catalog
  182.      * @param catalogTable The hoodie catalog table
  183.      * @param isBounded    A flag indicating whether the input data stream is bounded
  184.      */
  185.     private static DataStreamSink<?> sink(DataStream<RowData> input, ObjectIdentifier tablePath, CatalogTable catalogTable, boolean isBounded) {
  186.       DefaultDynamicTableContext context = new DefaultDynamicTableContext(tablePath, catalogTable,
  187.           Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false);
  188.       HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
  189.       return ((DataStreamSinkProvider) hoodieTableFactory.createDynamicTableSink(context)
  190.           .getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded)))
  191.           .consumeDataStream(input);
  192.     }
  193.     /**
  194.      * Returns the data stream source with given catalog table.
  195.      *
  196.      * @param execEnv      The execution environment
  197.      * @param tablePath    The table path to the hoodie table in the catalog
  198.      * @param catalogTable The hoodie catalog table
  199.      */
  200.     private static DataStream<RowData> source(StreamExecutionEnvironment execEnv, ObjectIdentifier tablePath, CatalogTable catalogTable) {
  201.       DefaultDynamicTableContext context = new DefaultDynamicTableContext(tablePath, catalogTable,
  202.           Configuration.fromMap(catalogTable.getOptions()), Thread.currentThread().getContextClassLoader(), false);
  203.       HoodieTableFactory hoodieTableFactory = new HoodieTableFactory();
  204.       DataStreamScanProvider dataStreamScanProvider = (DataStreamScanProvider) ((ScanTableSource) hoodieTableFactory
  205.           .createDynamicTableSource(context))
  206.           .getScanRuntimeProvider(new ScanRuntimeProviderContext());
  207.       return  dataStreamScanProvider.produceDataStream(execEnv);
  208.     }
  209.     /***
  210.      *  A POJO that contains tableId and resolvedCatalogTable.
  211.      */
  212.     public static class TableDescriptor {
  213.       private ObjectIdentifier tableId;
  214.       private CatalogTable catalogTable;
  215.       public TableDescriptor(ObjectIdentifier tableId, CatalogTable catalogTable) {
  216.           this.tableId = tableId;
  217.           this.catalogTable = catalogTable;
  218.       }
  219.       public ObjectIdentifier getTableId() {
  220.           return tableId;
  221.       }
  222.       public CatalogTable getCatalogTable() {
  223.             return catalogTable;
  224.         }
  225.     }
  226.     private static class DefaultDynamicTableContext implements DynamicTableFactory.Context {
  227.       private final ObjectIdentifier objectIdentifier;
  228.       private final CatalogTable catalogTable;
  229.       private final ReadableConfig configuration;
  230.       private final ClassLoader classLoader;
  231.       private final boolean isTemporary;
  232.       DefaultDynamicTableContext(
  233.         ObjectIdentifier objectIdentifier,
  234.         CatalogTable catalogTable,
  235.         ReadableConfig configuration,
  236.         ClassLoader classLoader,
  237.         boolean isTemporary) {
  238.         this.objectIdentifier = objectIdentifier;
  239.         this.catalogTable = catalogTable;
  240.         this.configuration = configuration;
  241.         this.classLoader = classLoader;
  242.         this.isTemporary = isTemporary;
  243.       }
  244.       @Override
  245.       public ObjectIdentifier getObjectIdentifier() {
  246.         return objectIdentifier;
  247.       }
  248.       @Override
  249.       public CatalogTable getCatalogTable() {
  250.         return catalogTable;
  251.       }
  252.       @Override
  253.       public ReadableConfig getConfiguration() {
  254.         return configuration;
  255.       }
  256.       @Override
  257.       public ClassLoader getClassLoader() {
  258.         return classLoader;
  259.       }
  260.       @Override
  261.       public boolean isTemporary() {
  262.             return isTemporary;
  263.         }
  264.     }
  265. }
复制代码
2.WriteIntoHudi.java 使用 DataStream API将数据写入Hudi。关键实现逻辑:
第一步:Demo中的数据源来自datagen connector Table。
第二步:使用toAppendStream将Table转化为Stream。
第三步:build hudi sink stream后写入Hudi。
在项目实践中也可以直接使用DataStream源写入Hudi。
  1. import org.apache.flink.streaming.api.datastream.DataStream;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.Table;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. import org.apache.flink.table.data.RowData;
  6. import org.apache.hudi.common.model.HoodieTableType;
  7. import org.apache.hudi.configuration.FlinkOptions;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. public class WriteIntoHudi {
  11.     public static void main(String[] args) throws Exception {
  12.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13.         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  14.         env.getCheckpointConfig().setCheckpointInterval(10000);
  15.         tableEnv.executeSql("CREATE TABLE datagen (\n"
  16.             + "  uuid varchar(20),\n"
  17.             + "  name varchar(10),\n"
  18.             + "  age int,\n"
  19.             + "  ts timestamp(3),\n"
  20.             + "  p varchar(20)\n"
  21.             + ") WITH (\n"
  22.             + "  'connector' = 'datagen',\n"
  23.             + "  'rows-per-second' = '5'\n"
  24.             + ")");
  25.         Table table = tableEnv.sqlQuery("SELECT * FROM datagen");
  26.         DataStream<RowData> dataStream = tableEnv.toAppendStream(table, RowData.class);
  27.         String targetTable = "hudiSinkTable";
  28.         String basePath = "hdfs://hacluster/tmp/flinkHudi/hudiTable";
  29.         Map<String, String> options = new HashMap<>();
  30.         options.put(FlinkOptions.PATH.key(), basePath);
  31.         options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
  32.         options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "ts");
  33.         options.put(FlinkOptions.INDEX_BOOTSTRAP_ENABLED.key(), "true");
  34.         HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
  35.             .column("uuid VARCHAR(20)")
  36.             .column("name VARCHAR(10)")
  37.             .column("age INT")
  38.             .column("ts TIMESTAMP(3)")
  39.             .column("p VARCHAR(20)")
  40.             .pk("uuid")
  41.             .partition("p")
  42.             .options(options);
  43.         builder.sink(dataStream, false); // The second parameter indicating whether the input data stream is bounded
  44.         env.execute("Api_Sink");
  45.     }
  46. }
复制代码
3.ReadFromHudi.java 使用 DataStream API读取Hudi数据。关键实现逻辑:
第一步:build hudi source stream读取hudi数据。
第二步:使用fromDataStream将stream转化为table。
第三步:将Hudi table的数据使用print connector打印输出。
在项目实践中也可以直接读取Hudi数据后写入sink DataStream。
  1. import org.apache.flink.streaming.api.datastream.DataStream;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.Table;
  4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  5. import org.apache.flink.table.data.RowData;
  6. import org.apache.hudi.common.model.HoodieTableType;
  7. import org.apache.hudi.configuration.FlinkOptions;
  8. import java.util.HashMap;
  9. import java.util.Map;
  10. public class ReadFromHudi {
  11.     public static void main(String[] args) throws Exception {
  12.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  13.         String targetTable = "hudiSourceTable";
  14.         String basePath = "hdfs://hacluster/tmp/flinkHudi/hudiTable";
  15.         Map<String, String> options = new HashMap<>();
  16.         options.put(FlinkOptions.PATH.key(), basePath);
  17.         options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.MERGE_ON_READ.name());
  18.         options.put(FlinkOptions.READ_AS_STREAMING.key(), "true"); // this option enable the streaming read
  19.         options.put("read.streaming.start-commit", "20210316134557"); // specifies the start commit instant time
  20.         HoodiePipeline.Builder builder = HoodiePipeline.builder(targetTable)
  21.             .column("uuid VARCHAR(20)")
  22.             .column("name VARCHAR(10)")
  23.             .column("age INT")
  24.             .column("ts TIMESTAMP(3)")
  25.             .column("p VARCHAR(20)")
  26.             .pk("uuid")
  27.             .partition("p")
  28.             .options(options);
  29.         DataStream<RowData> rowDataDataStream = builder.source(env);
  30.         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  31.         Table table = tableEnv.fromDataStream(rowDataDataStream,"uuid, name, age, ts, p");
  32.         tableEnv.registerTable("hudiSourceTable",table);
  33.         tableEnv.executeSql("CREATE TABLE print("
  34.             + "   uuid varchar(20),\n"
  35.             + "   name varchar(10),\n"
  36.             + "   age int,\n"
  37.             + "   ts timestamp(3),\n"
  38.             + "   p varchar(20)\n"
  39.             + ") WITH (\n"
  40.             + " 'connector' = 'print'\n"
  41.             + ")");
  42.         tableEnv.executeSql("insert into print select * from hudiSourceTable");
  43.         env.execute("Api_Source");
  44.     }
  45. }
复制代码
4.在项目实践中如果有解析Kafka复杂Json的需求:
1)使用FlinkSQL: https://bbs.huaweicloud.com/forum/thread-153494-1-1.html
2)使用Flink DataStream MapFunction实现。
点击关注,第一时间了解华为云新鲜技术~

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

写过一篇

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表