详解 Flink 的常见部署方式

打印 上一主题 下一主题

主题 455|帖子 455|积分 1365

一、常见部署模式分类

1. 按是否依赖外部资源调理

1.1 Standalone 模式

   独立模式 (Standalone) 是独立运行的,不依赖任何外部的资源管理平台,只必要运行所有 Flink 组件服务
  1.2 Yarn 模式

   Yarn 模式是指客户端把 Flink 应用提交给 Yarn 的 ResourceManager, Yarn 的 ResourceManager 会在 Yarn 的 NodeManager 上创建容器。在这些容器上,Flink 会部署 JobManager 和 TaskManager 的实例,从而启动集群。Flink 会根据运行在 JobManger 上的作业所必要的 Slot 数量动态分配 TaskManager 资源
  2. 按集群的生命周期和执行位置

2.1 会话模式



  • 会话模式 (Session Mode) 是指先启动一个集群,保持一个会话而且确定所有的资源,然后向集群提交作业,所有提交的作业会竞争集群中的资源,从而会出现资源不足作业执行失败的情况
  • 会话模式比较适合于单个规模小、执行时间短的大量作业
2.2 单作业模式



  • 单作业模式 (Per-Job Mode) 是指为每一个提交的作业启动一个集群,由客户端运行应用步调,然后启动集群,作业被提交给 JobManager,进而分发给 TaskManager 执行。作业作业完成后,集群就会关闭,所有资源也会释放。
  • 单作业模式在生产环境运行更加稳固,所以是实际应用的首选模式
  • 单作业模式一般必要借助一些资源管理框架来启动集群,好比 YARN、Kubernetes
2.3 应用模式



  • 应用模式 (Application Mode) 是指为每一个提交的应用单独启动一个 JobManager,也就是创建一个集群。这个 JobManager 只为执行这一个应用而存在,执行结束之后 JobManager 也就关闭了。这一模式下没有客户端的存在
  • 应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交作业的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由 JobManager 执行应用步调的,而且纵然应用包含了多个作业,也只创建一个集群
二、常见部署模式组合



  • Standalone + 会话模式
  • Standalone + 应用模式
  • Yarn + 会话模式
  • Yarn + 单作业模式
  • Yarn + 应用模式
三、独立模式安装

1. 单节点安装



  • flink 下载地址:https://flink.apache.org/downloads/
  • 下载 flink 安装包:flink-1.10.1-bin-scala_2.12.tgz
  • 将安装包上传到虚拟机节点并解压缩
    1. tar -zxvf flink-1.10.1-bin-scala_2.12.tgz -C /opt/module
    2. cd /opt/module
    3. mv flink-1.10.1 flink
    复制代码
  • 进入 flink 安装目次,执行启动命令,并查看历程
    1. cd /opt/module/flink
    2. bin/start-cluster.sh
    3. jps
    复制代码
  • 访问 http://hadoop102:8081 进入 flink 集群和任务监控管理 Web 页面
  • 关闭 flink:bin/stop-cluster.sh
2. 集群安装

2.1 集群规划

节点服务器hadoop102hadoop103hadoop104脚色JobManagerTaskManagerTaskManager 2.2 步骤



  • 下载 flink 安装包:flink-1.10.1-bin-scala_2.12.tgz
  • 将安装包上传到 hadoop102 并解压缩
    1. tar -zxvf flink-1.10.1-bin-scala_2.12.tgz -C /opt/module
    2. cd /opt/module
    3. mv flink-1.10.1 flink
    复制代码
  • 进入 flink 安装目次下的 conf 目次,修改配置文件 flink-conf.yaml
    1. cd /opt/module/flink/conf
    2. vim flink-conf.yaml
    复制代码
    1. #修改 jobmanager 内部通信主机名
    2. jobmanager.rpc.address: hadoop102
    3. jobmanager.rpc.port: 6123
    复制代码
  • 修改 conf 目次下的 slaves 文件,配置 taskmanager 节点
    1. vim slaves #1.13版本为 workers 文件
    2. #添加内容
    3. hadoop103
    4. hadoop104
    复制代码
  • flink-conf.yaml 文件中常用配置项:
    1. #对 JobManager 进程可使用到的全部内存进行配置,包括 JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整
    2. jobmanager.memory.process.size: 1600
    3. #对 TaskManager 进程可使用到的全部内存进行配置,包括 JVM 元空间和其他开销,默认为 1600M,可以根据集群规模进行适当调整
    4. taskmanager.memory.process.size: 1600
    5. #对每个 TaskManager 能够分配的 TaskSlot 数量进行配置,默认为 1,可根据 TaskManager 所在的机器能够提供给 Flink 的 CPU 数量决定。Slot 就是 TaskManager 中具体运行一个任务所分配的计算资源
    6. taskmanager.numberOfTaskSlots: 1
    7. #Flink 任务执行的默认并行度,优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量
    8. parallelism.default: 1
    复制代码
  • 分发 flink 安装目次到 hadoop103 和 hadoop104
  • 在 hadoop102 上启动 flink 集群
    1. cd /opt/module/flink
    2. bin/start-cluster.sh
    3. jps
    复制代码
  • 访问 http://hadoop102:8081 查看 flink 监控页面
3. 提交任务

3.1 会话模式提交



  • 启动 Flink 集群服务
  • 在 hadoop102 中启动 netcat 网络端口
    1. nc -lk 7777
    复制代码
  • Web 页面提交

    • 将编码好的 Flink maven 工程打成 jar 包
    • 访问 http://hadoop102:8081 进入 flink 监控页面,选择左侧的 Submit New Job 选项菜单
    • 点击 +Add New 按钮,然后选择 jar 包进行上传
    • 点击页面上上传好的 jar 包项,配置填写主步调类全类名、启动参数项、并行度等;点击 submit 提交任务
    • 在页面左侧的 overview 和 jobs 等菜单选项下查看任务运行情况
    • 一个 job 所占据的 TaskSlots 数即是该 job 中最大的并行度

  • 命令行提交
    1. #提交任务:bin/flink run -m [jobmanager主机和端口] -c [主程序类全类名] -p [并行度] [jar包的绝对路径] [--param1 value1 --param2 value2 ...]
    2. cd flink
    3. bin/flink run -m hadoop102:8081 -c com.app.wc.StreamWordCount2 -p 3 /project/FlinkTutorial/target/FlinkTutorial-1.0-SNAPSHOT.jar --host localhost --port 7777
    4. #查看job:-a 可以查看已经取消的job
    5. bin/flink list [-a]
    6. #取消job
    7. bin/flink cancel [jobId]
    复制代码
3.2 应用模式提交



  • 不能利用 start-cluster.sh 命令启动集群
  • 将编码好的 Flink maven 工程打成 jar 包,并将 jar 包上传到 flink 安装目次下的 lib 目次
  • 启动 JobManager
    1. cd /opt/module/flink
    2. bin/standalone-job.sh start --job-classname com.app.wc.StreamWordCount2
    复制代码
  • 启动 TaskManager
    1. cd /opt/module/flink
    2. bin/taskmanager.sh start
    复制代码
  • 访问 http://hadoop102:8081 查看 flink 监控页面的作业执行
  • 关闭
    1. cd /opt/module/flink
    2. bin/standalone-job.sh stop
    3. bin/taskmanager.sh stop
    复制代码
4. 高可用集群安装

4.1 集群规划

节点服务器hadoop102hadoop103hadoop104脚色JobManagerJobManager / TaskManagerTaskManager 4.2 步骤



  • 在 hadoop102 上进入 flink 目次下的 conf 目次,修改 flink-conf.yaml 文件
    1. cd /opt/module/flink/conf
    2. vim flink-conf.yaml
    复制代码
    1. #添加内容
    2. high-availability: zookeeper
    3. high-availability.storageDir: hdfs://hadoop102:9820/flink/standalone/ha
    4. high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181
    5. high-availability.zookeeper.path.root: /flink-standalone
    6. high-availability.cluster-id: /cluster_atguigu
    复制代码
  • 修改 masters 文件
    1. vim masters
    2. #添加
    3. hadoop102:8081
    4. hadoop103:8081
    复制代码
  • 分发配置到其他节点
  • 启动
    1. #保证hadoop环境变量配置生效
    2. #启动 hadoop 集群和 Zookeeper 集群
    3. #启动 flink 集群
    4. cd /opt/module/flink
    5. bin/start-cluster.sh
    复制代码
  • 访问:http://hadoop102:8081 和 http://hadoop103:8081
四、Yarn 模式安装

