Flink系统知识解说之:Flink内存管理详解

打印 上一主题 下一主题

主题 988|帖子 988|积分 2964

Flink系统知识解说之:Flink内存管理详解

在现阶段,大部分开源的大数据计算引擎都是用Java大概是基于JVM的编程语言实现的,如Apache Hadoop、Apache Spark、Apache Drill、Apache Flink等。Java语言的好处是不消考虑底层,低沉了程序员的门槛,JVM可以对代码举行深度优化,对内存资源举行管理,自动回收内存。但是自动内存管理的问题在于不可控,基于JVM的大数据引擎常常会面临一个问题,即在处理海量数据时,如何在内存中存储大量的数据。
自主内存管理

Flink从一开始就选择了自主的内存管理,避开了JVM内存管理在大数据场景下的问题,提升了计算服从。
1.JVM内存管理的不足

当须要将海量数据存储到内存中时,就不得不面对JVM存在的几个问题:
(1)有效数据密度低
Java的对象在内存中的存储包罗3个重要部分:对象头、实例数据、对齐填充部分。32位和64位的假造机中对象头分别须要占32bit和64bit。实例数据时现实的数据存储。为了提高服从,内存中数据存储不是连续的,而是按照8 byte的整数倍举行存储。例如,只有一个boolean字段的类实例占16 byte:头信息8 byte,boolean 1 byte,为了对齐到达8的倍数会额外占用7 byte。这就导致在JVM中有效信息的存储密度很低。
(2)垃圾回收
JVM的内存回收机制的优点和缺点同样显着,优点是开发者无需关注资源回收问题,可以提高开发服从,减少内存走漏的大概。但是内存回收是不可控的,在大数据计算的场景中,这个缺点被放大,TB、PB级的数据计算须要斲丧大量的内存,在内存中产生海量的Java对象。一旦出现Full GC,GC会到达秒级乃至分钟级,直接影响实行服从。
GC带来的停止会使集群中的心跳超时,导致节点被踢出集群,整个集群进入不稳定状态。虽然通过JVM参数调优可以提升回收服从,尽量减少Full GC的发生,但是仍然不能避免这个问题,准确的调优也确实非常困难。
(3)OOM问题影响稳定性
OutOfMemoryError是分布式计算框架经常会碰到的问题,当JVM中全部对象巨细超过分配给JVM的内存巨细时,就会发生OutOfMemoryError错误,导致JVM崩溃,分布式框架的结实性和性能都会受到影响。
(4)缓存未掷中问题
CPU举行计算的时候,是从CPU缓存中获取数据,而不是直接从内存中获取数据。 CPU有分L1、L2、L3级缓存。L1小,一样平常为32KB,L3大,能到达32MB。缓存的理论底子是程序局部性原理,包括时间局部性和空间局部性:最近被CPU访问的数据,短期内CPU还要访问(时间);被CPU访问的数据附近的数据,CPU短期内还要访问(空间)。Java对象在堆上存储的时候并不是连续的,所以从内存中读取Java对象时,缓存的邻近的内存区域的数据往往不是CPU下一步计算所须要的,这就是缓冲未掷中。此时CPU须要空转等待从内存中重新读取数据。CPU的速率和内存的速率之间差好几个数量级,导致CPU没有充分使用起来。假如数据没有在内存中,而是须要从磁盘上加载,那么实行服从就会变得惨不忍睹。
2.自主内存管理

因为直接使用JVM做内存管理在大数据场景下大概碰到的诸多问题,所以越来越多的大数据计算引擎选择自行管理JVM内存,如Spark、Flink、HBase,尽量到达C/C++一样的性能,同时避免OOM的发生。本章重要介绍Flink是如何解决上面的问题的,重要内容包括内存管理、定制的序列化工具、缓存友好的数据布局和算法、堆外内存等。
在Flink中,Java对象的有效信息被序列化为二进制数据流,在内存中连续存储,生存在预分配的内存块上,内存块叫做MemorySegment。MemorySegment是内存分配的最小单元,是一段固定长度的内存(默认巨细为32KB)。同时,Flink为其提供了非常高效的读写方法,许多运算可以直接操作二进制数据,而不须要反序列化即可实行。
MemorySegment可以生存在堆上,其内部存储为一个Java byte数组,也可以生存在堆外的ByteBuffer中。每条记录都会以序列化的情势存储在一个或多个MemorySegment中。
但使用堆上内存,仍然不是完全自主的内存管理,还存在以下问题:
1)超大内存(上百GB)JVM的启动须要很长时间,Full GC可以到达分钟级。使用堆外内存,可以将大量的数据生存在堆外,极大地减小堆内存,避免GC和内存溢出的问题。
2)高效的IO操作。堆外内存在写磁盘或网络传输时接纳的是零拷贝,而堆上内存则至少须要1次内存复制。
3.堆外内存的不足之处

堆外内存提供了更好的性能和更可控的内存管理,但是也存在几个问题:
1)堆上内存的使用、监控、调试简朴,堆外内存出现问题后的诊断则较为复杂。
2)Flink偶尔须要分配短生命周期的MemorySegment,在堆外内存上分配比在堆上内存开销更高。
3)在Flink的测试中,部分操作在堆外内存上会比堆上内存慢。
同时为了提供服从,Flink在计算中接纳了DBMS的Sort和Join算法,直接操作二进制数据,避免数据反复序列化带来的开销。Flink的内部实现更像C/C++而非Java。
内存模子

内存布局

TaskManager是Flink中实行计算的焦点组件,是用来运行用户代码的Java进程。其中,大量使用了堆外内存。
Flink TaskManager的简化和详细内存布局如下图所示:
简化内存模子:

详细内存模子:

基于文初提及的使用JVM堆上内存的一些不足之处,Flink设计了使用堆外内存的自主内存管理。因此,Flink任务进程的总内存(Total Process Memory, TPM)= Flink自身使用的内存(Total Flink Memory, TFM) + JVM运行额外的内存(如Metaspace、overhead)。其中,Flink自身使用的内存(TFM)包括了JVM堆内存和自主管理的堆外内存,堆外内存又包罗了托管内存和直接内存。下面分别对这些分类举行介绍:
JVM Heap

Framework Heap

这部分内存重要由Flink框架自身使用,用于存储系统级别的数据布局,包括Flink框架在运行期间须要的一些数据布局,例如任务的线程栈内存和其他Flink框架的底子设施。例如用于JobManager和TaskManager的RPC消息、管理查抄点的元数据等。它是作业实行所必需的根本内存,独立于用户程序和运行期间的数据存储。
Task Heap

这部分内存重要用于存储由用户函数创建的Java对象和用户函数操作的数据。例如,当实行一个map操作,您的函数大概会创建一些新的Java对象,这些对象都是在JVM堆内存中创建和管理的。假如Flink的托管内存配置为堆内,那么Flink的排序、哈希和状态后端操作也会使用到Task Heap内存。
Off-Heap

托管内存(Managed Memory)

托管内存是由 Flink 负责分配和管理的本地(堆外)内存。 以了局景须要使用托管内存:


  • 流处理作业中用于 RocksDB State Backend。
  • 流处理和批处理作业中用于排序、哈希表及缓存中央结果。
  • 流处理和批处理作业中用于在 Python 进程中实行用户自定义函数。
    更具体的,对于当作业中使用排序、哈希表及缓存中央结果时,Flink是如何使用托管内存的:
  • 排序:例如,当你须要对一个非常大的数据集举行排序时,假如数据无法完全装入内存,Flink 就会使用其托管内存来实行外排序。在外排序过程中,数据会被分割成可以装入内存的小块,每个小块内部举行排序,然后将排序后的小块写入磁盘。当全部小块都举行了排序和写入后,Flink 会从磁盘读取这些小块,实行归并排序,直到全部数据都被排序。
  • 哈希表:Flink 在处理连接(Join)操作时,经常须要使用哈希表来维护到目前为止已经看到的数据记录。假如不能将全部数据装入内存,Flink 就会使用托管内存来存储这个哈希表。这样就可以保证即使在处理大规模数据时也能保持良好的性能。
  • 缓存中央结果:在一些须要多遍扫描数据的算法(比如迭代算法)中,Flink 会缓存数据的中央结果,以便下一轮迭代可以重复使用,这样可以减少数据重复读取的开销。Flink 托管内存就是用来存储这些中央结果的。
    另外,当使用 RocksDB 作为状态后端时,Flink 托管内存重要被用作 RocksDB 的写缓冲区(Write Buffer)和读缓存(Block Cache),从而提高状态访问的速率。
    简言之,Flink 的托管内存重要用于存储在处理过程中须要存储的中央计算数据和结果,以求在充分使用有限内存资源的同时提供尽大概高的处理速率。
直接内存()

直接内存通常指的是被Flink进程直接从操作系统中申请的、不受Java堆内存垃圾回收器管理的内存。 以了局景须要使用直接内存:


  • 于网络通信和文件I/O,
  • 通过网络缓冲池举行数据交换(如shuffle)
  • 数据缓冲以及序列化/反序列化中举行应用。
    Flink通过直接内存技能举行数据交换可以有效避免频繁的Java堆内存和本地I/O缓存之间的数据复制(使用零拷贝技能),从而提高性能。
    在一些情况下,直接内存也可以使用在某些须要大量内存并希望避免频繁触发垃圾回收的处理中,例如当使用RocksDB作为状态后端时,RocksDB的本地内存通常是由直接内存提供的,这样可以避免状态数据引起的Java堆内存的显著增加,从而低沉了垃圾收集的开销和提高了性能。
直接内存包括了以下几部分:


  • Framework Off-Heap:这部分内存被Flink框架用于框架自身的一些运行需求。比如,Flink的一些本地数据布局和算法大概会使用这部分内存举行操作。这部分内存一样平常不大。
  • Task Off-Heap:这部分内存重要用于存储由用户任务产生的、并由Flink以某种情势管理的内存。比如,假如你配置了本地状态后端(如RocksDB)使用堆外内存,那么这部分内存将存储状态数据。这部分内存的使用可以避免引起频繁的Java GC操作,提高性能。
  • Network:此部分内存重要用于网络通信中的缓冲区。Flink通过此缓冲区在TaskManager之间发送和接收数据。这部分内存通常是直接内存,不受GC的影响,可以有效地举行数据交换和缓冲以提高网络通信的性能。
另外,除了Flink使用的总内存(Total Flink Memory,TFM)外,总进程内存(Total Process Memory,TPM)还包括了JVM元空间(Metaspace)和其他开销内存(overhead)。在JVM内存模子中,将元空间从堆内存独立出来了,所以在上面的内存模子中也元空间也是单独一部分,外加一些JVM运行时的额外开销内存,例如线程栈、代码缓存、GC回收空间等等。
Flink内存模子分类配置参数

1.Flink使用的内存

(1)JVM堆上内存

  • 框架堆上内存Framework Heap Memory。Flink框架本身所使用的内存,即TaskManager本身所占用的堆上内存,不计入slot的资源中。
    配置参数:taskmanager.memory.framework.heap.size = 128MB,默认128MB。
  • Task堆上内存Task Heap Memory。Task实行用户代码时所使用的堆上内存。
    配置参数:taskmanager.memory.task.heap.size
    (2)JVM堆外内存
  • 框架堆外内存Framework Off-Heap Memory。Flink框架本身所使用的内存,即TaskManager本身所占用的堆外内存,不计入slot的资源。
    配置参数:taskmanager.memory.framework.off-heap.size = 128MB,默认128MB。
  • Task堆外内存Task Off-Heap Memory。Task实行用户代码时所使用的堆外内存。
    配置参数:taskmanager.memory.task.off-heap.size = 0,默以为0.
  • 网络缓冲内存Network Memory。网络数据交换所使用的堆外内存巨细,如网络数据交换缓冲区(Network Buffer,后面回介绍)。
    配置参数:taskmanager.memory.network.(min/max/fraction),默认min=64MB,max=1GB,fraction=0.1。
  • 堆外托管内存 Managed Memory。Flink管理的堆外内存。
    配置参数:taskmanager.memory.managed.[size|fraction],默认fraction = 0.4。
2.JVM本身使用的内存

JVM本身直接使用了操作系统的内存。

  • JVM元空间
    JVM元空间所使用的内存
    配置参数:taskmanager.memory.jvm-metaspace=96M,默认96MB。
  • JVM实行开销
    JVM在实行时自身所须要的内容,包括线程栈、IO、编译缓存等所使用的内存。
    配置参数:taskmanager.memory.jvm-overhead.[min|max|fraction]。默认min=192MB,max=1GB,fraction=0.1。
  • 总体内存
    (1)Flink使用内存
    综上而言,Flink使用的内存包括Flink使用的堆上、堆外内存。使用参数taskmanager.memory.flink.size参数举行控制。
    (2)进程使用内存
    整个进程所使用的内存,包括Flink使用的内存和JVM使用的内存。使用参数taskmanager.memory.process.size举行控制。
    JVM内存控制参数如下所示:
    1)JVM堆上内存,使用-Xmx和-Xms参数举行控制。
    2)JVM直接内存,使用参数-XX:MaxDirectMemorySize举行控制。对于托管内存,使用Unsafe.allocateMemory()申请,不受该参数控制。
    3)JVM Metaspace使用-XX:MaxMetaspaceSize举行控制。
内存计算

目前的实现中,在JVM启动之前就须要确定各个内存区块的巨细。一旦JVM启动了,在TaskManager进程内部就不再重新计算。Flink中有两个地方举行内存巨细计算:


  • 在Standalone部署模式下,内存的计算在启动脚本中实现。
  • 在容器情况下(Yarn、K8s、Mesos),计算在ResourceManager中举行。
    在启动脚本与容器情况下的内存巨细计算都调用了Flink的Java代码时间,保证了全部部署模式下的统一,计算好的参数使用-D参数提交给Java进程。
计算时,须要配置如下3个参数组合中的至少1个:
(1)Task的堆上内存和托管内存
假如手动配置了网络缓冲区内存巨细,则使用该参数。假如没有明确配置,则使用分配系数fraction ✖️总体Flink使用内存计算网络缓冲区内存巨细。
(2)总体Flink使用内存
假如配置了该选项,而没有配置(1),则从整体Flink内存中分别网络缓冲区内存和托管内存,剩余的内存作为Task堆上内存。
假如手动设置了网络缓冲内存,则使用其值,否则使用默认的分配系统fraction✖️总体Flink内存。
(3)总体进程使用内存
假如只配置了总体进程使用内存,则从整体进程中扣除JVM元空间和JVM实行开销内存,剩余的内存作为总体Flink使用内存。
内存数据布局

Flink的内存管理像操作系统管理内存一样,将内存分别为内存段、内存页等布局。
内存段

内存段在Flink内存叫做MemorySegment,是Flink的内存抽象的最小分配单元。 默认情况下,一个MemorySegment对应着一个32KB巨细的内存块。这块内存既可以是堆上内存(Java的byte数组),也可以是堆外内存(基于Netty的DirectByteBuffer)。
MemorySegment同时也提供了堆二进制数据举行读取和写入的方法。对于Java根本数据范例,如short、int、long等,MemorySegment内置了方法,可以直接返回大概写入数据,对于其他范例,读取二进制数组byte[]后举行反序列化,序列化为二进制数据byte[]后写入。
MemorySegment布局

为了更清楚地理解MemorySegment,下面一起看一下MemorySegment的关键属性。
1)BYTE_ARRAY_BASE_OFFSET:二进制字节数组的起始索引,相对于字节数组对象而言。
2)LITTLE_ENDIAN:判断是否为Little Endian模式的字节存储顺序,若不是就是Big Endian模式。
3)HeapMemory:假如MemorySegment使用堆上内存,则表示一个堆上的字节数组(byte[]),假如MemorySegment使用堆外内存,则为null。
4)address:字节数组对应的相对地址(若HeapMemory为null,即大概为堆外内存的绝对地址)。
5)addressLimit:标识地址竣事位置(address+size)。
6)size:内存段的字节数。
布局类图如下所示:


可以看到,MemorySegment类定义了一系列的方法来对字节和根本数据范例,如int、long、float举行读写的方法(例如get和put方法)。同时,还支持批量操作,例如复制和比力操作,它提供了copy和compare方法用来对大量数据举行操作。
Flink的MemorySegment重要用于Flink框架对内存的管理和数据的处理,重要用在它的网络缓冲、排序算法和内存状态后端等地方,以提供高效的内存操作。在设计上,MemorySegment重要实现了以下几点:


  • 高效的数据访问。 MemorySegment类包罗了一个连续的字节数组(heapMemory),用于存储现实的二进制数据。全部的get和put操作都是在这个字节数组上实行的。使用连续的字节数组的好处正是我们上文提到的,可以充分使用程序的局部性原理,因此,Flink使用MemorySegment作为其最小的内存分配单元,保证了读写数据时,相邻的数据能够一起被加载到CPU缓存中,提升处理性能。
  • 高效的内存管理。 Flink通过MemorySegment对内存举行管理,保证了Flink程序运行时的内存服从。例如,对于Flink的网络缓冲、排序算法和内存状态后端等地方,都会使用MemorySegment举行内存的分配和回收。这有助于Flink高效地使用内存,而且避免了一些Java内存管理中常见的问题,如垃圾收集(Garbage Collection)过频繁等。
  • **支持堆外内存(off-heap memory)操作。**这意味着,除了在JVM堆内存上操作,MemorySegment还能直接在系统的物理内存上举行操作。使用离堆内存可以避免频繁的垃圾回收,提高数据处理的性能。
另外,须要注意的是,MemorySegment抽象类中的heapMemory仅适用于“堆”内存段,即哪些将数据存储在Java堆上的内存段。对于“非堆”内存段,即哪些将数据存储在Java堆之外的内存段,Flink使用java.nio.DirectByteBuffer的字节缓冲区(定义在HybridMemorySegment类中)来存储和操作数据。
最后再扼要谈一下MemorySegment,假如觉得理解起来比力抽象的话,可以跟其他的一些数据布局类如ArrayList、LinkedList一起来对比理解,这些类都是定义的用来表示数据如何在内存中存储和管理的。ArrayList是分别一块连续的内存地址,LinkedList是用链表的布局来存储,而MemorySegment就是分别一块指定巨细的连续内存地址来存储字节数据。上层的模块可以直接对MemorySegment举行操作,就相当于对平常对ArrayList、LinkedList这些布局的操作(比如插入、排序、比力等)。因此,MemorySegment就是Flink定义的一种数据布局,用来方便地存储、管理和操作内存数据。
字节顺序Big Endian和Little Endian

字节顺序是指字节范例的数据在内存中的存放顺序。不同的CPU架构体系使用不同的存储顺序。PowerPC系统接纳Big Endian方式存储数据,低地址存放最高有效字节(MSB),而x86系列则接纳Little Endian方式存储数据,低地址存放最低有效字节(LSB),如下图所示:
MemorySegment实现

Flink的MemorySegment有堆上和堆外两种实现,其类体系布局如图所示:

HeapMemorySegment用来分配堆上内存,HybridMemorySegment用来分配堆外内存和堆上内存。现实上在2017年之后的Flink中,并没有使用HeapMemorySegment,而是使用HybridMemorySegment这个类来同时实现堆上和堆外内存的分配。
之所以在后续的版本中只使用HybridMemorySegment,涉及了JIT编译优化的问题。假如同时使用了两个类,那么在运行的时候,每一次调用都须要去查询函数表,确定调用哪个子类中的方法,无法提前优化。但是假如只使用一个类,那么JIT编译时,自动识别方法的调用都可以被去假造化(de-virtualized)和内联(inlined),可以极大地提高性能。调用越频繁,优化效果就越好。
内存页

MemorySegment是Flink内存分配的最小单元,对于跨MemorySegment生存的数据,假如须要上层的使用者,须要考虑全部的细节,非常繁琐。所以Flink又抽象了一层,叫做内存页。内存页是MemorySegment之上的数据访问视图,数据读取抽象为DataInputView,数据写入抽象为DataOutputView。 有了这一层,上层使用者无需关心MemorySegment的细节,该层会自动处理跨MemorySegment的读取和写入。
DataInputView

DataInputView是从MemorySegment数据读取的抽象视图,该视图可用于顺序读取内存的内容。继承自java.io.DataInput,提供了从二进制流中读取不同数据范例的方法。如下图所示:

InputView中持有多个MemorySegment的饮用(MemorySegment[]),这一组MemorySegment被视为一个内存页(Page),可以顺序读取MemorySegment中的数据。DataInputView重要提供了一系列接口用于从数据输入流中读取数据,而MemorySegment则重要用于在连续的内存块上举行数据的低层次操作。
在Flink的网络缓冲,排序,哈希表等操作中,MemorySegment用作持有真实数据的内存块。而DataInputView则提供了读取这些数据的接口,以方便地从MemorySegment读取所需的数据。
在现实的数据编解码过程中,常常须要将DataInputView与MemorySegment一起使用。例如,一个典范的使用场景是在网络数据传输中,Flink会起首将数据生存在MemorySegment中,然后通过实现DataInputView的方式,来举行数据的读取和解码。
根本上全部的InputView实现类都继承了AbstractPageInputView抽象类,也就是所全部的InputView实现类都支持Page。
DataOutputView

