drop table if exists students_mysql_source;CREATE TABLE if not exists students_mysql_source ( `id` BIGINT, `name` STRING, `age` INT, `gender` STRING, `clazz` STRING, PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/bigdata30?useSSL=false', 'table-name' = 'students', 'username' = 'root', 'password' = '123456');-- 实行查询select * from students_mysql_source;-- 将模式换成tableau 看结果变化的全过程SET 'sql-client.execution.result-mode' = 'tableau';
-- 默认会以 流处置惩罚的方式 实行,所以可以看到结果连续变化的过程select gender,count(*) as cnt from students_mysql_source group by gender;-- 将运行模式切换成批处置惩罚SET 'execution.runtime-mode' = 'batch';
-- 再试实行,只会看到最终的一个结果,没有变化的过程(这是与流处置惩罚的区别之处)select gender,count(*) as cnt from students_mysql_source group by gender;
复制代码
Sink
从Kafka接收无界流的学生数据,统计班级人数,将最终的结果写入MySQL
-- 创建MySQL的结果表-- 查询库中已有表的建表语句show create table xxx;-- 无主键的MySQL建表语句-- 最终发现写入的结果是有连续变更的过程,并不是直接写入最终的结果drop table if exists `clazz_cnt`;CREATE TABLE if not exists `clazz_cnt`( `clazz` varchar(255) DEFAULT NULL ,`cnt` bigint DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- 将班级设置为主键-- 最终写入的结果是可以通过主键进行更新,所以可以展示最终的结果,而且可以实时更新drop table if exists `clazz_cnt`;CREATE TABLE if not exists `clazz_cnt`( `clazz` varchar(255) NOT NULL, `cnt` bigint(20) DEFAULT NULL, PRIMARY KEY (`clazz`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- 创建MySQL的Sink表drop table if exists clazz_cnt_mysql_sink;CREATE TABLE if not exists clazz_cnt_mysql_sink ( `clazz` STRING, `cnt` BIGINT, -- 假如查询的结果有更新,则必要设置主键 PRIMARY KEY (clazz) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://master:3306/bigdata30?useSSL=false', 'table-name' = 'clazz_cnt', 'username' = 'root', 'password' = '123456');-- 记得将实行模式切换成流处置惩罚,因为Kafka是无界流SET 'execution.runtime-mode' = 'streaming';
-- 实行查询:实时统计班级人数,将结果写入MySQLinsert into clazz_cnt_mysql_sinkselect clazz,count(*) as cnt from students_kafka_source where clazz is not null group by clazz;