配景
本文重要先容calcite 怎样转成自定义的relnode
示例
FlinkLogicalCalcConverter
查抄是不是calcite 的LogicalCalc 算子,是的话,重写带RelTrait 为FlinkConventions.LOGICA
的rel,类型FlinkLogicalCalc
- private class FlinkLogicalCalcConverter(config: Config) extends ConverterRule(config) {
- override def convert(rel: RelNode): RelNode = {
- val calc = rel.asInstanceOf[LogicalCalc]
- val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.LOGICAL)
- FlinkLogicalCalc.create(newInput, calc.getProgram)
- }
- }
复制代码 BatchPhysicalCalcRule
查抄是不是FlinkLogicalCalc 的relnode
- class BatchPhysicalCalcRule(config: Config) extends ConverterRule(config) {
- override def matches(call: RelOptRuleCall): Boolean = {
- val calc: FlinkLogicalCalc = call.rel(0)
- val program = calc.getProgram
- !program.getExprList.asScala.exists(containsPythonCall(_))
- }
- def convert(rel: RelNode): RelNode = {
- val calc = rel.asInstanceOf[FlinkLogicalCalc]
- val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
- val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.BATCH_PHYSICAL)
- new BatchPhysicalCalc(rel.getCluster, newTrait, newInput, calc.getProgram, rel.getRowType)
- }
- }
复制代码 StreamPhysicalCalcRule
查抄是不是FlinkLogicalCalc 的relnode
- class StreamPhysicalCalcRule(config: Config) extends ConverterRule(config) {
- override def matches(call: RelOptRuleCall): Boolean = {
- val calc: FlinkLogicalCalc = call.rel(0)
- val program = calc.getProgram
- !program.getExprList.asScala.exists(containsPythonCall(_))
- }
- def convert(rel: RelNode): RelNode = {
- val calc: FlinkLogicalCalc = rel.asInstanceOf[FlinkLogicalCalc]
- val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
- val newInput = RelOptRule.convert(calc.getInput, FlinkConventions.STREAM_PHYSICAL)
- new StreamPhysicalCalc(rel.getCluster, traitSet, newInput, calc.getProgram, rel.getRowType)
- }
- }
复制代码 其它算子
先容下算子的匹配条件
FlinkLogicalAggregate
对应的SQL语义是聚合函数
FlinkLogicalAggregateBatchConverter
不存在准确的distinct调用而且支持聚合函数,则返回true
- override def matches(call: RelOptRuleCall): Boolean = {
- val agg = call.rel(0).asInstanceOf[LogicalAggregate]
- // we do not support these functions natively
- // they have to be converted using the FlinkAggregateReduceFunctionsRule
- val supported = agg.getAggCallList.map(_.getAggregation.getKind).forall {
- // we support AVG
- case SqlKind.AVG => true
- // but none of the other AVG agg functions
- case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
- case _ => true
- }
- val hasAccurateDistinctCall = AggregateUtil.containsAccurateDistinctCall(agg.getAggCallList)
- !hasAccurateDistinctCall && supported
- }
复制代码 FlinkLogicalAggregateStreamConverter
SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP
非这几种,都支持转换
- override def matches(call: RelOptRuleCall): Boolean = {
- val agg = call.rel(0).asInstanceOf[LogicalAggregate]
- // we do not support these functions natively
- // they have to be converted using the FlinkAggregateReduceFunctionsRule
- agg.getAggCallList.map(_.getAggregation.getKind).forall {
- case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
- case _ => true
- }
- }
复制代码 FlinkLogicalCorrelate
对应的SQL语义是,LogicalCorrelate 用于处理关联子查询和某些特殊的毗连利用
查抄relnode 是不是LogicalCorrelate,重写relnode
FlinkLogicalDataStreamTableScan
对应的SQL语义是,查抄数据源是不是流式的
查抄relnode 是不是LogicalCorrelate,重写relnode
- override def matches(call: RelOptRuleCall): Boolean = {
- val scan: TableScan = call.rel(0)
- val dataStreamTable = scan.getTable.unwrap(classOf[DataStreamTable[_]])
- dataStreamTable != null
- }
- def convert(rel: RelNode): RelNode = {
- val scan = rel.asInstanceOf[TableScan]
- FlinkLogicalDataStreamTableScan.create(rel.getCluster, scan.getHints, scan.getTable)
- }
复制代码 FlinkLogicalDistribution
形貌数据是不是打散的
- override def convert(rel: RelNode): RelNode = {
- val distribution = rel.asInstanceOf[LogicalDistribution]
- val newInput = RelOptRule.convert(distribution.getInput, FlinkConventions.LOGICAL)
- FlinkLogicalDistribution.create(newInput, distribution.getCollation, distribution.getDistKeys)
- }
复制代码 FlinkLogicalExpand
支持复杂聚合利用(如 ROLLUP 和 CUBE)的逻辑运算符
- override def convert(rel: RelNode): RelNode = {
- val expand = rel.asInstanceOf[LogicalExpand]
- val newInput = RelOptRule.convert(expand.getInput, FlinkConventions.LOGICAL)
- FlinkLogicalExpand.create(newInput, expand.projects, expand.expandIdIndex)
- }
复制代码 FlinkLogicalIntermediateTableScan
FlinkLogicalIntermediateTableScan 用于表示对这些中间结果表进行扫描的逻辑利用
- override def matches(call: RelOptRuleCall): Boolean = {
- val scan: TableScan = call.rel(0)
- val intermediateTable = scan.getTable.unwrap(classOf[IntermediateRelTable])
- intermediateTable != null
- }
- def convert(rel: RelNode): RelNode = {
- val scan = rel.asInstanceOf[TableScan]
- FlinkLogicalIntermediateTableScan.create(rel.getCluster, scan.getTable)
- }
复制代码 FlinkLogicalIntersect
用于表示 SQL 中 INTERSECT 利用的逻辑运算符
- override def convert(rel: RelNode): RelNode = {
- val intersect = rel.asInstanceOf[LogicalIntersect]
- val newInputs = intersect.getInputs.map {
- input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
- }
- FlinkLogicalIntersect.create(newInputs, intersect.all)
- }
复制代码 FlinkLogicalJoin
用于表示 SQL 中 JOIN 利用的逻辑运算符
- override def convert(rel: RelNode): RelNode = {
- val join = rel.asInstanceOf[LogicalJoin]
- val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)
- val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)
- FlinkLogicalJoin.create(newLeft, newRight, join.getCondition, join.getHints, join.getJoinType)
- }
复制代码 FlinkLogicalLegacySink
写数据到传统的数据源
- override def convert(rel: RelNode): RelNode = {
- val sink = rel.asInstanceOf[LogicalLegacySink]
- val newInput = RelOptRule.convert(sink.getInput, FlinkConventions.LOGICAL)
- FlinkLogicalLegacySink.create(
- newInput,
- sink.hints,
- sink.sink,
- sink.sinkName,
- sink.catalogTable,
- sink.staticPartitions)
- }
复制代码 FlinkLogicalLegacyTableSourceScan
读传统的数据源
- override def matches(call: RelOptRuleCall): Boolean = {
- val scan: TableScan = call.rel(0)
- isTableSourceScan(scan)
- }
- def convert(rel: RelNode): RelNode = {
- val scan = rel.asInstanceOf[TableScan]
- val table = scan.getTable.asInstanceOf[FlinkPreparingTableBase]
- FlinkLogicalLegacyTableSourceScan.create(rel.getCluster, scan.getHints, table)
- }
复制代码 FlinkLogicalMatch
MATCH_RECOGNIZE 语句的逻辑运算符。MATCH_RECOGNIZE 语句答应用户在流数据中进行复杂的事件模式匹配,这对于及时数据处理和复杂事件处理(CEP)非常有用。
- override def convert(rel: RelNode): RelNode = {
- val logicalMatch = rel.asInstanceOf[LogicalMatch]
- val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
- val newInput = RelOptRule.convert(logicalMatch.getInput, FlinkConventions.LOGICAL)
- new FlinkLogicalMatch(
- rel.getCluster,
- traitSet,
- newInput,
- logicalMatch.getRowType,
- logicalMatch.getPattern,
- logicalMatch.isStrictStart,
- logicalMatch.isStrictEnd,
- logicalMatch.getPatternDefinitions,
- logicalMatch.getMeasures,
- logicalMatch.getAfter,
- logicalMatch.getSubsets,
- logicalMatch.isAllRows,
- logicalMatch.getPartitionKeys,
- logicalMatch.getOrderKeys,
- logicalMatch.getInterval)
- }
复制代码 FlinkLogicalMinus
用于表示 SQL 中 minus 利用的逻辑运算符
- override def convert(rel: RelNode): RelNode = {
- val minus = rel.asInstanceOf[LogicalMinus]
- val newInputs = minus.getInputs.map {
- input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
- }
- FlinkLogicalMinus.create(newInputs, minus.all)
- }
复制代码 FlinkLogicalOverAggregate
用于表示 SQL 中 窗口函数利用的逻辑运算符
FlinkLogicalRank
SQL 中 RANK 或 DENSE_RANK 函数的逻辑运算符。这些函数通常用于对数据进行排序和排名
- override def convert(rel: RelNode): RelNode = {
- val rank = rel.asInstanceOf[LogicalRank]
- val newInput = RelOptRule.convert(rank.getInput, FlinkConventions.LOGICAL)
- FlinkLogicalRank.create(
- newInput,
- rank.partitionKey,
- rank.orderKey,
- rank.rankType,
- rank.rankRange,
- rank.rankNumberType,
- rank.outputRankNumber
- )
- }
复制代码 FlinkLogicalSink
表示SQL里的写
FlinkLogicalSnapshot
SQL 语句中的 AS OF 子句的逻辑运算符。AS OF 子句用于对流数据进行快照利用,从而在处理数据时可以引用特定时间点的数据快照
- def convert(rel: RelNode): RelNode = {
- val snapshot = rel.asInstanceOf[LogicalSnapshot]
- val newInput = RelOptRule.convert(snapshot.getInput, FlinkConventions.LOGICAL)
- snapshot.getPeriod match {
- case _: RexFieldAccess =>
- FlinkLogicalSnapshot.create(newInput, snapshot.getPeriod)
- case _: RexLiteral =>
- newInput
- }
- }
复制代码 FlinkLogicalSort
表示SQL里的排序
FlinkLogicalUnion
表示SQL里的union 利用
- override def matches(call: RelOptRuleCall): Boolean = {
- val union: LogicalUnion = call.rel(0)
- union.all
- }
- override def convert(rel: RelNode): RelNode = {
- val union = rel.asInstanceOf[LogicalUnion]
- val newInputs = union.getInputs.map {
- input => RelOptRule.convert(input, FlinkConventions.LOGICAL)
- }
- FlinkLogicalUnion.create(newInputs, union.all)
- }
复制代码 FlinkLogicalValues
SQL 中 VALUES 表达式的逻辑运算符。VALUES 表达式答应在查询中直接定义一组值,这在需要构造临时数据或进行简单的数据输入时非常有用。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |