Flink简介、快速入门、部署、集群

打印 上一主题 下一主题

主题 885|帖子 885|积分 2659

一、Flink先容

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。
以内存实行速度和恣意规模来实行计算。
中文文档:https://nightlies.apache.org/flink/flink-docs-stable/zh/
Flink有四个基石:Checkpoint机制(保障数据同等性)、State(提供API)、Time(数据容错)、Window(操作窗口)
1.1 Flink简介

1.1.1 Checkpoint

Checkpoint机制 为Flink实现了一个分布式的同等性的快照,从而提供了同等性的语义;
1.1.2 State

虽然有了同等性的语义之后,Flink为了让用户在编程时更加轻松、更轻易地去管理状态,提供了一套非常简单明白的StateApi,包括里面的有ValueState、ListState、MapState,近期还添加了BroadcastState,利用State API能够自动先用这种同等性的语义。
1.1.3 Time

Flink还实现了Watemark的机制,能够支持基于事件的时间的处理,大概说基于系统时间的处理,能够容忍数据的延时、容忍数据的迟到、容忍乱序的数据。
1.1.4 Window

Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常机动的自定义窗口。
1.2 Flink流处理特性



  • 支持高吞吐、低延迟、高性能的流处理
  • 支持带有事件时间的窗口(Window)操作
  • 支持有状态计算的 Exactly-once 语义
  • 支持高度机动的窗口(Window)操作,支持基于 time、count、session,以及 data-driven 的窗口操作
  • 支持具有 Backpressure 功能的连续流模型
  • 支持基于轻量级分布式快照(Snapshot)实现的容错
  • 一个运行时同时支持 Batch on Streaming 处理和 Streaming 处理
  • Flink 在 JVM 内部实现了自己的内存管理
  • 支持迭代计算
  • 支持步伐自动优化:制止特定情况下 Shuffle、排序等昂贵操作,中心结果有必要进行缓存
1.3 Flink的批处理和流处理

批处理:有界、长期、大量,批处理适合需要访问全套纪录才能完成的计算工作,一般用于离线统计;
流处理:无界、实时,流处理无需针对整个数据集实行操作,而是对通过系统传输的每个数据项进行操作,一般用于实时统计;
Flink可以同时处理批处理和流处理。Flink通过将批处理(即处理有限的静态数据)视作一种特别的流处理;
1.4 Flink Runtime实行引擎

Flink的核心计算架构是下图中的Flink Runtime实行引擎,他是一个分布式系统,能够接受数据流步伐 并在一台或多台机器上以容错方式实行。

上图为Flink技术栈的核心组成部门,值得一提的是,Flink分别提供:
①面向流处理的接口(DataStream API)
②面向批处理的接口(DataSet API)
因此,Flink既可以完成流处理,也可以完成批处理。
Flink还支持的拓展库涉及:
①机器学习(FlinkML)
②复杂事件处理(CEP)
③图计算(Gelly)
④分别针对流处理和批处理的Table API
一一一一一
DataStream API可以流程地分析无限数据流,并且可以用Java大概Scala等来实现。开发者需要基于一个叫DataStream的数据结构来开发,这个数据结构用于表示永不停止的分布式数据流。
Flink的分布式特点表如今他能够在成百上千台机器上运行,它将大型的计算使命分成很多小的部门,每个机器实行一部门。
Flink能够自动地确保发生气器故障大概其他错误时计算能够连续进行,大概在修复bug或进行版本升级后有计划地再次实行一次。这种能力使得开发人员不需要担心运行失败。
Flink本质上利用容错性数据流,这使得开发人员可以分析连续天生切用不结束的数据(即流处理);
二、Flink运行架构

2.1 Flink步伐结构

Flink步伐的基本构建块是 :流和转换;(留意ataSet API中利用的DataSet也是内部流)
从概念上讲,
流:可能永无止境的数据纪录流;
转换:是将一个或多个流作为一个或多个流的操作。(即,输入并产生一个或多个输出流)
2.1.1 Source数据源

Flink在流处理和批处理上的source大概有四类:
① 基于本地集合的source;
② 基于文件的source;
③ 基于网络套接字的source;
④ 自定义的source;
自定义的source常见的有Apache kafka、RabbitMQ等,当然你也可以自定义自己的source;
2.1.2 Transformation

数据转换的各种操作,有Map/FlatMap/Filter/KetBy/Reduce/Fold/Aggregations/Window/WindowAll./Window join/Split/Select等,操作很多,可以将数据转换计算成你想要的数据。
2.1.3Sink

Sink吸收器,Flink将转换计算后的数据发送的地点,可能需要存储下来。
Flink常见的Sink大概有如下几类:写入文件、打印出来、写入socket、自定义的sink。
自定义的sink常见的有Apache Kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem等,同理也可以定义自己的sink。
2.2 Flink并行数据流

Flink步伐本质上是并行的和分布式的;
Flink步伐在启动、实行、结束的各个流程:
① 实行的时候,会被映射成一个Streaming Dataflow,一个Streaming Dataflow 是由一组Stream和Transformation Operator组成的。
② 启动时从一个或多个Source Operator开始
③ 结束于一个或多个Sink Operator。
Flink步伐实行流程:实行的时候,会被映射成一个Streaming Dataflow,一个Streaming Dataflow 是由一组Stream和Transformation Operator组成的;
一个流Stream 包罗一个或多个流分区,而每一个operator包罗一个或多个operator子使命,也就是 operator subtask【操作子使命】。
operator subtask【操作子使命】间相互独立,在不同的线程中实行,乃至是在不同的机器或不同的容器上。
operator subtask【操作子使命】的数量是这一特定operator的并行度。
相同步伐中operator有不同级别的并行度。
一个Stream可以被分成多个Stream的分区,也就是Stream Partition【分区】。
一个Operator也可以被分为多个Operator Subtask【子使命】,如下图;

分析图片:


  • Source被分为Source1 和 Source2,它们是Source的Operator Subtask
  • 【也就是说Source1和Source2都是Source的 Operator Subtask子使命】。
  • 每一个Operator Subtask都是在不同的线程当中独立实行的;
  • 一个Operator的并行度,就等于Operator Subtask的个数;
  • 下图Source的并行度为2,而一个Stream的并行度就等于它天生的Operator的并行度;
  • 数据在两个 operator 之间通报的时候有两种模式:
    ①One to One 模式:两个operator用此模式通报的时候,会保持数据的分区数和数据的排序;
    如下图中的Source1到IMap1,它就保存的Source的分区特性,以及分区元素处理的有序性。
    ②Redistributing重新分配模式:这种模式会改变数据的分区数;
    每一个Operator subtask【如下图的Source1 和 Source2】会根据选择Transformation把数据
    发送到不同目标subtasks, 比如keyBy()会通过hashcode重新分区,broadcase()和rebalance()方法会随机重新分区;
2.3 Task和Operator chain

Flink的全部操作都被称之为Operator,客户端在提交使命的时候会对Operator进行优化操作,能进行归并的Operator会被归并为一个Operator,归并后的Operator称为Operator chain,实际上就是一个实行链,每个实行链会在TaskManager上一个独立的线程中实行。

2.4 使命调度与实行




  • 当Flink实行executor会自动根据步伐代码天生DAG数据流图;
  • ActorSystem创建Actor将数据流图发送给JobManager的Actor;
  • Jobmanager会不断接受TaskManager的心跳消息,从而可以获取到有效的TaskManager;
  • JobManager通过调度器在TaskManager中调度实行Task(在Flink中,最小的调度单元就是task,对应就是一个线程);
  • 在步伐运行过程中,task和task之间是可以进行数据传输的;
Job Client【就是上图的Flink步伐】:


  • 重要职责是提交使命,提交后可以结束历程,也可以等待结果返回;
  • Job Client 不是Flink步伐实行的内部部门,但它是使命实行的起点;
  • Job Client负责吸收用户的步伐代码,然后创建数据流,将数据流提交给Job Manager以便进一步实行。实行完成后,Job Client将结果返回给用户;
JobManager:


  • 重要职责是调度工作并协调使命做查抄点;
  • 集群中至少要有一个master,master负责调度task,协调checkpoints和容错;
  • 高可用设置的话可以有多个master,但是要包管一个是leader,其他是standby;
  • JobManager包罗Actor System、Scheduler调度器、CheckPoint协调器 三个紧张的组件;
  • JobManager从客户端【上图的Flink步伐】吸收到使命后,起首天生优化过的实行计划,再调度到TaskManager中实行;
TaskManager


  • 重要职责是从JobManager处吸收使命,并部署和启动使命,吸收上游的数据并处理;
  • TaskManager是在JVM中的一个或多个线程中实行使命的工作节点;
  • TaskManager在创建之初就设置好了Slot【槽】,每个Slot可以实行一个使命;
2.4.1使命槽和槽共享


每个TaskManager是一个JVM的历程,可以在不同的线程中实行一个或多个子使命。为了控制一个worker能吸收多少个task。worker通过task slot【使命槽】来进行控制(一个worker至少有一个task slot)
使命槽:


  • 每个task slot表示TaskManager拥有资源的一个固定巨细的子集;
  • flink将历程的内存进行了分别到多个slot中;
  • 上图中有两个taskManager,每个TaskManager有三个slot,每个slot占1/3的内存;
  • 内存被分别到不同的slot之后可以得到如下好处:
    ①TaskManager最多能同时并发实行的使命是可以控制的,那就是3个,由于不能超过slot的数量;
    ②slot有独占的内存空间,这样在一个TaskManager中可以运行多个不同的作业,作业之间不受影响;
槽共享:


  • 只需计算Job中最高并行度(parallelism)的task slot,只要这个满足,其他的job也都能满足;
  • 资源分配更加公平,如果有比力空闲的slot可以将更多的使命分配给它。图中若没有使命槽共享,负载不高的Source/Map 等subtask将会占据很多资源,而负载较高的窗口subtask则会缺乏资源;
  • 有了使命槽使命,可以将基本并行度(base parallelism)从2提升到6,提高了分槽资源的利用率。同时还可以保障TaskManager给subtask的分配的slot方案更加公平;

三、Flink快速上手

3.1 预备

需求:统计一段文字中,每个单词出现的频次;
版本:基于1.17.0版本;
数据预备:在工程根目录下创建一个input文件夹,并且在下面创建文本文件words.txt,恣意如下数据
批处理思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次;
3.2 导入依靠

  1. <!-- fink 相关依赖 -->
  2.         <dependency>
  3.             <groupId>org.apache.flink</groupId>
  4.             <artifactId>flink-clients</artifactId>
  5.             <version>1.17.0</version>
  6.         </dependency>
  7.         <dependency>
  8.             <groupId>org.apache.flink</groupId>
  9.             <artifactId>flink-streaming-java</artifactId>
  10.             <version>1.17.0</version>
  11.         </dependency>
复制代码
3.3 数据预备

查找到数据源:

思路:流处理,所以是一行一行的读取,然后按照空格切分,然后再分组统计;
3.4、批处理代码 :实现wordcount 案例

DataSet API 批处理 (过时了)

  1. package com.flink17.demo;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.ExecutionEnvironment;
  4. import org.apache.flink.api.java.operators.AggregateOperator;
  5. import org.apache.flink.api.java.operators.DataSource;
  6. import org.apache.flink.api.java.operators.FlatMapOperator;
  7. import org.apache.flink.api.java.operators.UnsortedGrouping;
  8. import org.apache.flink.api.java.tuple.Tuple2;
  9. import org.apache.flink.util.Collector;
  10. /**
  11. * DataSet API 实现wordcount
  12. *
  13. * @author lc
  14. * @version 1.0
  15. * @date 2024/10/8 0008 16:27
  16. */
  17. public class WordCountStreamDemoMain{
  18.     public static void main(String[] args) throws Exception {
  19.         // 1.创建执行环境
  20.         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  21.         // 2.读取数据(这里是文本数据)
  22.         DataSource<String> lineDS = env.readTextFile("input/words.txt");
  23.         // 3.切分、转换(word,1)的格式(第一个参数 word表示的是单词,第二个参数 1是指出现的次数)
  24.         // flatMap的方法中的FlatMapFunction 的第一个参数是输入,第二个参数是输出
  25.         FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne
  26.                 = lineDS.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  27.             // 重写 flatMap ,第一个参数是需要操作的数据,第二个参数Collector是采集器
  28.             @Override
  29.             public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
  30.                 //3.1 按照空格切割
  31.                 String[] words = s.split(" ");
  32.                 //3.2 将单词转换成二元组(word,1)这样的格式
  33.                 for (String word : words) {
  34.                     Tuple2<String, Integer> wordTuple2 = Tuple2.of(word, 1);
  35.                     // 3.3 使用Collector向下游发送数据
  36.                     collector.collect(wordTuple2);
  37.                 }
  38.             }
  39.         });
  40.         // 4.按照word分组
  41.         UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroupBy = wordAndOne.groupBy(0);
  42.         // 5.各分组内聚合
  43.         AggregateOperator<Tuple2<String, Integer>> sum =
  44.                 wordAndOneGroupBy.sum(1); // 这里的1 是位置,表示第二个元素
  45.         // 6.输出
  46.         sum.print();
  47.     }
  48. }
复制代码
实行结果

DataSet写法实现批处理是过时的,保举利用DataStream来写;

3.5、流处理代码(有界流-有开始有结束)::实现wordcount 案例


  1. package com.flink17.demo;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.functions.KeySelector;
  4. import org.apache.flink.api.java.tuple.Tuple2;
  5. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  6. import org.apache.flink.streaming.api.datastream.KeyedStream;
  7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  9. import org.apache.flink.util.Collector;
  10. /**
  11. * DataStream流处理 实现wordcount:(读文件,有界流)
  12. *
  13. * @author lc
  14. * @version 1.0
  15. * @date 2024/10/8 0008 16:53
  16. */
  17. public class WordCountStreamDemoMain {
  18.     public static void main(String[] args) throws Exception {
  19.         // 1.创建执行环境
  20.         StreamExecutionEnvironment evn = StreamExecutionEnvironment.getExecutionEnvironment();
  21.         // 2.读取数据
  22.         DataStreamSource<String> lineDs = evn.readTextFile("input/words.txt");
  23.         // 3.处理数据:切分、转换、分组、聚合
  24.         SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lineDs.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
  25.             @Override
  26.             public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
  27.                 String[] words = value.split(" "); // 空格切分
  28.                 for (String word : words) {
  29.                     // 转换成二元组
  30.                     Tuple2<String, Integer> wordsAndOne = Tuple2.of(word, 1);
  31.                     // 使用Collector向下游发送数据
  32.                     out.collect(wordsAndOne);
  33.                 }
  34.             }
  35.         });
  36.         // 分组
  37.         KeyedStream<Tuple2<String, Integer>, String> ks = wordAndOne.keyBy(
  38.                 // 第一个参数书数据的类型,第二个参数是key的类型
  39.                 new KeySelector<Tuple2<String, Integer>, String>() {
  40.                     @Override
  41.                     public String getKey(Tuple2<String, Integer> value) throws Exception {
  42.                         return value.f0;
  43.                     }
  44.                 }
  45.         );
  46.         SingleOutputStreamOperator<Tuple2<String, Integer>> sum = ks.sum(1);// 这里的1 是位置,表示第二个元素
  47.         // 4.输出数据
  48.         sum.print();
  49.         // 5.执行
  50.         evn.execute();
  51.     }
  52. }
复制代码
实行结果

流处理:拿一条处理一条;所以是一行一行的读取;且有状态,比如hello末了为(hello,3)3是有状态的计算;
前面的编号就是并行度编号,也就是线程数编号;
批处理:一口吻处理一批(这里就是整个文本)
3.6、流处理代码(无界流-有开始没有结束,更加常用)::实现wordcount 案例

在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要连续地处理捕获的数据;
模拟场景:监听socket端口,然后向该端口不断的发送数据

将StreamWorldCount代码中读取文件数据的readTextFile方法更换成socket文本流的方法socketTextStram;
  1. package com.flink17.demo;
  2. import org.apache.flink.api.common.typeinfo.Types;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  6. import org.apache.flink.util.Collector;
  7. /**
  8. * @author lc
  9. * @version 1.0
  10. * @date 2024/10/8 0008 17:17
  11. */
  12. public class WordCountStreamUnboundDemoMain {
  13.     public static void main(String[] args) throws Exception {
  14.         //1.创建执行环境
  15.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  16.         //2.读取数据:socket,第一个参数书服务器名称,第二个参数是端口号
  17.         DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);
  18.         //3.处理数据:切分,转换,分组,聚合,输出
  19.         socketDS
  20.                 .flatMap(
  21.                         (String value, Collector<Tuple2<String, Integer>> out) -> {
  22.                             String[] words = value.split(" "); // 空格切分
  23.                             for (String word : words) {
  24.                                 // 转换成二元组
  25.                                 Tuple2<String, Integer> wordsAndOne = Tuple2.of(word, 1);
  26.                                 // 使用Collector向下游发送数据
  27.                                 out.collect(wordsAndOne);
  28.                             }
  29.                         }
  30.                 )
  31.                 .returns(Types.TUPLE(Types.STRING,Types.INT)) //需要执行返回类型,(word,1)
  32.                 .keyBy(value -> value.f0)
  33.                 .sum(1)
  34.                 .print(); // 输出
  35.         //4.执行
  36.         env.execute();
  37.     }
  38. }
复制代码
不指定返回范例的化,实行会报错;由于Flink具有一个范例提取系统,可以分析函数的输入和返回范例,自动获取范例信息,从而得到对应序列化器和反序列化器,但是,由于java中泛型查出的存在所以报错了;

实行,就会不停监听那个服务器,有输入才会有响应的输出;

有界流是有结束的,无界留是没有结束的
四、Flink部署及启动

4.1、Flink的实行逻辑

Flink提交作业和实行使命,需要几个关键组件:
1.客户端(client):代码由客户端获取并做转换,之后提交给JobManager
2.JobManager 就是 Flink集群里面的“管事人”,对作业进行中央调度管理;而他获取到要实行的作业后,会进一步处理转换,然后分布使命给浩繁的TaskManager;
3.TaskManager,就是真正“干活的人”,数据的处理操作 都是他们来做的;

4.2、Flink的安装模式

Flink支持多种安装模式:
1.local-本地安装:单机模式,一般倒霉用;
2.standalone-独立模式:Flink自带集群,开发测试环境利用;
3.yarn:计算资源同一由Hadoop YARN管理,生产环境利用;
4.2.1、预备工作:

  1. Flink 是一个分布式的流处理框架,所以实际应用一般都需要搭建集群环境。我们在进行
  2. Flink 安装部署的学习时,需要准备 3 台 Linux 机器。
  3. 具体要求如下:
  4.         系统环境为 CentOS 7.5 版本(也可以是Ubuntu)
  5.         安装 Java 8。
复制代码
4.2.1.1 Flink和Java下载安装

安装Flink:
官网:https://flink.apache.org/downloads/
清华镜像:https://mirrors.tuna.tsinghua.edu.cn/apache/flink/
1.可以从官网下载Flink的安装包,如下1.20.0版本的;
2.下载完后可以移动到自己想存储的位置

3.解压安装包


4.启动flink
进入到解压目录下,实行以下脚本:

发现报错,是由于没有安装jdk,这里我们安装jdk8;
5.安装jdk
网址:https://www.oracle.com/java/technologies/downloads/#java8
jdk安装:
ubuntu中自带了jdk,先将其卸载:sudo apt-get remove openjdk
sudo apt-get autoremove
上传安装包到自指定路径:/usr/local/jdk
解压安装包:tar -zxvf jdk-8u291-linux-x64.tar -C /usr/local/myapp/jdk
设置环境变量:sudo vim /etc/profile
ESC然后再 : + shfit 输入wq,保存退出
在文末增加设置(路径根据实际情况调解):
export JAVA_HOME=/usr/local/myapp/jdk/jdk1.8.0_291
export JRE_HOME=/usr/local/myapp/jdk/jdk1.8.0_291/jre
export PATH=                                   P                         A                         T                         H                         :                              PATH:                  PATH:JAVA_HOME/bin:                                   J                         R                                   E                            H                                  O                         M                         E                         /                         b                         i                         n                         e                         x                         p                         o                         r                         t                         C                         L                         A                         S                         S                         P                         A                         T                         H                         =                              JRE_HOME/bin export CLASSPATH=                  JREH​OME/binexportCLASSPATH=CLASSPATH:                                   J                         A                         V                                   A                            H                                  O                         M                         E                         /                         l                         i                         b                         :                              JAVA_HOME/lib:                  JAVAH​OME/lib:JRE_HOME/lib

测试jdk:java -version 大概 javac -version
root@vm1:/usr/local/myapp/jdk# java -version
java version “1.8.0_291”
Java™ SE Runtime Environment (build 1.8.0_291-b10)
Java HotSpot™ 64-Bit Server VM (build 25.291-b10, mixed mode)
root@vm1:/usr/local/myapp/jdk# javac -version
javac 1.8.0_291

6. Flink安装包然后解压到指定目录,留意修改所属用户和用户组
  1. #.改名
  2. mv flink-1.7.2 flink
  3. #.赋予权限
  4. chown -R root:root flink
复制代码

7.再次启动
关闭防火墙:sudo ufw disable

启动成功,通过jps查看服务信息:
StandaloneSessionClusterEntrypoint为Flink主历程,即JobManager;TaskManagerRunner为Flink从历程,即TaskManager
8.页面查看
单机启动,自己既是jobmanager,也是taskmanager
在欣赏器中访问服务器8081端口即可查看Flink的WebUI,
比如http://localhost:8081/,从WebUI中可以看出,当前本地模式的Task Slot数量和TaskManager数量。访问结果如下图所示:

9.停止flink
./bin/stop-cluster.sh
革新页面会报错

10.可以通过观察logs目录下的日记来检测系统是否正在运行了
tail log/flink–jobmanager-.log

4.2.2、单节点部署


4.2.3、standalone安装模式

我这边是给Flink003为Master机器(JobManager),其他的为Slave机器(TaskManager)
4.2.3.1 ssh设置



集群的服务器之间设置好ssh免密登录,制止后续搭建出现贫苦,这一步肯定要做。简单步骤如下:

  • 先辈入root用户权限 sudo -i,且修改root用户的密码 输入passwd

2. 查看服务器是否安装有ssh服务


  • 天生密钥和公钥;和在master机器实行ssh-keygen -t rsa


  • 把公钥复制到需要免密登录的从服务器上;在master机器实行下令,将密钥拷贝到其余服务器ssh-copy-id -i /root/.ssh/id_rsa.pub 目标服务器IP (报错的化看下图的原因解释)
我们实行ssh-copy-id 报错,说连接被拒,可以进行排错

确认目标主机的SSH服务是否运行:sudo systemctl status ssh,如果未运行实行:sudo systemctl start ssh;
![![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/07bd7889c044401593383cad3875c95b.png)
那就是ssh服务没有被安装,需要安装,ubuntu上查抄安装状态输入 sudo apt-listfiles | grep openssh-server,
在利用sudo apt-get install openssh-server安装;
查抄SSH设置,确保SSH服务的设置文件 /etc/ssh/sshd_config正确,特别是监听的端口设置是Port 22,如果利用了自定义端口,确保该端口正确设置并且未被防火墙拦截。


可以之后再试ssh-copy-id ;

  • 在继续ssh-copy-id ,输入密码;
  • 验证免密登录。运行以下下令连接到远程服务器:ssh username@remote_server_ip。如果成功连接而无需输入密码,则表示设置成功
要退出ssh连接输入exit就可以了;
4.2.3.2 修改Flink设置,实现基础的集群搭建


  • 修改flink设置文件:
    进入Flink的设置文件,conf目录下的flink-conf.yaml 大概cong.yaml: vim conf.yaml
    将文件中的jobmanager.rpc.address属性进行修改为JobManager机器也就是主机的ip地址,
jobmanager.rpc.address: 主机ip地址
jobmanager.bind-host: 0.0.0.0

2.修改workers文件
vim conf/workers
workers文件必须包罗全部需要启动的TaskManager节点的主机名,且每个主机名占一行。在JobManager服务器,实行以下操作


3. 复制Flink安装文件到其他服务器
其他从服务器不需要下载flink的安装包,必须从主scp到从,由于有路径要求,才能关联起来;
scp -r /usr/local/flink/ root@从服务器1的ip地址:/usr/local/flink/
scp -r /usr/local/flink/ root@从服务器2的ip地址:/usr/local/flink/

4.集群启动
这里只需要在主服务器上,也就是JobManager的服务器上启动Flink,从服务也会跟着启动;
在JobManager节点上进入Flink安装目录,实行以下下令启动Flink集群:
启动完毕后,在集群各服务器上通过jsp下令查看Java历程。若各节点存在以下历程,则说明集群启动成功:
JobManager节点:StandaloneSessionClusterEntrypoint
TaskManager1节点:TaskManagerRunner
TaskManager2节点:TaskManagerRunner
我们输入从机的密码,可以看到提醒我们启动了从机的flink(这里我只弄了一台机器从机),下图是主机的下令窗口,输入jps可以看到作为JobManager的StanaloneSessionClusxxxx启动了

从机下令窗口输入jps可以看到TaskManager启动成功;

尝试提交一个简单使命,如果使命正常实行完毕,则集群一切正常。提交Flink自带的简单使命如下:
./bin/flink run examples/streaming/WordCount.jar

5.查看webUI
通过JobManager节点访问WebUI(http://localhost:8081/),可以看到此时是1个JobManager,1个TaskManager,也能以上实行完毕的使命,如下图:



4.2.4 进阶版集群-Flink Standalone HA搭建

在Flink Standalone模式下,实现HA的方式可以利用ZooKeeper在全部正在运行的JobManager实例之间进行分布式协调,实现多个JobManager无缝切换。Flink Standalone模式的HA架构如图:
简单来说就是 利用 ZooKeeper 来实现JobManager无缝切换,从而到达分布式协调;
留意:Flink内置了Zookeeper服务和相关脚本文件,如果你的集群中没有安装Zookeeper,则可以通过修改zoo.cfg文件设置Flink内置的Zookeeper。生产环境建议利用独立的外部zookeeper;

HA的核心就是:可以在集群中启动多个JobManager,并使它们都向ZooKeeper进行注册,ZooKeeper利用自身的选举机制包管同一时间只有一个JobManager是活动状态(Active)的,其他的都是备用状态(Standby)。当活动状态的JobManager出现故障时,ZooKeeper会从其他备用状态的JobManager选出一个成为活动JobManager。流程见下图:

此外,活动状态的JobManager在工作时会将其元数据(JobGraph、应用步伐JAR文件等)写入一个远程长期化存储系统(比方HDFS)中,还会将元数据存储的位置和路径信息写入ZooKeeper存储,以便能够进行故障规复,如图下图所示:

操作起来!!!!!
起首,我们还是在上面的操作的三个服务器进行操作;
如今是一个jobmanager(Flink003机器),两个taskmanager(Flink001和Flink002机器);
如今我们需要变成两个jobmanager,一个taskmanager;

  • 修改masters文件
    Flink的masters文件用于设置全部需要启动的JobManager节点以及每个JobManager的WebUI绑定的端口。
进入Flink安装目录,修改conf/masters文件,修改内容如下:
  1. Flink003机器的ip地址:8081
  2. Flink002机器的ip地址:8082
复制代码


上述设置表示在集群 Flink003机器 和 Flink002机器节点上启动JobManager,并且每个JobManager的WebUI访问端口分别为8081和8082。

  • 修改flink-conf.yaml文件设置高可用模式
    进入Flink003机器 节点的Flink安装主目录,修改conf/flink-conf.yaml文件,添加以下内容:
  1. # 将高可用模式设置为ZooKeeper,默认集群不会开启高可用状态
  2. high-availability: zookeeper
  3. # ZooKeeper集群主机名(或IP)与端口列表,多个以逗号分隔
  4. high-availability.zookeeper.quorum: centos01:2181,centos02:2181,centos03:2181
  5. # 用于持久化JobManager元数据(JobGraph、应用程序JAR文件等)的HDFS地址,以便进行故障恢复,ZooKeeper上存储的只是元数据所在的位置路径信息
  6. high-availability.storageDir: /data/software/flink-15.4/ha
  7. # 获取storageDir也可用hdfs,如果使用hdfs的话,则需要单独安装hdfs,本文暂不使用
  8. #high-availability.storageDir: hdfs://centos01:9000/flink/recovery
复制代码


改动如图下


  • 修改zoo.cfg文件
    Flink内置了ZooKeeper服务和相关脚本文件,如果你的集群中没有安装ZooKeeper,则可以通过修改zoo.cfg文件设置Flink内置的ZooKeeper。生产环境建议利用独立的外部ZooKeeper。
进入centos01节点的Flink安装主目录,修改conf/zoo.cfg文件,添加以下内容,设置ZooKeeper启动节点与选举相关端口:
  1. server.1=centos01:2888:3888
  2. server.2=centos02:2888:3888
  3. server.3=centos03:2888:388
复制代码
上述设置表示在centos01、centos02和centos03节点上启动ZooKeeper服务,此中1、2、3表示每个ZooKeeper服务器的唯一ID。


  • 复制Flink安装文件到其他节点
    继续接纳scp下令,复制centos01的文件到其他节点,scp下令会把相同文件覆盖。
  1. scp -r /usr/local/flink/ root@从服务器1的ip地址:/usr/local/flink/
  2. scp -r /usr/local/flink/ root@从服务器2的ip地址:/usr/local/flink/
复制代码

  • 启动ZooKeeper集群
    如果利用Flink内置的ZooKeeper,在Flink003机器节点实行以下下令,即可启动整个ZooKeeper集群:
    …/bin/start-zookeeper-quorum.sh

启动成功后,在每个Flink节点上都会产生一个名为FlinkZooKeeperQuorumPeer的历程,该历程是ZooKeeper服务的守卫历程。利用jps可以查看到如下历程:



留意:这里本级需要利用localhost
6. Flink003机器节点上实行以下下令,启动Flink Standalone HA集群:
要先启动zookeeper再启动flink
bin/start-cluster.sh

Flink003主机 jps:

Flink002和Flink001机器 jps:



  • 访问webUI
    从前只有Flink003可以访问webui页面也就是Flink的Dashboard,如今是Flink003和Flink002都可以访问
假如方向访问不了,说明设置不对,重新查看自己的设置,要留意改完Flink003的后要从新scp复制到从服务器上;

8.测试
在提交一个测试,如果能正常实行,说明整个集群正常。
./bin/flink run examples/streaming/WordCount.jar

9.停止集群
若要停止Flink Standalone HA集群,在jobmanager节点上起首实行以下下令停止整个Flink集群,我这里是flink003:
./stop-zookeeper-quorum.sh
4.2.5、Yarn安装模式

完成上述的Flink和jdk的安装后;
进入Flink的conf目录,依据Flink的版本进行修改flink-conf.yaml大概conf.yaml文件
bind-host 就是0.0.0.0
rpc.address指的是从机的ip地址


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

刘俊凯

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表