Flink之SQL client使用案例

农妇山泉一亩田  金牌会员 | 2024-9-12 04:49:02 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 880|帖子 880|积分 2640

Flink的实行模式有以下三种:
前提是我们已经开启了yarnsession的进程,在下图中可以看到启动的id也就是后续使命须要通过此id进行认证,以及使命分配的master主机。
这里启动时间会报错一个ERROR:org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - Authentication failed
查阅资料得知:
该错误是因为,kerberos认证失败,cdh6,并没有启动kerberos。以是该错误可以忽略。但是如果已经开启动了kerberos,这个问题就要解决了。
我们这里没有开启Kerberos,以是这个报错我么可以不管。

Session Mode:会话模式
会话模式须要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时全部资源就都已经确定,全部提交的作业会竞争集群中的资源。恰当使命规模小,实行时间短的大量作业。
Flink的作业实行情况会不停保留在集群上,直到会话被显式终止。如许,可以提交多个作业,它们可以共享相同的集群资源和状态,从而实现更高的效率和资源使用。
bin/flink run -yid application_1723708102500_0009  examples/batch/WordCount.jar
紧张的是要添加 -yid 这个参数,不添加这个参数会实行不成功,会报错找不到执使命的cluster。
脚本实行参数:
-n(--container):TaskManager的数量。(1.10 已经废弃)
-s(--slots):每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。
-jm:JobManager的内存(单位MB)。
-q:表现可用的YARN资源(内存,内核);
-tm:每个TaskManager容器的内存(默认值:MB)
-nm:yarn 的appName(如今yarn的ui上的名字)。  
-d:后台实行。
提交flink使命:
bin/flink run examples/batch/WordCount.jar

Per-Job Mode:单作业模式,我们也是更多的使用这种模式,这个模式会将我们的资源更公道的规划使用。
每个Flink应用步伐作为一个独立的作业被提交和实行。
每次提交的Flink应用步伐都会创建一个独立的作业实行情况,该作业实行情况仅用于实行该特定的作业。
作业完成后,作业实行情况会被开释,集群关闭,资源开释
bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
常用参数:
--p 步伐默认并行度
下面的参数仅可用于 -m yarn-cluster 模式
--yjm JobManager可用内存,单位兆
--ynm YARN步伐的名称
--yq 查询YARN可用的资源
--yqu 指定YARN队列是哪一个
--ys 每个TM会有多少个Slot
--ytm 每个TM所在的Container可申请多少内存,单位兆
--yD 动态指定Flink参数
-yd 分离模式(后台运行,不指定-yd, 终端会卡在提交的页面上)

Application Mode:应用模式
应用模式算是前2种模式的升级,前2种模式中,Flink步伐代码是在客户端实行,然后客户端提交给JobManager,客户端须要占用大量网络带宽。
应用模式须要为每一个提交的应用单独启动一个JobManager(应用步伐在JobManager实行),也就是创建一个集群。这个JobManager只为实行这一个应用而存在,实行结束之后JobManager关闭。
application 模式使用 bin/flink run-application 提交作业;通过 -t 指定摆设情况,目前 application 模式支持摆设在 yarn 上(-t yarn-application) 和 k8s 上(-t kubernetes-application);并支持通过 -D 参数指定通用的 运行配置,比如 jobmanager/taskmanager 内存、checkpoint 时间隔断等。
带有 JM 和 TM 内存设置的下令提交,这种方式提交之后会带对应服务器的HDFS的WebUI页面多出一个wordcount_01的文件,该文件记载了步伐运行的结果
./bin/flink run-application -t yarn-application \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dyarn.application.name="MyFlinkWordCount" \
./examples/batch/WordCount.jar --output hdfs://ddp54:8020/wordcount_01
在上面例子 的底子上自己设置 TaskManager slots 个数为3,以及指定并发数为3:
./bin/flink run-application -t yarn-application -p 3 \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dyarn.application.name="MyFlinkWordCount" \
-Dtaskmanager.numberOfTaskSlots=3 \
./examples/batch/WordCount.jar --output hdfs://node1:8020/wordcount/output_52
指定并发还可以使用 -Dparallelism.default=3,而且社区目前倾向使用 -D+通用配置取代客户端下令参数(比如 -p)。以是如许写更符合规范:
./bin/flink run-application -t yarn-application \
-Dparallelism.default=3 \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=1024m \
-Dyarn.application.name="MyFlinkWordCount" \
-Dtaskmanager.numberOfTaskSlots=3 \
./examples/batch/WordCount.jar --output hdfs://node1:8020/wordcount/output_53
以上三种模式就先简述这些,着实尚有很多参数没有用到,我们更多的只须要用到第二种pre-job的模式即可。
Yarn-session模式开启成功后,我们进入SQL-Client界面,在这个界面我们可以写SQL来实现体系之间的交互,我接下来以MySQL与Kafka的交互为例:
首先是要在MySQL数据库创建一些库和表当作source数据源:
  
  1. CREATE TABLE src_mysql_order(
  2. order_id BIGINT,
  3. store_id BIGINT,
  4. sales_amt double,
  5. PRIMARY KEY (`order_id`)
  6. );
  7. CREATE TABLE src_mysql_order_detail(
  8. order_id BIGINT,
  9. store_id BIGINT,
  10. goods_id BIGINT,
  11. sales_amt double,
  12. PRIMARY KEY (order_id,store_id,goods_id)
  13. );
  14. CREATE TABLE dim_store(
  15. store_id BIGINT,
  16. store_name varchar(100),
  17. PRIMARY KEY (`store_id`)
  18. );
  19. CREATE TABLE dim_goods(
  20. goods_id BIGINT,
  21. goods_name varchar(100),
  22. PRIMARY KEY (`goods_id`)
  23. );
  24. CREATE TABLE dwa_mysql_order_analysis (
  25.    store_id BIGINT,
  26.    store_name varchar(100),
  27.    sales_goods_distinct_nums bigint,
  28.    sales_amt double,
  29.    order_nums bigint,
  30.    PRIMARY KEY (store_id,store_name)
  31. );
复制代码
Source:在MySQL中创建完成之后我们要在SQL client界面进行映射在这里以src_mysql_order表为例,实行成功如以下界面:
   CREATE TABLE src_mysql_order(
   order_id BIGINT,
   store_id BIGINT,
   sales_amt double,
   PRIMARY KEY (`order_id`) NOT ENFORCED
  ) WITH (
   'connector' = 'mysql-cdc',
   'hostname' = 'xxx',
   'port' = '3306',
   'username' = 'xxx',
   'password' = 'xxx',
   'database-name' = 'xxx',
   'table-name' = 'xxx',
   'scan.incremental.snapshot.enabled' = 'false'
  );
  

Sink:对MySQL做完source映射之后,我们要将MySQL的数据导入到Kafka,因此我们也要做一些Kafka表的映射,实行成功界面如下:
   CREATE TABLE ods_kafka_order (
   order_id BIGINT,
   store_id BIGINT,
   sales_amt double,
   PRIMARY KEY (`order_id`) NOT ENFORCED
  ) WITH (
   'connector' = 'upsert-kafka',
   'topic' = 'Kafka主题',
   'properties.bootstrap.servers' = 'Kafka集群的IP+端标语',
    'key.format' = 'json',
   'value.format' = 'json'
  );
  

两张表都映射完成之后,我们先在MySQL添加一些测试用例:
insert into src_mysql_order values
(20221210001,10000,50),
(20221210002,10000,20),
(20221210003,10001,10);

接下来就将MySQL与Kafka实现交互,即将MySQL数据插入到Kafka作业中:
insert into ods_kafka_order_2 select * from src_mysql_order;

在这个过程中,有大概会报错:

这个报错是找不到表的元数据信息,我这里是将表名写错了,这个是比较光荣的,但是尚有一种原因就是:没有MySQLCDC或者Kafka的依靠,导致毗连的元数据信息无法保存到catalog中,因此我们就须要添加MySQLCDC和Kafka的毗连依靠:
进入到Flink安装路径的lib目录下:使用 rz 指令将依靠jar包上传,上传完毕之后使用 scp 指令远程复制给集群的别的呆板,我们的是ddp54、ddp55:
scp -r lib/flink-sql-connector-kafka-1.16.2.jar  root@ddp54PWD/lib
scp -r lib/flink-sql-connector-kafka-1.16.2.jar  root@ddp55PWD/lib

Jar包上传完之后,我们在底子平台将Flink集群重启

集群重启之后,我们重新开启一个yarnsession进程来实行后续提交的使命。
进入yarn的web页面来检察进程启动的状况。

接下来我们重走一遍MySQL的source和Kafka的sink流程,走完之后进入SQL client界面实行交互指令,即MySQL数据插入到Kafka,实行完成之后没有报错,但是检察flink的web页面发现并没有作业在实行或实行完成,于是检察日志得知:问题是MySQL的体系时间跟所在地区时间不匹配导致的,我们可以在下令行进行时区的设置,也可以在配置文件中进行时区的设置,我选择了在my.cnf配置文件中进行时区的更改:在[mysqld]下添加默认时区设置即可,与此同时,MySQL也要开启binlog日志,可以保障数据同等性,主要用于复制和数据规复。配置完成之后重启MySQL服务。
开启binlog日志
   # 服务ID
server-id=1
# binlog 配置 只要配置了log_bin地址 就会开启
log_bin = /var/lib/mysql/mysql_bin
# 日志存储天数 默认0 永久保存
# 如果数据库会定期归档,发起设置一个存储时间不须要不停存储binlog日志,理论上只须要存储归档之后的日志
expire_logs_days = 30
# binlog最大值
max_binlog_size = 1024M
# 规定binlog的格式,binlog有三种格式statement、row、mixad,默认使用statement,发起使用row格式
binlog_format = ROW
# 在提交n次事务后,进行binlog的落盘,0为不进行强行的刷新操纵,而是由文件体系控制刷新日志文件,如果是在线交易和账有>关的数据发起设置成1,如果是其他数据可以保持为0即可
sync_binlog = 1
   检察日志得知是MySQL的时区问题导致使命提交不成功

在 my.cnf 对时区和binlog日志进行修改

上边的MySQL配置完成之后,须要重启MySQL服务
   docker restart mysql
  接下来在SQL client界面再次实行指令:
insert into ods_kafka_order select * from src_mysql_order;
打开Flink的web界面,发现Flink的作业使命正在实行:

我们在SQL client界面查询MySQL的数据表信息:
SET sql-client.execution.result-mode=tableau; 
select * from src_mysql_order;
可以检察插入到MySQL的数据信息和数据的更新信息[Flink中 +I 代表插入数据 ; +U 代表更新数据 ; -U代表撤回数据]

与此同时,我们去Kafka检察数据是否到来,通过Kafka Tool检察到数据已经成功到Kafka。

至此我们实现了MySQL到Kafka的实时数据的接入以及在这个过程中碰到的一些问题以及解决办法。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

农妇山泉一亩田

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表