ToB企服应用市场:ToB评测及商务社交产业平台

标题: Flink 中 JDBC Connector 使用详解 [打印本页]

作者: 梦应逍遥    时间: 2024-12-4 18:40
标题: Flink 中 JDBC Connector 使用详解
1. 配景

在实时盘算或离线任务中,往往需要与关系型数据库交互,例如 MySQL、PostgreSQL 等。Apache Flink 提供了 JDBC Connector,可以方便地将流式数据写入或读取数据库。
本文将先容 Flink JDBC Connector 的底子用法、设置方法以及注意事项,帮助开发者更好地集成数据库操纵。

2. JDBC Connector 的底子概念

JDBC Connector 是 Flink 官方提供的一个用于连接关系型数据库的工具包,支持:

使用 JDBC Connector 可以实现对数据库的实时写入,也可以用作批量操纵的工具。

3. Maven 依赖

在项目中添加 Flink JDBC 依赖:
  1. <dependency>
  2.     <groupId>org.apache.flink</groupId>
  3.     <artifactId>flink-connector-jdbc_2.12</artifactId>
  4.     <version>1.17.0</version> <!-- 根据实际使用的 Flink 版本调整 -->
  5. </dependency>
复制代码
如果使用 MySQL 数据库,还需添加 MySQL 驱动:
  1. <dependency>
  2.     <groupId>mysql</groupId>
  3.     <artifactId>mysql-connector-java</artifactId>
  4.     <version>8.0.33</version> <!-- MySQL 驱动版本 -->
  5. </dependency>
复制代码

4. JDBC Connector 的使用

4.1 写入数据库(Sink)

以下是一个将流式数据写入 MySQL 的示例:
  1. import org.apache.flink.api.common.typeinfo.Types;
  2. import org.apache.flink.api.java.tuple.Tuple2;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.connector.jdbc.JdbcSink;
  5. public class JdbcSinkExample {
  6.     public static void main(String[] args) throws Exception {
  7.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8.         // 模拟输入数据
  9.         env.fromElements(
  10.                 Tuple2.of(1, "Alice"),
  11.                 Tuple2.of(2, "Bob"),
  12.                 Tuple2.of(3, "Charlie")
  13.         )
  14.         .addSink(JdbcSink.sink(
  15.                 "INSERT INTO users (id, name) VALUES (?, ?)", // SQL 语句
  16.                 (ps, t) -> {
  17.                     ps.setInt(1, t.f0);  // 设置第一个参数为 ID
  18.                     ps.setString(2, t.f1);  // 设置第二个参数为 Name
  19.                 },
  20.                 JdbcSink.DefaultJdbcExecutionOptions.builder()
  21.                         .withBatchSize(100) // 批量写入大小
  22.                         .build(),
  23.                 () -> JdbcSink.defaultJdbcConnectionProvider(
  24.                         "jdbc:mysql://localhost:3306/testdb", // 数据库 URL
  25.                         "root",  // 用户名
  26.                         "password" // 密码
  27.                 )
  28.         ));
  29.         env.execute("Flink JDBC Sink Example");
  30.     }
  31. }
复制代码
关键点解析


4.2 从数据库读取数据(Source)

以下是一个从 MySQL 读取数据并打印的示例:
  1. import org.apache.flink.api.java.tuple.Tuple2;
  2. import org.apache.flink.connector.jdbc.JdbcInputFormat;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.api.datastream.DataStream;
  5. public class JdbcSourceExample {
  6.     public static void main(String[] args) throws Exception {
  7.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  8.         DataStream<Tuple2<Integer, String>> sourceStream = env.createInput(
  9.             JdbcInputFormat.buildJdbcInputFormat()
  10.                 .setDrivername("com.mysql.cj.jdbc.Driver") // JDBC 驱动
  11.                 .setDBUrl("jdbc:mysql://localhost:3306/testdb") // 数据库 URL
  12.                 .setUsername("root") // 用户名
  13.                 .setPassword("password") // 密码
  14.                 .setQuery("SELECT id, name FROM users") // SQL 查询
  15.                 .setRowTypeInfo(Types.TUPLE(Types.INT, Types.STRING)) // 结果类型
  16.                 .finish()
  17.         );
  18.         sourceStream.print();
  19.         env.execute("Flink JDBC Source Example");
  20.     }
  21. }
复制代码
关键点解析


5. JDBC Connector 的设置选项

5.1 批量写入设置

通过 JdbcExecutionOptions 可调整写入计谋:

5.2 数据库连接池

Flink JDBC Connector 默认使用单个连接执行操纵。对于高并发需求,可以结合 HikariCP 等连接池框架优化性能。

6. 注意事项


7. 总结

Flink JDBC Connector 是一个简单而高效的工具,适用于实时盘算场景下与关系型数据库的交互。无论是数据写入照旧读取,都可以通过简单设置快速实现。但对于高并发和大规模数据场景,需要根据业务需求调整计谋。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4