深入分析 Flink SQL 工作机制

打印 上一主题 下一主题

主题 680|帖子 680|积分 2040

摘要:本文整理自 Flink Forward 2020 全球在线会议中文英华版,由 Apache Flink PMC 伍翀(云邪)分享,社区志愿者陈婧敏(清樾)整理。旨在帮助各人更好地明白 Flink SQL 引擎的工作原理。文章主要分为以下四部分:
  

  • Flink SQL Architecture
  • How Flink SQL Works?
  • Flink SQL Optimizations
  • Summary and Futures
  Tips:点击下方链接可查看作者分享的原版视频~
https://ververica.cn/developers/flink-forward-virtual-conference/
  Apache Flink 社区在最近的两个版本(1.9 & 1.10 )中为面向未来的同一流批处理惩罚在架构层面做了许多优化,其中一个庞大改造是引入了 Blink Planner,开始支持 SQL & Table API 使用差异的 SQL Planner 进行编译(Planner 的插件化)。
本文起首会先容推动这些优化背后的思考,展示同一的架构怎样更好地处理惩罚流式和批式查询,其次将深入剖析 Flink SQL 的编译及优化过程,包括:

  • Flink SQL 使用 Apache Calcite 将 SQL 翻译为关系代数表达式,使用表达式折叠(Expression Reduce),下推优化(Predicate / Projection Pushdown )等优化技能天生物理执行筹划(Physical Plan),使用 Codegen 技能天生高效执行代码。
  • Flink SQL 使用高效的二进制数据存储结构 BinaryRow 加速计算性能;使用 Mini-batch 攒批进步吞吐,降低两层聚适时由 Retraction 引起的数据抖动;聚合场景下数据倾斜处理惩罚和 Top-N 排序的优化原理。
## Flink SQL 架构 & Blink Planner(1.9+ )
1.1 Old Planner 的限制

要想了解 Flink SQL 在1.9 版本引入新架构的动机,我们起首看下 1.9 版本之前的架构设计。


从图中可以看出,固然面向用户的 Table API & SQL 是同一的,但是流式和批式使命在翻译层分别对应了 DataStreamAPI 和 DataSetAPI,在 Runtime 层面也要根据差异的 API 获取执行筹划,两层的设计使得整个架构能够复用的模块有限,不易扩展。
1.2 同一的 Blink Planner

Flink 在设计之初就遵循“批是流的特例”的理念,在架构上做到流批同一是大势所趋。在社区和阿里巴巴的共同努力下,1.9 版本引入了新的 Blink Planner,将批 SQL 处理惩罚作为流 SQL 处理惩罚的特例,只管对通用的处理惩罚和优化逻辑进行抽象和复用,通过 Flink 内部的 Stream Transformation API 实现流 & 批的同一处理惩罚,替代原 Flink Planner 将流 & 批区分处理惩罚的方式。
别的,新架构通过灵活的插件化方式兼容老版本 Planner,用户可自行选择。不外在 1.11 版本 Blink Planner 会代替 Old Planner 成为默认的 Planner 来支持流 & 批进一步融合同一( Old Planner 将在之后逐步退出历史舞台)。


Flink SQL 工作流

Flink SQL 引擎的工作流总结如图所示。


从图中可以看出,一段查询 SQL / 使用TableAPI 编写的程序(以下简称 TableAPI 代码)从输入到编译为可执行的 JobGraph 主要经历如下几个阶段

  • 将 SQL文本 / TableAPI 代码转化为逻辑执行筹划(Logical Plan)
  • Logical Plan 通过优化器优化为物理执行筹划(Physical Plan)
  • 通过代码天生技能天生 Transformations 后进一步编译为可执行的 JobGraph 提交运行
本节将重点对 Flink SQL 优化器的常用优化方法和 CodeGen 天生 Transformations 进行先容。
2.1 Logical Planning

Flink SQL 引擎使用 Apache Calcite SQL Parser 将 SQL 文本解析为词法树,SQL Validator 获取 Catalog 中元数据的信息进行语法分析和验证,转化为关系代数表达式(RelNode),再由 Optimizer 将关系代数表达式转换为初始状态的逻辑执行筹划。
备注:TableAPI 代码使用 TableAPI Validator 对接 Catalog 后天生逻辑执行筹划。
E.g.1 思量如下表达 JOIN 操作的一段 SQL。
  1. SELECT
  2.   t1.id, 1 + 2 + t1.value AS v
  3. FROM t1, t2
  4. WHERE
  5.   t1.id = t2.id AND
  6.   t2.id < 1000
复制代码
经过上述操作后得到了一个树状结构的逻辑执行筹划,根节点对应最上层的 Select 语句,叶子节点对应输入表 t1 和 t2 的 TableScan 操作,Join 和 Where 条件过滤 分别对应了 Join 和 Filter 节点。
  1. LogicalProject(id=[$0], v=[+(+(1, 2), $1)])
  2. +- LogicalFilter(condition=[AND(=($0, $3), <($3, 1000))])
  3.    +- LogicalJoin(condition=[true], joinType=[inner])
  4.       :- LogicalTableScan(table=[[default_catalog, default, t1]])
  5.       +- LogicalTableScan(table=[[default_catalog, default, t2]])
复制代码
可视化后如图所示,这是优化器开始工作的初始状态。


下面开始先容 Flink SQL 优化器常见的几种优化方式。
■ 2.1.1 Expression Reduce

表达式(Expression) 是 SQL 中最常见的语法。好比 t1.id 是一个表达式, 1 + 2 + t1.value 也是一个表达式。优化器在优化过程中会递归遍历树上节点,尽大概预计算出每个表达式的值,这个过程就称为表达式折叠。这种转换在逻辑上等价,通过优化后,真正执行时不再必要为每一条记录都计算一遍 1 + 2。


■ 2.1.2 PushDown Optimization

下推优化是指在保持关系代数语义不变的前提下将 SQL 语句中的变动操作尽大概下推到靠近数据源的位置以获得更优的性能,常见的下推优化有谓词下推(Predicate Pushdown),投影下推(Projection Pushdown,有时也译作列裁剪)等。


  • Predicate Pushdown
回顾 E.g.1,我们发现 WHERE 条件表达式中 t2.id < 1000 这个过滤条件描述的是对表 t2 的束缚,跟表 t1 无关,完全可以下推到 JOIN 操作之前完成。假设表 t2 中有一百万行数据,但是满意 id < 1000 的数据只有 1,000 条,则通过谓词下推优化后到达 JOIN 节点的数据量降低了1,000 倍,极大地节省了 I / O 开销,提升了 JOIN 性能。
谓词下推(Predicate Pushdown)是优化 SQL 查询的一项根本技能,谓词一词泉源于数学,指能推导出一个布尔返回值(TRUE / FALSE)的函数或表达式,通过判断布尔值可以进行数据过滤。谓词下推是指保持关系代数语义不变的前提下将 Filter 尽大概移至靠近数据源的位置(好比读取数据的 SCAN 阶段)来降低查询和传递的数据量(记录数)。




  • Projection Pushdown
列裁剪是 Projection Pushdown 更直观的描述方式,指在优化过程中去掉没有使用的列来降低 I / O 开销,提升性能。但与谓词下推只移动节点位置差异,投影下推大概会增加节点个数。好比最后计算出的投影组合应该放在 TableScan 操作之上,而 TableScan 节点之上没有 Projection 节点,优化器就会显式地新增 Projection 节点来完成优化。另外如果输入表是基于列式存储的(如 Parquet 或 ORC 等),优化还会继承下推到 Scan 操作中进行。
回顾 E.g.1,我们发现整个查询中只用到了表 t1 的 id 和 value 字段,表 t2 的 id 字段,在 TableScan 节点之上分别增加 Projection 节点去掉多余字段,极大地节省了 I / O 开销。


简要总结一下,谓词下推和投影下推分别通过制止处理惩罚不必要的记录数和字段数来降低 I / O 开销提升性能。
2.2 Physical Planning on Batch

通过上述一系列操作后,我们得到了优化后的逻辑执行筹划。逻辑执行筹划描述了执行步骤和每一步必要完成的操作,但没有描述操作的具体实现方式。而物理执行筹划会思量物理实现的特性,天生每一个操作的具体实现方式。好比 Join 是使用 SortMergeJoin、HashJoin 或 BroadcastHashJoin 等。优化器在天生逻辑执行筹划时会计算整棵树上每一个节点的 Cost,对于有多种实现方式的节点(好比 Join 节点),优化器会展开全部大概的 Join 方式分别计算。最终整条路径上 Cost 最小的实现方式就被选中成为 Final Physical Plan。
回顾 E.g.1,当它以批模式执行,同时我们可以拿到输入表的 Statistics 信息。在经过前述优化后,表 t2 到达 Join 节点时只有 1,000 条数据,使用 BroadcastJoin 的开销相对最低,则最终的 Physical Plan 如下图所示。


2.3 Translation & Code Generation

代码天生(Code Generation) 在计算机领域是一种广泛使用的技能。在 Physical Plan 到天生 Transformation Tree 过程中就使用了 Code Generation。
回顾 E.g.1,以 表 t2 之上的 Calc 节点 t2.id < 1000 表达式为例,通过 Code Generation 后天生了描述 Transformation Operator 的一段 Java 代码,将接收到的 Row 中 id < 1000 的 Row 发送到下一个 Operator。


Flink SQL 引擎会将 Physical Plan 通过 Code Generation 翻译为 Transformations,再进一步编译为可执行的 JobGraph。
2.4 Physical Planning on Stream

以上先容了 Flink SQL 引擎的团体工作流,上述例子是假定以批模式编译的,下面我们来先容一下以流模式编译时,在天生 Physical Plan 过程中的一个重要机制:Retraction Mechanism (aka. Changelog Mechanism)。


■ 2.4.1 Retraction Mechanism

Retraction 是流式数据处理惩罚中撤回过早下发(Early Firing)数据的一种机制,雷同于传统数据库的 Update 操作。级联的聚合等复杂 SQL 中如果没有 Retraction 机制,就会导致最终的计算结果与批处理惩罚差异,这也是现在业界许多流计算引擎的缺陷。
E.g.2 思量如下统计词频分布的 SQL。
  1. SELECT cnt, COUNT(cnt) as freq
  2. FROM (
  3.   SELECT word, COUNT(*) as cnt
  4.   FROM words
  5.   GROUP BY word)
  6. GROUP BY cnt
复制代码
假设输入数据是:


则经过上面的计算后,预期的输出结果应该是:


但与批处理惩罚差异,流处理惩罚的数据是一条条到达的,理论上每一条数据都会触发一次计算,所以在处理惩罚了第一个 Hello 和第一个 World 之后,词频为 1 的单词数已经酿成了 2,此时再处理惩罚第二个 Hello 时,如果不能修正之前的结果,Hello 就会在词频即是 1 和词频即是 2 这两个窗口下被同时统计,显然这个结果是错误的,这就是没有 Retraction 机制带来的题目。


Flink SQL 在流计算领域中的一个庞大贡献就是初次提出了这个机制的具体实现方案。Retraction 机制又名 Changelog 机制,因为某种水平上 Flink 将输入的流数据看作是数据库的 Changelog,每条输入数据都可以看作是对数据库的一次变动操作,好比 Insert,Delete 或者 Update。以 MySQL 数据库为例,其Binlog 信息以二进制情势存储,其中 Update_rows_log_event 会对应 2 条标志 Before Image (BI) 和 After Image (AI),分别表示某一行在更新前后的信息。
在 Flink SQL 优化器天生流作业的 Physical Plan 时会判断当前节点是否是更新操作,如果是则会同时发出 2 条消息 update_before 和 update_after 到卑鄙节点,update_before 表示之前“错误”下发的数据,必要被撤回,update_after 表示当前下发的“精确”数据。卑鄙收到后,会在结果上先减去 update_before,再加上 update_after。
回顾 E.g.2,下面的动图演示了加入 Retraction 机制后精确结果的计算过程。


update_before 是一条非常关键的信息,相当于标志出了导致当前结果不精确的那个“元凶”。不外额外操作会带来额外的开销,有些情况下不必要发送 update_before 也可以获得精确的结果,好比卑鄙节点接的是 UpsertSink(MySQL 或者 HBase的情况下,数据库可以按主键用 update_after 消息覆盖结果)。是否发送 update_before 由优化器决定,用户不必要关心。
■ 2.4.2 Update_before Decision

前面先容了 Retraction 机制和 update_before,那优化器是怎样决定是否必要发送update_before 呢?本节将先容这一部分的工作。
Step1:确定每个节点对应的 Changelog 变动类型
数据库中最常见的三种操作类型分别是 Insert (记为 [I]),Delete(记为 [D]),Update(记为 [U])。优化器起首会自底向上查抄每个节点,判断它属于哪(几)种类型,分别打上对应标志。
回顾 E.g.2,第一个 Source 节点由于只产生新数据,所以属于 Insert,记为 [I];第二个节点计算内层的聚合,所以会发出更新的消息,记为 [I,U];第三个节点裁掉 word 字段,属于简单计算,传递了上游的变动类型,记为 [I,U];第四个节点是外层的聚合计算,由于它收到了来自上游的 Update 消息,所以额外必要 Delete 操作来保证更新成功,记为 [I,U,D]。


Step2:确定每个节点发送的消息类型
在先容 Step2 之前,我们先先容下 Flink 中 Update 消息类型的表示情势。在 Flink 中 Update 由两条 update_before(简称 UB)和 update_after (简称 UA)来表示,其中 UB 消息在某些情况下可以不发送,从而进步性能。
在 Step1 中优化器自底向上推导出了每个节点对应的 Changelog 变动操作,这一步里会先自顶向下推断当前节点必要父节点提供的消息类型,直到遇到第一个不必要父节点提供任何消息类型的节点,再往上回推每个节点最终的实现方式和必要的消息类型。
回顾 E.g.2,由于最上层节点是 UpsertSink 节点,只必要它的父节点提供 [UA] 即可。到了外层聚合的 Aggregate 节点,由于 Aggregate 节点的输入有 Update 操作,所以必要父节点必要提供 [UB,UA],这样才能精确更新本身的计算状态。
再往下到 Calc 节点,它必要传递 [UB,UA] 的需求给它的父节点,也就是内层的 Aggregate 节点。而到了内层 Aggregation 节点,它的父节点是 Source 节点,不会产生 Update 操作,所以它不必要 Source 节点额外发送任何 [UB / UA ]。当优化器遍历到 Source 节点,便开始进行回溯,如果当前节点能满意子节点的 requirement,则将对应的标签更新到节点上,否则便无法天生 plan。起首内层的 Aggregate 能产生 UB,所以能满意子节点的 requirement,所以优化器会给内层的 Aggregate 节点打上 [UB,UA] 的标签,然后向上传递到 Calc 节点,同样打上 [UB,UA] ,再到外层的 Aggregate 节点,由于它的卑鄙只必要担当更新后的消息,所以打上 [UA] 标签,表示它只必要向卑鄙发送 update_after 即可。
这些标签最终会影响算子的物理实现,好比外层的 Aggregate 节点,由于它会接收到来自上游的 [UB],所以物理实现会使用带 Retract 的 Count,同时它只会向 Sink 发送 update_after。而内层的 Aggregate 节点,由于上游发送过来的数据没有 [UB],所以可以采用不带 Retract 的 Count 实现,同时由于带有 [UB] 标签,所以必要往卑鄙发送 update_before。


Flink SQL Internal Optimization

前面先容了 Flink SQL 引擎的工作原理,接下来会简要概括一下 Flink SQL 内部的一些优化,更多资料可以在 Flink Forward Asia 2019 查看。
3.1 BinaryRow

在 Flink 1.9+ 前, Flink Runtime 层各算子间传递的数据结构是 Row,其内部实现是 Object[]。这种数据结构的题目在于不但必要额外开销存 Object Metadata,计算过程中还涉及到大量序列化 / 反序列 (特殊是只必要处理惩罚某几个字段时必要反序列化整个 Row),primitive 类型的拆 / 装箱等,都会带来大量额外的性能开销。


