版本分析
Flink和kafka的版本号有一定的匹配关系,操作乐成的版本:
- Flink1.17.1
- kafka_2.12-3.3.1
添加kafka连接器依靠
将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下
下载flink-sql-connector-kafka连接器jar包
- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1
复制代码
上传到flink的lib目录下
- [hadoop@node2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib
复制代码 分发flink-connector-kafka-1.17.1.jar
- xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar
复制代码 启动yarn-session
- [hadoop@node2 ~]$ myhadoop.sh start
- [hadoop@node2 ~]$ yarn-session.sh -d
复制代码 启动kafka集群
- [hadoop@node2 ~]$ zk.sh start
- [hadoop@node2 ~]$ kf.sh start
复制代码 创建kafka主题
- 查看主题
- [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
-
- 如果没有ws1,则创建
- [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1
-
复制代码 普通Kafka表
'connector' = 'kafka'
进入Flink SQL客户端
- [hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session
- ...
- 省略若干日志输出
- ...
- Flink SQL>
复制代码
创建Kafka的映射表
- CREATE TABLE t1(
- `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
- --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
- `partition` BIGINT METADATA VIRTUAL,
- `offset` BIGINT METADATA VIRTUAL,
- id int,
- ts bigint ,
- vc int )
- WITH (
- 'connector' = 'kafka',
- 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
- 'properties.group.id' = 'test',
- -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
- 'scan.startup.mode' = 'earliest-offset',
- -- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
- 'sink.partitioner' = 'fixed',
- 'topic' = 'ws1',
- 'format' = 'json'
- );
复制代码 可以往kafka读数据,也可以往kafka写数据。
插入数据到Kafka表
假如没有source表,先创建source表,假如source表存在则不需要再创建。
- CREATE TABLE source (
- id INT,
- ts BIGINT,
- vc INT
- ) WITH (
- 'connector' = 'datagen',
- 'rows-per-second'='1',
- 'fields.id.kind'='random',
- 'fields.id.min'='1',
- 'fields.id.max'='10',
- 'fields.ts.kind'='sequence',
- 'fields.ts.start'='1',
- 'fields.ts.end'='1000000',
- 'fields.vc.kind'='random',
- 'fields.vc.min'='1',
- 'fields.vc.max'='100'
- );
复制代码 把source表插入t1表
- insert into t1(id,ts,vc) select * from source;
复制代码 假如报错
- [ERROR] Could not execute SQL statement. Reason:
- java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
复制代码 依然同样错误,还不可,把kafka libs目录下的kafka-clients-3.3.1.jar,把jar包发到Flink的lib目录,同时也留意重启sql-client、yarn-session也要重启(重要)
- cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib
复制代码 查看是否复制乐成
重启sql-client重新操作,乐成如下:
- Flink SQL> CREATE TABLE t1(
- > `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
- > --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
- > `partition` BIGINT METADATA VIRTUAL,
- > `offset` BIGINT METADATA VIRTUAL,
- > id int,
- > ts bigint ,
- > vc int )
- > WITH (
- > 'connector' = 'kafka',
- > 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
- > 'properties.group.id' = 'test',
- > -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
- > 'scan.startup.mode' = 'earliest-offset',
- > -- fixed为flink实现的分区器,一个并��度只写往kafka一个分区
- > 'sink.partitioner' = 'fixed',
- > 'topic' = 'ws1',
- > 'format' = 'json'
- > );
- [INFO] Execute statement succeed.
-
- Flink SQL> CREATE TABLE source (
- > id INT,
- > ts BIGINT,
- > vc INT
- > ) WITH (
- > 'connector' = 'datagen',
- > 'rows-per-second'='1',
- > 'fields.id.kind'='random',
- > 'fields.id.min'='1',
- > 'fields.id.max'='10',
- > 'fields.ts.kind'='sequence',
- > 'fields.ts.start'='1',
- > 'fields.ts.end'='1000000',
- > 'fields.vc.kind'='random',
- > 'fields.vc.min'='1',
- > 'fields.vc.max'='100'
- > );
- [INFO] Execute statement succeed.
-
- Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
- 2024-06-14 10:45:30,673 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032
- 2024-06-14 10:45:31,027 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
- 2024-06-14 10:45:31,227 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node3:41749 of application 'application_1718331886020_0001'.
- insert into t1(id,ts,vc) select * from source;
- [INFO] Submitting SQL update statement to the cluster...
- [INFO] SQL update statement has been successfully submitted to the cluster:
- Job ID: b1765f969c3ae637bd4c8100efbb0c4e
-
复制代码
查询Kafka表
报错
- [ERROR] Could not execute SQL statement. Reason:
- java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord
-
复制代码 重启yarn session,重新操作,乐成如下:
- Flink SQL> CREATE TABLE t1(
- > `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
- > --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
- > `partition` BIGINT METADATA VIRTUAL,
- > `offset` BIGINT METADATA VIRTUAL,
- > id int,
- > ts bigint ,
- > vc int )
- > WITH (
- > 'connector' = 'kafka',
- > 'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
- > 'properties.group.id' = 'test',
- > -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
- > 'scan.startup.mode' = 'earliest-offset',
- > -- fixed为flink实现的分区器,一个并??度只写往kafka一个分区
- > 'sink.partitioner' = 'fixed',
- > 'topic' = 'ws1',
- > 'format' = 'json'
- > );
- [INFO] Execute statement succeed.
-
- Flink SQL> CREATE TABLE source (
- > id INT,
- > ts BIGINT,
- > vc INT
- > ) WITH (
- > 'connector' = 'datagen',
- > 'rows-per-second'='1',
- > 'fields.id.kind'='random',
- > 'fields.id.min'='1',
- > 'fields.id.max'='10',
- > 'fields.ts.kind'='sequence',
- > 'fields.ts.start'='1',
- > 'fields.ts.end'='1000000',
- > 'fields.vc.kind'='random',
- > 'fields.vc.min'='1',
- > 'fields.vc.max'='100'
- > );
- [INFO] Execute statement succeed.
-
- Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
- 2024-06-14 11:22:18,422 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032
- 2024-06-14 11:22:18,895 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
- 2024-06-14 11:22:19,052 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
- insert into t1(id,ts,vc) select * from source;
- [INFO] Submitting SQL update statement to the cluster...
- [INFO] SQL update statement has been successfully submitted to the cluster:
- Job ID: 84292f84d1fce4756ccd8ae294b6163a
-
-
- Flink SQL> select * from t1;2024-06-14 11:23:38,338 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
- 2024-06-14 11:23:38,606 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at node3/192.168.193.143:8032
- 2024-06-14 11:23:38,617 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
- 2024-06-14 11:23:38,649 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
- select * from t1;
- [INFO] Result retrieval cancelled.
-
- Flink SQL>
-
复制代码
upsert-kafka表
'connector' = 'upsert-kafka'
假如当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。
创建upsert-kafka的映射表(必须界说主键)
- CREATE TABLE t2(
- id int ,
- sumVC int ,
- primary key (id) NOT ENFORCED
- )
- WITH (
- 'connector' = 'upsert-kafka',
- 'properties.bootstrap.servers' = 'node2:9092',
- 'topic' = 'ws2',
- 'key.format' = 'json',
- 'value.format' = 'json'
- );
复制代码 假如没有kafka名为ws2的topic,将自动被创建。
插入upsert-kafka表
- insert into t2 select id,sum(vc) sumVC from source group by id;
复制代码 查询upsert-kafka表
upsert-kafka 无法从指定的偏移量读取,只会从主题的源读取。如此,才知道整个数据的更新过程。并且通过 -U,+U,+I 等符号来显示数据的变化过程。
设置显示模式
- SET sql-client.execution.result-mode=tableau;
复制代码 查询t2表数据
假如发现没有输出数据,原因是之前的source表已经生成到end(1000000)就不再生成数据了。
进入Flink Web UI,cancel掉所有running job,重新操作乐成如下:
删除表
- Flink SQL> show tables;
- +------------+
- | table name |
- +------------+
- | source |
- | t1 |
- | t2 |
- +------------+
- 3 rows in set
-
- Flink SQL> drop table source;
- Flink SQL> drop table t1;
- Flink SQL> drop table t2;
复制代码 创建表
- CREATE TABLE source (
- id INT,
- ts BIGINT,
- vc INT
- ) WITH (
- 'connector' = 'datagen',
- 'rows-per-second'='1',
- 'fields.id.kind'='random',
- 'fields.id.min'='1',
- 'fields.id.max'='10',
- 'fields.ts.kind'='sequence',
- 'fields.ts.start'='1',
- 'fields.ts.end'='1000000',
- 'fields.vc.kind'='random',
- 'fields.vc.min'='1',
- 'fields.vc.max'='100'
- );
复制代码- CREATE TABLE t2(
- id int ,
- sumVC int ,
- primary key (id) NOT ENFORCED
- )
- WITH (
- 'connector' = 'upsert-kafka',
- 'properties.bootstrap.servers' = 'node2:9092',
- 'topic' = 'ws2',
- 'key.format' = 'json',
- 'value.format' = 'json'
- );
复制代码
设置显示模式
- SET sql-client.execution.result-mode=tableau;
复制代码 查询表
完成!enjoy it!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |