Flink 1.17.2 版本用 java 读取 starrocks

王柳  金牌会员 | 2025-3-16 09:12:07 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 991|帖子 991|积分 2973

  在 Flink 1.17.2 中使用 Java 读取 StarRocks 数据,可以通过 JDBC 连接器 或 StarRocks 官方提供的 Flink Connector 实现。以下是两种方法的具体步骤:
方法一:使用 Flink JDBC 连接器(兼容 MySQL 协议)

  StarRocks 兼容 MySQL 协议,可通过 Flink 的 JDBC 连接器读取数据。在 pom.xml 中添加以下依靠:
  1. <!-- Flink JDBC 连接器 -->
  2. <dependency>
  3.     <groupId>org.apache.flink</groupId>
  4.     <artifactId>flink-connector-jdbc</artifactId>
  5.     <version>3.1.1-1.17</version>
  6. </dependency>
  7. <!-- MySQL 驱动(兼容 StarRocks) -->
  8. <dependency>
  9.     <groupId>mysql</groupId>
  10.     <artifactId>mysql-connector-java</artifactId>
  11.     <version>8.0.28</version>
  12. </dependency>
复制代码
  编写 Java 代码:
  1. import org.apache.flink.api.common.typeinfo.TypeInformation;
  2. import org.apache.flink.api.common.typeinfo.Types;
  3. import org.apache.flink.api.java.DataSet;
  4. import org.apache.flink.api.java.ExecutionEnvironment;
  5. import org.apache.flink.api.java.typeutils.RowTypeInfo;
  6. import org.apache.flink.connector.jdbc.JdbcInputFormat;
  7. import org.apache.flink.types.Row;
  8. public class ReadStarRocksJDBC {
  9.     public static void main(String[] args) throws Exception {
  10.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  11.         TypeInformation[] fieldTypes = {
  12.                 Types.STRING,
  13.                 Types.STRING,
  14.                 Types.INT
  15.         };
  16.         RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
  17.         JdbcInputFormat inputFormat = JdbcInputFormat.buildJdbcInputFormat()
  18.                 .setDrivername("com.mysql.cj.jdbc.Driver")
  19.                 .setDBUrl("jdbc:mysql://<starrocks-fe-host>:<port>/<database>")
  20.                 .setUsername("<username>")
  21.                 .setPassword("<password>")
  22.                 .setQuery("SELECT teacher_id, student_id, student_num FROM dwd_student = limit 10")
  23.                 .setRowTypeInfo(rowTypeInfo)
  24.                 .finish();
  25.         DataSet<Row> dataSet = env.createInput(inputFormat);
  26.         dataSet.print();
  27.     }
  28. }
复制代码
  输出:
  1. +I[teacher03, abc01, 2]
  2. +I[teacher01, abc01, 3]
复制代码
方法二:使用 StarRocks Flink Connector(保举)

参考:使用 Flink Connector 读取数据
  StarRocks 提供官方 Connector,支持高效读写。在 pom.xml 中添加以下依靠:
  1. <dependency>
  2.     <groupId>com.starrocks.connector</groupId>
  3.     <artifactId>flink-connector-starrocks</artifactId>
  4.     <version>1.2.9_flink-1.17</version>
  5. </dependency>
复制代码
2.1 通过 Flink SQL 直接注册 StarRocks 表:

  1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  2. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  3. public class ReadStarRocksSQL {
  4.     public static void main(String[] args) throws Exception {
  5.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  6.         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
  7.         String createTableSQL = "CREATE TABLE starrocks_table (\n" +
  8.                 "  id INT,\n" +
  9.                 "  name STRING,\n" +
  10.                 "  score INT\n" +
  11.                 ") WITH (\n" +
  12.                 "  'connector' = 'starrocks',\n" +
  13.                 "  'jdbc-url' = 'jdbc:mysql://192.168.101.xx:9030',\n" +
  14.                 "  'scan-url' = '192.168.101.xx:8030',\n" +
  15.                 "  'database-name' = 'mydatabase',\n" +
  16.                 "  'table-name' = 'table1',\n" +
  17.                 "  'username' = 'root',\n" +
  18.                 "  'password' = ''\n" +
  19.                 ")";
  20.         tableEnv.executeSql(createTableSQL);
  21.         tableEnv.executeSql("SELECT * FROM starrocks_table").print();
  22.     }
  23. }
复制代码
  输出:
  1. +----+-------------+--------------------------------+-------------+
  2. | op |          id |                           name |       score |
  3. +----+-------------+--------------------------------+-------------+
  4. | +I |           4 |                          Julia |          25 |
  5. | +I |           2 |                           Rose |          23 |
  6. | +I |           3 |                          Alice |          24 |
  7. | +I |           1 |                           Lily |          23 |
  8. +----+-------------+--------------------------------+-------------+
  9. 4 rows in set
复制代码
  备注:StarRocks 表 table1 结构
  1. CREATE TABLE `table1` (
  2.   `id` int(11) NOT NULL COMMENT "用户 ID",
  3.   `name` varchar(65533) NULL COMMENT "用户姓名",
  4.   `score` int(11) NOT NULL COMMENT "用户得分"
  5. ) ENGINE=OLAP
  6. PRIMARY KEY(`id`)
  7. DISTRIBUTED BY HASH(`id`)
  8. PROPERTIES (
  9. "compression" = "LZ4",
  10. "enable_persistent_index" = "true",
  11. "fast_schema_evolution" = "false",
  12. "replicated_storage" = "true",
  13. "replication_num" = "1"
  14. );
  15. INSERT INTO mydatabase.table1(id, name, score) VALUES(3, 'Alice', 24);
  16. INSERT INTO mydatabase.table1(id, name, score) VALUES(2, 'Rose', 23);
  17. INSERT INTO mydatabase.table1(id, name, score) VALUES(4, 'Julia', 25);
  18. INSERT INTO mydatabase.table1(id, name, score) VALUES(1, 'Lily', 23);
复制代码
注:但这遇到了一个问题就是这个映射关系必须和原表逐一对应,那假如原表有一百多个字段的话这里还得映射这么多字段感觉很不方便,在网上查了能不能只映射部门字段,但并没有乐成。
2.2 使用 Flink DataStream 读取数据

  在 pom.xml 文件中添加依靠,如下所示:
  1.         <dependency>
  2.             <groupId>org.apache.flink</groupId>
  3.             <artifactId>flink-table-common</artifactId>
  4.             <version>1.17.2</version>
  5.         </dependency>
  6.         <dependency>
  7.             <groupId>org.apache.flink</groupId>
  8.             <artifactId>flink-clients</artifactId>
  9.             <version>1.17.2</version>
  10.         </dependency>
  11.         <dependency>
  12.             <groupId>com.starrocks</groupId>
  13.             <artifactId>flink-connector-starrocks</artifactId>
  14.             <version>1.2.8_flink-1.17</version>
  15.         </dependency>
复制代码
  调用 Flink Connector,读取 StarRocks 中的数据,如下所示:
  1. import com.starrocks.connector.flink.StarRocksSource;
  2. import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.table.api.DataTypes;
  5. import org.apache.flink.table.api.TableSchema;
  6.    
  7. public class StarRocksSourceApp {
  8.         public static void main(String[] args) throws Exception {
  9.             StarRocksSourceOptions options = StarRocksSourceOptions.builder()
  10.                    .withProperty("scan-url", "192.168.xxx.xxx:8030")
  11.                    .withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030")
  12.                    .withProperty("username", "root")
  13.                    .withProperty("password", "")
  14.                    .withProperty("table-name", "table1")
  15.                    .withProperty("database-name", "mydatabase")
  16.                    .build();
  17.             TableSchema tableSchema = TableSchema.builder()
  18.                    .field("id", DataTypes.INT())
  19.                    .field("name", DataTypes.STRING())
  20.                    .field("score", DataTypes.INT())
  21.                    .build();
  22.             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23.             env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print();
  24.             env.execute("StarRocks flink source");
  25.         }
  26.     }
复制代码
  输出:
  1. 11> +I(4,Julia,25)
  2. 8> +I(1,Lily,23)
  3. 16> +I(2,Rose,23)
  4. 9> +I(3,Alice,24)
复制代码

  1.         StarRocksSourceOptions options = StarRocksSourceOptions.builder()
  2.                 .withProperty("scan-url", "192.168.37.11:8030")
  3.                 .withProperty("jdbc-url", "jdbc:mysql://192.168.37.11:9030")
  4.                 .withProperty("username", "root")
  5.                 .withProperty("password", "")
  6.                 .withProperty("table-name", "table1")
  7.                 .withProperty("database-name", "mydatabase")
  8.                 .withProperty("scan.columns", "id, name")
  9.                 .withProperty("scan.filter", "id = 4 or id = 2")
  10.                 .build();
  11.         TableSchema tableSchema = TableSchema.builder()
  12.                 .field("id", DataTypes.INT())
  13.                 .field("name", DataTypes.STRING())
  14.                 .build();
  15.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16.         env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print();
  17.         env.execute("StarRocks flink source");
复制代码
  输出:
  1. 14> +I(2,Rose)
  2. 7> +I(4,Julia)
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

王柳

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表