Flink 1.9 开始引入了 Blink Planner,使用二进制数据结构的 BinaryRow 来表示 Record。BinaryRow 作用于默认大小为 32K 的 Memory Segment,直接映射到内存。BinaryRow 内部分为 Header,定长区和变长区。Header 用于存储 Retraction 消息的标识,定长区使用 8 个 bytes 来记录字段的 Nullable 信息及全部 primitive 和可以在 8 个 bytes 内表示的类型。其它类型会按照基于起始位置的 offset 存放在变长区。
BinaryRow 作为 Blink Planner 的基础数据结构,带来的好处是显而易见的:起首存储上更为紧凑,去掉了额外开销;其次在序列化和反序列化上带来的显著性能提升,可根据 offset 只反序列化必要的字段,在开启 Object Reuse 后,序列化可以直接通过内存拷贝完成。


3.2 Mini-batch Processing

Flink 是纯流式处理惩罚框架,在理论上每一条新到的数据都会触发一次计算。然而在实现层面,这样做会导致聚合场景下每处理惩罚一条数据都必要读写 State 及序列化 / 反序列化。如果能够在内存中 buffer 一定量的数据,预先做一次聚合后再更新 State,则不但会降低操作 State 的开销,还会有效淘汰发送到卑鄙的数据量,提升 throughput,降低两层聚适时由 Retraction 引起的数据抖动, 这就是 Mini-batch 攒批优化的焦点思想。


3.3 Skew Processing

对于数据倾斜的优化,主要分为是否带 DISTINCT 去重语义的两种方式。对于普通聚合的数据倾斜,Flink 引入了 Local-Global 两阶段优化,雷同于 MapReduce 增加 Local Combiner 的处理惩罚模式。而对于带有去重的聚合,Flink 则会将用户的 SQL 按原有聚合的 key 组合再加上 DISTINCT key 做 Hash 取模后改写为两层聚合来进行打散。


3.4 Top-N Rewrite

全局排序在流式的场景是很难实现的,但如果只必要计算到现在的 Top-N 极值,题目就变得可解。不外传统数据库求排序的 SQL 语法是通过 ORDER BY 加 LIMIT 限制条数,背后实现的机制也是通过扫描全表排序后再返回 LIMIT 条数的记录。另外如果按照某些字段开窗排序,ORDER BY 也无法满意要求。Flink SQL 借鉴了批场景下开窗求 Top-N 的语法,使用 ROW_NUMBER 语法来做流场景下的 Top-N 排序。
E.g.3 下面这段 SQL 计算了每个类目下销量 Top3 的店铺
  1. SELECT*
  2. FROM(
  3.   SELECT *, -- you can get like shopId or other information from this
  4.     ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS rowNum
  5.   FROM shop_sales )
  6. WHERE rowNum <= 3
复制代码
在天生 Plan 方面,ROW_NUMBER 语义对应 OverAggregate 窗口节点和一个过滤行数的 Calc 节点,而这个窗口节点在实现层面必要为每一个到达的数据重新将 State 中的历史数据拿出来排序,这显然不是最优解。
我们知道流式场景求解极大 / 小值的最优操作是通过维护一个 size 为 N 的 minHeap / maxHeap。由实现反推出我们必要在优化器上新增一条规则,在遇到 ROW_NUMBER 天生的逻辑节点后,将其优化为一个特殊的 Rank 节点,对应上述的最优实现方式(当然这只是特殊 Rank 对应的其中一种实现)。这便是 Top-N Rewrite 的焦点思想。


Summary & Futures

本文内容回顾


  • 简要先容 Flink 1.9 + 在 SQL & TableAPI 上引入新架构,同一技能栈,朝着流 & 批一体的方向迈进了一大步。
  • 深入先容 Flink SQL 引擎的内部运行机制,以及在对用户透明的同时,Flink SQL 在优化方面做的许多工作。


未来工作筹划


  • 在 Flink 1.11+ 后的版本,Blink Planner 将作为默认的 Planner 提供生产级别的支持。
  • FLIP-95:重构 TableSource & TableSink 的接口设计,面向流批一体化,在 Source 端支持 changelog 消息流,从而支持 FLIP-105 的 CDC 数据源。
  • FLIP-105:Flink TableAPI & SQL 对 CDC 的支持。
  • FLIP-115:扩展现在只支持 CSV 的 FileSystem Connector,使其成为流批同一的 Generalized FileSystem Connector。
  • FLIP-123:对 Hive DDL 和 DML 的兼容,支持用户在 Flink 中运行 Hive DDL。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

麻花痒

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表