OpenEuler部署Flink 1.19.2完全分布式集群
前期准备
环境准备:
- 三台OpenEuler-24.03-LTS-SP1系统的主机
- JDK 11
- Hadoop集群
Flink版本:
flink-1.19.2-bin-scala_2.12
集群规划:
节点服务器node1node2node3IP地址10.90.100.10110.90.100.10210.90.100.103Flink服务历程JobManager、TaskManagerTaskManagerTaskManager 一、 Flink集群安装部署
1.1 Flink 安装
- 下载Flink安装包并上传该Flink软件包到node1主机。
上传方式有很多,任选一种方式即可,在此不再赘述。
- 上传成功后,通过 ls 命令查看是否上传成功。
- [hadoop@node1 ~]$ ls
- flink-1.19.2-bin-scala_2.12.tgz
- [hadoop@node1 ~]$
复制代码
- 解压flink-1.19.2-bin-scala_2.12.tgz到/opt/software/目录下。
- [hadoop@node1 ~]$ tar -zxvf flink-1.19.2-bin-scala_2.12.tgz -C /opt/software/
- [hadoop@node1 ~]$ ls /opt/software/
- flink-1.19.2
- [hadoop@node1 ~]$
复制代码 1.2 配置Flink
- 进入flink的conf目录,修改flink-conf.yaml文件,指定node1节点服务器为JobManager 。
- [hadoop@node1 ~]$ cd /opt/software/flink-1.19.2
- [hadoop@node1 flink-1.19.2]$ ls
- bin conf examples lib LICENSE licenses log NOTICE opt plugins README.txt
- [hadoop@node1 flink-1.19.2]$ cd conf/
- [hadoop@node1 conf]$ ls
- config.yaml log4j-console.properties log4j-session.properties logback-session.xml masters zoo.cfg
- log4j-cli.properties log4j.properties logback-console.xml logback.xml workers
- [hadoop@node1 conf]$ vim config.yaml
复制代码 config.yaml修改如下内容:
提示:本环境的jdk版本是jdk11,要把该文件21-24行兼容jdk17的配置删掉或解释掉
- # JobManager节点地址
- jobmanager:
- bind-host: 0.0.0.0
- rpc:
- address: node1
- port: 6123
- # TaskManager节点地址.需要配置为当前机器名
- taskmanager:
- bind-host: 0.0.0.0
- host: node1
-
- #Rest
- rest:
- address: node1
- bind-address: 0.0.0.0
复制代码
- 修改workers文件,指定node1、node2和node3为TaskManager。
- [hadoop@node1 conf]$ vim workers
- #修改后的内容如下
- [hadoop@node1 conf]$ cat workers
- node1
- node2
- node3
复制代码- [hadoop@node1 conf]$ vim masters
- #修改后的内容如下
- [hadoop@node1 conf]$ cat masters
- node1:8081
复制代码 1.3 分发Flink配置
- 将Flink安装目录发给集群的节点node2和node3。
- [hadoop@node1 conf]$ cd /opt/software/
- [hadoop@node1 software]$ ls
- flink-1.19.2
- #分发到node2
- [hadoop@node1 software]$ rsync -avz /opt/software/flink-1.19.2 node2:/opt/software/
- #分发到node3
- [hadoop@node1 software]$ rsync -avz /opt/software/flink-1.19.2 node3:/opt/software/
- #此时可以看到node2和node3主机的/opt/software/目录下载存在flink-1.19.2
复制代码
- 登录到node2,修改node2的taskmanager.host配置,
- [hadoop@node2 ~]$ cd /opt/software/
- [hadoop@node2 software]$ ls
- flink-1.19.2
- [hadoop@node2 software]$ cd flink-1.19.2/conf/
- [hadoop@node2 conf]$ vim config.yaml
复制代码 修改内容如下:
- # TaskManager节点地址.需要配置为当前机器名
- taskmanager:
- host: node2
复制代码
- 登录到node3,修改node3的taskmanager.host配置。
- [hadoop@node3 ~]$ cd /opt/software/
- [hadoop@node3 software]$ ls
- flink-1.19.2
- [hadoop@node3 software]$ cd flink-1.19.2/conf/
- [hadoop@node3 conf]$ vim config.yaml
复制代码 修改内容如下:
- # TaskManager节点地址.需要配置为当前机器名
- taskmanager:
- host: node3
复制代码 1.4 配置环境变量
配置flink的环境变量
- [hadoop@node1 ~]$ sudo vim /etc/profile
复制代码 在profile末尾添加如下内容
- #FLINK_HOME
- export FLINK_HOME=/opt/software/flink-1.19.2
- export PATH=$PATH:$FLINK_HOME/bin
复制代码 让环境变量生效
- [hadoop@node1 ~]$ source /etc/profile
-
复制代码 验证版本号
- [hadoop@node1 software]$ flink -v
- Version: 1.19.2, Commit ID: 7d9f0e0
- [hadoop@node1 software]$
复制代码 看到版本信息说明配置环境设置完成。
在node2、node3主机执行类似的操纵,完成Flink环境变量的配置。
二、Flink集群Standalone运行模式
环境准备
① 为了提供无界流的测试,在任意节点(这里以node2为例)安装网络工具Netcat,并开启当地监听端口(这里以9999端口为例)。
- [hadoop@node2 ~]$ sudo yum install nc
- [hadoop@node2 ~]$ nc -lk 9999
复制代码 ② 为了提交Flink步伐任务到集群中执行验证。本例中提供无界流词频统计的步伐FlinkWC-1.0-SNAPSHOT.jar进行测试。其中,步伐入口为com.flink.wc.demo.WordCountUnbounded
下载FlinkWC-1.0-SNAPSHOT.jar
2.1 会话模式部署
2.1.1 启动集群
在node1主机上执行start-cluster.sh启动Flink集群
- [hadoop@node1 ~]$ start-cluster.sh
复制代码 查看历程情况:
node1:
- [hadoop@node1 ~]$ jps
- 3376 StandaloneSessionClusterEntrypoint
- 4177 Jps
- 4091 TaskManagerRunner
复制代码 node2:
- [hadoop@node2 ~]$ jps
- 5898 Jps
- 5740 TaskManagerRunner
复制代码 node3:
- [hadoop@node3 ~]$ jps
- 5896 TaskManagerRunner
- 6059 Jps
复制代码 2.1.2 访问Web UI
启动成功后,同样可以访问http://10.90.100.101:8081对flink集群和任务进行监控管理
提示:在该例中,10.90.100.101是该集群node1节点的IP地址,请根据现实IP访问。
从图中可以看到,当前集群的TaskManager数量为3;由于默认每个TaskManager的Slot数量为1,所以总Slot数和可用Slot数都为3。
2.1.3 提交作业验证
环境准备
① 为了提供无界流的测试,在任意节点(这里以node2为例)安装网络工具Netcat,并开启当地监听端口(这里以9999端口为例)。
- [hadoop@node2 ~]$ sudo yum install nc
- [hadoop@node2 ~]$ nc -lk 9999
复制代码 ② 为了提交Flink步伐任务到集群中执行验证。本例中提供无界流词频统计的步伐FlinkWC-1.0-SNAPSHOT.jar进行测试。其中,步伐入口为com.flink.wc.demo.WordCountUnbounded
下载FlinkWC-1.0-SNAPSHOT.jar
2.1.3.1 方式1: Web UI上提交作业
- 任务打包完成后,打开Flink的WEB UI页面,在右侧导航栏点击Submit New Job,然后点击按钮Add New,选择要上传运行的FlinkWC-1.0-SNAPSHOT.jar包,如下图所示。
jar包上传完成,如下图所示:
- 鼠标点击该jar包,出现任务配置页面,进行相应配置。
主要配置步伐入口主类的全类名,任务运行的并行度,任务运行所需的配置参数和保存点路径等,配置完成后,即可点击按钮Submit,将任务提交到集群运行。
在本文提供的无界流词频统计的步伐配置如下:
- 步伐类的路径即步伐入口(Entry Class ): com.flink.wc.demo.WordCountUnbounded
- 并行度(Parallelism): 1
- 步伐传参(Program Arguments): 10.90.100.102 9999
其中,10.90.100.102为netcat运行地点的主机地址,9999是netcat当地监听端口。根据现实情况填写对应的IP地址和监听端口,(注意:填写传参时,IP地址和端口中间要有空格)
- 任务提交成功之后,可点击左侧导航栏的Running Jobs查看步伐运行列表情况
① 在netcat的socket中输入内容,如hello world
- [hadoop@node2 ~]$ nc -lk 9999
- hello world
复制代码 ② 点击Web UI界面左侧的导航栏Task Manager,然后点击右侧服务器节点node1(注:任务可能运行任意工作节点,逐个观察)
③ 点击Stdout,就可以看到netcat输出单词的统计
④ 如果要竣事任务。点击该任务,可以查看任务运行的具体情况,也可以通过点击Cancel Job竣事任务运行。
2.1.3.2 方式2: 命令行提交作业
除了通过WEB UI界面提交任务之外,也可以直接通过命令行来提交任务。
- 将词频统计步伐FlinkWC-1.0-SNAPSHOT.jar包上传到node1
- [hadoop@node1 ~]$ ls
- FlinkWC-1.0-SNAPSHOT.jar
复制代码
- 在node1的命令行使用flink run命令提交作业。
要执行的命令内容:
- flink run -m node1:8081 -c com.flink.wc.demo.WordCountUnbounded ./FlinkWC-1.0-SNAPSHOT.jar 10.90.100.102 9999
复制代码 (-m指定了提交到的JobManager,-c指定了步伐入口。其中,10.90.100.102 9999是netcat的地址和监听端口,要根据现实情况填入)
执行操纵:
- [hadoop@node1 ~]$ flink run -m node1:8081 -c com.flink.wc.demo.WordCountUnbounded ./FlinkWC-1.0-SNAPSHOT.jar 10.90.100.102 9999
- Job has been submitted with JobID f3cde1cf550e4ead76413b7afff1d40c
复制代码- [hadoop@node2 ~]$ nc -lk 9999
- hello flink
复制代码 可以在欣赏器中打开Web UI查看应用执行情况
在命令行上查看,在/opt/software/flink-1.19.2/log路径中,可以查看TaskManager节点
- [hadoop@node1 log]$ cd /opt/software/flink-1.19.2/log
- [hadoop@node1 log]$ cat flink-hadoop-taskexecutor-0-node1.out
- (hello,1)
- (flink,1)
复制代码 2.1.4 关闭集群
在node1主机上执行stop-cluster.sh关闭Flink集群
- [hadoop@node1 ~]$ stop-cluster.sh
复制代码 至此,Standalone资源模式运行的会话模式部署方式已经完成。
2.2 应用模式部署
应用模式下不会提前创建集群,所以不能调用start-cluster.sh脚本。而是使用在bin目录下的standalone-job.sh来创建一个JobManager。
- 将应用步伐的jar包放到Flink的安装路径的/opt/software/flink-1.19.2/lib/目录下。
- [hadoop@node1 ~]$ ls
- FlinkWC-1.0-SNAPSHOT.jar
- [hadoop@node1 ~]$ cp FlinkWC-1.0-SNAPSHOT.jar /opt/software/flink-1.19.2/lib/
- hadoop@node1 ~]$ ls /opt/software/flink-1.19.2/lib/
- FlinkWC-1.0-SNAPSHOT.jar
复制代码 需要执行的命令:
- standalone-job.sh start --job-classname com.flink.wc.demo.WordCountUnbounded 10.90.100.102 9999
复制代码 这里直接指定作业入口类,脚本会到lib目录扫描所有的jar包,其中,10.90.100.102 9999是netcat的地址和监听端口,要根据现实情况填入。
执行如下:
- [hadoop@node1 ~]$ standalone-job.sh start --job-classname com.flink.wc.demo.WordCountUnbounded 10.90.100.102 9999
- Starting standalonejob daemon on host node1.[hadoop@node1 ~]$ jps6016 Jps5959 StandaloneApplicationClusterEntryPoint
复制代码 提示:执行该命令之前肯定要先启动该步伐所连接的监听服务即先启动Netcat
- [hadoop@node1 ~]$ taskmanager.sh start
- Starting taskexecutor daemon on host node1.
- [hadoop@node1 ~]$ jps
- 6627 Jps
- 5959 StandaloneApplicationClusterEntryPoint
- 6569 TaskManagerRunner
复制代码 注:这里只在node1上启动一个taskmanager,如果需要多个taskmanager需要在别的节点上单独执行该命令启动。
- 在netcat的socket中输入内容,如hello world
- [hadoop@node2 ~]$ nc -lk 9999
- hello world
-
复制代码
- 欣赏器访问Flink Web监控界面的地址10.90.100.101:8081,在Web界面中观察输出数据
从输出的结果可以看到提交的任务已经实现词的统计。
- #关闭TaskManager
- [hadoop@node1 ~]$ taskmanager.sh stop
- #关闭JobManager
- [hadoop@node1 ~]$ standalone-job.sh stop
复制代码 至此,Standalone资源模式运行的应用模式部署方式已经完成。
三、 Flink集群YARN运行模式
YARN上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。
3.1 环境准备
- 为了提供无界流的测试,在任意节点(这里以node2为例)安装网络工具Netcat,并开启当地监听端口(这里以9999端口为例)。
- [hadoop@node2 ~]$ sudo yum install nc
- [hadoop@node2 ~]$ nc -lk 9999
复制代码
- 为了提交Flink步伐任务到集群中执行验证。本例中提供无界流词频统计的步伐FlinkWC-1.0-SNAPSHOT.jar进行测试。其中,步伐入口为com.flink.wc.demo.WordCountUnbounded
下载FlinkWC-1.0-SNAPSHOT.jar
- 在将Flink任务部署至YARN集群之前,肯定要有Hadoop集群,并完成如下配置:
① 配置环境变量
- [hadoop@node1 ~]$ sudo vim /etc/profile
-
复制代码 在profile文件末尾添加如下内容:
- #FLINK_HADOOP_CONF
- export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
- export HADOOP_CLASSPATH=`hadoop classpath`
复制代码 刷新环境变量
- [hadoop@node1 ~]$ source /etc/profile
复制代码 ② 启动Hadoop集群,包括HDFS和YARN
- [hadoop@node1 ~]$ hdp.sh start
- [hadoop@node1 ~]$ hdp.sh jps
- ==================== node1 ====================
- 8210 NameNode
- 8885 NodeManager
- 8518 DataNode
- 9129 Jps
- ==================== node2 ====================
- 3875 ResourceManager
- 3606 DataNode
- 4041 NodeManager
- 4511 Jps
- ==================== node3 ====================
- 4066 NodeManager
- 3670 DataNode
- 4310 Jps
- 3882 SecondaryNameNode
复制代码 注:hdp.sh脚本已经封装好hdfs和yarn的启动命令。
3.2 会话模式部署
YARN的会话模式与独立集群略有差异,需要首先申请一个YARN会话(YARN Session)来启动Flink集群。具体步调如下
3.2.1 启动Flink集群
执行脚本命令向YARN集群申请资源,开启一个YARN会话,启动Flink集群。操纵如下:
- [hadoop@node1 ~]$ yarn-session.sh -nm yarn-session-cluster
复制代码 常用参数:
- -d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。
- -jm(–jobManagerMemory):配置JobManager所需内存,默认单位MB。
- -nm(–name):配置在YARN UI界面上显示的任务名。如以上命令的yarn-session-cluster
- -qu(–queue):指定YARN队列名
- -tm(–taskManager):配置每个TaskManager所使用内存。
YARN Session启动之后会给出一个Web UI地址以及一个YARN application ID,如下所示,用户可以通过Web UI或者命令行两种方式提交作业,如:
- pplication 'application_1741879375303_0001'.
- JobManager Web Interface: http://node1:43167
复制代码 可以通过以上显示的地址http://node1:43167访问flink的web界面。
3.2.2 提交作业
方式一:通过Web UI方式提交作业。
这种方式比力简单,与上文所述Standalone部署模式根本类似。
操纵过程在此不在赘述,请自己完成。
方式二 :通过命令行提交作业
- 执行以下命令将该任务提交到已经开启的Yarn-Session中运行 (因为node1在启动集群是在前台服务,这里可以重新打开一个node1的连接,在新的连接中执行提交作业命令)
命令格式:
- flink run -c <程序入口> <程序jar包路径> [参数]
复制代码 执行命令如下:
- [hadoop@node1 ~]$ ls
- FlinkWC-1.0-SNAPSHOT.jar
- [hadoop@node1 ~]$ flink run -c com.flink.wc.demo.WordCountUnbounded ./FlinkWC-1.0-SNAPSHOT.jar 10.90.100.102 9999
复制代码 注意:这种模式下,客户端可以自行确定JobManager的地址,也可以通过-m或者-jobmanager参数指定JobManager的地址,JobManager的地址在YARN Session的启动页面中可以找到。
- 任务提交成功后,可在YARN的Web UI界面查看运行情况。
创建的Yarn-Session现实上是一个Yarn的Application,并且有唯一的Application ID
- 在netcat的socket中输入内容,如hello flink
- [hadoop@node2 ~]$ nc -lk 9999
- hello flink
-
-
复制代码
- 在yarn的Tracking UI找到flink的Web入口。
进入taskmanager,看到词的统计
3.2.3 关闭flink集群
如果要关闭集群,在终端按Ctrl +C 即可。
至此,YARN资源模式运行的会话模式部署方式已经完成。
3.3 单作业模式部署
在YARN环境中,由于有了外部平台做资源调度,所以也可以直接向YARN提交一个单独的作业,从而启动一个Flink集群。
命令格式:
- flink run -d -t yarn-per-job -c <程序入口> <程序jar包路径> [参数]
复制代码 执行命令如下:
- [hadoop@node1 ~]$ ls
- FlinkWC-1.0-SNAPSHOT.jar
- [hadoop@node1 ~]$ flink run -d -t yarn-per-job -c com.flink.wc.demo.WordCountUnbounded ./FlinkWC-1.0-SNAPSHOT.jar 10.90.100.102 9999
复制代码 注意:
如果启动过程中报如下非常。
- Exception in thread "Thread-4" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
- at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:189)
- ......
复制代码 可以不用理会,并不会影响结果,因为任务已经提交到yarn中。如果肯定要办理可以在flink的/opt/software/flink-1.19.2/conf/config.yaml
- [hadoop@node1 conf]$ vim config.yaml
复制代码 配置文件中添加以下配置:
- classloader.check-leaked-classloader: false
复制代码
- 在YARN的ResourceManager界面查看执行情况
点击可以打开Flink Web UI页面进行监控,如下图所示
Flink集群已经启动,词频统计与之前的操纵步调都一样,在此就不在赘述。请自己完成相关操纵。
- 如果需使用命令行查看或取消作业,可以使用如下命令:
- #查看
- [hadoop@node1 ~]$ flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
- #取消
- [hadoop@node1 ~]$ flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
复制代码 这里的application_XXXX_YY是当前应用的ID,<jobId>是作业的ID。在提交作业的时间,终端是能看到应用ID和jobID的,如:
执行取消作业命令:
- [hadoop@node1 ~]$ flink cancel -t yarn-per-job -Dyarn.application.id=application_1741879375303_0003 69d156ad4b1750e44d807611f8ab8d81
复制代码 注意:如果取消作业,整个Flink集群也会停掉。
至此,YARN资源模式运行的单机作业模式部署方式已经完成。
3.4 应用模式部署
应用模式同样非常简单,与单作业模式类似,直接执行flink run-application命令即可。
3.4.1 命令行提交
命令格式:
- flink run-application -t yarn-application -c <程序入口> <程序jar包路径> [参数]
复制代码 执行以下命令:
- [hadoop@node1 ~]$ ls
- FlinkWC-1.0-SNAPSHOT.jar
- [hadoop@node1 ~]$ flink run-application -t yarn-application -c com.flink.wc.demo.WordCountUnbounded ./FlinkWC-1.0-SNAPSHOT.jar 10.90.100.102 9999
复制代码 Flink集群已经启动,词频统计与之前的操纵步调都一样,在此就不在赘述。请自己完成相关操纵。
- #通过application ID可以查看到jobID
- [hadoop@node1 ~]$ flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
- #取消
- [hadoop@node1 ~]$ flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
复制代码 这里的application_XXXX_YY是当前应用的ID,<jobId>是作业的ID。
3.4.2 上传HDFS提交
根据YARN的工作机制,以上这些操纵中,每次执行操纵,flink依赖jar包和步伐jar包都会单独上传到集群hdfs,如果做要提交的作业非常多,就显得很繁重了,此时可将flink本身的依赖和用户jar可以预先上传到HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了,执行更加快。
可以通过yarn.provided.lib.dirs配置选项指定位置,将flink的依赖上传到远程。
- 上传flink的lib和plugins到HDFS上。
- #在hdfs上创建存放flink依赖的目录
- [hadoop@node1 ~]$ hdfs dfs -mkdir /flink-libs
- #上传依赖到hdfs到flink-libs
- [hadoop@node1 ~]$ hdfs dfs -put /opt/software/flink-1.19.2/lib /flink-libs
- [hadoop@node1 ~]$ hdfs dfs -put /opt/software/flink-1.19.2/plugins /flink-libs
复制代码- #在hdfs上创建存放用户的步伐jar的目录[hadoop@node1 ~]$ hdfs dfs -mkdir /flink-jars#上传用户步伐jar到hdfs到flink-jars[hadoop@node1 ~]$ ls
- FlinkWC-1.0-SNAPSHOT.jar
- [hadoop@node1 ~]$ hdfs dfs -put FlinkWC-1.0-SNAPSHOT.jar /flink-jars
复制代码- [hadoop@node1 ~]$ flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://node1:9820/flink-libs" -c com.flink.wc.demo.WordCountUnbounded hdfs://node1:9820/flink-jars/FlinkWC-1.0-SNAPSHOT.jar 10.90.100.102 9999
复制代码 Flink集群已经启动,词频统计与之前的操纵步调都一样,在此就不在赘述。请自己完成相关操纵。
- #通过application ID可以查看到jobID
- [hadoop@node1 ~]$ flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
- #取消
- [hadoop@node1 ~]$ flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
复制代码 这里的application_XXXX_YY是当前应用的ID,<jobId>是作业的ID。
至此,YARN资源模式运行的应用模式部署方式已经完成。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |