农民 发表于 2024-12-8 10:07:25

通过flinkSql将kafka和mysql连接

kafkaToKafka

{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "success"}
{"user_id": "1", "page_id":"1", "status": "fail"} package com.bigdata.day07;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class _08_kafka_to_kafka {
    public static void main(String[] args) {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
      tabEnv.executeSql("CREATE TABLE table1 (\n" +
                "`user_id` int,\n" +
                "`page_id` int,\n" +
                "`status` STRING\n" +
                ") WITH (\n" +
                "'connector' = 'kafka',\n" +
                "'topic' = 'topicA',\n" +
                "'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
                "'properties.group.id' = 'g1',\n" +
                "'scan.startup.mode' = 'latest-offset',\n" +
                // 这个是需要flink-json的
                "'format' = 'json'\n" +
                ")");
      tabEnv.executeSql("CREATE TABLE table2 (\n" +
                "`user_id` int,\n" +
                "`page_id` int,\n" +
                "`status` STRING\n" +
                ") WITH (\n" +
                "'connector' = 'kafka',\n" +
                "'topic' = 'topicB',\n" +
                "'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
                "'properties.group.id' = 'g1',\n" +
                "'scan.startup.mode' = 'latest-offset',\n" +
                "'format' = 'json'\n" +
                ")");

      tabEnv.executeSql("insert into table2 select * from table1 where status = 'success'");

    }
}
// 非常简单的代码

使用executeSql后,就可以不使用execute了
但是若有一个print ,那么还需要execute kafkaToMysql

需要先在mysql中建表
create table t_success
(
    id      int auto_increment,
    user_id int         null,
    page_id int         null,
    statusvarchar(20) null,
    constraint t_success_pk
      primary key (id)
); package com.bigdata.day07;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class _09_kafka_to_mysql {
    public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
      tabEnv.executeSql("CREATE TABLE table1 (\n" +
                "`user_id` int,\n" +
                "`page_id` int,\n" +
                "`status` STRING\n" +
                ") WITH (\n" +
                "'connector' = 'kafka',\n" +
                "'topic' = 'topicA',\n" +
                "'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
                "'properties.group.id' = 'g1',\n" +
                "'scan.startup.mode' = 'latest-offset',\n" +
                "'format' = 'json'\n" +
                ")");
      tabEnv.executeSql("CREATE TABLE table2 (\n" +
                "`user_id` int,\n" +
                "`page_id` int,\n" +
                "`status` STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:mysql://localhost:3306/edu?useUnicode=true&characterEncoding=utf8',\n" +
                "    'table-name' = 't_success', \n" +
                "    'username' = 'root',\n" +
                "    'password' = 'root'\n" +
                ")");

      tabEnv.executeSql("insert into table2 select * from table1 where status = 'success'");


    }
} readMysql

package com.bigdata.day07;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class _10_read_mysql {
    public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
      tabEnv.executeSql("CREATE TABLE table1 (\n" +
                "`user_id` int,\n" +
                "`page_id` int,\n" +
                "`status` STRING\n" +
                ") WITH (\n" +
                "    'connector' = 'jdbc',\n" +
                "    'url' = 'jdbc:mysql://localhost:3306/edu?useUnicode=true&characterEncoding=utf8',\n" +
                "    'table-name' = 't_success', \n" +
                "    'username' = 'root',\n" +
                "    'password' = 'root'\n" +
                ")");

      Table table = tabEnv.sqlQuery("select * from table1");

      DataStream<Row> appendStream = tabEnv.toAppendStream(table, Row.class);

      appendStream.print();
      env.execute();
    }
}

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: 通过flinkSql将kafka和mysql连接