Flink SQL kafka连接器

打印 上一主题 下一主题

主题 916|帖子 916|积分 2750

版本分析

Flink和kafka的版本号有一定的匹配关系,操作乐成的版本:


  • Flink1.17.1
  • kafka_2.12-3.3.1

添加kafka连接器依靠

将flink-sql-connector-kafka-1.17.1.jar上传到flink的lib目录下

下载flink-sql-connector-kafka连接器jar包
  1. https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka/1.17.1
复制代码


上传到flink的lib目录下
  1. [hadoop@node2 ~]$ cp flink-connector-kafka-1.17.1.jar $FLINK_HOME/lib
复制代码
分发flink-connector-kafka-1.17.1.jar
  1. xsync $FLINK_HOME/lib/flink-connector-kafka-1.17.1.jar
复制代码
启动yarn-session

  1. [hadoop@node2 ~]$ myhadoop.sh start
  2. [hadoop@node2 ~]$ yarn-session.sh -d
复制代码
启动kafka集群

  1. [hadoop@node2 ~]$ zk.sh start
  2. [hadoop@node2 ~]$ kf.sh start
复制代码
创建kafka主题

  1. 查看主题
  2. [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --list
  3. 如果没有ws1,则创建
  4. [hadoop@node2 ~]$ kafka-topics.sh --bootstrap-server node2:9092,node3:9092,node4:9092 --create --replication-factor 1 --partitions 1 --topic ws1
复制代码
普通Kafka表

'connector' = 'kafka'

进入Flink SQL客户端

  1. [hadoop@node2 ~]$ sql-client.sh embedded -s yarn-session
  2. ...
  3. 省略若干日志输出
  4. ...
  5. Flink SQL>
复制代码

创建Kafka的映射表

  1. CREATE TABLE t1(
  2.   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  3.   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
  4.   `partition` BIGINT METADATA VIRTUAL,
  5.   `offset` BIGINT METADATA VIRTUAL,
  6. id int,
  7. ts bigint ,
  8. vc int )
  9. WITH (
  10.   'connector' = 'kafka',
  11.   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
  12.   'properties.group.id' = 'test',
  13. -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
  14.   'scan.startup.mode' = 'earliest-offset',
  15.   -- fixed为flink实现的分区器,一个并行度只写往kafka一个分区
  16. 'sink.partitioner' = 'fixed',
  17.   'topic' = 'ws1',
  18.   'format' = 'json'
  19. );
复制代码
可以往kafka读数据,也可以往kafka写数据。

插入数据到Kafka表

假如没有source表,先创建source表,假如source表存在则不需要再创建。
  1. CREATE TABLE source (
  2.     id INT,
  3.     ts BIGINT,
  4.     vc INT
  5. ) WITH (
  6.     'connector' = 'datagen',
  7.     'rows-per-second'='1',
  8.     'fields.id.kind'='random',
  9.     'fields.id.min'='1',
  10.     'fields.id.max'='10',
  11.     'fields.ts.kind'='sequence',
  12.     'fields.ts.start'='1',
  13.     'fields.ts.end'='1000000',
  14.     'fields.vc.kind'='random',
  15.     'fields.vc.min'='1',
  16.     'fields.vc.max'='100'
  17. );
复制代码
把source表插入t1表
  1. insert into t1(id,ts,vc) select * from source;
复制代码
假如报错
  1. [ERROR] Could not execute SQL statement. Reason:
  2. java.lang.ClassNotFoundException: org.apache.kafka.common.serialization.ByteArraySerializer
复制代码
依然同样错误,还不可,把kafka libs目录下的kafka-clients-3.3.1.jar,把jar包发到Flink的lib目录,同时也留意重启sql-client、yarn-session也要重启(重要)
  1. cp $KAFKA_HOME/libs/kafka-clients-3.3.1.jar $FLINK_HOME/lib
复制代码
查看是否复制乐成
  1. $ ls $FLINK_HOME/lib
复制代码

重启sql-client重新操作,乐成如下:
  1. Flink SQL> CREATE TABLE t1(
  2. >   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  3. >   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
  4. >   `partition` BIGINT METADATA VIRTUAL,
  5. >   `offset` BIGINT METADATA VIRTUAL,
  6. > id int,
  7. > ts bigint ,
  8. > vc int )
  9. > WITH (
  10. >   'connector' = 'kafka',
  11. >   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
  12. >   'properties.group.id' = 'test',
  13. > -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
  14. >   'scan.startup.mode' = 'earliest-offset',
  15. >   -- fixed为flink实现的分区器,一个并��度只写往kafka一个分区
  16. > 'sink.partitioner' = 'fixed',
  17. >   'topic' = 'ws1',
  18. >   'format' = 'json'
  19. > );
  20. [INFO] Execute statement succeed.
  21. Flink SQL> CREATE TABLE source (
  22. >     id INT,
  23. >     ts BIGINT,
  24. >     vc INT
  25. > ) WITH (
  26. >     'connector' = 'datagen',
  27. >     'rows-per-second'='1',
  28. >     'fields.id.kind'='random',
  29. >     'fields.id.min'='1',
  30. >     'fields.id.max'='10',
  31. >     'fields.ts.kind'='sequence',
  32. >     'fields.ts.start'='1',
  33. >     'fields.ts.end'='1000000',
  34. >     'fields.vc.kind'='random',
  35. >     'fields.vc.min'='1',
  36. >     'fields.vc.max'='100'
  37. > );
  38. [INFO] Execute statement succeed.
  39. Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 10:45:30,125 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
  40. 2024-06-14 10:45:30,673 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
  41. 2024-06-14 10:45:31,027 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
  42. 2024-06-14 10:45:31,227 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node3:41749 of application 'application_1718331886020_0001'.
  43. insert into t1(id,ts,vc) select * from source;
  44. [INFO] Submitting SQL update statement to the cluster...
  45. [INFO] SQL update statement has been successfully submitted to the cluster:
  46. Job ID: b1765f969c3ae637bd4c8100efbb0c4e
复制代码

查询Kafka表

  1. select * from t1;
复制代码
报错
  1. [ERROR] Could not execute SQL statement. Reason:
  2. java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord
复制代码
重启yarn session,重新操作,乐成如下:
  1. Flink SQL> CREATE TABLE t1(
  2. >   `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  3. >   --列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读
  4. >   `partition` BIGINT METADATA VIRTUAL,
  5. >   `offset` BIGINT METADATA VIRTUAL,
  6. > id int,
  7. > ts bigint ,
  8. > vc int )
  9. > WITH (
  10. >   'connector' = 'kafka',
  11. >   'properties.bootstrap.servers' = 'node2:9092,node3:9092,node4:9094',
  12. >   'properties.group.id' = 'test',
  13. > -- 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'
  14. >   'scan.startup.mode' = 'earliest-offset',
  15. >   -- fixed为flink实现的分区器,一个并??度只写往kafka一个分区
  16. > 'sink.partitioner' = 'fixed',
  17. >   'topic' = 'ws1',
  18. >   'format' = 'json'
  19. > );
  20. [INFO] Execute statement succeed.
  21. Flink SQL> CREATE TABLE source (
  22. >     id INT,
  23. >     ts BIGINT,
  24. >     vc INT
  25. > ) WITH (
  26. >     'connector' = 'datagen',
  27. >     'rows-per-second'='1',
  28. >     'fields.id.kind'='random',
  29. >     'fields.id.min'='1',
  30. >     'fields.id.max'='10',
  31. >     'fields.ts.kind'='sequence',
  32. >     'fields.ts.start'='1',
  33. >     'fields.ts.end'='1000000',
  34. >     'fields.vc.kind'='random',
  35. >     'fields.vc.min'='1',
  36. >     'fields.vc.max'='100'
  37. > );
  38. [INFO] Execute statement succeed.
  39. Flink SQL> insert into t1(id,ts,vc) select * from source;2024-06-14 11:22:17,971 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
  40. 2024-06-14 11:22:18,422 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
  41. 2024-06-14 11:22:18,895 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
  42. 2024-06-14 11:22:19,052 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
  43. insert into t1(id,ts,vc) select * from source;
  44. [INFO] Submitting SQL update statement to the cluster...
  45. [INFO] SQL update statement has been successfully submitted to the cluster:
  46. Job ID: 84292f84d1fce4756ccd8ae294b6163a
  47. Flink SQL> select * from t1;2024-06-14 11:23:38,338 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/home/hadoop/soft/flink-1.17.1/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
  48. 2024-06-14 11:23:38,606 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at node3/192.168.193.143:8032
  49. 2024-06-14 11:23:38,617 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
  50. 2024-06-14 11:23:38,649 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface node4:38788 of application 'application_1718331886020_0002'.
  51. select * from t1;
  52. [INFO] Result retrieval cancelled.
  53. Flink SQL>
复制代码


 
upsert-kafka表

'connector' = 'upsert-kafka'
假如当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。
创建upsert-kafka的映射表(必须界说主键)

  1. CREATE TABLE t2(
  2.     id int ,
  3.     sumVC int ,
  4.     primary key (id) NOT ENFORCED
  5. )
  6. WITH (
  7.   'connector' = 'upsert-kafka',
  8.   'properties.bootstrap.servers' = 'node2:9092',
  9.   'topic' = 'ws2',
  10.   'key.format' = 'json',
  11.   'value.format' = 'json'
  12. );
复制代码
假如没有kafka名为ws2的topic,将自动被创建。

插入upsert-kafka表

  1. insert into t2 select id,sum(vc) sumVC  from source group by id;
复制代码
查询upsert-kafka表

upsert-kafka 无法从指定的偏移量读取,只会从主题的源读取。如此,才知道整个数据的更新过程。并且通过 -U,+U,+I 等符号来显示数据的变化过程。
设置显示模式
  1. SET sql-client.execution.result-mode=tableau;
复制代码
 查询t2表数据
  1. select * from t2;
复制代码
假如发现没有输出数据,原因是之前的source表已经生成到end(1000000)就不再生成数据了。
进入Flink Web UI,cancel掉所有running job,重新操作乐成如下:
删除表
  1. Flink SQL> show tables;
  2. +------------+
  3. | table name |
  4. +------------+
  5. |     source |
  6. |         t1 |
  7. |         t2 |
  8. +------------+
  9. 3 rows in set
  10. Flink SQL> drop table source;
  11. Flink SQL> drop table t1;
  12. Flink SQL> drop table t2;
复制代码
创建表
  1. CREATE TABLE source (
  2.    id INT,
  3.    ts BIGINT,
  4.    vc INT
  5. ) WITH (
  6.    'connector' = 'datagen',
  7.    'rows-per-second'='1',
  8.    'fields.id.kind'='random',
  9.    'fields.id.min'='1',
  10.    'fields.id.max'='10',
  11.    'fields.ts.kind'='sequence',
  12.    'fields.ts.start'='1',
  13.    'fields.ts.end'='1000000',
  14.    'fields.vc.kind'='random',
  15.    'fields.vc.min'='1',
  16.    'fields.vc.max'='100'
  17. );
复制代码
  1. CREATE TABLE t2(
  2.    id int ,
  3.    sumVC int ,
  4.    primary key (id) NOT ENFORCED
  5. )
  6. WITH (
  7.  'connector' = 'upsert-kafka',
  8.  'properties.bootstrap.servers' = 'node2:9092',
  9.  'topic' = 'ws2',
  10.  'key.format' = 'json',
  11.  'value.format' = 'json'
  12. );
复制代码

设置显示模式
  1. SET sql-client.execution.result-mode=tableau;
复制代码
查询表
  1. select * from t2;
复制代码

 
完成!enjoy it!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

南七星之家

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表