马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。
您需要 登录 才可以下载或查看,没有账号?立即注册
x
kafkaToKafka
- {"user_id": "1", "page_id":"1", "status": "success"}
- {"user_id": "1", "page_id":"1", "status": "success"}
- {"user_id": "1", "page_id":"1", "status": "success"}
- {"user_id": "1", "page_id":"1", "status": "success"}
- {"user_id": "1", "page_id":"1", "status": "fail"}
复制代码- package com.bigdata.day07;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- public class _08_kafka_to_kafka {
- public static void main(String[] args) {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
- tabEnv.executeSql("CREATE TABLE table1 (\n" +
- " `user_id` int,\n" +
- " `page_id` int,\n" +
- " `status` STRING\n" +
- ") WITH (\n" +
- " 'connector' = 'kafka',\n" +
- " 'topic' = 'topicA',\n" +
- " 'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
- " 'properties.group.id' = 'g1',\n" +
- " 'scan.startup.mode' = 'latest-offset',\n" +
- // 这个是需要flink-json的
- " 'format' = 'json'\n" +
- ")");
- tabEnv.executeSql("CREATE TABLE table2 (\n" +
- " `user_id` int,\n" +
- " `page_id` int,\n" +
- " `status` STRING\n" +
- ") WITH (\n" +
- " 'connector' = 'kafka',\n" +
- " 'topic' = 'topicB',\n" +
- " 'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
- " 'properties.group.id' = 'g1',\n" +
- " 'scan.startup.mode' = 'latest-offset',\n" +
- " 'format' = 'json'\n" +
- ")");
- tabEnv.executeSql("insert into table2 select * from table1 where status = 'success'");
- }
- }
- // 非常简单的代码
- 使用executeSql后,就可以不使用execute了
- 但是若有一个print ,那么还需要execute
复制代码 kafkaToMysql
- 需要先在mysql中建表
- create table t_success
- (
- id int auto_increment,
- user_id int null,
- page_id int null,
- status varchar(20) null,
- constraint t_success_pk
- primary key (id)
- );
复制代码- package com.bigdata.day07;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- public class _09_kafka_to_mysql {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
- tabEnv.executeSql("CREATE TABLE table1 (\n" +
- " `user_id` int,\n" +
- " `page_id` int,\n" +
- " `status` STRING\n" +
- ") WITH (\n" +
- " 'connector' = 'kafka',\n" +
- " 'topic' = 'topicA',\n" +
- " 'properties.bootstrap.servers' = 'bigdata01:9092',\n" +
- " 'properties.group.id' = 'g1',\n" +
- " 'scan.startup.mode' = 'latest-offset',\n" +
- " 'format' = 'json'\n" +
- ")");
- tabEnv.executeSql("CREATE TABLE table2 (\n" +
- " `user_id` int,\n" +
- " `page_id` int,\n" +
- " `status` STRING\n" +
- ") WITH (\n" +
- " 'connector' = 'jdbc',\n" +
- " 'url' = 'jdbc:mysql://localhost:3306/edu?useUnicode=true&characterEncoding=utf8',\n" +
- " 'table-name' = 't_success', \n" +
- " 'username' = 'root',\n" +
- " 'password' = 'root'\n" +
- ")");
- tabEnv.executeSql("insert into table2 select * from table1 where status = 'success'");
- }
- }
复制代码 readMysql
- package com.bigdata.day07;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.Table;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
- import org.apache.flink.types.Row;
- public class _10_read_mysql {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
- tabEnv.executeSql("CREATE TABLE table1 (\n" +
- " `user_id` int,\n" +
- " `page_id` int,\n" +
- " `status` STRING\n" +
- ") WITH (\n" +
- " 'connector' = 'jdbc',\n" +
- " 'url' = 'jdbc:mysql://localhost:3306/edu?useUnicode=true&characterEncoding=utf8',\n" +
- " 'table-name' = 't_success', \n" +
- " 'username' = 'root',\n" +
- " 'password' = 'root'\n" +
- ")");
- Table table = tabEnv.sqlQuery("select * from table1");
- DataStream<Row> appendStream = tabEnv.toAppendStream(table, Row.class);
- appendStream.print();
- env.execute();
- }
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |