ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Flink SQL 批模式下 ClickHouse 批量写入
[打印本页]
作者:
冬雨财经
时间:
2022-9-17 08:41
标题:
Flink SQL 批模式下 ClickHouse 批量写入
Flink SQL 批模式下 ClickHouse 批量写入
内置使用JdbcBatchingOutputFormat 批量处理类
pom依赖
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.1-patch</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
复制代码
clickHouse数据源需要的扩展类:
工厂类
[code]public class ClickHouseDynamicTableFactory implements DynamicTableSinkFactory { public static final String IDENTIFIER = "clickhouse"; private static final String DRIVER_NAME = "ru.yandex.clickhouse.ClickHouseDriver"; public static final ConfigOption URL = ConfigOptions .key("url") .stringType() .noDefaultValue() .withDescription("the jdbc database url."); public static final ConfigOption TABLE_NAME = ConfigOptions .key("table-name") .stringType() .noDefaultValue() .withDescription("the jdbc table name."); public static final ConfigOption USERNAME = ConfigOptions .key("username") .stringType() .noDefaultValue() .withDescription("the jdbc user name."); public static final ConfigOption PASSWORD = ConfigOptions .key("password") .stringType() .noDefaultValue() .withDescription("the jdbc password."); public static final ConfigOption FORMAT = ConfigOptions .key("format") .stringType() .noDefaultValue() .withDescription("the format."); @Override public String factoryIdentifier() { return IDENTIFIER; } @Override public Set> requiredOptions = new HashSet(); requiredOptions.add(TABLE_NAME); requiredOptions.add(URL); return requiredOptions; } @Override public Set
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4