大数据——Flink原理

种地  金牌会员 | 2024-9-8 07:45:26 | 来自手机 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 954|帖子 954|积分 2862

择要

Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流举行有状态盘算。Flink被筹划在所有常见的集群环境中运行,以内存执行速度和恣意规模来执行盘算。

1. FLink特点

1.1. 变乱驱动型(Event-driven)

变乱驱动型应用是一类具有状态的应用,它从一个或多个变乱流提取数据,并根据到来的变乱触发盘算、状态更新或其他外部动作。比较典范的就是以kafka为代表的消息队列险些都是变乱驱动型应用。

与之不同的就是SparkStreaming微批次,如图:

1.2. 流批一体筹划


  • 批处理的特点是有界、持久、大量,非常适合必要访问全套记录才能完成的盘算工作,一般用于离线统计。
  • 流处理的特点是无界、实时, 无需针对整个数据集执行操纵,而是对通过体系传输的每个数据项执行操纵,一般用于实时统计
在spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无穷的小批次组成的。而在flink的世界观中,一切都是由流组成的,离线数据是有边界的流,实时数据是一个没有边界的流,这就是所谓的有界流和无界流。

  • 无界数据流:无界数据流有一个开始但是没有竣事,它们不会在天生时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法期待所有数据都到达,由于输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定次序(比方变乱发生的次序)获取event,以便能够推断效果完备性。
  • 有界数据流:有界数据流有明白界说的开始和竣事,可以在执行任何盘算之前通过获取所有数据来处理有界流,处理有界流不必要有序获取,由于可以始终对有界数据集举行排序,有界流的处理也称为批处理。

这种以流为世界观的架构,得到的最大好处就是具有极低的延迟。
1.3. API 分层


最底层级的抽象仅仅提供了有状态流,它将通过过程函数(Process Function)被嵌入到DataStream API中。底层过程函数(Process Function)与 DataStream API 相集成,使其可以对某些特定的操纵举行底层的抽象,它允许用户可以自由地处理来自一个或多个数据流的变乱,并利用一致的容错的状态。除此之外,用户可以注册变乱时间并处理时间回调,从而使程序可以处理复杂的盘算。
实际上,大多数应用并不必要上述的底层抽象,而是针对核心API(Core APIs)举行编程,比如DataStream API(有界或无界流数据)以及DataSet API(有界数据集)。这些API为数据处理提供了通用的构建模块,比如由用户界说的多种情势的转换(transformations),毗连(joins),聚合(aggregations),窗口操纵(windows)等等。DataSet API 为有界数据集提供了额外的支持,比方循环与迭代。这些API处理的数据类型以类(classes)的情势由各自的编程语言所表现。
Table API 是以表为中央的声明式编程,其中表可能会动态变化(在表达流数据时)。Table API遵照(扩展的)关系模型:表有二维数据结构(schema)(雷同于关系数据库中的表),同时API提供可比较的操纵,比方select、project、join、group-by、aggregate等。Table API程序声明式地界说了什么逻辑操纵应该执行,而不是正确地确定这些操纵代码的看上去怎样。
只管Table API可以通过多种类型的用户自界说函数(UDF)举行扩展,其仍不如核心API更具表达能力,但是利用起来却更加简洁(代码量更少)。除此之外,Table API程序在执行之前会颠末内置优化器举行优化。你可以在表与 DataStream/DataSet 之间无缝切换,以允许程序将 Table API 与 DataStream 以及 DataSet 混淆利用。
Flink提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 雷同,但是是以SQL查询表达式的情势体现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API界说的表上执行。如今Flink作为批处理还不是主流,不如Spark成熟,所以DataSet利用的并不是许多。Flink Table API和Flink SQL也并不完善,大多都由各大厂商本身定制。所以我们重要学习DataStream API的利用。实际上Flink作为最靠近Google DataFlow模型的实现,是流批统一的观点,所以基本上利用DataStream就可以了。
Flink几大模块


  • Flink Table & SQL(还没开发完)
  • Flink Gelly(图盘算)
  • Flink CEP(复杂变乱处理)
2. Flink运行架构原理

2.1. Flink运行时组件

Flink运行时架构重要包括四个不同的组件,它们会在运行流处理应用程序时协同工作:作业管理器(JobManager)、资源管理器(ResourceManager)、任务管理器(TaskManager),以及分发器(Dispatcher)。由于Flink是用Java和Scala实现的,所以所有组件都会运行在Java虚拟机上。每个组件的职责如下:
2.1.1. 作业管理器(JobManager)

控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。JobManager会先吸收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。JobManager会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包罗了所有可以并发执行的任务。JobManager会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了充足的资源,就会将执行图分发到真正运行它们的TaskManager上。而在运行过程中,JobManager会负责所有必要中央协调的操纵,比如说检查点(checkpoints)的协调。
2.1.2. 资源管理器(ResourceManager)

重要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中界说的处理资源单元。Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有充足的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。另外,ResourceManager还负责终止空闲的TaskManager,释放盘算资源。
2.1.3. 任务管理器(TaskManager)

Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包罗了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。
2.1.4. 分发器(Dispatcher)

可以跨作业运行,它为应用提交提供了REST接口。当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。由于是REST接口,所以Dispatcher可以作为集群的一个HTTP接入点,这样就能够不受防火墙拦截。Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。
2.2. Flink任务提交流程

我们来看看当一个应用提交执行时,Flink的各个组件是怎样交互协作的:

上图是从一个较为高层级的视角,来看应用中各组件的交互协作。如果部署的集群环境不同(比方YARN,Mesos,Kubernetes,standalone等),其中一些步骤可以被省略,或是有些组件会运行在同一个JVM进程中。
详细地,如果我们将Flink集群部署到YARN上,那么就会有如下的提交流程:

Flink任务提交后,Client向HDFS上传Flink的Jar包和设置,之后向Yarn ResourceManager提交任务,ResourceManager分配Container资源并通知对应的NodeManager启动ApplicationMaster,ApplicationMaster启动后加载Flink的Jar包和设置构建环境,然后启动JobManager,之后ApplicationMaster向ResourceManager申请资源启动TaskManager,ResourceManager分配Container资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager,NodeManager加载Flink的Jar包和设置构建环境并启动TaskManager,TaskManager启动后向JobManager发送心跳包,并期待JobManager向其分配任务。
2.3. Flink任务调度原理


Flink调度流程

  • 客户端不是运行时和程序执行的一部门,但它用于准备并发送dataflow(JobGraph)给Master(JobManager),然后,客户端断开毗连或者维持毗连以期待吸收盘算效果。
  • 当Flink 集群启动后,首先会启动一个JobManger 和一个或多个的TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后TaskManager 将心跳和统计信息汇报给JobManager。TaskManager之间以流的情势举行数据的传输。上述三者均为独立的JVM进程。
  • Client 为提交 Job 的客户端,可以是运行在任何机器上(与 JobManager 环境连通即可)。提交 Job 后,Client 可以竣事进程(Streaming的任务),也可以不竣事并期待效果返回。
  • JobManager 重要负责调度 Job 并协调 Task 做 checkpoint,职责上很像 Storm 的 Nimbus。从 Client 处吸收到 Job 和 JAR 包等资源后,会天生优化后的执行筹划,并以 Task 的单元调度到各个 TaskManager 去执行。
  • TaskManager 在启动的时间就设置好了槽位数(Slot),每个 slot 能启动一个 Task,Task 为线程。从 JobManager 处吸收必要部署的 Task,部署启动后,与本身的上游建立 Netty 毗连,吸收数据并处理。
2.3.1. TaskManger与Slots

Flink中每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程上执行一个或多个subtask。为了控制一个worker能吸收多少个task,worker通过task slot来举行控制(一个worker至少有一个task slot)。
每个task slot表现TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot化意味着一个subtask将不必要跟来自其他job的subtask竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。必要注意的是,这里不会涉及到CPU的隔离,slot如今仅仅用来隔离task的受管理的内存。
通过调整task slot的数量,允许用户界说subtask之间怎样互相隔离。如果一个TaskManager一个slot,那将意味着每个task group运行在独立的JVM中(该JVM可能是通过一个特定的容器启动的),而一个TaskManager多个slot意味着更多的subtask可以共享同一个JVM。而在同一个JVM进程中的task将共享TCP毗连(基于多路复用)和心跳消息。它们也可能共享数据集和数据结构,因此这减少了每个task的负载。


默认情况下,Flink允许子任务共享slot,纵然它们是不同任务的子任务(条件是它们来自同一个job)。这样的效果是,一个slot可以保存作业的整个管道。
TaskSlot是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots举行设置;
并行度parallelism是动态概念,即TaskManager运行程序时实际利用的并发能力,可以通过参数parallelism.default举行设置。
也就是说,假设一共有3个TaskManager,每一个TaskManager中的分配3个TaskSlot,也就是每个TaskManager可以吸收3个task,一共9个TaskSlot,如果我们设置parallelism.default=1,即运行程序默认的并行度为1,9个TaskSlot只用了1个,有8个空闲,因此,设置合适的并行度才能提高效率。


2.3.2. 程序与数据流(DataFlow)

所有的Flink程序都是由三部门组成的: Source TransformationSink。Source负责读取数据源,Transformation利用各种算子举行处理加工,Sink负责输出。
在运行时,Flink上运行的程序会被映射成“逻辑数据流”(dataflows),它包罗了这三部门。每一个dataflow以一个或多个sources开始以一个或多个sinks竣事。dataflow雷同于恣意的有向无环图(DAG)。在大部门情况下,程序中的转换运算(transformations)跟dataflow中的算子(operator)是一一对应的关系,但有时间,一个transformation可能对应多个operator。

2.3.3. 执行图

由Flink程序直接映射成的数据流图是StreamGraph,也被称为逻辑流图,由于它们表现的是盘算逻辑的高级视图。为了执行一个流处理程序,Flink必要将逻辑流图转换为物理数据流图(也叫执行图),详细阐明程序的执行方式。

Flink 中的执行图可以分成四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。

  • StreamGraph:是根据用户通过 Stream API 编写的代码天生的最初的图。用来表现程序的拓扑结构。
  • JobGraph:StreamGraph颠末优化后天生了 JobGraph,提交给 JobManager 的数据结构。重要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间活动所必要的序列化/反序列化/传输消耗。
  • ExecutionGraph:JobManager 根据 JobGraph 天生ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
  • 物理执行图:JobManager 根据 ExecutionGraph 对 Job 举行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个详细的数据结构。
2.3.4. 并行度

Flink程序的执行具有并行、分布式的特性。在执行过程中,一个流(stream)包罗一个或多个分区(stream partition),而每一个算子(operator)可以包罗一个或多个子任务(operator subtask),这些子任务在不同的线程、不同的物理机或不同的容器中彼此互不依赖地执行。
一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度

Stream在算子之间传输数据的情势可以是one-to-one(forwarding)的模式也可以是redistributing的模式,详细是哪一种情势,取决于算子的种类。
One-to-one:stream(比如在source和map operator之间)维护着分区以及元素的次序。那意味着map 算子的子任务看到的元素的个数以及次序跟source 算子的子任务生产的元素的个数、次序雷同,map、fliter、flatMap等算子都是one-to-one的对应关系。
雷同于spark中的窄依赖
Redistributing:stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。比方,keyBy() 基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就雷同于Spark中的shuffle过程。
2.3.5. 任务链(Operator Chains)

雷同并行度的one to one操纵,Flink这样相连的算子链接在一起形成一个task,原来的算子成为里面的一部门。将算子链接成task是非常有用的优化:它能减少线程之间的切换和基于缓存区的数据交换,在减少时延的同时提拔吞吐量。链接的行为可以在编程API中举行指定。

3. Flink 流处理API


3.1. Environment

3.1.1. getExecutionEnvironment()

创建一个执行环境,表现当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从下令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
  1. val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
  2. val env = StreamExecutionEnvironment.getExecutionEnvironment
复制代码
如果没有设置并行度,会以flink-conf.yaml中的设置为准,默认是1。
3.1.2. createLocalEnvironment()

  1. val env = StreamExecutionEnvironment.createLocalEnvironment(1)
复制代码
3.1.3. createRemoteEnvironment()

  1. val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")
复制代码
返回集群执行环境,将Jar提交到长途服务器。必要在调用时指定JobManager的IP和端口号,并指定要在集群中运行的Jar包。
3.2. Source

3.2.1. 从集合读取数据

  1. case class SensorReading(id: String, timestamp: Long, temperature: Double)
  2. object Sensor {
  3.   def main(args: Array[String]): Unit = {
  4.     val env = StreamExecutionEnvironment.getExecutionEnvironment
  5.     val stream1 = env
  6.     .fromCollection(List(
  7.       SensorReading("sensor_1", 1547718199, 35.8),
  8.       SensorReading("sensor_6", 1547718201, 15.4),
  9.       SensorReading("sensor_7", 1547718202, 6.7),
  10.       SensorReading("sensor_10", 1547718205, 38.1)
  11.     ))
  12.     stream1.print("stream1:").setParallelism(1)
  13.     env.execute()
  14.   }
  15. }
复制代码
3.2.2. 从文件读取数据

  1. val stream2 = env.readTextFile("YOUR_FILE_PATH")
复制代码
3.2.3. 从kafka消息队列读区

  1. <dependency>
  2.     <groupId>org.apache.flink</groupId>
  3.     <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  4.     <version>1.10.0</version>
  5. </dependency>
复制代码
  1. val properties = new Properties()
  2. properties.setProperty("bootstrap.servers", "localhost:9092")
  3. properties.setProperty("group.id", "consumer-group")
  4. properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  5. properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  6. properties.setProperty("auto.offset.reset", "latest")
  7. val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
复制代码
3.2.4. 自界说Source

除了以上的source数据来源,我们还可以自界说source。必要做的,只是传入一个SourceFunction就可以。详细调用如下:
  1. val stream4 = env.addSource( new MySensorSource() )
复制代码
我们盼望可以随机天生传感器数据,MySensorSource详细的代码实现如下:
  1. class MySensorSource extends SourceFunction[SensorReading]{
  2. // flag: 表示数据源是否还在正常运行
  3. var running: Boolean = true
  4. override def cancel(): Unit = {
  5.     running = false
  6. }
  7. override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
  8.   // 初始化一个随机数发生器
  9.   val rand = new Random()
  10.   
  11.   var curTemp = 1.to(10).map(
  12.   i => ( "sensor_" + i, 65 + rand.nextGaussian() * 20 )
  13.   )
  14.   
  15.   while(running){
  16.     // 更新温度值
  17.     curTemp = curTemp.map(
  18.         t => (t._1, t._2 + rand.nextGaussian())
  19.     )
  20.     // 获取当前时间戳
  21.     val curTime = System.currentTimeMillis()
  22.    
  23.     curTemp.foreach(
  24.       t => ctx.collect(SensorReading(t._1, curTime, t._2))
  25.     )
  26.       Thread.sleep(100)
  27.     }
  28.   }
  29. }
复制代码
3.3. Transform

3.3.1. map算子

  1. val streamMap = stream.map { x => x * 2 }
复制代码
3.3.2. flatMap

  1. val streamFlatMap = stream.flatMap{
  2.     x => x.split(" ")
  3. }
复制代码
3.3.3. Filter

  1. val streamFilter = stream.filter{
  2.     x => x == 1
  3. }
复制代码
3.3.4. KeyBy

DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包罗具有雷同key的元素,在内部以hash的情势实现的。
3.3.5. 滚动聚合算子Rolling Aggregation

这些算子可以针对KeyedStream的每一个支流做聚合。

  • sum()
  • min()
  • max()
  • minBy()
  • maxBy()
3.3.6. Reduce

KeyedStream → DataStream:一个分组数据流的聚合操纵,合并当前的元素和前次聚合的效果,产生一个新的值,返回的流中包罗每一次聚合的效果,而不是只返回末了一次聚合的终极效果。
  1. val stream2 = env.readTextFile("YOUR_PATH\\sensor.txt")
  2. .map( data => {
  3.     val dataArray = data.split(",")
  4.     SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
  5.   })
  6. .keyBy("id")
  7. .reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature) )
复制代码
3.3.7. Split 和Select


DataStream → SplitStream:根据某些特性把一个DataStream拆分成两个或者多个DataStream。

  1. val splitStream = stream2
  2.   .split( sensorData => {
  3.     if (sensorData.temperature > 30) Seq("high") else Seq("low")
  4.   } )
  5. val high = splitStream.select("high")
  6. val low = splitStream.select("low")
  7. val all = splitStream.select("high", "low")
复制代码
3.3.8. Connect和CoMap


DataStream,DataStream → ConnectedStreams:毗连两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和情势不发生任何变化,两个流相互独立。

ConnectedStreams → DataStream:作用于ConnectedStreams上,功能与map和flatMap一样,对ConnectedStreams中的每一个Stream分别举行map和flatMap处理。
  1. val warning = high.map( sensorData => (sensorData.id, sensorData.temperature) )
  2. val connected = warning.connect(low)
  3. val coMap = connected.map(
  4.     warningData => (warningData._1, warningData._2, "warning"),
  5.     lowData => (lowData.id, "healthy")
  6. )
复制代码
3.3.9. Union


DataStream → DataStream:对两个或者两个以上的DataStream举行union操纵,产生一个包罗所有DataStream元素的新DataStream。
  1. //合并以后打印
  2. val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream)
  3. unionStream.print("union:::")
复制代码
Connect与Union 区别:

  • Union之前两个流的类型必须是一样,Connect可以不一样,在之后的coMap中再去调整成为一样的
  • Connect只能操纵两个流,Union可以操纵多个。
3.4. Sink

Flink没有雷同于spark中foreach方法,让用户举行迭代的操纵。虽有对外的输出操纵都要利用Sink完成。
  1. stream.addSink(new MySink(xxxx))
复制代码
官方提供了一部门的框架的sink。除此以外,必要用户自界说实现sink。
4. Flink中的Window

streaming流式盘算是一种被筹划用于处理无穷数据集的数据处理引擎,而无穷数据集是指一种不停增长的本质上无穷的数据集,而window是一种切割无穷数据为有限块举行处理的手段。Window是无穷数据流处理的核心,Window将一个无穷的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做盘算操纵。
4.1. Window类型与API

Window可以分成两类:

  • CountWindow:按照指定的数据条数天生一个Window,与时间无关。
  • TimeWindow:按照时间天生Window。
对于TimeWindow,可以根据窗口实现原理的不同分成三类:

  • 滚动窗口(Tumbling Window)
将数据依据固定的窗口长度对数据举行切片。特点:时间对齐,窗口长度固定,没有重叠。滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。比方:如果你指定了一个5分钟大小的滚动窗口,窗口的创建如下图所示:

适用场景:适合做BI统计等(做每个时间段的聚合盘算)。
Flink默认的时间窗口根据Processing Time 举行窗口的分别,将Flink获取到的数据根据进入Flink的时间分别到不同的窗口中。
  1. val minTempPerWindow = dataStream
  2.   .map(r => (r.id, r.temperature))
  3.   .keyBy(_._1)
  4.   .timeWindow(Time.seconds(15))
  5.   .reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
复制代码
时间间隔可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。
默认的CountWindow是一个滚动窗口,只必要指定窗口大小即可,当元素数量达到窗口大小时,就会触发窗口的执行。
  1. val minTempPerWindow: DataStream[(String, Double)] = dataStream
  2.   .map(r => (r.id, r.temperature))
  3.   .keyBy(_._1)
  4.   .countWindow(5)
  5.   .reduce((r1, r2) => (r1._1, r1._2.max(r2._2)))
复制代码

  • 滑动窗口(Sliding Window)
滑动窗口是固定窗口的更广义的一种情势,滑动窗口由固定的窗口长度和滑动间隔组成。特点:时间对齐,窗口长度固定,可以有重叠。
滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口雷同,窗口的大小由窗口大小参数来设置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。
比方,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包罗着上个10分钟产生的数据,如下图所示:

适用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时必要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了5s,也就是说,每5s就盘算输出效果一次,每一次盘算的window范围是15s内的所有元素。
  1. val minTempPerWindow: DataStream[(String, Double)] = dataStream
  2.   .map(r => (r.id, r.temperature))
  3.   .keyBy(_._1)
  4.   .timeWindow(Time.seconds(15), Time.seconds(5))
  5.   .reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
  6. // .window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5))
复制代码
时间间隔可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一个来指定。
滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时必要传入两个参数,一个是window_size,一个是sliding_size。下面代码中的sliding_size设置为了2,也就是说,每收到两个雷同key的数据就盘算一次,每一次盘算的window范围是10个元素。
  1. val keyedStream: KeyedStream[(String, Int), Tuple] = dataStream.map(r => (r.id, r.temperature)).keyBy(0)
  2. //每当某一个key的个数达到2的时候,触发计算,计算最近该key最近10个元素的内容
  3. val windowedStream: WindowedStream[(String, Int), Tuple, GlobalWindow] = keyedStream.countWindow(10,2)
  4. val sumDstream: DataStream[(String, Int)] = windowedStream.sum(1)
复制代码

  • 会话窗口(Session Window)
由一系列变乱组合一个指定时间长度的timeout间隙组成,雷同于web应用的session,也就是一段时间没有吸收到新数据就会天生新的窗口。特点:时间无对齐。
session窗口分配器通过session运动来对元素举行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和竣事时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非运动间隔产生,那个这个窗口就会关闭。一个session窗口通过一个session间隔来设置,这个session间隔界说了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。

4.1.1. window function

window function 界说了要对窗口中收集的数据做的盘算操纵,重要可以分为两类:

  • 增量聚合函数(incremental aggregation functions):每条数据到来就举行盘算,保持一个简朴的状态。典范的增量聚合函数有ReduceFunction, AggregateFunction。
  • 全窗口函数(full window functions):先把窗口所有数据收集起来,等到盘算的时间会遍历所有数据。ProcessWindowFunction就是一个全窗口函数。
4.1.2. 其它可选API

.trigger() ——触发器(界说 window 什么时间关闭,触发盘算并输出效果)
.evitor() ——移除器(界说移除某些数据的逻辑)

  • .allowedLateness() ——允许处理迟到的数据
  • .sideOutputLateData() ——将迟到的数据放入侧输出流
  • .getSideOutput() ——获取侧输出流
5. 时间语义与WaterMark

5.1. Flink中的时间语义

在Flink的流式处理中,会涉及到时间的不同概念,如下图所示:


  • Event Time:是变乱创建的时间。它通常由变乱中的时间戳描述,比方采集的日志数据中,每一条日志都会记录本身的天生时间,Flink通过时间戳分配器访问变乱时间戳。
  • Ingestion Time:是数据进入Flink的时间。
  • Processing Time:是每一个执行基于时间操纵的算子的本地体系时间,与机器相关,默认的时间属性就是Processing Time。
5.2. EventTime的引入

在Flink的流式处理中,绝大部门的业务都会利用eventTime,一般只在eventTime无法利用时,才会被迫利用ProcessingTime或者IngestionTime。如果要利用EventTime,那么必要引入EventTime的时间属性,引入方式如下所示:
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. // 从调用时刻开始给env创建的每一个stream追加时间特征
  3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
复制代码
5.3. Watermark

流处理从变乱产生,到流经source,再到operator,中间是有一个过程和时间的,虽然大部门情况下,流到operator的数据都是按照变乱产生的时间次序来的,但是也不清除由于网络、分布式等缘故起因,导致乱序的产生,所谓乱序,就是指Flink吸收到的变乱的先后次序不是严酷按照变乱的Event Time次序排列的。


那么此时出现一个问题,一旦出现乱序,如果只根据eventTime决定window的运行,我们不能明白数据是否全部到位,但又不能无穷期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发window去举行盘算了,这个特别的机制,就是Watermark。
Watermark是一种衡量Event Time进展的机制。
Watermark是用于处理乱序变乱的,而正确的处理乱序变乱,通常用Watermark机制结合window来实现。
数据流中的Watermark用于表现timestamp小于Watermark的数据,都已经到达了,因此,window的执行也是由Watermark触发的。
Watermark可以明白成一个延迟触发机制,我们可以设置Watermark的延时时长t,每次体系会校验已经到达的数据中最大的maxEventTime,然后认定eventTime小于maxEventTime - t的所有数据都已经到达,如果有窗口的停止时间便是maxEventTime – t,那么这个窗口被触发执行。
有序流的Watermarker如下图所示:

乱序流的Watermarker如下图所示:(Watermark设置为2)

当Flink吸收到数据时,会按照一定的规则去天生Watermark,这条Watermark就便是当前所有到达数据中的maxEventTime - 延迟时长,也就是说,Watermark是基于数据携带的时间戳天生的,一旦Watermark比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于event time是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永世都不被触发。
上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的变乱对应的Watermark是5s,时间戳为12s的变乱的Watermark是10s,如果我们的窗口1是1s~5s,窗口2是6s~10s,那么时间戳为7s的变乱到达时的Watermarker恰好触发窗口1,时间戳为12s的变乱到达时的Watermark恰好触发窗口2。
Watermark 就是触发前一窗口的“关窗时间”,一旦触发关门那么以当前时间为准在窗口范围内的所有所有数据都会收入窗中。只要没有达到水位那么不管现实中的时间推进了多久都不会触发关窗。
watermark的引入很简朴,对于乱序数据,最常见的引用方式如下:
  1. dataStream.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {
  2.   override def extractTimestamp(element: SensorReading): Long = {
  3.     element.timestamp * 1000
  4.   }
  5. } )
复制代码
Event Time的利用一定要指定数据源中的时间戳。否则程序无法知道变乱的变乱时间是什么(数据源里的数据没有时间戳的话,就只能利用Processing Time了)。
我们看到上面的例子中创建了一个看起来有点复杂的类,这个类实现的实在就是分配时间戳的接口。Flink袒露了TimestampAssigner接供词我们实现,使我们可以自界说怎样从变乱数据中抽取时间戳。
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. // 从调用时刻开始给env创建的每一个stream追加时间特征
  3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)val readings: DataStream[SensorReading] = env.addSource(new SensorSource).assignTimestampsAndWatermarks(new MyAssigner())
复制代码
MyAssigner有两种类型

  • AssignerWithPeriodicWatermarks
  • AssignerWithPunctuatedWatermarks
以上两个接口都继承自TimestampAssigner。
5.3.1. Assigner with periodic watermarks

周期性的天生watermark:体系会周期性的将watermark插入到流中(水位线也是一种特别的变乱!)。默认周期是200毫秒。可以利用ExecutionConfig.setAutoWatermarkInterval()方法举行设置。
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  3. // 每隔5秒产生一个watermark
  4. env.getConfig.setAutoWatermarkInterval(5000)
复制代码
产生watermark的逻辑:每隔5秒钟,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的watermark会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于便是之前水位的时间戳,则不会产生新的watermark。
自界说一个周期性的时间戳抽取:
  1. class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {
  2. val bound: Long = 60 * 1000 // 延时为1分钟
  3. var maxTs: Long = Long.MinValue // 观察到的最大时间戳
  4. override def getCurrentWatermark: Watermark = {
  5.     new Watermark(maxTs - bound)
  6. }
  7. override def extractTimestamp(r: SensorReading, previousTS: Long) = {
  8.   maxTs = maxTs.max(r.timestamp)
  9.     r.timestamp
  10.   }
  11. }
复制代码
一种简朴的特别情况是,如果我们事先得知数据流的时间戳是单调递增的,也就是说没有乱序,那我们可以利用assignAscendingTimestamps,这个方法会直接利用数据的时间戳天生watermark。
  1. val stream: DataStream[SensorReading] = ...
  2. val withTimestampsAndWatermarks = stream
  3. .assignAscendingTimestamps(e => e.timestamp)
复制代码
而对于乱序数据流,如果我们能大抵估算出数据流中的变乱的最大延迟时间,就可以利用如下代码:
  1. val stream: DataStream[SensorReading] = ...
  2. val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(
  3. new SensorTimeAssigner
  4. )
  5. class SensorTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(5)) {
  6. // 抽取时间戳
  7. override def extractTimestamp(r: SensorReading): Long = r.timestamp
  8. }
复制代码
5.3.2. Assigner with punctuated watermarks

中断式地天生watermark。和周期性天生的方式不同,这种方式不是固定时间的,而是可以根据必要对每条数据举行筛选和处理。直接上代码来举个例子,我们只给sensor_1的传感器的数据流插入watermark:
  1. class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {
  2.   val bound: Long = 60 * 1000
  3.   
  4.   override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long): Watermark = {
  5.     if (r.id == "sensor_1") {
  6.       new Watermark(extractedTS - bound)
  7.     } else {
  8.       null
  9.     }
  10.   }
  11.   override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
  12.     r.timestamp
  13.   }
  14. }
复制代码
5.4. EvnetTime在window中的利用

滚动窗口(TumblingEventTimeWindows)
  1. def main(args: Array[String]): Unit = {
  2.     //  环境
  3.     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  4.     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  5.     env.setParallelism(1)
  6.     val dstream: DataStream[String] = env.socketTextStream("localhost",7777)
  7.     val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text =>
  8.       val arr: Array[String] = text.split(" ")
  9.       (arr(0), arr(1).toLong, 1)
  10.     }
  11.     val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
  12.       override def extractTimestamp(element: (String, Long, Int)): Long = {
  13.        return  element._2
  14.       }
  15.     })
  16.     val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0)
  17.     textKeyStream.print("textkey:")
  18.     val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(TumblingEventTimeWindows.of(Time.seconds(2)))
  19.     val groupDstream: DataStream[mutable.HashSet[Long]] = windowStream.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) =>
  20.       set += ts
  21.     }
  22.     groupDstream.print("window::::").setParallelism(1)
  23.     env.execute()
  24.   }
  25. }
复制代码
滑动窗口(SlidingEventTimeWindows)
  1. def main(args: Array[String]): Unit = {
  2.   //  环境
  3.   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  4.   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  5.   env.setParallelism(1)
  6.   val dstream: DataStream[String] = env.socketTextStream("localhost",7777)
  7.   val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text =>
  8.     val arr: Array[String] = text.split(" ")
  9.     (arr(0), arr(1).toLong, 1)
  10.   }
  11.   val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
  12.     override def extractTimestamp(element: (String, Long, Int)): Long = {
  13.      return  element._2
  14.     }
  15.   })
  16.   val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0)
  17.   textKeyStream.print("textkey:")
  18.   val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(SlidingEventTimeWindows.of(Time.seconds(2),Time.milliseconds(500)))
  19.   val groupDstream: DataStream[mutable.HashSet[Long]] = windowStream.fold(new mutable.HashSet[Long]()) { case (set, (key, ts, count)) =>
  20.     set += ts
  21.   }
  22.   groupDstream.print("window::::").setParallelism(1)
  23.   env.execute()
  24. }
复制代码
会话窗口(EventTimeSessionWindows)
相邻两次数据的EventTime的时间差超过指定的时间间隔就会触发执行。如果加入Watermark,会在符合窗口触发的情况下举行延迟。到达延迟水位再举行窗口触发。
  1. def main(args: Array[String]): Unit = {
  2.     //  环境
  3.     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  4.     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  5.     env.setParallelism(1)
  6.     val dstream: DataStream[String] = env.socketTextStream("localhost",7777)
  7.     val textWithTsDstream: DataStream[(String, Long, Int)] = dstream.map { text =>
  8.       val arr: Array[String] = text.split(" ")
  9.       (arr(0), arr(1).toLong, 1)
  10.     }
  11.     val textWithEventTimeDstream: DataStream[(String, Long, Int)] = textWithTsDstream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, Long, Int)](Time.milliseconds(1000)) {
  12.       override def extractTimestamp(element: (String, Long, Int)): Long = {
  13.        return  element._2
  14.       }
  15.     })
  16.     val textKeyStream: KeyedStream[(String, Long, Int), Tuple] = textWithEventTimeDstream.keyBy(0)
  17.     textKeyStream.print("textkey:")
  18.     val windowStream: WindowedStream[(String, Long, Int), Tuple, TimeWindow] = textKeyStream.window(EventTimeSessionWindows.withGap(Time.milliseconds(500)) )
  19.     windowStream.reduce((text1,text2)=>
  20.                         (  text1._1,0L,text1._3+text2._3)
  21.     )  .map(_._3).print("windows:::").setParallelism(1)
  22.     env.execute()
  23.   }
复制代码
6. ProcessFunction API(底层API)

我们之前学习的转换算子是无法访问变乱的时间戳信息和水位线信息的。而这在一些应用场景下,极为紧张。比方MapFunction这样的map转换算子就无法访问时间戳或者当前变乱的变乱时间。
基于此,DataStream API提供了一系列的Low-Level转换算子。可以访问时间戳、watermark以及注册定时势件。还可以输出特定的一些变乱,比方超时势件等。Process Function用来构建变乱驱动的应用以及实现自界说的业务逻辑(利用之前的window函数和转换算子无法实现)。比方,Flink SQL就是利用Process Function实现的。
Flink提供了8个Process Function:


  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • ProcessJoinFunction
  • BroadcastProcessFunction
  • KeyedBroadcastProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction
6.1. KeyedProcessFunction

KeyedProcessFunction用来操纵KeyedStream。KeyedProcessFunction会处理流的每一个元素,输出为0个、1个或者多个元素。所有的Process Function都继承自RichFunction接口,所以都有open()、close()和getRuntimeContext()等方法。而KeyedProcessFunction[KEY, IN, OUT]还额外提供了两个方法:

  • processElement(v: IN, ctx: Context, out: Collector[OUT]), 流中的每一个元素都会调用这个方法,调用效果将会放在Collector数据类型中输出。Context可以访问元素的时间戳,元素的key,以及TimerService时间服务。Context还可以将效果输出到别的流(side outputs)。
  • onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])是一个回调函数。当之前注册的定时器触发时调用。参数timestamp为定时器所设定的触发的时间戳。Collector为输出效果的集合。OnTimerContext和processElement的Context参数一样,提供了上下文的一些信息,比方定时器触发的时间信息(变乱时间或者处理时间)。
6.2. TimerService 和定时器(Timers)

Context和OnTimerContext所持有的TimerService对象拥有以下方法:

  • currentProcessingTime(): Long 返回当前处理时间
  • currentWatermark(): Long 返回当前watermark的时间戳
  • registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达定时时间时,触发timer。
  • registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于便是定时器注册的时间时,触发定时器执行回调函数。
  • deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
  • deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的变乱时间定时器,如果没有此时间戳的定时器,则不执行。
  • 当定时器timer触发时,会执行回调函数onTimer()。注意定时器timer只能在keyed streams上面利用。
