ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink SQL 批模式下 ClickHouse 批量写入 [打印本页]

作者: 冬雨财经    时间: 2022-9-17 08:41
标题: Flink SQL 批模式下 ClickHouse 批量写入
Flink SQL 批模式下 ClickHouse 批量写入
内置使用JdbcBatchingOutputFormat 批量处理类
pom依赖
  1. <dependency>
  2.   <groupId>ru.yandex.clickhouse</groupId>
  3.   <artifactId>clickhouse-jdbc</artifactId>
  4.   <version>0.3.1-patch</version>
  5. </dependency>
  6. <dependency>
  7.   <groupId>org.apache.flink</groupId>
  8.   <artifactId>flink-connector-jdbc_2.11</artifactId>
  9.   <version>${flink.version}</version>
  10. </dependency>
  11. <dependency>
  12.   <groupId>cn.hutool</groupId>
  13.   <artifactId>hutool-all</artifactId>
  14.   <version>${hutool.version}</version>
  15. </dependency>
  16. <dependency>
  17.   <groupId>mysql</groupId>
  18.   <artifactId>mysql-connector-java</artifactId>
  19.   <version>${mysql.version}</version>
  20. </dependency>
复制代码
clickHouse数据源需要的扩展类:

工厂类

[code]public class ClickHouseDynamicTableFactory implements DynamicTableSinkFactory {    public static final String IDENTIFIER = "clickhouse";    private static final String DRIVER_NAME = "ru.yandex.clickhouse.ClickHouseDriver";    public static final ConfigOption URL = ConfigOptions            .key("url")            .stringType()            .noDefaultValue()            .withDescription("the jdbc database url.");    public static final ConfigOption TABLE_NAME = ConfigOptions            .key("table-name")            .stringType()            .noDefaultValue()            .withDescription("the jdbc table name.");    public static final ConfigOption USERNAME = ConfigOptions            .key("username")            .stringType()            .noDefaultValue()            .withDescription("the jdbc user name.");    public static final ConfigOption PASSWORD = ConfigOptions            .key("password")            .stringType()            .noDefaultValue()            .withDescription("the jdbc password.");    public static final ConfigOption FORMAT = ConfigOptions            .key("format")            .stringType()            .noDefaultValue()            .withDescription("the format.");    @Override    public String factoryIdentifier() {        return IDENTIFIER;    }    @Override    public Set> requiredOptions = new HashSet();        requiredOptions.add(TABLE_NAME);        requiredOptions.add(URL);        return requiredOptions;    }    @Override    public Set




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4