王柳 发表于 2025-3-16 09:12:07

Flink 1.17.2 版本用 java 读取 starrocks

  在 Flink 1.17.2 中使用 Java 读取 StarRocks 数据,可以通过 JDBC 连接器 或 StarRocks 官方提供的 Flink Connector 实现。以下是两种方法的具体步骤:
方法一:使用 Flink JDBC 连接器(兼容 MySQL 协议)

  StarRocks 兼容 MySQL 协议,可通过 Flink 的 JDBC 连接器读取数据。在 pom.xml 中添加以下依靠:
<!-- Flink JDBC 连接器 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>3.1.1-1.17</version>
</dependency>
<!-- MySQL 驱动(兼容 StarRocks) -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.28</version>
</dependency>
  编写 Java 代码:
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.types.Row;

public class ReadStarRocksJDBC {
    public static void main(String[] args) throws Exception {
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      TypeInformation[] fieldTypes = {
                Types.STRING,
                Types.STRING,
                Types.INT
      };

      RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

      JdbcInputFormat inputFormat = JdbcInputFormat.buildJdbcInputFormat()
                .setDrivername("com.mysql.cj.jdbc.Driver")
                .setDBUrl("jdbc:mysql://<starrocks-fe-host>:<port>/<database>")
                .setUsername("<username>")
                .setPassword("<password>")
                .setQuery("SELECT teacher_id, student_id, student_num FROM dwd_student = limit 10")
                .setRowTypeInfo(rowTypeInfo)
                .finish();

      DataSet<Row> dataSet = env.createInput(inputFormat);
      dataSet.print();
    }
}
  输出:
+I
+I
方法二:使用 StarRocks Flink Connector(保举)

参考:使用 Flink Connector 读取数据
  StarRocks 提供官方 Connector,支持高效读写。在 pom.xml 中添加以下依靠:
<dependency>
    <groupId>com.starrocks.connector</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <version>1.2.9_flink-1.17</version>
</dependency>
2.1 通过 Flink SQL 直接注册 StarRocks 表:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class ReadStarRocksSQL {
    public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

      String createTableSQL = "CREATE TABLE starrocks_table (\n" +
                "id INT,\n" +
                "name STRING,\n" +
                "score INT\n" +
                ") WITH (\n" +
                "'connector' = 'starrocks',\n" +
                "'jdbc-url' = 'jdbc:mysql://192.168.101.xx:9030',\n" +
                "'scan-url' = '192.168.101.xx:8030',\n" +
                "'database-name' = 'mydatabase',\n" +
                "'table-name' = 'table1',\n" +
                "'username' = 'root',\n" +
                "'password' = ''\n" +
                ")";

      tableEnv.executeSql(createTableSQL);
      tableEnv.executeSql("SELECT * FROM starrocks_table").print();
    }
}
  输出:
+----+-------------+--------------------------------+-------------+
| op |          id |                           name |       score |
+----+-------------+--------------------------------+-------------+
| +I |         4 |                        Julia |          25 |
| +I |         2 |                           Rose |          23 |
| +I |         3 |                        Alice |          24 |
| +I |         1 |                           Lily |          23 |
+----+-------------+--------------------------------+-------------+
4 rows in set
  备注:StarRocks 表 table1 结构
CREATE TABLE `table1` (
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) NULL COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分"
) ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "false",
"replicated_storage" = "true",
"replication_num" = "1"
);

INSERT INTO mydatabase.table1(id, name, score) VALUES(3, 'Alice', 24);
INSERT INTO mydatabase.table1(id, name, score) VALUES(2, 'Rose', 23);
INSERT INTO mydatabase.table1(id, name, score) VALUES(4, 'Julia', 25);
INSERT INTO mydatabase.table1(id, name, score) VALUES(1, 'Lily', 23);
注:但这遇到了一个问题就是这个映射关系必须和原表逐一对应,那假如原表有一百多个字段的话这里还得映射这么多字段感觉很不方便,在网上查了能不能只映射部门字段,但并没有乐成。
2.2 使用 Flink DataStream 读取数据

  在 pom.xml 文件中添加依靠,如下所示:
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>1.17.2</version>
      </dependency>
      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.2</version>
      </dependency>
      <dependency>
            <groupId>com.starrocks</groupId>
            <artifactId>flink-connector-starrocks</artifactId>
            <version>1.2.8_flink-1.17</version>
      </dependency>
  调用 Flink Connector,读取 StarRocks 中的数据,如下所示:
import com.starrocks.connector.flink.StarRocksSource;
import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
   
public class StarRocksSourceApp {
      public static void main(String[] args) throws Exception {
            StarRocksSourceOptions options = StarRocksSourceOptions.builder()
                   .withProperty("scan-url", "192.168.xxx.xxx:8030")
                   .withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030")
                   .withProperty("username", "root")
                   .withProperty("password", "")
                   .withProperty("table-name", "table1")
                   .withProperty("database-name", "mydatabase")
                   .build();
            TableSchema tableSchema = TableSchema.builder()
                   .field("id", DataTypes.INT())
                   .field("name", DataTypes.STRING())
                   .field("score", DataTypes.INT())
                   .build();
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print();
            env.execute("StarRocks flink source");
      }

    }
  输出:
11> +I(4,Julia,25)
8> +I(1,Lily,23)
16> +I(2,Rose,23)
9> +I(3,Alice,24)
https://i-blog.csdnimg.cn/direct/5095864e4d1d4acbaa8fce7f01990627.png
      StarRocksSourceOptions options = StarRocksSourceOptions.builder()
                .withProperty("scan-url", "192.168.37.11:8030")
                .withProperty("jdbc-url", "jdbc:mysql://192.168.37.11:9030")
                .withProperty("username", "root")
                .withProperty("password", "")
                .withProperty("table-name", "table1")
                .withProperty("database-name", "mydatabase")
                .withProperty("scan.columns", "id, name")
                .withProperty("scan.filter", "id = 4 or id = 2")
                .build();
      TableSchema tableSchema = TableSchema.builder()
                .field("id", DataTypes.INT())
                .field("name", DataTypes.STRING())
                .build();
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print();
      env.execute("StarRocks flink source");
  输出:
14> +I(2,Rose)
7> +I(4,Julia)

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Flink 1.17.2 版本用 java 读取 starrocks