1. Flink简介
- Usermanual:
- https://nightlies.apache.org/flink/flink-docs-release-1.13/
- Java API :
- https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/
- Scala API:
- https://nightlies.apache.org/flink/flink-docs-release-1.16/api/scala/org/apache/flink/api/scala/index.html
- Python:
- https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/python/datastream/intro_to_datastream_api/
复制代码 1.1 Flink的起源和计划理念
flink项目的核心目标:数据流上的有状态盘算(Stateful Computations over Data Streams)。
时间驱动:来一个事件处置惩罚一个事件;流处置惩罚的流水线;流&批数据分析
具体定位是:Apache Flink 是一个框架和分布式处置惩罚引擎,用于对无界和有界数据流进行有状态盘算。Flink 被计划在全部常见的集群情况中运行,以内存执行速度和恣意规模来执行盘算。
1.2 Flink的应用
Flink是一个大数据流处置惩罚引擎,它可以为不同的行业提供大数据实时处置惩罚的解决方案。Flink在国内热度尤其高,一方面由于阿里的贡献和带头效应,另一方面也跟中国的应用场景密切相关。中国的人口规模与互联网的遍及程度,决定了对大数据处置惩罚速度的要求越来越高,也迫使中国的互联网企业去追逐更高的数据处置惩罚效率。好比,一个网站在中国可能要面临数亿的日活用户、每秒数亿次的盘算峰值,而Flink恰恰给了高速的处置惩罚海量流式数据提供了可能。
1.2.1 Flink在企业中的应用
对于数据处置惩罚而言,任何行业、任何公司的需求其实都是一样的:数据规模大、实时性要求高、确保结果准确、方便扩展、故障后可快速恢复 —— 而这些要求,作为新一代大数据流式处置惩罚引擎的Flink统统可以满足,这也是Flink在全天下范围得到广泛应用的原因。
以下是Flink官网列出的着名企业用户,
当用户数据量非常巨大时,快速地分析响应、实时做出精准的推荐就显得困难,而Flink这样真正意义上的大数据流处置惩罚引擎,就能做到这些。
1.2.2 Flink重要应用场景
各种行业的众多公司都在使用Flink,那到底他们用Flink来处置惩罚什么需求呢?什么样的场景最适合Flink的使用呢?
回到Flink本身的定位,它是一个大数据流式处置惩罚引擎,处置惩罚的是流式数据,也就是“数据流(Data Flow)”。数据流的含义是,数据并不是收集好的,而是像流水一样,是一组有序的数据序列,逐个到来、逐个处置惩罚。由于数据来到之后就会被即刻处置惩罚,所以流处置惩罚的一大特点就是“快速”,也就是良好的实时性。Flink适合的场景,其实也就是需要实时处置惩罚数据流的场景。
1. 电商和市场营销
举例:实时数据报表、广告投放、实时推荐
在电商行业中,网站点击量是统计PV、UV的重要泉源,也是现在“流量经济”的最重要数据指标。许多公司的营销策略,好比广告的投放,也是基于点击量来决定的。另外,在网站上提供给用户的实时推荐,往往也是基于当前用户的点击行为做出的。
网站获得的点击数据可能是连续且不均匀的,还可能在同一时间大量产生,这是典型的数据流。假如我们盼望把它们全部收集起来,再去分析处置惩罚,就谋面临许多问题:起首,我们需要很大的空间来存储数据;其次,收集数据的过程耗费许多时间,统计分析结果的实时性就大大低落了;另外,分布式处置惩罚无法保证数据的顺序,假如我们只以数据进入体系的时间为准,可能导致最闭幕果盘算错误。我们需要的是直接处置惩罚数据流,而Flink就可以做到这一点。
2. 物联网(IOT)
举例:传感器实时数据收罗和表现、实时报警、交通运输等
物联网是流数据被普遍应用的范畴。各种传感器不绝获得测量数据,并将他们以流的情势传输到数据中央,而数据中央会将数据处置惩罚分析后,得到运行状态或者报警信号,实时地表现在监控屏幕上,所以在物联网中,低延迟的数据传输和处置惩罚,以及准确的数据分析通常很关键。
交通运输业也表现了流处置惩罚的重要性。好比高铁运行重要就是依赖传感器检测数据,测量数据包括列车的速度和位置,以及轨道周边的状态。这些数据会从轨道传给列车,再从列车传到沿途的其他传感器;与此同时,数据报告也被发送回控制中央。由于列车处于高速行驶状态,因此数据处置惩罚的实时性要求是极高的。假如数据没有被实时准确处置惩罚,调整意见和告警就能相应产生,结果可能会非常严峻。
3. 物流配送和服务业
举例:订单状态实时更新、通知信息推送
在许多服务型应用中,都会涉及订单状态的更新和通知的推送。这些信息基于事件触发,不均匀地连续不断生成,处置惩罚之后需要实时传递给用户,这也是非常典型的数据流处置惩罚。
4. 银行和金融业
举例:实时结算和通知推送,实时检测非常行为。
银行和金融业是另外一个典型的应用行业。用户的生意业务行为是连续大量发生的,银行面临的是海量的流式数据。由于要处置惩罚的生意业务数据量太大,从前的银行是按天结算,汇款一般都要隔天才气到账。这显然不能满足快速生意业务的需求,在举世化经济中,能够提供24小时服务变得越来越重要。现在生意业务和报表都会快速准确地生成,跨行转账也可以做到瞬间到账,还可以接到实时地推送通知,这需要我们能够实时处置惩罚数据流。
1.3 流式数据处置惩罚的发展和演变
1.3.1 流处置惩罚和批处置惩罚
数据处置惩罚有不同的方式,有些场景数据是一个一个来,是一组有序的数据序列,叫做“数据流”;而有些场景的数据本身就是一批同时到来,是一个有限的数据集,这就是批量数据(或数据集)。
企业中绝大多数的应用程序,都是在不绝地吸收用户请求、记载用户行为和体系日志,或则持续吸收收罗到的状态信息。所以数据会在不同的时间持续生成,形成一个有序的数据序列 -- 这就是典型的数据流。对于流式数据,用流处置惩罚是最好、最符合的方式。但是传统的数据处置惩罚架构并不是这样的,无论是关系型数据库、照旧数仓,都倾向于先“收集数据”,然后再进行处置惩罚。为什么不直接用流处置惩罚方式呢?这是由于,分布式处置惩罚在架构上更轻易实现。要想弄清楚流处置惩罚的发展演变,我们起首要了解传统的数据处置惩罚架构。
1.3.2 传统事务处置惩罚
IT互联网公司往往会用不同的应用程序来处置惩罚各种业务。好比内部使用的企业资源规划(ERP)体系、客户关系管理(CRM)体系,还有面向客户的Web应用程序。
- 企业资源规划(Enterprise Resource Planning,简称 ERP)系统是一种集成管理软件,
- 用于组织和管理企业不同部门的核心业务流程和资源。通过一个统一的数据库系统,
- ERP 可以整合公司的各个部门(如财务、人力资源、供应链管理、采购、制造和销售)在同一个信息系统中,
- 为企业提供全面的管理视图和数据支持。
- ERP 系统的目标是协调企业内部各个部门之间的工作流程,提高效率、降低成本、增强业务流程的透明度,
- 并帮助企业做出更明智的决策。这些系统通常包括一系列应用程序和工具,
- 可以用来实施和自动化各种业务流程,
- 如财务会计、库存管理、生产计划、客户关系管理(CRM)、人力资源管理等。
- 一些流行的 ERP 系统供应商包括 SAP、Oracle、Microsoft Dynamics、Infor 和 Salesforce,
- 它们提供各种功能丰富的 ERP 解决方案,可根据企业的需求和规模进行定制和部署。
- 总的来说,ERP 系统是帮助企业实现资源整合、业务流程优化和管理决策支持的关键工具,
- 有助于提高企业运营效率和竞争力。
复制代码 这些体系一般都会进行分层计划:盘算层就是应用程序本身,用于数据盘算和处置惩罚;而存储层往往是传统的关系型数据,用于存储数据。如下图所示:
这里的应用程序在处置惩罚数据的模式上有共同之处:吸收的数据是持续生成的事件,好比用户的点击行为,客户下的订单,或者操作人员发出的请求。处置惩罚事件时,应用程序需要先读取远程数据库的状态,然后按照处置惩罚逻辑得到结果,将响应返回给用户,并更新数据库状态。一般来说数据库体系可以服务于多个应用程序,它们偶然会访问相同的数据库或表。这就是传统的“事务处置惩罚”架构。体系所处置惩罚的连续不断地事件,其实就是一个数据流,而对于每一个事件,体系都在收到之后进行相应的处置惩罚,这也是符合流处置惩罚的原则的。所以可以说,传统的事务处置惩罚,就是最基本的流处置惩罚架构。
对于各种事件请求,事务处置惩罚的方式能够保证实时响应,但是我们知道,这样的架构对表和数据的计划要求很高;当数据规模越来越巨大、体系越来越复杂时,可能需要对表进行重构,而且一次联表查询也会花费大量的时间,甚至不能实时得到返回结果。于是作为程序员就只好更多的精力放在表的计划和重构,以及SQL的调优上,而无法专注于业务逻辑的实现 -- 这种工作费时费力,却没有办法直接表现在产品上。
1. 事务处置惩罚 -- OLTP
对于传统的事务处置惩罚,是比较快的是实时的,此架构受限的地方在于,数据要存储在传统的关系型数据库中。
关系型数据库:MySQL,Oracle,PostSQL等;
非关系型数据库:Redis,MongoDB,HBase,ES等
整体来讲,Redis响应速度比较快的,内存级别。
2. 分析处置惩罚 -- OLAP
OLTP(On-Line Transaction Processing):联机事务处置惩罚,典型代表是关系型数据库(mysql),它的数据存储在服务器本地的文件里。
OLAP(On-Line Analytical Processing):联机分析处置惩罚,OLAP型数据库的典型代表是分布式文件体系(hive),它的数据存储在HDFS集群里。
它们的重要区别入下图:
1.3.3 有状态的流处置惩罚 -- 第一代流处置惩罚架构
对于事件流的处置惩罚非常简单,例如收到一个请求就返回一个“收到”,那就可以省去数据库的查询和更新,但是这样的处置惩罚没有什么现实意义。在现实的应用中,往往还需要一些额外的数据,我们把额外的数据保存成一个“状态”,然后针对这条数据进行处置惩罚,而且更新状态,在传统的架构中,这个状态被保存到数据库中。这就是所谓的“有状态的流处置惩罚”。
为了加快访问速度,我们可以直接把状态保存到本地的内存中,如下图所示:
当应用收到一个新事件时,它可以从状态中读取数据,也可以更新状态。而当状态是从内存中读写的时候,这就和访问本地变量没有什么区别了,实时性得到极大的提拔。
另外数据规模增大时,也不需要重构,只需要构建分布式集群,各自在本地盘算就可以,可扩展性也变得更好。由于采用的是一个分布式体系,所以还需要保护本地状态,防止在故障时数据丢失。我们可以定期地将应用状态的一致性检查点(checkpoint)存盘,写入远程的持久化存储,碰到故障时再去读取进行恢复,这样就保证了更好的容错性。
有状态的流处置惩罚是一种通用而且机动的计划架构,可用于许多不同的场景。具体来说,有以下几种典型的应用。
1. 事件驱动型(Event-Driven)应用
事件驱动型应用时一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发盘算、状态更新或其他外部动作。比较典型的就是以Kafka为代表的消息队列几乎都是事件驱动型应用。
这跟传统事务处置惩罚本质上是一致的,区别在基于有状态流处置惩罚的事件驱动应用,不再需要查询远程数据库,而是在本地访问他们的数据,这样在吞吐量和延迟方面就可以有更好的性能。另外远程持久性存储的检查点保证了应用可以从故障中恢复。检查点可以异步和增量地完成,因此对正常盘算的影响非常小。
2. 数据分析型(Event-Analysis)应用
所谓数据分析,就是从原始数据中提取信息和发觉规律。传统上,数据分析一般是先将数据复制到数据仓库(DataWarehouse),然后进行批量查询。假如数据有了更新,必须将最新数据添加到要分析的数据会合,然后重新运行查询或应用程序。数仓的计划一般是将大量数据(如日志文件)写入到Hadoop的分布式文件体系(HDFS)、S3或HBase等批量存储数据库,以较低的资本进行大容量的存储。然后通过SQL-On-Hadoop类引擎查询和处置惩罚数据,好比Hive。这中处置惩罚方式是典型的批处置惩罚,特点是可以处置惩罚海量数据,但实时性较差,所以也叫离线分析。
假如有一个复杂的流处置惩罚引擎,数据分析可以实时执行。流式查询或应用程序不是读取有限的数据集,而是担当实时事件流,不断生成和更新结果。结果要么写入外部数据库,要么作为内部状态进行维护。Apache Flink同时支持流式处置惩罚和批处置惩罚的数据分析应用。如下图所示:
与批处置惩罚分析相比,流处置惩罚分析最大的上风就是低延迟,真正实现了实时。另外流处置惩罚不需要去单独思量新数据的导入和处置惩罚,实时更新本来就是流式处置惩罚的基本模式。当前企业对流 式数据处置惩罚的一个热点应用就是实时数仓,许多公司正是基于 Flink 来实现的。
3. 数据管道(Data-Pipline)应用
ETL就是将数据的提取、转换、加载时在存储体系之间转换和移动数据的常用方法。在数据分析的应用中,通常会定期触发ETL任务,将数据从事务数据库体系复制到分析数据库或数据仓库。所谓管道的作用与ETL雷同。他们可以转换和扩展数据,也可以在存储体系之间移动数据。不过我们用流式处置惩罚架构来搭建管道,这些工作就可以连续运行,不再需要周期触发。好比,数据管道可以用来监控文件体系目录中的新文件,将数据写入事件日志。连续数据管道的显着上风是减少了将数据移动到目的地的延迟,而且更加通用。如图 1-9 所示,展示了 ETL 与数据管道之间的区别。
有状态的流处置惩罚架构上其实并不复杂,许多用户基于这种头脑开辟出了本身的流处置惩罚体系,这就是第一代流处置惩罚器。Apache Storm 就是此中的代表。Storm 可以说是开源流处置惩罚的先锋,最早是由 Nathan Marz 和创业公司 BackType 的一个团队开辟的,后来才成为 Apache 软件基金会下属的项目。Storm 提供了低延迟的流处置惩罚,但是它也为实时性付出了代价:很难实现高吞吐,而且无法保证结果的准确性。用更专业的话说,它并不能保证“准确一次”(exactly-once);即便它能够保证的一致性级别,开销也相当大。关于状态一致性和exactly-once,会在后续讨论。
1.3.4 Lambda架构--第二代流处置惩罚架构
对于有状态的流处置惩罚,当数据越来越多时,我们必须用分布式的集群架构来获取更大的吞吐量。但是分布式架构会带来另一个问题:怎样保证数据处置惩罚的顺序是准确的呢?
对于批处置惩罚来说,这并不是一个问题。由于全部数据都已收集完毕,我们可以根据需要选择、分列数据,得到想要的结果。可假如我们采用“来一个处置惩罚一个”的流处置惩罚,就可能出现“乱序”的现象:本来先发生的事件,由于分布处置惩罚的原因滞后了。怎么解决这个问题呢?
以 Storm 为代表的第一代分布式开源流处置惩罚器,重要专注于具有毫秒延迟的事件处置惩罚,特点就是一个字“快”;而对于准确性和结果的一致性,是不提供内置支持的,由于结果有可能取决于到达事件的时间和顺序。另外,第一代流处置惩罚器通过检查点来保证容错性,但是故障恢复的时候,纵然事件不会丢失,也有可能被重复处置惩罚——所以无法保证 exactly-once。与批处置惩罚器相比,可以说第一代流处置惩罚器捐躯了结果的准确性,用来换取更低的延迟。而批处置惩罚器恰恰反过来,捐躯了实时性,换取了结果的准确。假如可以让二者做个联合,不就可以同时提供快速和准确的结果了吗?正是基于这样的头脑,Lambda 架构被计划出来,如图 1-10 所示。我们可以以为这是第二代流处置惩罚架构,但事实上,它只是第一代流处置惩罚器和批处置惩罚器的简单合并。
Lambda 架构主体是传统批处置惩罚架构的加强。它的“批处置惩罚层”(Batch Layer)就是由传统的批处置惩罚器和存储组成,而“实时层”(Speed Layer)则由低延迟的流处置惩罚器实现。数据到达之后,两层处置惩罚双管齐下,一方面由流处置惩罚器进行实时处置惩罚,另一方面写入批处置惩罚存储空间, 等待批处置惩罚器批量盘算。流处置惩罚器快速盘算出一个近似结果,并将它们写入“流处置惩罚表”中。而批处置惩罚器会定期处置惩罚存储中的数据,将准确的结果写入批处置惩罚表,并从快速从表中删除禁绝确的结果。最终,应用程序会集并快速表和批处置惩罚表中的结果,并展示出来。
Lambda 架构现在已经不再是最先进的,但仍在许多地方使用。它的长处非常显着,就是兼具了批处置惩罚器和第一代流处置惩罚器的特点,同时保证了低延迟和结果的准确性。而它的缺点同样非常显着。起首,Lambda 架构本身就很难建立和维护;而且,它需要我们对一个应用程序,做出两套语义上等效的逻辑实现,由于批处置惩罚和流处置惩罚是两套完全独立的体系,它们的 API也完全不同。为了实现一个应用,付出了双倍的工作量,这对程序员显然不敷友爱。
1.3.4 新一代流处置惩罚器 -- Flink架构
之前的分布式流处置惩罚架构,都有显着的缺陷,人们也不绝没有放弃对流处置惩罚器的改进和完善。终于,在原有流处置惩罚器的基础上,新一代分布式开源流处置惩罚器诞生了。为了与之前的体系区分,我们一般称之为第三代流处置惩罚器,代表当然就是 Flink。
第三代流处置惩罚器通过奇妙的计划,完善解决了乱序数据对结果准确性的影响。这一代体系还做到了准确一次(exactly-once)的一致性保障,是第一个具有一致性和准确结果的开源流处置惩罚器。另外,先前的流处置惩罚器仅能在高吞吐和低延迟中二选一,而新一代体系能够同时提供这两个特性。所以可以说,这一代流处置惩罚器仅凭一套体系就完成了 Lambda 架构两套体系的工作,它的出现使得 Lambda 架构黯然失色。除了低延迟、容错和结果准确性之外,新一代流处置惩罚器还在不断添加新的功能,例如高可用的设置,以及与资源管理器(如 YARN 或 Kubernetes)的精密集成等等。在下一节,我们会将 Flink 的特性做一个总结,从中可以了解到新一代流处置惩罚器的强盛。
1.4 Flink的特性总结
Flink是第三代分布式流处置惩罚器,它的功能丰富而强盛。
1.4.1 Flink的核心特性
Flink区别于传统数据处置惩罚框架的特点如下:
- 高吞吐,低延迟。每秒处置惩罚数百万个事件,毫秒级延迟。
- 结果的准确性。Flink提供了事件事件(Event Time)和处置惩罚事件(Processing Time)语义。对于乱序事件流,事件事件语义仍然能提供一致且准确的结果。
- 准确一次(Exactly Once)的状态一致性保证。
- 可以毗连到最常用的存储体系,如Apache Kafka、Apache Cassandra、Elasticsearch、JDBC、(分布式)文件体系,如HDFS和S3.
- 高可用。本身支持高可用设置,加上K8s,Yarn和Mesos的精密集成,再加上从故障中快速恢复和动态扩展任务的本事,Flink能做到以少少的停机时间7 x 24 全天运行。
- 能够更新应用程序代码并将作业(jobs)迁移到不同的Flink集群,而不会丢失应用程序的状态。
1.4.2 Flink的分层API
除了上述这些特性之外,Flink 照旧一个非常易于开辟的框架,由于它拥有易于使用的分层 API,整体 API 分层如图 1-11 所示。
越顶层越抽象,表达含义越简明,使用越方便;
越底层越具体,表达本事越丰富,使用越机动;
SQL:最高层语言
Table API:声明式范畴专用语言
DataStream/DataSet API:核心APIs
有状态流处置惩罚:底层APIs
最底层级的抽象仅仅提供了有状态流,它将处置惩罚函数(ProcessFunction)嵌入到了DataStream API中。底层处置惩罚函数(Process Function)与DataStream API相集成,可以对某些操作进行抽象,它答应用户使用自界说状态处置惩罚来自一个或多个数据流的事件,且状态具有一致性和容错性保证。除此之外,用户可以注册事件时间并处置惩罚时间回调,从而使程序可以处置惩罚复杂的盘算。现实上,大多数应用并不需要上述的底层抽象,而是直接针对核心 API(Core APIs) 进行编程,好比 DataStream API(用于处置惩罚有界或无界流数据)以及 DataSet API(用于处置惩罚有界数据集)。这些 API 为数据处置惩罚提供了通用的构建模块,好比由用户界说的多种情势的转换(transformations)、毗连(joins)、聚合(aggregations)、窗口(windows)操作等。DataSet API为有界数据集提供了额外的支持,例如循环与迭代。这些 API 处置惩罚的数据类型以类(classes)的情势由各自的编程语言所表示。
Table API 以表为中央的声明式编程,此中表在表达流数据时会动态变化。Table API遵循关系模型:表有二维数据布局(schema)(雷同于关系数据库中的表),同时API提供可比较的操作,如select、join、grup-by,aggregate等。
尽管Table API可以通过多种类型的用户自界说函数(UDF)进行扩展,仍不如核心API更具表达本事,但是使用起来代码量更少,更简便。除此之外,Table API程序在执行之前会使用内置优化器进行优化。我们可以在表与DataStream/DateSet 之间无缝切换,可以答应程序将Table API与DataStream混合使用。
SQL 是Flink提供的最高层级的抽象,这一层在语法与表达本事上与Table API雷同,但是是以SQL查询表达式的情势表现程序。SQL抽象层与Table API交互密切,同时SQL查询可以直接在Table API界说的表上执行。目前Flink SQL和Table API还在完善过程中,大公司都会二次开辟符合本身需要的工具包。而 DataSet 作为批处置惩罚 API 现实应用较少,2020 年 12 月 8 日发布的新版本 1.12.0, 已经完全实现了真正的流批一体,DataSet API 已处于软性弃用(soft deprecated)的状态。用 Data Stream API 写好的一套代码, 即可以处置惩罚流数据, 也可以处置惩罚批数据,只需要设置不同的执行模式。这与之前版本处置惩罚有界流的方式是不一样的,Flink 已专门对批处置惩罚数据做了优化处置惩罚。
1.5 flink vs spark
Spark是一个通用大规模数据分析引擎,它提出的内存盘算概念让各人耳目一新,得以从 Hadoop 繁重的 MapReduce 程序中解脱出来,可以说是划时代的大数据处置惩罚框架。除了盘算速度快、可扩展性强,Spark 还为批处置惩罚(SparkSQL)、流处置惩罚(Spark Streaming)、机器学习(Spark MLlib)、图盘算(Spark GraphX)提供了统一的分布式数据处置惩罚平台,整个生态经过多年的蓬勃发展已经非常完善。然而正在各人以为 Spark 已经方兴未艾、即将一统天下之际,Flink 如一颗新星异军突起,使得大数据处置惩罚的江湖再起风云。许多读者在最初接触都会有这样的疑问:想学习一个大数据处置惩罚框架,到底选择 Spark,照旧 Flink 呢?这就需要我们了解两者的重要区别,理解它们在不同范畴的上风。
1.5.1 数据处置惩罚架构
我们已经知道,数据处置惩罚的基本方式,可以分为批处置惩罚和流处置惩罚两种。批处置惩罚针对的是有界数据集,非常适合需要访问海量的全部数据才气完成的盘算工作,一般用于离线统计。流处置惩罚重要针对的是数据流,特点是无界、实时, 对体系传输的每个数据依次执行操作,一般用于实时统计。
从根本上说,Spark 和 Flink 采用了完全不同的数据处置惩罚方式。可以说,两者的天下观是截然相反的。Spark 以批处置惩罚为根本,并尝试在批处置惩罚之上支持流盘算;在 Spark 的天下观中,万物皆批次,离线数据是一个大批次,而实时数据则是由一个一个无限的小批次组成的。所以对于流处置惩罚框架 Spark Streaming 而言,其实并不是真正意义上的“流”处置惩罚,而是“微批次”(micro-batching)处置惩罚,如图 1-12 所示。。
而 Flink 则以为,流处置惩罚才是最基本的操作,批处置惩罚也可以统一为流处置惩罚。在 Flink 的天下观中,万物皆流,实时数据是标准的、没有界限的流,而离线数据则是有界限的流。如图1-13 所示,就是所谓的无界流和有界流。
1. 无界数据流(Unbounded Data Stream)
所谓无界数据流,就是有头没尾,数据的生成和传递会开始但永远不会结束,如图 1-13所示。我们无法等待全部数据都到达,由于输入是无界的,永无止境,数据没有“都到达”的时候。所以对于无界数据流,必须连续处置惩罚,也就是说必须在获取数据后立刻处置惩罚。在处置惩罚无界流时,为了保证结果的准确性,我们必须能够做到按照顺序处置惩罚数据。
2. 有界数据流(Bounded Data Stream)
对应的,有界数据流有明确界说的开始和结束,如图 1-13 所示,所以我们可以通过获取全部数据来处置惩罚有界流。处置惩罚有界流就不需要严格保证数据的顺序了,由于总可以对有界数据集进行排序。有界流的处置惩罚也就是批处置惩罚。
正由于这种架构上的不同,Spark 和 Flink 在不同的应用范畴上表现会有差别。一般来说,Spark 基于微批处置惩罚的方式做同步总有一个“攒批”的过程,所以会有额外开销,因此无法在流处置惩罚的低延迟上做到极致。在低延迟流处置惩罚场景,Flink 已经有显着的上风。而在海量数据的批处置惩罚范畴,Spark 能够处置惩罚的吞吐量更大,加上其完善的生态和成熟易用的 API,目前同样上风比较显着。
1.5.2 数据模型和运行架构
除了三观不合,Spark 和 Flink 在底层实现最重要的差别就在于数据模型不同。
Spark 底层数据模型是弹性分布式数据集(RDD),Spark Streaming 进行微批处置惩罚的底层接口 DStream,现实上处置惩罚的也是一组组小批数据 RDD 的聚集。可以看出,Spark 在计划上本身就是以批量的数据集作为基准的,更加适合批处置惩罚的场景。而 Flink 的基本数据模型是数据流(DataFlow),以及事件(Event)序列。Flink 基本上是完全按照 Google 的 DataFlow 模型实现的,所以从底层数据模型上看,Flink 是以处置惩罚流式数据作为计划目标的,更加适合流处置惩罚的场景。数据模型不同,对应在运行处置惩罚的流程上,自然也会有不同的架构。Spark 做批盘算,需要将任务对应的 DAG 划分阶段(Stage),一个完成后经过 shuffle 再进行下一阶段的盘算。而Flink 是标准的流式执行模式,一个事件在一个节点处置惩罚完后可以直接发往下一个节点进行处置惩罚。
1.5.3 Spark 照旧 Flink?
通过前文的分析,我们已经可以看出,Spark 和 Flink 可以说目前是各自有擅长的场景,批处置惩罚范畴 Spark 称王,而在流处置惩罚方面 Flink 当仁不让。具体到项目应用中,不仅要看是流处置惩罚照旧批处置惩罚,还需要在延迟、吞吐量、可靠性,以及开辟轻易度等多个方面进行权衡。假如在工作中需要从 Spark 和 Flink 这两个主流框架中选择一个来进行实时流处置惩罚,我们更加推荐使用 Flink,重要的原因有:
- (1)Flink 的延迟是毫秒级别,而 Spark Streaming 的延迟是秒级延迟。
- (2)Flink 提供了严格的准确一次性语义保证。
- (3)Flink 的窗口 API 更加机动、语义更丰富。
- (4)Flink 提供事件时间语义,可以准确处置惩罚延迟数据。
- (5)Flink 提供了更加机动的对状态编程的 API。
基于以上特点,使用 Flink 可以解放程序员, 加快编程效率, 把本来需要程序员花大力大举气手动完成的工作交给框架完成。当然,在海量数据的批处置惩罚方面,Spark 照旧具有显着的上风。而且 Spark 的生态更加成熟。也会使其在应用中更为方便。信赖随着 Flink 的快速发展和完善,这方面的差距会越来越小。另外,Spark 2.0 之后新增的 Structured Streaming 流处置惩罚引擎借鉴 DataFlow 进行了大量优化,同样做到了低延迟、时间准确性以及准确一次性语义保证;Spark 2.3 以后引入的连续处置惩罚(Continuous Processing)模式,更是可以在至少一次语义保证下做到 1 毫秒的延迟。而 Flink自 1.9 版本合并 Blink 以来,在 SQL 的表达和批处置惩罚的本事上同样有了长足的进步。那假如现在要学习一门框架的话,优先选 Spark 照旧 Flink 呢?其实我们可以看到,不同的框架各有利弊,同时它们也在相互借鉴、取长补短、不断发展,至于未来是 Spark 照旧 Flink、甚至是其他新崛起的处置惩罚引擎一统江湖,都是有可能的。作为技术人员,我们应该对不同的架构和头脑都有所了解,跳出某个框架的限制,才气看到更广阔的天下。
1.6 小结
本章节重要先容了 Flink 的源起和应用,引出了流处置惩罚相关的一些重要概念,并通过先容数据处置惩罚架构发展演变的过程,为读者展示了 Flink 作为新一代分布式流处置惩罚器的架构头脑。最后我们还将 Flink 与时下同样火热的处置惩罚引擎 Spark 进行了对比,详细论述了 Flink 在流处置惩罚方面的上风。
2. flink快速入门
2.1 情况准备
(1)Windows体系作为开辟情况
(2)安装Java8 、Scala2.12
(3)安装IDEA
(4)安装maven和git
2.2 创建项目
1. 创建工程
2. 添加项目依赖
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <flink.version>1.13.0</flink.version>
- <java.version>1.8</java.version>
- <scala.binary.version>2.12</scala.binary.version>
- <slf4j.version>1.7.30</slf4j.version>
- </properties>
- <dependencies>
- <!-- 引入 Flink 相关依赖-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-java</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!-- 引入日志管理相关依赖-->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-to-slf4j</artifactId>
- <version>2.14.0</version>
- </dependency>
- </dependencies>
复制代码 添加flink-clients可以做一些提交、管理的需求。假如只是开辟一个应用 clients可以不用引入。在属性中,界说了<scala.binary.version>,这指代的是所依赖的 Scala 版本。Flink 底层是 Java,为什么还会依赖 Scala 呢?这是由于 Flink的架构中使用了 Akka 来实现底层的分布式通信,而 Akka 是用 Scala 开辟的。
3. 设置日志管理
resources目录中添加: log4j.properties
- log4j.rootLogger=error, stdout
- log4j.appender.stdout=org.apache.log4j.ConsoleAppender
- log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
- log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
复制代码 2.3 编写代码
2.3.1 批处置惩罚
(1)在工程根目录下新建一个 input 文件夹,并在下面创建文本文件 words.txt
(2)在 words.txt 中输入一些文字,例如:
- hello world
- hello flink
- hello java
复制代码 (3)新建 Java 类 BatchWordCount,在静态 main 方法中编写测试代码。
我们进行单词频次统计的基本思路是:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.AggregateOperator;
- import org.apache.flink.api.java.operators.DataSource;
- import org.apache.flink.api.java.operators.FlatMapOperator;
- import org.apache.flink.api.java.operators.UnsortedGrouping;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.util.Collector;
- /**
- * BatchWordCount: flink 批处理方式实现WordCount
- *
- * @author: Seven
- * @version: 2024/07/01 21:42
- **/
- public class BatchWordCount {
- public static void main(String[] args) throws Exception {
- // 创建执行环境
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- // 从文件中读取数据
- DataSource<String> lineDS = env.readTextFile("./input/words.txt");
- // 转换数据结构,当使用Lambda表达式时,由于泛型擦除的存在,需要显示声明类型信息
- FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
- String[] words = line.split(" ");
- for (String word : words) {
- out.collect(Tuple2.of(word, 1L));
- }
- }).returns(Types.TUPLE(Types.STRING, Types.LONG));
- // 按照Word进行分组
- UnsortedGrouping<Tuple2<String, Long>> wordAndOneGp = wordAndOne.groupBy(0);
- // 聚合
- AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGp.sum(1);
- // 打印结果
- sum.print();
- }
- }
复制代码 代码阐明和注意事项:
① Flink 在执行应用程序前应该获取执行情况对象,也就是运行时上下文情况。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
② Flink 同时提供了 Java 和 Scala 两种语言的 API,有些类在两套 API 中名称是一样的。所以在引入包时,假如有 Java 和 Scala 两种选择,要注意选用 Java 的包。
③ 直接调用执行情况的 readTextFile 方法,可以从文件中读取数据。
④我们的目标是将每个单词对应的个数统计出来,所以调用 flatmap 方法可以对一行文字进行分词转换。将文件中每一行文字拆分成单词后,要转换成(word,count)情势的二元组,初始 count 都为 1。returns 方法指定的返回数据类型 Tuple2,注意是 Flink 自带的二元组数据类型。
⑤ 在分组时调用了 groupBy 方法,它不能使用分组选择器,只能采用位置索引或属性名称进行分组。
- // 使用索引定位
- dataStream.groupBy(0)
- // 使用类属性名称
- dataStream.groupBy("id")
复制代码 ⑤ 在分组之后调用 sum 方法进行聚合,同样只能指定聚合字段的位置索引或属性名称。
(4) 运行程序,控制台会打印出结果:
可以看到,我们将文档中的全部单词的频次,全部统计出来,以二元组的情势在控制台打印输出了。需要注意的是,这种代码的实现方式,是基于 DataSet API 的,也就是我们对数据的处置惩罚转换,是看作数据集来进行操作的。事实上 Flink 本身是流批统一的处置惩罚架构,批量的数据集本质上也是流,没有必要用两套不同的 API 来实现。所以从 Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为 BATCH 来进行批处置惩罚:
$ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
这样,DataSet API 就已经处于“软弃用”(soft deprecated)的状态,在现实应用中我们只要维护一套 DataStream API 就可以了。这里只是为了方便各人理解,我们依然用 DataSet API做了批处置惩罚的实现。
2.3.2 流处置惩罚
用 DataSet API 可以很轻易地实现批处置惩罚;与之对应,流处置惩罚当然可以用DataStream API 来实现。对于 Flink 而言,流才是整个处置惩罚逻辑的底层核心,所以流批统一之后的 DataStream API 更加强盛,可以直接处置惩罚批处置惩罚和流处置惩罚的全部场景
1. 读取文件(有界流)
(1) 新建 Java 类 BoundedStreamWordCount,在静态 main 方法中编写测试代码。具体代码实现如下:
- package com.hwcloud.flink.wordcount;
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
- import java.util.Arrays;
- /**
- * @Project: flink-demos
- * @Package: com.hwcloud.flink.wordcount
- * @Class: BoundedWordCount
- * @Author: Seven
- * @CreateTime: 2024-07-01 22:52
- * @Description: 有界流的处理
- * @Version: 1.0
- */
- public class BoundedWordCount {
- public static void main(String[] args) throws Exception {
- //1.创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- //2.从文件读取数据
- DataStreamSource<String> lineDS = env.readTextFile("./input/words.txt");
- //3.对数据进行转换,第一个参数是输入数据,第二个参数是对转换过后做输入用的,泛型擦除,需要指明输出的数据类型
- SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne =
- lineDS.flatMap((String line, Collector<String> out) -> {
- Arrays.stream(line.split(" ")).forEach(out::collect);
- }).returns(Types.STRING)
- .map(word -> Tuple2.of(word, 1))
- .returns(Types.TUPLE(Types.STRING, Types.INT));
- //分组求和
- SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOne.keyBy(t -> t.f0).sum(1);
- //4.打印
- sum.print();
- //5.执行
- env.execute();
- }
- }
复制代码 重要观察与批处置惩罚程序 BatchWordCount 的不同:
-> 创建执行情况的不同,流处置惩罚程序使用的是 StreamExecutionEnvironment。
-> 每一步处置惩罚转换之后,得到的数据对象类型不同。
-> 分组操作调用的是 keyBy 方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的 key 是什么。
-> 代码末端需要调用 env 的 execute 方法,开始执行任务。
(2) 运行程序,控制台输出结果如下:

可以看到,这与批处置惩罚的结果是完全不同的。批处置惩罚针对每个单词,只会输出一个最终的统计个数;而在流处置惩罚的打印结果中,“hello”这个单词每出现一次,都会有一个频次统计数据输出。这就是流处置惩罚的特点,数据逐个处置惩罚,每来一条数据就会处置惩罚输出一次。我们通过打印结果,可以清晰地看到单词“hello”数量增长的过程。看到这里各人可能又会有新的疑惑:我们读取文件,第一行应该是“hello flink”,怎么这里输出的第一个单词是“world”呢?每个输出的结果二元组,前面都有一个数字,这又是什么呢?
我们可以先做个简单的表明。Flink 是一个分布式处置惩罚引擎,所以我们的程序应该也是分布式运行的。在开辟情况里,会通过多线程来模拟 Flink 集群运行。所以这里结果前的数字,其实就指示了本地执行的不同线程,对应着 Flink 运行时不同的并行资源。这样第一个乱序的问题也就解决了:既然是并行执行,不同线程的输出结果,自然也就无法保持输入的顺序了。另外需要阐明,这里表现的编号,是由于运行电脑的 CPU 是 44 核88线程,所以默认模拟的并行线程有 88个。这段代码不同的运行情况,得到的结果会是不同的。关于 Flink 程序并行执行的数量,可以通过设定“并行度”(Parallelism)来进行设置,我们会在后续章节详细讲解这些内容。
2. 读取文本流(无界流)
在现实的生产情况中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要保持一个监听事件的状态,持续地处置惩罚捕获的数据。为了模拟这种场景,我们就不再通过读取文件来获取数据了,而是监听数据发送端主机的指定端口,统计发送来的文本数据中出现过的单词的个数。具体实现上,我们只要对BoundedStreamWordCount 代码中读取数据的步骤稍做修改,就可以实现对真正无界流的处置惩罚。 下载netcat-win32-1.12.zip并解压后进入netcat-win32-1.12目录。并执行:nc -lp 7777 备用。

Windows 输入nc.exe -lp 7777 备用

(1)新建一个 Java 类 StreamWordCount,将 BoundedStreamWordCount 代码中读取文件数据的 readTextFile 方法,替换成读取 socket 文本流的方法 socketTextStream。具体代码实现如下:
- package com.hwcloud.flink.wordcount;
- import org.apache.flink.api.common.typeinfo.Types;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
- import java.util.Arrays;
- /**
- * @Name: StreamWordCount
- * @Author: Seven
- * @CreateTime: 2024-07-01 22:59
- * @Description: 无界流处理
- * @Version: 1.0
- */
- public class StreamWordCount {
- public static void main(String[] args) throws Exception {
- //1.创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- //2.从文件读取数据
- DataStreamSource<String> lineDS = env.socketTextStream("localhost", 7777);
- //3.对数据进行转换,第一个参数是输入数据,第二个参数是对转换过后做输入用的,泛型擦除,需要指明输出的数据类型
- SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne =
- lineDS.flatMap((String line, Collector<String> out) -> {
- Arrays.stream(line.split(" ")).forEach(out::collect);
- }).returns(Types.STRING)
- .map(word -> Tuple2.of(word, 1))
- .returns(Types.TUPLE(Types.STRING, Types.INT));
- //分组求和
- SingleOutputStreamOperator<Tuple2<String, Integer>> sum = wordAndOne.keyBy(t -> t.f0).sum(1);
- //4.打印
- sum.print();
- //5.执行
- env.execute();
- }
- }
复制代码 代码阐明和注意事项:
-> socket 文本流的读取需要设置两个参数:发送端主机名和端标语。这里代码中指定了主机“localhost”的 7777 端口作为发送数据的 socket 端口,读者可以根据测试情况自行设置。
-> 在现实项目应用中,主机名和端标语这类信息往往可以通过设置文件,或者传入程序运行参数的方式来指定。
-> socket文本流数据的发送,可以通过Linux体系自带的netcat工具进行模拟。
(2)在主机上,执行下列下令,发送数据进行测试: nc -l -p 7777
(3)启动 StreamWordCount 程序
我们会发现程序启动之后没有任何输出、也不会退出。这是正常的——由于 Flink 的流处置惩罚是事件驱动的,当前程序会不绝处于监听状态,只有吸收到数据才会执行任务、输出统计结果。
(4)从主机发送数据:

- 现象:
- Caused by: java.io.IOException: Insufficient number of network buffers: required 65, but only 33 available. The total number of network buffers is currently set to 2048 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
- at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:372)
- at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:350)
- at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:280)
- at org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:150)
- at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.setup(BufferWritingResultPartition.java:95)
- at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:943)
- at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:652)
- at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
- at java.lang.Thread.run(Thread.java:750)
- 解决方案一:
- 环境情况(44核88线程的CPU,256GB内存),您可以根据以下建议来设置网络缓冲区的相关参数。
- 这些参数可以通过修改 Apache Flink 的配置文件来实现。
- 一般情况下,为了充分利用集群资源,您需要根据实际情况进行调整。
- 1. **taskmanager.memory.network.fraction**:这个参数定义了 TaskManager 可以使用的 JVM 堆内存的百分比来分配给网络缓冲区。可以考虑设置为 0.1 或更高的值。
- 2. **taskmanager.memory.network.min**:此参数定义了 TaskManager 至少应该分配给网络缓冲区的内存量。建议将其设置为一个较小的固定值。
- 3. **taskmanager.memory.network.max**:这个参数规定了 TaskManager 可以用于网络缓冲区的最大内存量。根据您的环境,可以适当提高这个值以增加网络缓冲区的容量。
- 在您的情况下,您可以尝试设置这些参数为以下值:
- - taskmanager.memory.network.fraction = 0.1
- - taskmanager.memory.network.min = 1GB (即 1024MB)
- - taskmanager.memory.network.max = 4GB (即 4096MB)
- 请注意,这些值是作为起点的建议值,根据工作负载和实际性能表现,可能需要进一步调整。在进行设置时,请确保您对系统资源的使用和性能有适当的监控,以便根据实际情况进行调整。
- 解决方案二:
- env.setParallelism(4);
复制代码 2.4 本章总结
本章重要实现一个 Flink 开辟的入门程序——词频统计 WordCount。通过批处置惩罚和流处置惩罚两种不同模式的实现,可以对 Flink 的 API 风格和编程方式有所认识,而且更加深刻地理解批处置惩罚和流处置惩罚的不同。另外,通过读取有界数据(文件)和无界数据(socket 文本流)进行流处置惩罚的比较,我们也可以更加直观地了解到 Flink 流处置惩罚的方式和特点。
3. flink摆设
在上一章中,我们在集成开辟情况里编写 Flink 代码,然后运行测试。仔细的读者应该会发现:对于读取文本流的流处置惩罚程序,运行之后其实并不会去直接执行代码中界说好的操作 -- 由于这时还没有数据;只有在输入数据之后,才会触发分词转换、分组统计的一系列处置惩罚操作。可明明我们的代码顺序执行,会调用到 flatMap、keyBy 和 sum 等一系列处置惩罚方法,这是怎么回事呢?
这涉及 Flink 作业提交运行的原理。我们编写的代码,对应着在 Flink 集群上执行的一个作业;所以我们在本地执行代码,其实是先模拟启动一个 Flink 集群,然后将作业提交到集群上,创建好要执行的任务等待数据输入。这里需要提到 Flink 中的几个关键组件:客户端(Client)、作业管理器(JobManager)和任务管理器(TaskManager)。我们的代码,现实上是由客户端获取并做转换,之后提交给JobManger 的。所以 JobManager 就是 Flink 集群里的“管理者”,对作业进行中央调治管理;而它获取到要执行的作业后,会进一步处置惩罚转换,然后分发任务给众多的 TaskManager。这里的 TaskManager,就是真正“干活的人”,数据的处置惩罚操作都是它们来做的,如图 3-1 所示。
关于 Flink 中各组件的作用和作业提交及运行时的架构,我们会在下一章详细展开讲解。在现实项目应用中,我们当然不能使用开辟情况的模拟集群,而是需要将 Flink 摆设在生产集群情况中,然后在将作业提交到集群上运行。所以本章我们就来先容 Flink 的摆设及作业提交的流程。Flink 是一个非常机动的处置惩罚框架,它支持多种不同的摆设场景,还可以和不同的资源管理平台方便地集成。所以接下来我们会先做一个简单的先容,让各人有一个初步的认识,之后再展开讲述不同情形下的 Flink 摆设。
3.1 快速启动一个 Flink 集群
3.1.1 情况设置
Flink 是一个分布式的流处置惩罚框架,所以现实应用一般都需要搭建集群情况。我们在进行Flink 安装摆设的学习时,需要准备 3 台 Linux 机器。
3.1.2 本地启动
最简单的启动方式,其实是不搭建集群,直接本地启动。本地摆设非常简单,直接解压安装包就可以使用,不用进行任何设置;一般用来做一些简单的测试。具体安装步骤如下:
1. 下载安装包
进入 Flink 官网,下载 1.13.0 版本安装包 flink-1.13.0-bin-scala_2.12.tgz,注意此处选用对应 scala 版本为 scala 2.12 的安装包。
2. 解压
在 hadoop102 节点服务器上创建安装目录/opt/module,将 flink 安装包放在该目录下,并执行解压下令,解压至当前目录。
3. 启动
进入解压后的目录,执行启动下令,并查看进程。
- $ cd flink-1.13.0/
- $ bin/start-cluster.sh
- Starting cluster.
- Starting standalonesession daemon on host hadoop102.
- Starting taskexecutor daemon on host hadoop102.
- $ jps
- 10369 StandaloneSessionClusterEntrypoint
- 10680 TaskManagerRunner
- 10717 Jps
复制代码 4. 访问 Web UI
启动乐成后,访问 http://hadoop102:8081,可以对 flink 集群和任务进行监控管理,如图 3-2所示。
5. 关闭集群
假如想要让 Flink 集群制止运行,可以执行以下下令:
- $ bin/stop-cluster.sh
- Stopping taskexecutor daemon (pid: 10680) on host hadoop102.
- Stopping standalonesession daemon (pid: 10369) on host hadoop102
复制代码 3.1.3 集群启动
可以看到,Flink 本地启动非常简单,直接执行 start-cluster.sh 就可以了。假如我们想要扩展成集群,其实启动下令是不变的,重要是需要指定节点之间的主从关系。Flink 是典型的 Master-Slave 架构的分布式数据处置惩罚框架,此中 Master 脚色对应着JobManager,Slave 脚色则对应 TaskManager。我们对三台节点服务器的脚色分配如表 3-1 所示。
具体安装摆设步骤如下:
1. 下载并解压安装包:具体操作与上节相同。
2. 修改集群设置
(1)进入 conf 目录下,修改 flink-conf.yaml 文件,修改 jobmanager.rpc.address 参数为hadoop102,如下所示:
- $ cd conf/
- $ vim flink-conf.yaml
- # JobManager 节点地址.
- jobmanager.rpc.address: hadoop102
- jobmanager.bind-host: hadoop102
- taskmanager.bind-host: 0.0.0.0
- taskmanager.host: 0.0.0.0
- rest.address: 0.0.0.0
- rest.bind-address: 0.0.0.0
- 修改masters文件:hadoop102:8081
复制代码 (2)修改 workers 文件,将另外两台节点服务器添加为本 Flink 集群的 TaskManager 节点,具体修改如下:
- $ vim workers
- hadoop103
- hadoop104
复制代码 这样就指定了 hadoop103 和 hadoop104 为 TaskManager 节点。
(3)另外,在 flink-conf.yaml 文件中还可以对集群中的 JobManager 和 TaskManager 组件进行优化设置,重要设置项如下:
-> jobmanager.memory.process.size:对 JobManager 进程可使用到的全部内存进行设置,包括 JVM 元空间和其他开销,默以为 1600M,可以根据集群规模进行适当调整。
-> taskmanager.memory.process.size:对 TaskManager 进程可使用到的全部内存进行设置,包括 JVM 元空间和其他开销,默以为 1600M,可以根据集群规模进行适当调整。
-> taskmanager.numberOfTaskSlots:对每个 TaskManager 能够分配的 Slot 数量进行设置,默以为 1,可根据 TaskManager 地点的机器能够提供给 Flink 的 CPU 数量决定。所谓Slot 就是 TaskManager 中具体运行一个任务所分配的盘算资源。
-> parallelism.default:Flink 任务执行的默认并行度,优先级低于代码中进行的并行度设置和任务提交时使用参数指定的并行度数量。
关于 Slot 和并行度的概念,我们会在下一章做详细讲解。
3. 分发安装目录
设置修改完毕后,将 Flink 安装目录发给另外两个节点服务器。
- $ scp -r ./flink-1.13.0 atguigu@hadoop103:/opt/module
- $ scp -r ./flink-1.13.0 atguigu@hadoop104:/opt/module
复制代码 4. 启动集群
(1)在 hadoop102 节点服务器上执行 start-cluster.sh 启动 Flink 集群:
- $ bin/start-cluster.sh
- Starting cluster.
- Starting standalonesession daemon on host hadoop102.
- Starting taskexecutor daemon on host hadoop103.
- Starting taskexecutor daemon on host hadoop104.
复制代码 (2)查看进程情况:
- [hadoop102 flink-1.13.0]$ jps
- 13859 Jps
- 13782 StandaloneSessionClusterEntrypoint
- [hadoop103 flink-1.13.0]$ jps
- 12215 Jps
- 12124 TaskManagerRunner
- [hadoop104 flink-1.13.0]$ jps
- 11602 TaskManagerRunner
- 11694 Jps
复制代码 5. 访问 Web UI
启动乐成后,同样可以访问 http://hadoop102:8081 对 flink 集群和任务进行监控管理,如图3-3 所示。
这里可以显着看到,当前集群的 TaskManager 数量为 2;由于默认每个 TaskManager 的 Slot数量为 1,所以总 Slot 数和可用 Slot 数都为 2。
3.1.4 向集群提交作业
在上一章中,我们已经编写了词频统计的批处置惩罚和流处置惩罚的示例程序,并在开辟情况的模拟集群上做了运行测试。现在既然已经有了真正的集群情况,那接下来我们就要把作业提交上去执行了。本节我们将以流处置惩罚的程序为例,演示怎样将任务提交到集群中进行执行。具体步骤如下。
1. 程序打包
(1)为方便自界说布局和定制依赖,我们可以引入插件 maven-assembly-plugin 进行打包。在 FlinkTutorial 项目的 pom.xml 文件中添加打包插件的设置,具体如下:
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.6.0</version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
复制代码 ( 2 )插件设置完毕后,可以使用 IDEA 的 Maven 工具执行 package 下令,出现打包乐成的提示即可。打 包 完 成 后 , 在 target 目 录 下 即 可 找 到 所 需 JAR 包 , JAR 包 会 有 两 个 ,FlinkTutorial-1.0-SNAPSHOT.jar 和 FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar,由于集群中已经具备任务运行所需的全部依赖,所以建议使用 FlinkTutorial-1.0-SNAPSHOT.jar。 2. 在 Web UI 上提交作业 ( 1 )任务打包完成后,我们打开 Flink 的 WEB UI 页面,在右侧导航栏点击“ Submit New Job”,然后点击按钮“ + Add New ”,选择要上传运行的 JAR 包,如图 3-4 所示。 上传完成后,如图 3-5 所示: ( 2 )点击该 JAR 包,出现任务设置页面,进行相应设置。 重要设置程序入口主类的全类名,任务运行的并行度,任务运行所需的设置参数和保存点 路径等,如图 3-6 所示,设置完成后,即可点击按钮“Submit”,将任务提交到集群运行。 ( 3 )任务提交乐成之后,可点击左侧导航栏的“ Running Jobs ”查看程序运行列表情况, 如图 3-7 所示。 ( 4 )点击该任务,可以查看任务运行的具体情况,也可以通过点击“ Cancel Job ”结束任务运行,如图 3-8 所示。 Flink 的 WEB UI 页面计划非常简便明白,读者可以自行尝试别的操作。 3. 下令行提交作业 除了通过 WEB UI 界面提交任务之外,也可以直接通过下令行来提交任务。这里为方便起见,我们可以先把 jar 包直接上传到目录 flink-1.13.0 下 ( 1 )起首需要启动集群。 $ bin/start-cluster.sh ( 2 )在 hadoop102 中执行以下下令启动 netcat 。 $ nc -lk 7777 ( 3 )进入到 Flink 的安装路径下,在下令利用用 flink run 下令提交作业。 - $ bin/flink run -m hadoop102:8081 -c com.wc.StreamWordCount ./FlinkTutorial-1.0-SNAPSHOT.jar
复制代码 这里的参数 –m 指定了提交到的 JobManager,-c 指定了入口类。
( 4 )在欣赏器中打开 Web UI , http://hadoop102:8081 查看应用执行情况,如图 3-9 所示。 用 netcat 输入数据,可以在 TaskManager 的标准输出( Stdout )看到对应的统计结果。 ( 5 )在 log 日志中,也可以查看执行结果,需要找到执行该数据任务的 TaskManager 节点查看日志。 3.2 摆设模式
在一些应用场景中,对于集群资源分配和占用的方式,可能会有特定的需求。Flink 为各种场景提供了不同的摆设模式,重要有以下三种: ⚫ 会话模式( Session Mode ) ⚫ 单作业模式( Per-Job Mode ) ⚫ 应用模式( Application Mode ) 它们的区别重要在于:集群的生命周期以及资源的分配方式;以及应用的 main 方法到底在哪里执行——客户端(Client )照旧 JobManager 。接下来我们就做一个展开阐明。 3.2.1 会话模式(Session Mode)
会话模式其实最符合常规头脑。我们需要先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业,如图 3-10 所示。集群启动时全部资源就都已经确定,所以全部提交的作业会竞争集群中的资源。 这样的长处很显着,我们只需要一个集群,就像一个大箱子,全部的作业提交之后都塞进 去;集群的生命周期是超越于作业之上的,铁打的营盘流水的兵,作业结束了就释放资源,集群依然正常运行。当然缺点也是显而易见的:由于资源是共享的,所以资源不敷了,提交新的作业就会失败。另外,同一个 TaskManager 上可能运行了许多作业,假如此中一个发生故障导致 TaskManager 宕机,那么全部作业都会受到影响。 我们在 3.1 节中先启动集群再提交作业,这种方式其实就是会话模式。 会话模式比较适合于 单个规模小、执行时间短的大量作业 。 3.2.2 单作业模式(Per-Job Mode)
会话模式由于资源共享会导致许多问题,所以为了更好地隔离资源,我们可以思量为每个提交的作业启动一个集群,这就是所谓的单作业( Per-Job )模式,如图 3-11 所示。 单作业模式也很好理解,就是严格的一对一,集群只为这个作业而生。同样由客户端运行应用程序,然后启动集群,作业被提交给 JobManager ,进而分发给 TaskManager 执行。作业作业完成后,集群就会关闭,全部资源也会释放。这样一来,每个作业都有它本身的 JobManager管理,占用独享的资源,纵然发生故障,它的 TaskManager 宕机也不会影响其他作业。这些特性使得单作业模式在生产情况运行更加稳定,所以是现实应用的首选模式。需要注意的是, Flink 本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,好比 YARN 、 Kubernetes 。 3.2.3 应用模式(Application Mode)
前面提到的两种模式下,应用代码都是在客户端上执行,然后由客户端提交给 JobManager的。但是这种方式客户端需要占用大量网络带宽,去下载依赖和把二进制数据发送给JobManager ;加上许多情况下我们提交作业用的是同一个客户端,就会加重客户端地点节点的 资源消耗。 所以解决办法就是,我们不要客户端了,直接把应用提交到 JobManger 上运行。而这也就代表着,我们需要为每一个提交的应用单独启动一个 JobManager,也就是创建一个集群。这个 JobManager 只为执行这一个应用而存在,执行结束之后 JobManager 也就关闭了,这就是所 谓的应用模式,如图 3-12 所示。 应用模式与单作业模式,都是提交作业之后才创建集群;单作业模式是通过客户端来提交 的,客户端解析出的每一个作业对应一个集群;而应用模式下,是直接由 JobManager 执行应 用程序的,而且纵然应用包含了多个作业,也只创建一个集群。 总结一下,在会话模式下,集群的生命周期独立于集群上运行的任何作业的生命周期,并 且提交的全部作业共享资源。而单作业模式为每个提交的作业创建一个集群,带来了更好的资 源隔离,这时集群的生命周期与作业的生命周期绑定。最后,应用模式为每个应用程序创建一 个会话集群,在 JobManager 上直接调用应用程序的 main() 方法。 我们所讲到的摆设模式,相对是比较抽象的概念。现实应用时,一般需要和资源管理平台 联合起来,选择特定的模式来分配资源、摆设应用。接下来,我们就针对不同的资源提供者 ( Resource Provider )的场景,具体先容 Flink 的摆设方式。 3.3 独立模式(Standalone)
独立模式(Standalone )是摆设 Flink 最基本也是最简单的方式:所需要的全部 Flink 组件, 都只是操作体系上运行的一个 JVM 进程。 独立模式是独立运行的,不依赖任何外部的资源管理平台;当然独立也是有代价的:假如 资源不敷,或者出现故障,没有自动扩展或重分配资源的保证,必须手动处置惩罚。所以独立模式 一般只用在开辟测试或作业非常少的场景下。 另外,我们也可以将独立模式的集群放在容器中运行。Flink 提供了独立模式的容器化部 署方式,可以在 Docker 或者 Kubernetes 上进行摆设。 3.3.1 会话模式摆设
可以发现,独立模式的特点是不依赖外部资源管理平台,而会话模式的特点是先启动集群、 后提交作业。所以,我们在第 3.1 节用的就是独立模式( Standalone )的会话模式摆设。 3.3.2 单作业模式摆设
在 3.2.2 节中我们提到, Flink 本身无法直接以单作业方式启动集群,一般需要借助一些资 源管理平台。所以 Flink 的独立(Standalone)集群并不支持单作业模式摆设 。 3.3.3 应用模式摆设
应用模式下不会提前创建集群,所以不能调用 start-cluster.sh 脚本。我们可以使用同样在 bin 目录下的 standalone-job.sh 来创建一个 JobManager 。 具体步骤如下: ( 1 )进入到 Flink 的安装路径下,将应用程序的 jar 包放到 lib/ 目录下。 $ cp ./FlinkTutorial-1.0-SNAPSHOT.jar lib/ ( 2 )执行以下下令,启动 JobManager 。 $ ./bin/standalone-job.sh start --job-classname com.wc.StreamWordCount 这里我们直接指定作业入口类,脚本会到 lib 目录扫描全部的 jar 包。 ( 3 )同样是使用 bin 目录下的脚本,启动 TaskManager 。 $ ./bin/taskmanager.sh start ( 4 )假如盼望停掉集群,同样可以使用脚本,下令如下。 $ ./bin/standalone-job.sh stop $ ./bin/taskmanager.sh stop 3.3.4 高可用(High Availability )
分布式除了提供高吞吐,另一大长处就是有更好的容错性。对于 Flink 而言,由于一般会 有多个 TaskManager ,纵然运行时出现故障,也不需要将全部节点重启,只要尝试重启故障节 点就可以了。但是我们发现,针对一个作业而言,管理它的 JobManager 却只有一个,这同样 有可能出现单点故障。为了实现更好的可用性,我们需要 JobManager 做一些主备冗余,这就 是所谓的高可用( High Availability ,简称 HA )。 我们可以通过设置,让集群在任何时候都有一个主 JobManager 和多个备用 JobManagers , 如图 3-13 所示,这样主节点故障时就由备用节点来担当集群,担当后作业就可以继承正常运 行。主备 JobManager 实例之间没有显着的区别,每个 JobManager 都可以充当主节点或者备 节点。 具体设置如下: ( 1 )进入 Flink 的安装路径下的 conf 目录下,修改设置文件 : flink-conf.yaml,增加如下设置。 - high-availability: zookeeper
- high-availability.storageDir: hdfs://hadoop102:9820/flink/standalone/ha
- high-availability.zookeeper.quorum:
- hadoop102:2181,hadoop103:2181,hadoop104:2181
- high-availability.zookeeper.path.root: /flink-standalone
- high-availability.cluster-id: /cluster_test
复制代码 ( 2 )修改设置文件 : masters ,设置备用 JobManager 列表。 - hadoop102:8081
- hadoop103:8081
复制代码 ( 3 )分发修改后的设置文件到其他节点服务器。 ( 4 )在 /etc/profile.d/my_env.sh 中设置情况变量 - export HADOOP_CLASSPATH=`hadoop classpath`
复制代码 具体摆设方法如下: ( 1 )起首启动 HDFS 集群和 Zookeeper 集群。 ( 2 )执行以下下令,启动 standalone HA 集群。 $ bin/start-cluster.sh ( 3 )可以分别访问两个备用 JobManager 的 Web UI 页面。 http://hadoop102:8081 http://hadoop103:8081 ( 4 )在 zkCli.sh 中查看谁是 leader 。 - [zk: localhost:2181(CONNECTED) 1] get /flink-standalone/cluster_test/leader/rest_server_lock
复制代码 杀死 hadoop102 上的 Jobmanager, 再看 leader。
注意: 不管是不是 leader,从 WEB UI 上是看不到区别的, 都可以提交应用。
3.4 YARN 模式
独立(Standalone )模式由 Flink 自身提供资源,无需其他框架,这种方式低落了和其他 第三方资源框架的耦合性,独立性非常强。但我们知道, Flink 是大数据盘算框架,不是资源 调治框架,这并不是它的刚强;所以照旧应该让专业的框架做专业的事,和其他资源调治框架 集成更靠谱。而在目前大数据生态中,国内应用最为广泛的资源管理平台就是 YARN 了。所 以接下来我们就将学习,在强盛的 YARN 平台上 Flink 是怎样集成摆设的。 整体来说,YARN 上摆设的过程是:客户端把 Flink 应用提交给 Yarn 的 ResourceManager, Yarn 的 ResourceManager 会向 Yarn 的 NodeManager 申请容器。在这些容器上, Flink 会摆设 JobManager 和 TaskManager 的实例,从而启动集群。 Flink 会根据运行在 JobManger 上的作业 所需要的 Slot 数量动态分配 TaskManager 资源。 3.4.1 相关准备和设置
在 Flink1.8.0 之前的版本,想要以 YARN 模式摆设 Flink 任务时,需要 Flink 是有 Hadoop 支持的。从 Flink 1.8 版本开始,不再提供基于 Hadoop 编译的安装包,若需要 Hadoop 的情况 支持,需要自行在官网下载 Hadoop 相关版本的组件 flink-shaded-hadoop-2-uber-2.7.5-10.0.jar , 并将该组件上传至 Flink 的 lib 目录下。在 Flink 1.11.0 版本之后,增加了许多重要新特性,其 中就包括增加了对 Hadoop3.0.0 以及更高版本 Hadoop 的支持,不再提供“ flink-shaded-hadoop-* ” jar 包,而是通过设置情况变量完成与 YARN 集群的对接。 在将 Flink 任务摆设至 YARN 集群之前,需要确认集群是否安装有 Hadoop ,保证 Hadoop 版本至少在 2.2 以上,而且集群中安装有 HDFS 服务。 具体设置步骤如下: ( 1 )按照 3.1 节所述,下载并解压安装包,并将解压后的安装包重定名为 flink-1.13.0-yarn , 本节的相关操作都将默认在此安装路径下执行。 ( 2 )设置情况变量,增加情况变量设置如下: - $ sudo vim /etc/profile.d/my_env.sh
- HADOOP_HOME=/opt/module/hadoop-2.7.5
- export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
- export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
- export HADOOP_CLASSPATH=`hadoop classpath`
复制代码 这里必须保证设置了情况变量 HADOOP_CLASSPATH 。 ( 3 )启动 Hadoop 集群,包括 HDFS 和 YARN 。 ( 4 )进入 conf 目录,修改 flink-conf.yaml 文件,修改以下设置,这些设置项的含义在进行 Standalone 模式设置的时候进行过讲解,若在提交下令中不特定指明,这些设置将作为默认 设置。 - $ cd /opt/module/flink-1.13.0-yarn/conf/
- $ vim flink-conf.yaml
- jobmanager.memory.process.size: 1600m
- taskmanager.memory.process.size: 1728m
- taskmanager.numberOfTaskSlots: 8
- parallelism.default: 1
复制代码 3.4.2 会话模式摆设
YARN 的会话模式与独立集群略有不同,需要起首申请一个 YARN 会话( YARN session ) 来启动 Flink 集群。具体步骤如下: 1. 启动集群 ( 1 )启动 hadoop 集群 (HDFS, YARN) 。 ( 2 )执行脚本下令向 YARN 集群申请资源,开启一个 YARN 会话,启动 Flink 集群。 $ bin/yarn-session.sh -nm test 可用参数解读: ⚫ -d :分离模式,假如你不想让 Flink YARN 客户端不绝前台运行,可以使用这个参数, 纵然关掉当前对话窗口, YARN session 也可以背景运行。 ⚫ -jm(--jobManagerMemory) :设置 JobManager 所需内存,默认单位 MB 。 ⚫ -nm(--name) :设置在 YARN UI 界面上表现的任务名。 ⚫ -qu(--queue) :指定 YARN 队列名。 ⚫ -tm(--taskManager) :设置每个 TaskManager 所使用内存。 注意: Flink1.11.0 版本不再使用 -n 参数和 -s 参数分别指定 TaskManager 数量和 slot 数量, YARN 会按照需求动态分配 TaskManager 和 slot 。所以从这个意义上讲, YARN 的会话模式也 不会把集群资源固定,同样是动态分配的。 YARN Session 启动之后会给出一个 web UI 地点以及一个 YARN application ID ,如下所示, 用户可以通过 web UI 或者下令行两种方式提交作业。 - 2021-06-03 15:54:27,069 INFO org.apache.flink.yarn.YarnClusterDescriptor
- [] - YARN application has been deployed successfully.
- 2021-06-03 15:54:27,070 INFO org.apache.flink.yarn.YarnClusterDescriptor
- [] - Found Web Interface hadoop104:39735 of application
- 'application_1622535605178_0003'.
- JobManager Web Interface: http://hadoop104:39735
复制代码 2. 提交作业 ( 1 )通过 Web UI 提交作业 这种方式比较简单,与上文所述 Standalone 摆设模式基本相同。 ( 2 )通过下令行提交作业 ① 将 Standalone 模式讲解中打包好的任务运行 JAR 包上传至集群 ② 执行以下下令将该任务提交到已经开启的 Yarn-Session 中运行。 - $ bin/flink run -c com.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
复制代码 客户端可以自行确定 JobManager 的地点,也可以通过 -m 或者 -jobmanager 参数指定 JobManager 的地点, JobManager 的地点在 YARN Session 的启动页面中可以找到。 ③ 任务提交乐成后,可在 YARN 的 Web UI 界面查看运行情况。 如图 3-14 所示,从图中可以看到我们创建的 Yarn-Session 现实上是一个 Yarn 的 Application ,而且有唯一的 Application ID 。 ④也可以通过 Flink 的 Web UI 页面查看提交任务的运行情况,如图 3-15 所示。 3.4.3 单作业模式摆设
在 YARN 情况中,由于有了外部平台做资源调治,所以我们也可以直接向 YARN 提交一 个单独的作业,从而启动一个 Flink 集群。 ( 1 )执行下令提交作业。 - $ bin/flink run -d -t yarn-per-job -c com.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
复制代码 早期版本也有另一种写法:
- $ bin/flink run -m yarn-cluster -c com.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
复制代码 注意这里是通过参数-m yarn-cluster 指定向 YARN 集群提交任务。
(2)在 YARN 的 ResourceManager 界面查看执行情况,如图 3-16 所示。
点击可以打开 Flink Web UI 页面进行监控,如图 3-17 所示: ( 3 )可以使用下令行查看或取消作业,下令如下。 - $ ./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
- $ ./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
复制代码 这里的 application_XXXX_YY 是当前应用的 ID , <jobId> 是作业的 ID 。注意假如取消作业,整个 Flink 集群也会停掉。 3.4.4 应用模式摆设
应用模式同样非常简单,与单作业模式雷同,直接执行 flink run-application 下令即可。 ( 1 )执行下令提交作业。 - $ bin/flink run-application -t yarn-application -c com.wc.StreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar
复制代码 ( 2 )在下令行中查看或取消作业。 - $ ./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY
- $ ./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>
复制代码 ( 3 )也可以通过 yarn.provided.lib.dirs 设置选项指定位置,将 jar 上传到远程。 - $ ./bin/flink run-application -t yarn-application
- -Dyarn.provided.lib.dirs="hdfs://myhdfs/my-remote-flink-dist-dir"
- hdfs://myhdfs/jars/my-application.jar
复制代码 这种方式下 jar 可以预先上传到 HDFS,而不需要单独发送到集群,这就使得作业提交更加轻量了。 3.4.5 高可用
YARN 模式的高可用和独立模式( Standalone )的高可用原理不一样。 Standalone 模式中 , 同时启动多个 JobManager, 一个为“领导者”( leader ),其他为“后备” ( standby ) , 当 leader 挂了 , 其他的才会有一个成为 leader 。 而 YARN 的高可用是只启动一个 Jobmanager, 当这个 Jobmanager 挂了之后 , YARN 会再次 启动一个 , 所以其实是利用的 YARN 的重试次数来实现的高可用。 ( 1 )在 yarn-site.xml 中设置。 - <property>
- <name>yarn.resourcemanager.am.max-attempts</name>
- <value>4</value>
- <description>
- The maximum number of application master execution attempts.
- </description>
- </property>
复制代码 注意: 设置完不要忘记分发 , 和重启 YARN 。 ( 2 )在 flink-conf.yaml 中设置。 - yarn.application-attempts: 3
- high-availability: zookeeper
- high-availability.storageDir: hdfs://hadoop102:9820/flink/yarn/ha
- high-availability.zookeeper.quorum:
- hadoop102:2181,hadoop103:2181,hadoop104:2181
- high-availability.zookeeper.path.root: /flink-yarn
复制代码 ( 3 )启动 yarn-session 。 ( 4 )杀死 JobManager, 查看复活情况。 注意: yarn-site.xml 中设置的是 JobManager 重启次数的上限 , flink-conf.xml 中的次数应该 小于这个值。 3.5 K8S 模式
容器化摆设是现在业界盛行的一项技术,基于 Docker 镜像运行能够让用户更加方便地对 应用进行管理和运维。容器管理工具中最为盛行的就是 Kubernetes ( k8s ),而 Flink 也在迩来 的版本中支持了 k8s 摆设模式。基本原理与 YARN 是雷同的,具体设置可以参见官网阐明, 这里我们就不做过多讲解了。 3.6 本章总结
Flink 支持多种不同的摆设模式,还可以和不同的资源管理平台方便地集成。本章从快速 启动的示例入手,接着先容了 Flink 中几种摆设模式的区别,并进一步针对不同的资源提供者 展开讲解了具体的摆设操作。在这个过程中,我们不仅认识了 Flink 的使用方法,而且接触到 了许多内部运行原理的知识。 关于 Flink 运行时组件概念的作用,以及作业提交运行的流程架构,我们会在下一章进一 步详细展开。 4. flink运行时架构
接下来我们就将研讨 Flink 内部,探究它的运行时架构,详细分析在不同摆设情况中的作业提交换程,深入了解 Flink 计划架构中的重要概念和原理。
4.1 体系架构
对于数据处置惩罚体系的架构,最简单的实现方式当然就是单节点。当数据量增大、处置惩罚盘算更加复杂时,我们可以思量增加 CPU 数量、加大内存,也就是让这一台机器变得性能更强盛, 从而进步吞吐量——这就是所谓的 SMP ( Symmetrical Multi-Processing,对称多处置惩罚)架构。但是这样做问题非常显着:全部 CPU 是完全平等、共享内存和总线资源的,这就势必造成资源竞争;而且随着 CPU 核心数量的增加,机器的资本会指数增长,所以 SMP 的可扩展性是比较差的,无法应对海量数据的处置惩罚场景。 于是人们提出了“不共享任何东西”( share-nothing )的分布式架构。从以 Greenplum 为代 表的 MPP ( Massively Parallel Processing ,大规模并行处置惩罚)架构,到 Hadoop 、 Spark 为代表的批处置惩罚架构,再到 Storm 、 Flink 为代表的流处置惩罚架构,都是以分布式作为体系架构的基本形态的。 我们已经知道,Flink 就是一个分布式的并行流处置惩罚体系。简单来说,它会由多个进程构 成,这些进程一般会分布运行在不同的机器上。 正如一个团队,人多了就会难以管理;对于一个分布式体系来说,也需要面临许多棘手的问题。此中的核心问题有: 集群中资源的分配和管理、进程和谐调治、持久化和高可用的数据存储,以及故障恢复 。 对于这些分布式体系的经典问题,业内已有比较成熟的解决方案和服务。所以 Flink 并不会本身去处置惩罚全部的问题,而是利用了现有的集群架构和服务,这样它就可以把精力会合在核心工作——分布式数据流处置惩罚上了。 Flink 可以设置为独立( Standalone)集群运行,也可以方便地跟一些集群资源管理工具集成使用,好比 YARN 、 Kubernetes 和 Mesos 。 Flink 也不会本身去提供持久化的分布式存储,而是直接利用了已有的分布式文件体系(好比 HDFS)或者对象存储(好比 S3 )。而对于高可用的设置, Flink 是依赖 Apache ZooKeeper 来完成的。我们所要重点了解的,就是在 Flink 中有哪些组件、是怎样具体实现一个分布式流处置惩罚体系的。假如各人对 Spark 或者 Storm 比较认识,那么稍后就会发现, Flink 其实有雷同的概念和架构。 4.1.1 整体构成
Flink 的运行时架构中,最重要的就是两大组件:作业管理器( JobManger)和任务管理器 ( TaskManager )。对于一个提交执行的作业, JobManager 是真正意义上的“管理者”( Master) , 负责管理调治,所以在不思量高可用的情况下只能有一个;而 TaskManager 是“工作者” ( Worker 、 Slave ),负责执行任务处置惩罚数据,所以可以有一个或多个。 Flink 的作业提交和任务 处置惩罚时的体系如图 4-1 所示。 这里起首要阐明一下“客户端”。其实客户端并不是处置惩罚体系的一部分,它只负责作业的提交。具体来说,就是调用程序的 main 方法,将代码转换成“数据流图”( Dataflow Graph),并最终生成作业图( JobGraph ),一并发送给 JobManager。提交之后,任务的执行其实就跟客户端没有关系了;我们可以在客户端选择断开与 JobManager 的毗连 , 也可以继承保持毗连。之前我们在下令提交作业时,加上的 -d 参数,就是表示分离模式(detached mode),也就是断开毗连。当然,客户端可以随时毗连到 JobManager,获取当前作业的状态和执行结果,也可以发送请求取消作业。我们在上一章中岂论通过 Web UI 照旧下令行执行“flink run”的相关操作,都是通过客户端实现的。 JobManager 和 TaskManagers 可以以不同的方式启动: ⚫ 作为独立( Standalone )集群的进程,直接在机器上启动 ⚫ 在容器中启动 ⚫ 由资源管理平台调治启动,好比 YARN 、 K8S 这其实就对应着不同的摆设方式。TaskManager 启动之后,JobManager 会与它建立毗连,并将作业图(JobGraph)转换成可执行的“执行图”(ExecutionGraph)分发给可用的TaskManager,然后就由 TaskManager 具体执行任务。接下来,我们就具体先容一下 JobManger 和 TaskManager 在整个过程中扮演的脚色。 4.1.2 作业管理器(JobManager)
JobManager 是一个 Flink 集群中任务管理和调治的核心,是控制应用执行的主进程。也就是说,每个应用都应该被唯一的 JobManager 所控制执行。当然,在高可用( HA)的场景下,可能会出现多个 JobManager;这时只有一个是正在运行的领导节点(leader),其他都是备用节点(standby)。JobManger 又包含 3 个不同的组件,下面我们逐一讲解。 1. JobMaster JobMaster 是 JobManager 中最核心的组件,负责处置惩罚单独的作业( Job )。所以 JobMaster 和具体的 Job 是逐一对应的,多个 Job 可以同时运行在一个 Flink 集群中 , 每个 Job 都有一个本身的 JobMaster 。需要注意在早期版本的 Flink 中,没有 JobMaster 的概念;而 JobManager的概念范围较小,现实指的就是现在所说的 JobMaster。在作业提交时, JobMaster 会先吸收到要执行的应用。这里所说“应用”一般是客户端提交来的,包括: Jar 包,数据流图(dataflow graph),和作业图(JobGraph) 。 JobMaster 会把 JobGraph 转换成一个物理层面的数据流图,这个图被叫作“执行图”( ExecutionGraph ),它包含了全部可以并发执行的任务。 JobMaster 会向资源管理器 ( ResourceManager )发出请求,申请执行任务必要的资源。一旦它获取到了充足的资源,就会 将执行图分发到真正运行它们的 TaskManager 上。而在运行过程中, JobMaster 会负责全部需要中央和谐的操作,好比说检查点( checkpoints) 的和谐。 2. 资源管理器( ResourceManager ) ResourceManager 重要负责资源的分配和管理,在 Flink 集群中只有一个。所谓“资源”, 重要是指 TaskManager 的任务槽( task slots )。任务槽就是 Flink 集群中的资源调配单位,包含 了机器用来执行盘算的一组 CPU 和内存资源。每一个任务( Task )都需要分配到一个 slot 上 执行。这里注意要把 Flink 内置的 ResourceManager 和其他资源管理平台(好比 YARN )的 ResourceManager 区分开。Flink 的 ResourceManager ,针对不同的情况和资源管理平台(好比 Standalone 摆设,或者 YARN ),有不同的具体实现。在 Standalone 摆设时,由于 TaskManager 是单独启动的(没有 Per-Job 模式),所以 ResourceManager 只能分发可用 TaskManager 的任务槽,不能单独启动新 TaskManager。而在有资源管理平台时,就不受此限制。当新的作业申请资源时, ResourceManager 会将有空闲槽位的 TaskManager 分配给 JobMaster 。假如 ResourceManager 没有充足的任务槽,它还可以向资源提供平台发起会话,请求提供启动 TaskManager 进程的容器。另外,ResourceManager 还负责停掉空闲的 TaskManager ,释放盘算资源。 3. 分发器( Dispatcher ) Dispatcher 重要负责提供一个 REST 接口,用来提交应用,而且负责为每一个新提交的作业启动一个新的 JobMaster 组件。 Dispatcher 也会启动一个 Web UI,用来方便地展示和监控作业执行的信息。 Dispatcher 在架构中并不是必需的,在不同的摆设模式下可能会被忽略掉。 4.1.3 任务管理器(TaskManager)
TaskManager 是 Flink 中的工作进程,数据流的具体盘算就是它来做的,所以也被称为 “ Worker ”。 Flink 集群中必须至少有一个 TaskManager;当然由于分布式盘算的思量,通常会有多个 TaskManager 运行,每一个 TaskManager 都包含了一定命量的任务槽( task slots )。Slot是资源调治的最小单位, slot 的数量限制了 TaskManager 能够并行处置惩罚的任务数量。启动之后, TaskManager 会向资源管理器注册它的 slots;收到资源管理器的指令后,TaskManager 就会将一个或者多个槽位提供给 JobMaster 调用,JobMaster 就可以分配任务来执行了。在执行过程中, TaskManager 可以缓冲数据,还可以跟其他运行同一应用的 TaskManager互换数据。 4.2 作业提交换程
了解了 Flink 运行时的基本组件和体系架构,我们再来梳理一下作业提交的具体流程。 4.2.1 高层级抽象视角
Flink 的提交换程,随着摆设模式、资源管理平台的不同,会有不同的变化。起首我们从一个高层级的视角,来做一下抽象提炼,看一看作业提交时宏观上各组件是怎样交互协作的。 如图 4-2 所示,具体步骤如下: ( 1 ) 一般情况下,由客户端( App )通过分发器提供的 REST 接口,将作业提交给 JobManager 。 ( 2 )由分发器启动 JobMaster ,并将作业(包含 JobGraph )提交给 JobMaster 。 ( 3 ) JobMaster 将 JobGraph 解析为可执行的 ExecutionGraph,得到所需的资源数量,然后向资源管理器请求资源( slots )。 ( 4 )资源管理器判断当前是否由充足的可用资源;假如没有,启动新的 TaskManager 。 ( 5 ) TaskManager 启动之后,向 ResourceManager 注册本身的可用任务槽( slots )。 ( 6 )资源管理器通知 TaskManager 为新的作业提供 slots 。 ( 7 ) TaskManager 毗连到对应的 JobMaster ,提供 slots 。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |