ToB企服应用市场:ToB评测及商务社交产业平台

标题: 详解flink sql, calcite logical转flink logical [打印本页]

作者: 天空闲话    时间: 2024-7-23 10:42
标题: 详解flink sql, calcite logical转flink logical
配景

本文重要先容calcite 怎样转成自定义的relnode

示例

FlinkLogicalCalcConverter

查抄是不是calcite 的LogicalCalc 算子,是的话,重写带RelTrait 为FlinkConventions.LOGICA
的rel,类型FlinkLogicalCalc
  1. private class FlinkLogicalCalcConverter(config: Config) extends ConverterRule(config) {
  2.   override def convert(rel: RelNode): RelNode = {
  3.     val calc = rel.asInstanceOf[LogicalCalc]
  4.     val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.LOGICAL)
  5.     FlinkLogicalCalc.create(newInput, calc.getProgram)
  6.   }
  7. }
复制代码
BatchPhysicalCalcRule

查抄是不是FlinkLogicalCalc 的relnode
  1. class BatchPhysicalCalcRule(config: Config) extends ConverterRule(config) {
  2.   override def matches(call: RelOptRuleCall): Boolean = {
  3.     val calc: FlinkLogicalCalc = call.rel(0)
  4.     val program = calc.getProgram
  5.     !program.getExprList.asScala.exists(containsPythonCall(_))
  6.   }
  7.   def convert(rel: RelNode): RelNode = {
  8.     val calc = rel.asInstanceOf[FlinkLogicalCalc]
  9.     val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
  10.     val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.BATCH_PHYSICAL)
  11.     new BatchPhysicalCalc(rel.getCluster, newTrait, newInput, calc.getProgram, rel.getRowType)
  12.   }
  13. }
复制代码
StreamPhysicalCalcRule

查抄是不是FlinkLogicalCalc 的relnode
  1. class StreamPhysicalCalcRule(config: Config) extends ConverterRule(config) {
  2.   override def matches(call: RelOptRuleCall): Boolean = {
  3.     val calc: FlinkLogicalCalc = call.rel(0)
  4.     val program = calc.getProgram
  5.     !program.getExprList.asScala.exists(containsPythonCall(_))
  6.   }
  7.   def convert(rel: RelNode): RelNode = {
  8.     val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]
  9.     val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
  10.     val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.STREAM_PHYSICAL)
  11.     new StreamPhysicalCalc(rel.getCluster, traitSet, newInput, calc.getProgram, rel.getRowType)
  12.   }
  13. }
复制代码
其它算子

先容下算子的匹配条件
FlinkLogicalAggregate

对应的SQL语义是聚合函数
FlinkLogicalAggregateBatchConverter
不存在准确的distinct调用而且支持聚合函数,则返回true
  1. override def matches(call: RelOptRuleCall): Boolean = {
  2.     val agg = call.rel(0).asInstanceOf[LogicalAggregate]
  3.     // we do not support these functions natively
  4.     // they have to be converted using the FlinkAggregateReduceFunctionsRule
  5.     val supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {
  6.       // we support AVG
  7.       case SqlKind.AVG => true
  8.       // but none of the other AVG agg functions
  9.       case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
  10.       case _ => true
  11.     }
  12.     val hasAccurateDistinctCall = AggregateUtil.containsAccurateDistinctCall(agg.getAggCallList)
  13.     !hasAccurateDistinctCall && supported
  14.   }
复制代码
FlinkLogicalAggregateStreamConverter
SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP
非这几种,都支持转换
  1. override def matches(call: RelOptRuleCall): Boolean = {
  2.     val agg = call.rel(0).asInstanceOf[LogicalAggregate]
  3.     // we do not support these functions natively
  4.     // they have to be converted using the FlinkAggregateReduceFunctionsRule
  5.     agg.getAggCallList.map(_.getAggregation.getKind).forall {
  6.       case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
  7.       case _ => true
  8.     }
  9.   }
复制代码
FlinkLogicalCorrelate

