Flink SQL 批模式下 ClickHouse 批量写入

打印 上一主题 下一主题

主题 842|帖子 842|积分 2526

Flink SQL 批模式下 ClickHouse 批量写入
内置使用JdbcBatchingOutputFormat 批量处理类
pom依赖
  1. <dependency>
  2.   <groupId>ru.yandex.clickhouse</groupId>
  3.   <artifactId>clickhouse-jdbc</artifactId>
  4.   <version>0.3.1-patch</version>
  5. </dependency>
  6. <dependency>
  7.   <groupId>org.apache.flink</groupId>
  8.   <artifactId>flink-connector-jdbc_2.11</artifactId>
  9.   <version>${flink.version}</version>
  10. </dependency>
  11. <dependency>
  12.   <groupId>cn.hutool</groupId>
  13.   <artifactId>hutool-all</artifactId>
  14.   <version>${hutool.version}</version>
  15. </dependency>
  16. <dependency>
  17.   <groupId>mysql</groupId>
  18.   <artifactId>mysql-connector-java</artifactId>
  19.   <version>${mysql.version}</version>
  20. </dependency>
复制代码
clickHouse数据源需要的扩展类:

工厂类

[code]public class ClickHouseDynamicTableFactory implements DynamicTableSinkFactory {    public static final String IDENTIFIER = "clickhouse";    private static final String DRIVER_NAME = "ru.yandex.clickhouse.ClickHouseDriver";    public static final ConfigOption URL = ConfigOptions            .key("url")            .stringType()            .noDefaultValue()            .withDescription("the jdbc database url.");    public static final ConfigOption TABLE_NAME = ConfigOptions            .key("table-name")            .stringType()            .noDefaultValue()            .withDescription("the jdbc table name.");    public static final ConfigOption USERNAME = ConfigOptions            .key("username")            .stringType()            .noDefaultValue()            .withDescription("the jdbc user name.");    public static final ConfigOption PASSWORD = ConfigOptions            .key("password")            .stringType()            .noDefaultValue()            .withDescription("the jdbc password.");    public static final ConfigOption FORMAT = ConfigOptions            .key("format")            .stringType()            .noDefaultValue()            .withDescription("the format.");    @Override    public String factoryIdentifier() {        return IDENTIFIER;    }    @Override    public Set> requiredOptions = new HashSet();        requiredOptions.add(TABLE_NAME);        requiredOptions.add(URL);        return requiredOptions;    }    @Override    public Set
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

冬雨财经

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表