通过flinkSql将kafka和mysql连接

农民  论坛元老 | 2024-12-8 10:07:25 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1025|帖子 1025|积分 3075

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
kafkaToKafka

  1. {"user_id": "1", "page_id":"1", "status": "success"}
  2. {"user_id": "1", "page_id":"1", "status": "success"}
  3. {"user_id": "1", "page_id":"1", "status": "success"}
  4. {"user_id": "1", "page_id":"1", "status": "success"}
  5. {"user_id": "1", "page_id":"1", "status": "fail"}
复制代码
  1. package com.bigdata.day07;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  4. public class _08_kafka_to_kafka {
  5.     public static void main(String[] args) {
  6.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  7.         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
  8.         tabEnv.executeSql("CREATE TABLE table1 (\n" +
  9.                 "  `user_id` int,\n" +
  10.                 "  `page_id` int,\n" +
  11.                 "  `status` STRING\n" +
  12.                 ") WITH (\n" +
  13.                 "  'connector' = 'kafka',\n" +
  14.                 "  'topic' = 'topicA',\n" +
  15.                 "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
  16.                 "  'properties.group.id' = 'g1',\n" +
  17.                 "  'scan.startup.mode' = 'latest-offset',\n" +
  18.                 // 这个是需要flink-json的
  19.                 "  'format' = 'json'\n" +
  20.                 ")");
  21.         tabEnv.executeSql("CREATE TABLE table2 (\n" +
  22.                 "  `user_id` int,\n" +
  23.                 "  `page_id` int,\n" +
  24.                 "  `status` STRING\n" +
  25.                 ") WITH (\n" +
  26.                 "  'connector' = 'kafka',\n" +
  27.                 "  'topic' = 'topicB',\n" +
  28.                 "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
  29.                 "  'properties.group.id' = 'g1',\n" +
  30.                 "  'scan.startup.mode' = 'latest-offset',\n" +
  31.                 "  'format' = 'json'\n" +
  32.                 ")");
  33.         tabEnv.executeSql("insert into table2 select * from table1 where status = 'success'");
  34.     }
  35. }
  36. // 非常简单的代码
  37. 使用executeSql后,就可以不使用execute了
  38. 但是若有一个print ,那么还需要execute
复制代码
kafkaToMysql

  1. 需要先在mysql中建表
  2. create table t_success
  3. (
  4.     id      int auto_increment,
  5.     user_id int         null,
  6.     page_id int         null,
  7.     status  varchar(20) null,
  8.     constraint t_success_pk
  9.         primary key (id)
  10. );
复制代码
  1. package com.bigdata.day07;
  2. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  3. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  4. public class _09_kafka_to_mysql {
  5.     public static void main(String[] args) throws Exception {
  6.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  7.         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
  8.         tabEnv.executeSql("CREATE TABLE table1 (\n" +
  9.                 "  `user_id` int,\n" +
  10.                 "  `page_id` int,\n" +
  11.                 "  `status` STRING\n" +
  12.                 ") WITH (\n" +
  13.                 "  'connector' = 'kafka',\n" +
  14.                 "  'topic' = 'topicA',\n" +
  15.                 "  'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
  16.                 "  'properties.group.id' = 'g1',\n" +
  17.                 "  'scan.startup.mode' = 'latest-offset',\n" +
  18.                 "  'format' = 'json'\n" +
  19.                 ")");
  20.         tabEnv.executeSql("CREATE TABLE table2 (\n" +
  21.                 "  `user_id` int,\n" +
  22.                 "  `page_id` int,\n" +
  23.                 "  `status` STRING\n" +
  24.                 ") WITH (\n" +
  25.                 "    'connector' = 'jdbc',\n" +
  26.                 "    'url' = 'jdbc:mysql://localhost:3306/edu?useUnicode=true&characterEncoding=utf8',\n" +
  27.                 "    'table-name' = 't_success', \n" +
  28.                 "    'username' = 'root',\n" +
  29.                 "    'password' = 'root'\n" +
  30.                 ")");
  31.         tabEnv.executeSql("insert into table2 select * from table1 where status = 'success'");
  32.     }
  33. }
复制代码
readMysql

  1. package com.bigdata.day07;
  2. import org.apache.flink.streaming.api.datastream.DataStream;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.table.api.Table;
  5. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
  6. import org.apache.flink.types.Row;
  7. public class _10_read_mysql {
  8.     public static void main(String[] args) throws Exception {
  9.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  10.         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
  11.         tabEnv.executeSql("CREATE TABLE table1 (\n" +
  12.                 "  `user_id` int,\n" +
  13.                 "  `page_id` int,\n" +
  14.                 "  `status` STRING\n" +
  15.                 ") WITH (\n" +
  16.                 "    'connector' = 'jdbc',\n" +
  17.                 "    'url' = 'jdbc:mysql://localhost:3306/edu?useUnicode=true&characterEncoding=utf8',\n" +
  18.                 "    'table-name' = 't_success', \n" +
  19.                 "    'username' = 'root',\n" +
  20.                 "    'password' = 'root'\n" +
  21.                 ")");
  22.         Table table = tabEnv.sqlQuery("select * from table1");
  23.         DataStream<Row> appendStream = tabEnv.toAppendStream(table, Row.class);
  24.         appendStream.print();
  25.         env.execute();
  26.     }
  27. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

农民

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表