对应的SQL语义是,LogicalCorrelate 用于处理关联子查询和某些特殊的毗连利用
查抄relnode 是不是LogicalCorrelate,重写relnode
  1. 默认的onMatch 函数
复制代码
FlinkLogicalDataStreamTableScan

对应的SQL语义是,查抄数据源是不是流式的
查抄relnode 是不是LogicalCorrelate,重写relnode
  1.   override def matches(call: RelOptRuleCall): Boolean = {
  2.     val scan: TableScan = call.rel(0)
  3.     val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])
  4.     dataStreamTable != null
  5.   }
  6.   def convert(rel: RelNode): RelNode = {
  7.     val scan = rel.asInstanceOf[TableScan]
  8.     FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getHints, scan.getTable)
  9.   }
复制代码
FlinkLogicalDistribution

形貌数据是不是打散的
  1.   override def convert(rel: RelNode): RelNode = {
  2.     val distribution = rel.asInstanceOf[LogicalDistribution]
  3.     val newInput = RelOptRule.convert(distribution.getInput, FlinkConventions.LOGICAL)
  4.     FlinkLogicalDistribution.create(newInput, distribution.getCollation, distribution.getDistKeys)
  5.   }
复制代码
FlinkLogicalExpand

支持复杂聚合利用(如 ROLLUP 和 CUBE)的逻辑运算符
  1. override def convert(rel: RelNode): RelNode = {
  2.     val expand = rel.asInstanceOf[LogicalExpand]
  3.     val newInput = RelOptRule.convert(expand.getInput, FlinkConventions.LOGICAL)
  4.     FlinkLogicalExpand.create(newInput, expand.projects, expand.expandIdIndex)
  5.   }
复制代码
FlinkLogicalIntermediateTableScan

FlinkLogicalIntermediateTableScan 用于表示对这些中间结果表进行扫描的逻辑利用
  1. override def matches(call: RelOptRuleCall): Boolean = {
  2.     val scan: TableScan = call.rel(0)
  3.     val intermediateTable = scan.getTable.unwrap(classOf[IntermediateRelTable])
  4.     intermediateTable != null
  5.   }
  6.   def convert(rel: RelNode): RelNode = {
  7.     val scan = rel.asInstanceOf[TableScan]
  8.     FlinkLogicalIntermediateTableScan.create(rel.getCluster, scan.getTable)
  9.   }
复制代码
FlinkLogicalIntersect

用于表示 SQL 中 INTERSECT 利用的逻辑运算符
  1. override def convert(rel: RelNode): RelNode = {
  2.     val intersect = rel.asInstanceOf[LogicalIntersect]
  3.     val newInputs = intersect.getInputs.map {
  4.       input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
  5.     }
  6.     FlinkLogicalIntersect.create(newInputs, intersect.all)
  7.   }
复制代码
FlinkLogicalJoin

用于表示 SQL 中 JOIN 利用的逻辑运算符
  1. override def convert(rel: RelNode): RelNode = {
  2.     val join = rel.asInstanceOf[LogicalJoin]
  3.     val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)
  4.     val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)
  5.     FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getHints, join.getJoinType)
  6.   }
复制代码
FlinkLogicalLegacySink

写数据到传统的数据源
  1. override def convert(rel: RelNode): RelNode = {
  2.     val sink = rel.asInstanceOf[LogicalLegacySink]
  3.     val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)
  4.     FlinkLogicalLegacySink.create(
  5.       newInput,
  6.       sink.hints,
  7.       sink.sink,
  8.       sink.sinkName,
  9.       sink.catalogTable,
  10.       sink.staticPartitions)
  11.   }
复制代码
FlinkLogicalLegacyTableSourceScan

读传统的数据源
  1. override def matches(call: RelOptRuleCall): Boolean = {
  2.     val scan: TableScan = call.rel(0)
  3.     isTableSourceScan(scan)
  4.   }
  5.   def convert(rel: RelNode): RelNode = {
  6.     val scan = rel.asInstanceOf[TableScan]
  7.     val table = scan.getTable.asInstanceOf[FlinkPreparingTableBase]
  8.     FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, scan.getHints, table)
  9.   }
