冬雨财经 发表于 2022-9-17 08:41:52

Flink SQL 批模式下 ClickHouse 批量写入

Flink SQL 批模式下 ClickHouse 批量写入
内置使用JdbcBatchingOutputFormat 批量处理类
pom依赖

<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.1-patch</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>clickHouse数据源需要的扩展类:

工厂类

public class ClickHouseDynamicTableFactory implements DynamicTableSinkFactory {    public static final String IDENTIFIER = "clickhouse";    private static final String DRIVER_NAME = "ru.yandex.clickhouse.ClickHouseDriver";    public static final ConfigOption URL = ConfigOptions            .key("url")            .stringType()            .noDefaultValue()            .withDescription("the jdbc database url.");    public static final ConfigOption TABLE_NAME = ConfigOptions            .key("table-name")            .stringType()            .noDefaultValue()            .withDescription("the jdbc table name.");    public static final ConfigOption USERNAME = ConfigOptions            .key("username")            .stringType()            .noDefaultValue()            .withDescription("the jdbc user name.");    public static final ConfigOption PASSWORD = ConfigOptions            .key("password")            .stringType()            .noDefaultValue()            .withDescription("the jdbc password.");    public static final ConfigOption FORMAT = ConfigOptions            .key("format")            .stringType()            .noDefaultValue()            .withDescription("the format.");    @Override    public String factoryIdentifier() {      return IDENTIFIER;    }    @Override    public Set> requiredOptions = new HashSet();      requiredOptions.add(TABLE_NAME);      requiredOptions.add(URL);      return requiredOptions;    }    @Override    public Set
页: [1]
查看完整版本: Flink SQL 批模式下 ClickHouse 批量写入