1. 安装步骤



  • 下载安装包:

    • flink 1.8 版本之前可以直接下载基于 hadoop 版本编译的安装包
    • flink 1.8 及之后的版本除了下载根本的安装包之外,还必要下载 Hadoop 相关版本的组件,如 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar,并将该组件上传至 Flink 的 lib 目次下
    • flink 1.11 版本之后,不再提供 flink-shaded-hadoop-* 的 jar 包,而是通过配置环境变量完成与 Yarn 集群的对接

  • 将安装包上传到虚拟机并解压缩
    1. tar -zxvf flink-1.10.1-bin-scala_2.12.tgz -C /opt/module
    2. cd /opt/module
    3. mv flink-1.10.1 flink
    复制代码
  • 进入 conf 目次,修改 flink-conf.yaml 文件
    1. cd /opt/module/flink/conf
    2. vim flink-conf.yaml
    复制代码
    1. #修改
    2. jobmanager.memory.process.size: 1600m
    3. taskmanager.memory.process.size: 1728m
    4. taskmanager.numberOfTaskSlots: 8
    5. parallelism.default: 1
    复制代码
  • 确保正确安装 Hadoop 集群和配置 Hadoop 环境变量
    1. sudo vim /etc/profile.d/my_env.sh
    2. export HADOOP_HOME=/opt/module/hadoop
    3. export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
    4. export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
    5. export HADOOP_CLASSPATH=`hadoop classpath`
    复制代码
2. 提交任务

2.1 会话模式提交



  • 启动 Hadoop 集群
  • 执行脚本命令向 Yarn 集群申请资源,开启一个 Yarn 会话,启动 Flink 集群
    1. cd /opt/module/flink
    2. bin/yarn-session.sh -nm test
    3. #-n 参数:指定 TaskManager 数量
    4. #-s 参数:指定 slot 数量
    5. #-d:分离模式,如果你不想让 Flink YARN 客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session 也可以后台运行。
    6. #-jm(--jobManagerMemory):配置 JobManager 所需内存,默认单位 MB。
    7. #-nm(--name):配置在 YARN UI 界面上显示的任务名。
    8. #-qu(--queue):指定 YARN 队列名。
    9. #-tm(--taskManager):配置每个 TaskManager 所使用内存
    复制代码
  • 提交作业

    • Web UI 提交,同独立模式
    • 命令行提交:
      1. #1.将打包好的任务运行 JAR 包上传至集群
      2. #2.执行命令
      3. cd /opt/module/flink
      4. bin/flink run -c com.app.wc.StreamWordCount2 FlinkTutorial-1.0-SNAPSHOT.jar
      复制代码

  • 访问 yarn Web UI 界面或 flink Web UI 界面查看作业执行情况
2.2 单作业模式提交



  • 启动 Hadoop 集群
  • 直接向 Yarn 提交一个单独的作业,从而启动一个 Flink 集群
    1. cd /opt/module/flink
    2. #命令一:
    3. bin/flink run -d -t yarn-per-job -c com.app.wc.StreamWordCount2 FlinkTutorial-1.0-SNAPSHOT.jar
    4. #命令二:
    5. bin/flink run -m yarn-cluster -c com.app.wc.StreamWordCount2 FlinkTutorial-1.0-SNAPSHOT.jar
    复制代码
  • 访问 yarn Web UI 界面或 flink Web UI 界面查看作业执行情况
  • 取消作业:
    1. cd /opt/module/flink
    2. bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
    3. bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
    复制代码
2.3 应用模式提交



  • 启动 Hadoop 集群
  • 执行命令提交作业
    1. #上传 jar 包到集群
    2. cd /opt/module/flink
    3. bin/flink run-application -t yarn-application -c com.app.wc.StreamWordCount2 FlinkTutorial-1.0-SNAPSHOT.jar
    4. #上传 jar 包到 hdfs
    5. bin/flink run-application -t yarn-application
    6. -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir"
    7. hdfs://myhdfs/jars/my-application.jar
    复制代码
  • 查看或取消作业
    1. /opt/module/flink
    2. bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
    3. bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
    复制代码
3. 高可用配置

   YARN 的高可用是只启动一个 Jobmanager, 当这个 Jobmanager 挂了之后, YARN 会再次启动一个, 本质是利用的 YARN 的重试次数来实现的高可用
  

  • 在 yarn-site.xml 中配置
    1. <property>
    2.     <name>yarn.resourcemanager.am.max-attempts</name>
    3.     <value>4</value>
    4.     <description>
    5.             The maximum number of application master execution attempts.
    6.     </description>
    7. </property>
    复制代码
  • 分发配置到其他节点
  • 在 flink-conf.yaml 中配置
    1. yarn.application-attempts: 3  #要小于 yarn 的重试次数
    2. high-availability: zookeeper
    3. high-availability.storageDir: hdfs://hadoop102:9820/flink/yarn/ha
    4. high-availability.zookeeper.quorum: hadoop102:2181,hadoop103:2181,hadoop104:2181
    5. high-availability.zookeeper.path.root: /flink-yarn
    复制代码
  • 启动 yarn-session,杀死 JobManager, 查看复活情况

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大连密封材料

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表