复制代码
FlinkLogicalMatch

MATCH_RECOGNIZE 语句的逻辑运算符。MATCH_RECOGNIZE 语句答应用户在流数据中进行复杂的事件模式匹配,这对于及时数据处理和复杂事件处理(CEP)非常有用。
  1. override def convert(rel: RelNode): RelNode = {
  2.     val logicalMatch = rel.asInstanceOf[LogicalMatch]
  3.     val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
  4.     val newInput = RelOptRule.convert(logicalMatch.getInput, FlinkConventions.LOGICAL)
  5.     new FlinkLogicalMatch(
  6.       rel.getCluster,
  7.       traitSet,
  8.       newInput,
  9.       logicalMatch.getRowType,
  10.       logicalMatch.getPattern,
  11.       logicalMatch.isStrictStart,
  12.       logicalMatch.isStrictEnd,
  13.       logicalMatch.getPatternDefinitions,
  14.       logicalMatch.getMeasures,
  15.       logicalMatch.getAfter,
  16.       logicalMatch.getSubsets,
  17.       logicalMatch.isAllRows,
  18.       logicalMatch.getPartitionKeys,
  19.       logicalMatch.getOrderKeys,
  20.       logicalMatch.getInterval)
  21.   }
复制代码
FlinkLogicalMinus

用于表示 SQL 中 minus 利用的逻辑运算符
  1. override def convert(rel: RelNode): RelNode = {
  2.     val minus = rel.asInstanceOf[LogicalMinus]
  3.     val newInputs = minus.getInputs.map {
  4.       input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
  5.     }
  6.     FlinkLogicalMinus.create(newInputs, minus.all)
  7.   }
复制代码
FlinkLogicalOverAggregate

用于表示 SQL 中 窗口函数利用的逻辑运算符
FlinkLogicalRank

SQL 中 RANK 或 DENSE_RANK 函数的逻辑运算符。这些函数通常用于对数据进行排序和排名
  1. override def convert(rel: RelNode): RelNode = {
  2.     val rank = rel.asInstanceOf[LogicalRank]
  3.     val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL)
  4.     FlinkLogicalRank.create(
  5.       newInput,
  6.       rank.partitionKey,
  7.       rank.orderKey,
  8.       rank.rankType,
  9.       rank.rankRange,
  10.       rank.rankNumberType,
  11.       rank.outputRankNumber
  12.     )
  13.   }
复制代码
FlinkLogicalSink

表示SQL里的写
FlinkLogicalSnapshot

SQL 语句中的 AS OF 子句的逻辑运算符。AS OF 子句用于对流数据进行快照利用,从而在处理数据时可以引用特定时间点的数据快照
  1. def convert(rel: RelNode): RelNode = {
  2.     val snapshot = rel.asInstanceOf[LogicalSnapshot]
  3.     val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)
  4.     snapshot.getPeriod match {
  5.       case _: RexFieldAccess =>
  6.         FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)
  7.       case _: RexLiteral =>
  8.         newInput
  9.     }
  10.   }
复制代码
FlinkLogicalSort

表示SQL里的排序
FlinkLogicalUnion

表示SQL里的union 利用
  1. override def matches(call: RelOptRuleCall): Boolean = {
  2.     val union: LogicalUnion = call.rel(0)
  3.     union.all
  4.   }
  5.   override def convert(rel: RelNode): RelNode = {
  6.     val union = rel.asInstanceOf[LogicalUnion]
  7.     val newInputs = union.getInputs.map {
  8.       input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
  9.     }
  10.     FlinkLogicalUnion.create(newInputs, union.all)
  11.   }
复制代码
FlinkLogicalValues

SQL 中 VALUES 表达式的逻辑运算符。VALUES 表达式答应在查询中直接定义一组值,这在需要构造临时数据或进行简单的数据输入时非常有用。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4