Flink SQL 批模式下 ClickHouse 批量写入
内置使用JdbcBatchingOutputFormat 批量处理类
pom依赖
- <dependency>
- <groupId>ru.yandex.clickhouse</groupId>
- <artifactId>clickhouse-jdbc</artifactId>
- <version>0.3.1-patch</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-jdbc_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>cn.hutool</groupId>
- <artifactId>hutool-all</artifactId>
- <version>${hutool.version}</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>${mysql.version}</version>
- </dependency>
复制代码 clickHouse数据源需要的扩展类:
工厂类
[code]public class ClickHouseDynamicTableFactory implements DynamicTableSinkFactory { public static final String IDENTIFIER = "clickhouse"; private static final String DRIVER_NAME = "ru.yandex.clickhouse.ClickHouseDriver"; public static final ConfigOption URL = ConfigOptions .key("url") .stringType() .noDefaultValue() .withDescription("the jdbc database url."); public static final ConfigOption TABLE_NAME = ConfigOptions .key("table-name") .stringType() .noDefaultValue() .withDescription("the jdbc table name."); public static final ConfigOption USERNAME = ConfigOptions .key("username") .stringType() .noDefaultValue() .withDescription("the jdbc user name."); public static final ConfigOption PASSWORD = ConfigOptions .key("password") .stringType() .noDefaultValue() .withDescription("the jdbc password."); public static final ConfigOption FORMAT = ConfigOptions .key("format") .stringType() .noDefaultValue() .withDescription("the format."); @Override public String factoryIdentifier() { return IDENTIFIER; } @Override public Set> requiredOptions = new HashSet(); requiredOptions.add(TABLE_NAME); requiredOptions.add(URL); return requiredOptions; } @Override public Set |