DataOutputView是数据写入MemorySegment的抽象视图,继承自java.io.DataOutput,提供了将不同范例的数据写入二进制流的一系列方法。同样,DataOutputView中持有一个大概多个MemorySegment的引用(MemorySegment[]),这一组MemorySegment被视为一个内存页(Page),可以顺序地向MemorySegment中写入数据。
DataOutputView的接口继承关系如图所示:

在现实的数据编码和写入过程中,Flink通常会使用一个DataOutputView的实现将数据写入一个或多个MemorySegment。例如,在网络数据发送时,Flink会通过实现DataOutputView的方式,将数据写入MemorySegment,然后将这些MemorySegment添加到网络缓冲区以预备发送。
根本上全部的OutputView实现类都继承了AbstractPageOutputView抽象类,也就是说全部的OutputView实现类都支持跨MemorySegment写入。
内存页的使用

对内存的读取写入操作是非常底层的行为,对于上层应用(DataStream作业)而言,涉及向MemorySegment写入,读取二进制的地方都使用到了DataOutputView和DataInputView,而不是直接使用MemorySegment。
例如,在flink-table-runtime-blink中,BinaryRowSerializer中使用AbstractPagedInputView从MemorySegment中读取二进制数据并转换成BinaryRow,使用AbstractPagedOutputView将BinaryRow写入MemorySegment中。
Buffer

Task算子处理数据完毕,将结果交给卑鄙的时候,使用的抽象大概说内存对象是Buffer。Buffer接口是网络层面上传输数据和事故的统一抽象,其实现类是NetworkBuffer。 Flink在各个TaskManager之间通报数据时,使用的是这一层的抽象。1个NetworkBuffer中包装了1个MemorySegment,现实的数据就存储在这个MemorySegment中,并引入了一些额外的元数据,例如数据巨细(currentSize属性)以及Buffer中包罗的数据范例(dataType属性)等。
此外,Buffer还提供了内存的引用计数和递增/递减的方法,用于在资源回收时管理内存。
简朴来说,Buffer是基于MemorySegment的,它在MemorySegment上增加了一些用于网络传输和内存管理的额外功能。
Buffer接口的类体系如图所示:

Buffer的底层是MemorySegment,Buffer申请和开释由Flink自行管理,Flink引入了“引用数”的概念。当有新的Buffer斲丧者时,引用数加1,当斲丧者斲丧完Buffer时,引用数减1,终极当引用数变为0时,就可以将Buffer开释重用了。
具体来说,在Apache Flink中,Buffer对象具有一个“引用数(Reference Count)”的属性,它是用来跟踪Buffer实例在系统中被多少组件引用的指标。每当一个组件获取对Buffer的引用时,引用数就会增加。当组件完成对Buffer的使用而且不再须要它时,就会减少引用数。
这种设计的目的是为了更好地管理和复用内存资源。当Buffer的引用数降为0,就表示没有任何组件再使用该Buffer,它的内存可以归还给MemorySegment池,以便其他组件复用。引用数在内存管理中是一种常见的机制,能够避免不须要的对象复制和频繁的内存分配和开释,在Flink的Buffer管理中起到了重要的作用。例如,在数据移交过程中,大概有多个线程或模块同时处理同一个Buffer,此时通过引用数可以准确判断什么时候可以安全地开释该Buffer。
NetworkBuffer同时继承了AbstractReferenceCountedByteBuf。
AbstractReferenceCountedByteBuf是Netty中的抽象类,通过继承该类,Flink中Buffer具备了引用计数的本领,而且实现了对MemorySegment的读写。感爱好的读者可以去了解一下Netty。
Buffer资源池

Buffer资源池在Flink中叫做BufferPool。BufferPool用来管理Buffer,包罗Buffer的申请、开释、销毁、可用Buffer关照等,其实现类是LocalBufferPool。
BufferPool的类体系如图所示:

为了方便对BufferPool的管理,Flink设计了BufferPoolFactory,提供BufferPod的创建和销毁,其唯一的实现类是NetworkBufferPool。
每个TaskManager只有一个NetworkBufferPool,同一个TaskManager上的Task共享NetworkBufferPool,在TaskManager启动的时候,就会创建NetworkBufferPool,为其分配内存。
NetworkBufferPool持有该TaskManager在举行数据通报时所能够使用的全部内存,所以除了作为BufferPool的工厂外,还作为Task所需内存段(MemorySegment)的提供者,每个Task的LocalBufferPool所须要的内存都是从NetworkBufferPool申请而来的。
内存管理器

**MemoryManager是Flink中管理托管内存的组件,其管理的托管内存只使用堆外内存。**在批处理中用在排序、Hash表和中央结果的缓冲中,在流计算中作为RocksDBStateBackend的内存。
在Flink 1.10之前的版本中,MemoryManager负责TaskManager的全部内存。1.10版本中,MemoryManager的管理范围缩小为Slot级别,即为Task管理内容,TaskManager为每个Slot分配相同的内容,Task不能使用超过其Slot分配的资源。
MemoryManager重要通过内部接口MemoryPool来管理全部的MemorySegment。托管内存的管理相比于Network Buffers的管理更为简朴,因为不须要Buffer的那一层封装。
内存申请

批处理计算任务中,MemorySegment负责为算子申请堆外内存。终极现实申请的是堆外的ByteBuffer,代码如下所示,
  1. # MemorySegmentFactory类
  2. /**
  3.      * Allocates an off-heap unsafe memory and creates a new memory segment to represent that
  4.      * memory.
  5.      *
  6.      * <p>Creation of this segment schedules its memory freeing operation when its java wrapping
  7.      * object is about to be garbage collected, similar to {@link
  8.      * java.nio.DirectByteBuffer#DirectByteBuffer(int)}. The difference is that this memory
  9.      * allocation is out of option -XX:MaxDirectMemorySize limitation.
  10.      *
  11.      * @param size The size of the off-heap unsafe memory segment to allocate.
  12.      * @param owner The owner to associate with the off-heap unsafe memory segment.
  13.      * @param gcCleanupAction A custom action to run upon calling GC cleaner.
  14.      * @return A new memory segment, backed by off-heap unsafe memory.
  15.      */
  16.     public static MemorySegment allocateOffHeapUnsafeMemory(
  17.             int size, Object owner, Runnable gcCleanupAction) {
  18.         long address = MemoryUtils.allocateUnsafe(size);
  19.         ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
  20.         Runnable cleaner =
  21.                 MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address, gcCleanupAction);
  22.                 // 在申请内存的时候,同时为该内存片段准备好内存清理
  23.         return new HybridMemorySegment(offHeapBuffer, owner, false, cleaner);
  24.     }
复制代码
使用Unsafe申请堆外内存,包装为ByteBuffer后再包装为MemorySegment。
流计算任务中,MemoryManager更多的作用是管理,控制RocksDB的内存使用量,通过RocksDB的Block Cache和WriterBufferManager参数来限定,参数的具体值从TaskManager的内存配置参数中计算而来。RocksDB自己来负责运行过程中的内存申请和内存开释,如下述代码所示:
  1. /**
  2.      * Acquires a shared resource, identified by a type string. If the resource already exists, this
  3.      * returns a descriptor to the resource. If the resource does not yet exist, the method
  4.      * initializes a new resource using the initializer function and given size.
  5.      *
  6.      * <p>The resource opaque, meaning the memory manager does not understand its structure.
  7.      *
  8.      * <p>The OpaqueMemoryResource object returned from this method must be closed once not used any
  9.      * further. Once all acquisitions have closed the object, the resource itself is closed.
  10.      */
  11.     public <T extends AutoCloseable> OpaqueMemoryResource<T> getExternalSharedMemoryResource(
  12.             String type, LongFunctionWithException<T, Exception> initializer, long numBytes)
  13.             throws Exception {
  14.         // This object identifies the lease in this request. It is used only to identify the release
  15.         // operation.
  16.         // Using the object to represent the lease is a bit nicer safer than just using a reference
  17.         // counter.
  18.         final Object leaseHolder = new Object();
  19.         final SharedResources.ResourceAndSize<T> resource =
  20.                 sharedResources.getOrAllocateSharedResource(
  21.                         type, leaseHolder, initializer, numBytes);
  22.                 // 创建资源释放函数
  23.         final ThrowingRunnable<Exception> disposer =
  24.                 () -> sharedResources.release(type, leaseHolder);
  25.         return new OpaqueMemoryResource<>(resource.resourceHandle(), resource.size(), disposer);
  26.     }
复制代码
内存开释

Flink自行管理内存,也就意味着内存的申请和开释都由Flink来负责。触发Java堆外内存开释的行为一样平常有如下两种:


  • 内存使用完毕
  • Task停止(正常或异常)实行。
    在Flink中实现了一个JavaGcCleanerWrapper来举行堆外内存的开释,提供了两个Java Cleaner。
LegacyCleanerProvider

该CleanerProvider提供1.8及以下版本JDK的Flink管理的内存的垃圾回收,使用sum.misc.Cleaner来开释内存。
Java9CleanerProvider

该CleanerProvider提供1.9及以上版本的JDK的Flink管理的内存的垃圾回收,使用java.lang.ref.Cleaner来开释内存。
JavaGcCleanerWrapper会为每个Owner创建一个包罗Cleaner的Runnable对象,在每个MemorySegment开释内存的时候,调用此Cleaner举行内存的开释。
当MemoryManager关闭的时候会对全部申请的MemorySegment举行开释,交还给操作系统。
网络缓冲器

网络缓冲器(Network Buffer)是网络交换数据的封装,其对应于MemorySegment内存段,当结果分区(ResultPartition)开始写出数据的时候,须要向LocalBufferPool申请Buffer资源,使用BufferBuilder将数据写入MemorySegment。当MemorySegment都分配完后,则会持续等待Buffer的开释。
BufferBuilder在上游Task中,用来向申请到的MemorySegment写入数据。与BufferBuilder相对的是BufferConsumer。BufferConsumer位于卑鄙Task中,负责从MemorySegment中读取数据。1个BufferBuilder对应1个BufferConsumer。
内存申请

LocalBufferPool的巨细是动态的,在最小内存段数量与最大内存段数据之间浮动。使用NetworkBufferPool创建LocalBufferPool时,假如该TaskManager的内存无法满足全部Task所需的最小MemorySegment的数量总和,则会发生错误。
Buffer申请

结果分区(ResultPartition)申请Buffer举行数据写入时,如下代码所示:
LocalBufferPool起首从自身持有的MemorySegment中分配可用的,假如没有可用的,则从TaskManager的NetworkBuffer中申请,假如没有,则阻塞等待可用的MemorySegment,如下代码所示:
MemorySegment申请

申请Buffer本质上来说就是申请MemorySegment,假如在LocalBufferPool中,则申请新的堆外内存MemorySegment,如下代码若是:
内存回收

  1. Buffer使用了引用计数机制来判断什么时候可以释放Buffer到可用资源池。每创建一个BufferConsumer,就会对Buffer的引用计数+1,每个Buffer被消费完了,就会对Buffer的引用计数-1,当Buffer引用计数为0的时候就可以回收了。
复制代码
Buffer回收

前面介绍过Buffer的重要实现类是NetworkBuffer,同时继承了AbstractReferenceCountedByteBuf.。当Buffer被斲丧一次后,就会对Buffer的引用计数-1,如下代码所示:
Buffer回收之后,并不会开释MemorySegment,此时MemorySegment仍然在LocalBufferPool的资源池中,除非TaskManager的级别内存不足,才会开释回TaskManager持有的全局资源池。
开释MemorySegment的时候,同样要根据MemorySegment的范例来举行,而且要在不低于生存内存的情况下,将内存开释回内存段中,变为可用内存,后续申请MemorySegment的时候,可以重复使用该内存片断。
MemorySegment开释

当NetworkBufferPool关闭的时候举行内存的开释,交还给操作系统。
总结

大数据场景下,使用Java的内存管理会带来一系列的问题,所以Flink从一开始就选择自主内存管理。为了实现内存管理,Flink对内存举行了一系列的抽象,内存段MemorySegment是最小的内存分配单元,对于跨段的内存访问,Flink抽象了DataInputView和DataOutputView,可以看作是内存页。
Flink在1.10版本重构了其TaskManager的内存管理模子,重要分为堆上内存和堆外内存,并简化了内存参数。在计算层面上,Flink的内存管理器提供了对内存的申请和开释,在数据传输层面上,Flink抽象了网络内存缓存Buffer(1个Buffer对应一个MemorySegment)的申请和开释。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

立聪堂德州十三局店

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表