6.3. 侧输出流(SideOutput)

大部门的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都雷同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以界说为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个变乱到一个或者多个side outputs。
6.4. CoProcessFunction

对于两条输入流,DataStream API提供了CoProcessFunction这样的low-level操纵。CoProcessFunction提供了操纵每一个输入流的方法: processElement1()和processElement2()。
雷同于ProcessFunction,这两种方法都通过Context对象来调用。这个Context对象可以访问变乱数据,定时器时间戳,TimerService,以及side outputs。CoProcessFunction也提供了onTimer()回调函数。
7. 状态编程和容错机制

流式盘算分为无状态和有状态两种情况。无状态的盘算观察每个独立变乱,并根据末了一个变乱输出效果。
比方,流处理应用程序从传感器吸收温度读数,并在温度超过90度时发出警告。有状态的盘算则会基于多个变乱输出效果。以下是一些例子。

  • 所有类型的窗口。比方,盘算已往一小时的平均温度,就是有状态的盘算。
  • 所有用于复杂变乱处理的状态机。比方,若在一分钟内收到两个相差20度以上的温度读数,则发出警告,这是有状态的盘算。
  • 流与流之间的所有关联操纵,以及流与静态表或动态表之间的关联操纵,都是有状态的盘算。
下图展示了无状态流处理和有状态流处理的重要区别。无状态流处理分别吸收每条数据记录(图中的黑条),然后根据最新输入的数据天生输出数据(白条)。有状态流处理会维护状态(根据每条输入记录举行更新),并基于最新输入的记录和当前的状态值天生输出记录(灰条)。

上图中输入数据由黑条表现。无状态流处理每次只转换一条输入记录,并且仅根据最新的输入记录输出效果(白条)。有状态流处理维护所有已处理记录的状态值,并根据每条新输入的记录更新状态,因此输出记录(灰条)反映的是综合考虑多个变乱之后的效果。
只管无状态的盘算很紧张,但是流处理对有状态的盘算更感爱好。究竟上,正确地实现有状态的盘算比实现无状态的盘算难得多。旧的流处理体系并不支持有状态的盘算,而新一代的流处理体系则将状态及其正确性视为重中之重。
7.1. 有状态的算子和应用程序

Flink内置的许多算子,数据源source,数据存储sink都是有状态的,流中的数据都是buffer records,会保存一定的元素或者元数据。比方: ProcessWindowFunction会缓存输入流的数据,ProcessFunction会保存设置的定时器信息等等。
在Flink中,状态始终与特定算子相关联。总的来说,有两种类型的状态:


  • 算子状态(operator state)
  • 键控状态(keyed state)
7.1.1. 算子状态(operator state)

算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到雷同的状态,状态对于同一任务而言是共享的。算子状态不能由雷同或不同算子的另一个任务访问。

Flink为算子状态提供三种基本数据结构:

  • 列表状态(List state):将状态表现为一组数据的列表。
  • 联合列表状态(Union list state):也将状态表现为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时怎样恢复。
  • 广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都雷同,那么这种特别情况最适合应用广播状态。
7.1.2. 键控状态(keyed state)

键控状态是根据输入数据流中界说的键(key)来维护和访问的。Flink为每个键值维护一个状态实例,并将具有雷同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有雷同key的所有数据都会访问雷同的状态。Keyed State很雷同于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。

7.2. 状态一致性

当在分布式体系中引入状态时,自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的效果,与没有发生任何以障时得到的效果相比,前者到底有多正确?
举例来说,假设要对最近一小时登录的用户计数。在体系经历故障之后,计数效果是多少?如果有毛病,是有遗漏的计数还是重复计数?
7.2.1. 一致性级别

在流处理中,一致性可以分为3个级别:

  • at-most-once: 这实在是没有正确性保障的委婉说法——故障发生之后,计数效果可能丢失。同样的还有udp。
  • at-least-once: 这表现计数效果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。
  • exactly-once: 这指的是体系保证在发生故障后得到的计数效果与正确值一致。
曾经,at-least-once非常流行。第一代流处理器(如Storm和Samza)刚问世时只保证at-least-once,缘故起因有二。

  • 保证exactly-once的体系实现起来更复杂。这在底子架构层(决定什么代表正确,以及exactly-once的范围是什么)和实现层都很有挑战性。
  • 流处理体系的早期用户愿意担当框架的局限性,并在应用层想办法弥补(比方使应用程序具有幂等性,或者用批量盘算层再做一遍盘算)。
最先保证exactly-once的体系(Storm Trident和Spark Streaming)在性能和体现力这两个方面付出了很大的代价。为了保证exactly-once,这些体系无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到效果前,必须期待一批记录处理竣事。因此,用户经常不得不利用两个流处理框架(一个用来保证exactly-once,另一个用来对每个元素做低延迟处理),效果使底子办法更加复杂。曾经,用户不得不在保证exactly-once与得到低延迟和效率之间权衡利弊。Flink避免了这种权衡。Flink的一个庞大代价在于,它既保证了exactly-once,也具有低延迟和高吞吐的处理能力。从根本上说,Flink通过使自身满足所有需求来避免权衡,它是业界的一次意义庞大的技术飞跃。只管这在生手看来很神奇,但是一旦相识,就会恍然大悟。
7.2.2. 端到端状态一致性

如今我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包罗了数据源(比方 Kafka)和输出到持久化体系。
端到端的一致性保证,意味着效果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它本身的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。详细可以分别如下:
内部保证——依赖checkpoint

  • source 端——必要外部源可重设数据的读取位置
  • sink 端——必要保证从故障恢复时,数据不会重复写入外部体系
而对于sink端,又有两种详细的实现方式:幂等(Idempotent)写入和变乱性(Transactional)写入。

  • 幂等写入:所谓幂等操纵,是说一个操纵,可以重复执行许多次,但只导致一次效果更改,也就是说,后面再重复执行就不起作用了。
  • 变乱写入:必要构建变乱来写入外部体系,构建的变乱对应着 checkpoint,等到 checkpoint 真正完成的时间,才把所有对应的效果写入 sink 体系中。
对于变乱性写入,详细又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。DataStream API 提供了GenericWriteAheadSink模板类和TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的变乱性写入。

7.3. 检查点(checkpoint)

Flink详细怎样保证exactly-once呢? 它利用一种被称为"检查点"(checkpoint)的特性,在出现故障时将体系重置回正确状态。下面通过简朴的类比来解释检查点的作用。
Flink检查点的核心作用是确保状态正确,纵然遇到程序中断,也要正确。
检查点是Flink最有代价的创新之一,由于它使Flink可以保证exactly-once,并且不必要牺牲性能
7.3.1. Flink+Kafka怎样实现端到端的exactly-once语义

我们知道,端到端的状态一致性的实现,必要每一个组件都实现,对于Flink + Kafka的数据管道体系(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?

  • 内部——利用checkpoint机制,把状态存盘,发生故障的时间可以恢复,保证内部的状态一致性。
  • source ——kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时间可以由毗连器重置偏移量,重新消费数据,保证一致性。
  • sink —— kafka producer作为sink,采用两阶段提交 sink,必要实现一个 TwoPhaseCommitSinkFunction
内部的checkpoint机制我们已经有了相识,那source和sink详细又是怎样运行的呢?接下来我们渐渐做一个分析。我们知道Flink由JobManager协调各个TaskManager举行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的举行持久化保存。

当 checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流;barrier会在算子间传递下去。

每个算子会对当前的状态做个快照,保存到状态后端。对于source任务而言,就会把当前的offset作为状态保存起来。下次从checkpoint恢复时,source任务可以重新提交偏移量,从前次保存的位置开始重新消费数据。

每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里。sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的变乱(还不能被消费);当遇到 barrier 时,把状态保存到状态后端,并开启新的预提交变乱。

当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成。
当sink 任务收到确认通知,就会正式提交之前的变乱,kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了。

所以我们看到,执行过程实际上是一个两段式提交,每个算子执行完成,会举行“预提交”,直到执行完sink操纵,会发起“确认提交”,如果执行失败,预提交会放弃掉。
详细的两阶段提交步骤总结如下:

  • 第一条数据来了之后,开启一个 kafka 的变乱(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”
  • jobmanager 触发 checkpoint 操纵,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager
  • sink 毗连器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的变乱,用于提交下个检查点的数据
  • jobmanager 收到所有任务的通知,发出确认信息,表现 checkpoint 完成
  • sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
  • 外部kafka关闭变乱,提交的数据可以正常消费了。
所以我们也可以看到,如果宕机必要通过StateBackend举行恢复,只能恢复所有确认提交的操纵。
7.4. 选择一个状态后端(state backend)


  • MemoryStateBackend:内存级的状态后端,会将键控状态作为内存中的对象举行管理,将它们存储在TaskManager的JVM堆上;而将checkpoint存储在JobManager的内存中。
  • FsStateBackend:将checkpoint存到长途的持久化文件体系(FileSystem)上。而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。
  • RocksDBStateBackend:将所有状态序列化后,存入本地的RocksDB中存储。
注意:RocksDB的支持并不直接包罗在flink中,必要引入依赖:
  1. val env = StreamExecutionEnvironment.getExecutionEnvironment
  2. val checkpointPath: String = ???
  3. val backend = new RocksDBStateBackend(checkpointPath)
  4. env.setStateBackend(backend)
  5. env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
  6. env.enableCheckpointing(1000)
  7. // 配置重启策略
  8. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(60, Time.of(10, TimeUnit.SECONDS)))
复制代码
8. Table API 与SQL

Table API是流处理和批处理通用的关系型API,Table API可以基于流输入或者批输入来运行而不必要举行任何修改。Table API是SQL语言的超集并专门为Apache Flink筹划的,Table API是Scala 和Java语言集成式的API。与常规SQL语言中将查询指定为字符串不同,Table API查询是以Java或Scala中的语言嵌入样式来界说的,具有IDE支持如:自动完成和语法检测。
9. Flink CEP

一个或多个由简朴变乱构成的变乱流通过一定的规则匹配,然后输出用户想得到的数据,满足规则的复杂变乱。
特性:


  • 目标:从有序的简朴变乱流中发现一些高阶特性
  • 输入:一个或多个由简朴变乱构成的变乱流
  • 处理:识别简朴变乱之间的内在联系,多个符合一定规则的简朴变乱构成复杂变乱
  • 输出:满足规则的复杂变乱

CEP用于分析低延迟、频仍产生的不同来源的变乱流。CEP可以帮助在复杂的、不相关的变乱流中找出故意义的模式和复杂的关系,以靠近实时或准实时的得到通知并制止一些行为。
CEP支持在流上举行模式匹配,根据模式的条件不同,分为连续的条件或不连续的条件;模式的条件允许有时间的限制,当在条件范围内没有达到满足的条件时,会导致模式匹配超时。
看起来很简朴,但是它有许多不同的功能:

  • 输入的流数据,尽快产生效果
  • 在2个event流上,基于时间举行聚合类的盘算
  • 提供实时/准实时的警告和通知
  • 在多样的数据源中产生关联并分析模式
  • 高吞吐、低延迟的处理
市场上有多种CEP的解决方案,比方Spark、Samza、Beam等,但他们都没有提供专门的library支持。但是Flink提供了专门的CEP library。
Flink为CEP提供了专门的Flink CEP library,它包罗如下组件:

  • Event Stream
  • pattern界说
  • pattern检测
  • 天生Alert

参考博文

数据摄入和数据吞吐量 - 尚硅谷Flink教程
https://apachecn.github.io/flink-doc-zh/#/

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

种地

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表