【PostgreSQL内核学习 —— (WindowAgg(一))】

打印 上一主题 下一主题

主题 1028|帖子 1028|积分 3084

声明:本文的部门内容参考了他人的文章。在编写过程中,我们恭敬他人的知识产权和学术成果,力求依照合理使用原则,并在适用的环境下注明引用泉源。
本文重要参考了 postgresql-15.0 的开源代码和《PostgresSQL数据库内核分析》一书
  窗口函数介绍

  首先,我将提供一个简单的 SQL 用例,并逐步解读窗口函数的使用过程。假设我们有一个名为 sales 的销售数据表,表结构如下:
  1. CREATE TABLE sales (
  2.     id SERIAL PRIMARY KEY,
  3.     salesperson_id INT,
  4.     sale_date DATE,
  5.     sale_amount NUMERIC
  6. );
复制代码
  假设 sales 表包含以下数据:
idsalesperson_idsale_datesale_amount112024-01-011000212024-01-021200322024-01-01800422024-01-021100532024-01-011500632024-01-021300 SQL 用例:使用窗口函数计算每个销售人员的累计销售金额
  我们盼望计算每个销售人员在每个销售记录的日期上的累计销售金额。为了实现这一目标,我们可以使用 SUM() 函数,它会对每个销售人员的数据进行累计。
SQL 查询如下:
  1. SELECT
  2.     id,
  3.     salesperson_id,
  4.     sale_date,
  5.     sale_amount,
  6.     SUM(sale_amount) OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_sales
  7. FROM
  8.     sales
  9. ORDER BY
  10.     salesperson_id, sale_date;
复制代码
详细解读:

  • SUM(sale_amount)
     这是一个聚合函数,通常用于对某个列的值进行汇总。在这个查询中,SUM(sale_amount) 用于计算销售额的累计值。
  

  • OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
     这是一个窗口函数的关键部门,指定了如何对结果进行分区、排序和聚合。详细来说:
  

  • PARTITION BY salesperson_id:这是窗口函数的分区利用,将数据按 salesperson_id(销售人员 ID)分区。也就是说,每个销售人员的数据将分别计算,差别销售人员的累计销售是独立的。
  • ORDER BY sale_date:对每个分区内的数据按销售日期 (sale_date) 进行排序,确保累计计算是按时间顺序进行的。
  • ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW:这是一个窗口帧的界说,意味着每个分区的累计值从该分区的第一行开始计算,一直到当前行。UNBOUNDED PRECEDING 表示从分区的第一行开始,CURRENT ROW 表示包罗当前行。
  

  • 结果分析:
     查询结果将会返回每个销售人员的每笔销售记录,并在 cumulative_sales 列表现该销售人员的累计销售金额。例如:
  idsalesperson_idsale_datesale_amountcumulative_sales112024-01-0110001000212024-01-0212002200322024-01-01800800422024-01-0211001900532024-01-0115001500632024-01-0213002800   

  • 对于销售人员 1,第一个销售记录的累计销售金额为 1000,第二个销售记录的累计销售金额为 1000 + 1200 = 2200。
  • 对于销售人员 2,第一个销售记录的累计销售金额为 800,第二个销售记录的累计销售金额为 800 + 1100 = 1900。
  • 对于销售人员 3,第一个销售记录的累计销售金额为 1500,第二个销售记录的累计销售金额为 1500 + 1300 = 2800。
  窗口函数的工作机制:


  • 分区:窗口函数首先会根据 PARTITION BY 子句将数据分成差别的分区。这里,数据按 salesperson_id 分区,每个销售人员的记录组成一个分区。
  • 排序:在每个分区内,数据会根据 ORDER BY 子句进行排序。在这个例子中,按 sale_date 对每个销售人员的销售记录按时间顺序进行排序。
  • 累计:ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW 确保了每个销售人员从分区的第一行开始,直到当前行的所有销售记录都会被累加,形成一个累积的结果。
更多详细的窗口函数使用教程可以参阅:GaussDB(DWS) SQL进阶之SQL利用之窗口函数
WindowAgg

理论层面

  下面我们来相识一下 WindowAgg 算子,先看看书中的描述:


  书中详细描述了 WindowAgg 节点在 PostgreSQL 中处置惩罚窗口函数时的执行过程,包罗如何管理分区、排序、聚合等。通过 WindowAggState 和相干的数据结构,窗口聚合可以高效地计算多个窗口函数,同时保持对数据的完备性。性能优化方面,窗口函数的排序和缓存机制也起到了关键作用,资助提升计算效率。
源码层面

WindowObjectData 结构体

  WindowObjectData 结构体用于在窗口函数调用过程中保存与窗口聚合利用相干的状态信息。在 PostgreSQL 中,窗口函数用于基于窗口进行计算,而每个窗口函数可能必要差别的上下文状态来处置惩罚其数据。源码如下所示(路径:src\backend\executor\nodeWindowAgg.c)
  1. /*
  2. * 所有窗口函数的API都通过这个对象进行调用,该对象会作为fcinfo->context传递给窗口函数。
  3. */
  4. typedef struct WindowObjectData
  5. {
  6.         NodeTag                type;                                /* 类型标识符,用于区分不同的节点类型 */
  7.         WindowAggState *winstate;                /* 指向父级窗口聚合状态的指针,用于获取窗口聚合的上下文状态 */
  8.         List           *argstates;                        /* 窗口函数参数的表达式状态树 */
  9.         void           *localmem;                        /* 当前窗口函数在执行过程中使用的局部内存,由WinGetPartitionLocalMemory分配 */
  10.         int                        markptr;                        /* 用于标记当前窗口函数状态的tuplestore标记指针 */
  11.         int                        readptr;                        /* 读取指针,指向当前正在处理的行位置 */
  12.         int64                markpos;                        /* 标记指针所指向的行号 */
  13.         int64                seekpos;                        /* 读取指针所指向的行号 */
  14. } WindowObjectData;
复制代码
WindowStatePerFuncData 结构体

  WindowStatePerFuncData 结构体用于存储与窗口函数和窗口聚合利用相干的工作状态和数据。它包含了窗口函数执行时必要的各种信息,如参数数量排序规则结果类型是否为聚合函数等。这些信息对于在窗口函数计算过程中正确管理和执行窗口函数非常重要。在 PostgreSQL 中,窗口函数的执行涉及多次状态保存和计算,而这个结构体便用于管理这些窗口函数的详细执行细节。源码如下所示(路径:src\backend\executor\nodeWindowAgg.c)
  1. /*
  2. * 为每个由该节点处理的窗口函数和窗口聚合创建一个 WindowStatePerFunc 结构体。
  3. */
  4. typedef struct WindowStatePerFuncData
  5. {
  6.         /* 链接到与此工作状态相关的 WindowFunc 表达式和状态节点 */
  7.         WindowFuncExprState *wfuncstate;   /* 当前窗口函数的表达式状态 */
  8.         WindowFunc *wfunc;                 /* 当前窗口函数的定义(结构体) */
  9.         int                        numArguments;                /* 窗口函数的参数数量 */
  10.         FmgrInfo        flinfo;                                /* 用于窗口函数的 fmgr 查找数据,存储有关函数的信息 */
  11.         Oid                        winCollation;                /* 窗口函数的排序规则,由当前函数派生 */
  12.         /*
  13.          * 我们需要窗口函数结果的长度和 byval 信息,以便知道如何复制/删除值。
  14.          */
  15.         int16                resulttypeLen;                /* 窗口函数返回值类型的长度 */
  16.         bool                resulttypeByVal;        /* 窗口函数返回值类型是否为按值传递 */
  17.         bool                plain_agg;                        /* 是否仅为普通的聚合函数? */
  18.         int                        aggno;                                /* 如果是,指明其对应的 WindowStatePerAggData 的索引 */
  19.         WindowObject winobj;                         /* 用于窗口函数 API 的对象 */
  20. } WindowStatePerFuncData;
复制代码
WindowStatePerAggData 结构体

  WindowStatePerAggData 结构体重要用于保存窗口聚合过程中普通聚合函数的工作状态。它包含了有关过渡函数最终函数初始值当前帧的聚合结果过渡值等详细信息。通过这些信息,系统可以正确地计算窗口聚合函数的结果,处置惩罚每个聚合利用的中心状态,确保聚合计算按预期执行。别的,该结构体还考虑了内存管理和函数调用的效率,使得聚合利用在处置惩罚大数据量时能够高效执行。源码如下所示(路径:src\backend\executor\nodeWindowAgg.c)
  1. /*
  2. * 对于普通的聚合窗口函数,我们也有一个这样的结构体。
  3. */
  4. typedef struct WindowStatePerAggData
  5. {
  6.         /* 聚合函数的过渡函数 OID */
  7.         Oid                        transfn_oid;                        /* 聚合函数的过渡函数的 OID */
  8.         Oid                        invtransfn_oid;                /* 反向过渡函数的 OID,可能是 InvalidOid */
  9.         Oid                        finalfn_oid;                        /* 最终函数的 OID,可能是 InvalidOid */
  10.         /*
  11.          * 聚合过渡函数的 fmgr 查找数据 --- 只有当对应的 OID 不为 InvalidOid 时才有效。
  12.          * 特别注意,函数的 fn_strict 标志在这里保存。
  13.          */
  14.         FmgrInfo        transfn;                                /* 聚合函数的过渡函数的 fmgr 查找数据 */
  15.         FmgrInfo        invtransfn;                                /* 反向过渡函数的 fmgr 查找数据 */
  16.         FmgrInfo        finalfn;                                /* 最终函数的 fmgr 查找数据 */
  17.         int                        numFinalArgs;                        /* 传递给最终函数的参数个数 */
  18.         /*
  19.          * 来自 pg_aggregate 入口的初始值
  20.          */
  21.         Datum                initValue;                                /* 初始值 */
  22.         bool                initValueIsNull;                /* 初始值是否为 NULL */
  23.         /*
  24.          * 当前帧边界的缓存值
  25.          */
  26.         Datum                resultValue;                        /* 当前计算帧的结果值 */
  27.         bool                resultValueIsNull;                /* 结果值是否为 NULL */
  28.         /*
  29.          * 需要输入、结果和过渡数据类型的长度和 byval 信息,
  30.          * 以便知道如何复制/删除值。
  31.          */
  32.         int16                inputtypeLen,                        /* 输入类型的长度 */
  33.                                 resulttypeLen,                        /* 结果类型的长度 */
  34.                                 transtypeLen;                        /* 过渡数据类型的长度 */
  35.         bool                inputtypeByVal,                        /* 输入类型是否按值传递 */
  36.                                 resulttypeByVal,                 /* 结果类型是否按值传递 */
  37.                                 transtypeByVal;                 /* 过渡数据类型是否按值传递 */
  38.         int                        wfuncno;                                /* 关联的 WindowStatePerFuncData 的索引 */
  39.         /* 持有过渡值和可能的其他附加数据的上下文 */
  40.         MemoryContext aggcontext;                        /* 聚合上下文,可能是私有的,或 winstate->aggcontext */
  41.         /* 当前的过渡值 */
  42.         Datum                transValue;                        /* 当前过渡值 */
  43.         bool                transValueIsNull;                /* 过渡值是否为 NULL */
  44.         int64                transValueCount;                /* 当前聚合的行数 */
  45.         /* eval_windowaggregates() 函数中使用的数据 */
  46.         bool                restart;                                /* 是否需要在本轮聚合中重新启动此聚合? */
  47. } WindowStatePerAggData;
复制代码
eval_windowaggregates 函数

  eval_windowaggregates 函数重要用于窗口聚合的计算,特别是普通聚合函数(如 SUM()、COUNT() 等)。它在处置惩罚窗口时,根据窗口帧的位置和聚合的需求,优化了聚合利用。在帧起始位置为 UNBOUNDED_PRECEDING 时,采用增量计算策略,在窗口帧发生变化时,使用反向过渡函数或重新聚合数据。同时,它通过复用已计算的结果来进步性能,在必要时重启聚合并重置相应的状态。
  别的,它还管理了差别聚合函数的上下文,确保在窗口帧的差别部门对每个聚合函数都进行正确的计算,并在计算结束后保存结果。源码如下所示(路径:src\backend\executor\nodeWindowAgg.c)
  1. /*
  2. * eval_windowaggregates
  3. * 评估作为窗口函数的普通聚合函数
  4. *
  5. * 这与 nodeAgg.c 不同的地方在于:首先,如果窗口的帧开始位置发生变化,我们使用反向过渡函数(如果存在)从过渡值中删除行。其次,我们希望在将更多数据聚合到同一过渡值后,可以多次调用聚合最终函数。这是 nodeAgg.c 中不要求的行为。
  6. */
  7. static void
  8. eval_windowaggregates(WindowAggState *winstate)
  9. {
  10.         WindowStatePerAgg peraggstate;                /* 用于存储每个聚合函数的状态 */
  11.         int                        wfuncno,                                        /* 窗口函数的索引 */
  12.                                 numaggs,                                        /* 聚合函数的数量 */
  13.                                 numaggs_restart,                        /* 需要重启的聚合函数数量 */
  14.                                 i;                                                        /* 循环变量 */
  15.         int64                aggregatedupto_nonrestarted;        /* 尚未聚合的行数 */
  16.         MemoryContext oldContext;                                /* 内存上下文的备份 */
  17.         ExprContext *econtext;                                        /* 当前表达式上下文 */
  18.         WindowObject agg_winobj;                                /* 窗口函数对象 */
  19.         TupleTableSlot *agg_row_slot;                        /* 用于存储聚合数据的行槽 */
  20.         TupleTableSlot *temp_slot;                                /* 临时槽,用于存储中间结果 */
  21.         numaggs = winstate->numaggs;        /* 获取窗口聚合函数的数量 */
  22.         if (numaggs == 0)
  23.                 return;                /* 如果没有聚合函数,直接返回 */
  24.         /* 获取执行上下文 */
  25.         econtext = winstate->ss.ps.ps_ExprContext;
  26.         agg_winobj = winstate->agg_winobj;
  27.         agg_row_slot = winstate->agg_row_slot;
  28.         temp_slot = winstate->temp_slot_1;
  29.         /*
  30.          * 如果窗口的帧起始位置为 UNBOUNDED_PRECEDING 且没有排除子句,
  31.          * 那么窗口帧由从分区开始处向前延伸的一组连续的行组成,随着当前行向前推进,行只进入帧内,而不会退出帧。
  32.          * 这样就可以使用增量策略来计算聚合值:我们为每个加入帧的行运行过渡函数,并在需要时运行最终函数来获取当前聚合值。
  33.          * 这种方法比每次处理当前行时都重新运行整个聚合计算更高效。前提是假设最终函数不会破坏正在运行的过渡值,这一点在 nodeAgg.c 中也有类似的假设。
  34.          *
  35.          * 如果帧起始位置有时会移动,我们仍然可以优化相邻的行,尽可能使用增量聚合策略,但如果帧头超出了上一个头,我们将尝试使用反向过渡函数删除这些行。
  36.          * 反向过渡函数会恢复聚合的当前状态,仿佛被移除的行从未被聚合过。如果反向过渡函数无法删除该行,或者根本没有反向过渡函数,我们需要重新计算所有位于新帧边界内的元组的聚合结果。
  37.          *
  38.          * 如果存在排除子句,我们可能需要在一个不连续的行集上聚合,因此需要重新计算每行的聚合。
  39.          */
  40.         /*
  41.          * 更新帧头位置
  42.          *
  43.          * 窗口的帧头位置不应该向后移动,如果发生这种情况,代码将无法处理,因此在安全起见,我们会检查并报告错误。
  44.          */
  45.         update_frameheadpos(winstate);
  46.         if (winstate->
  47.         frameheadpos < winstate->aggregatedbase)
  48.                 elog(ERROR, "window frame head moved backward");
  49.         /*
  50.          * 如果帧没有变化,我们可以重用之前保存的结果值。
  51.          * 如果帧结束模式是 UNBOUNDED FOLLOWING 或 CURRENT ROW 且没有排除子句,并且当前行位于前一行的帧内,那么当前帧和前一帧的结束位置必须重合。
  52.          * 这意味着我们可以复用结果值。
  53.          */
  54.         if (winstate->aggregatedbase == winstate->frameheadpos &&
  55.                 (winstate->frameOptions & (FRAMEOPTION_END_UNBOUNDED_FOLLOWING |
  56.                                                                    FRAMEOPTION_END_CURRENT_ROW)) &&
  57.                 !(winstate->frameOptions & FRAMEOPTION_EXCLUSION) &&
  58.                 winstate->aggregatedbase <= winstate->currentpos &&
  59.                 winstate->aggregatedupto > winstate->currentpos)
  60.         {
  61.                 for (i = 0; i < numaggs; i++)
  62.                 {
  63.                         peraggstate = &winstate->peragg[i];
  64.                         wfuncno = peraggstate->wfuncno;
  65.                         econtext->ecxt_aggvalues[wfuncno] = peraggstate->resultValue;
  66.                         econtext->ecxt_aggnulls[wfuncno] = peraggstate->resultValueIsNull;
  67.                 }
  68.                 return;
  69.         }
  70.         /* 初始化重启标志 */
  71.         numaggs_restart = 0;
  72.         for (i = 0; i < numaggs; i++)
  73.         {
  74.                 peraggstate = &winstate->peragg[i];
  75.                 /* 判断是否需要重启聚合函数 */
  76.                 if (winstate->currentpos == 0 ||
  77.                         (winstate->aggregatedbase != winstate->frameheadpos &&
  78.                          !OidIsValid(peraggstate->invtransfn_oid)) ||
  79.                         (winstate->frameOptions & FRAMEOPTION_EXCLUSION) ||
  80.                         winstate->aggregatedupto <= winstate->frameheadpos)
  81.                 {
  82.                         peraggstate->restart = true;
  83.                         numaggs_restart++;
  84.                 }
  85.                 else
  86.                         peraggstate->restart = false;
  87.         }
  88.         /*
  89.          * 如果有任何可能需要移动的聚合函数,尝试通过删除从帧顶部掉落的输入行来将 aggregatedbase 向前推进。
  90.          * 如果失败(即 advance_windowaggregate_base 返回 false),则需要重启聚合。
  91.          */
  92.         while (numaggs_restart < numaggs &&
  93.                    winstate->aggregatedbase < winstate->frameheadpos)
  94.         {
  95.                 /*
  96.                  * 获取要删除的元组。这应该永远不会失败,因为我们应该已经处理过这些行。
  97.                  */
  98.                 if (!window_gettupleslot(agg_winobj, winstate->aggregatedbase,
  99.                                                                  temp_slot))
  100.                         elog(ERROR, "could not re-fetch previously fetched frame row");
  101.                 /* 设置元组上下文,用于计算聚合函数的参数 */
  102.                 winstate->tmpcontext->ecxt_outertuple = temp_slot;
  103.                 /*
  104.                  * 为每个聚合函数执行反向过渡,除非该聚合已经标记为需要重启。
  105.                  */
  106.                 for (i = 0; i < numaggs; i++)
  107.                 {
  108.                         bool                ok;
  109.                         peraggstate = &winstate->peragg[i];
  110.                         if (peraggstate->restart)
  111.                                 continue;
  112.                         wfuncno = peraggstate->wfuncno;
  113.                         ok = advance_windowaggregate_base(winstate,
  114.                                                                                           &winstate->perfunc[wfuncno],
  115.                                                                                           peraggstate);
  116.                         if (!ok)
  117.                         {
  118.                                 /* 如果反向过渡函数失败,则需要重启聚合 */
  119.                                 peraggstate->restart = true;
  120.                                 numaggs_restart++;
  121.                         }
  122.                 }
  123.                 /* 重置每个输入元组的上下文 */
  124.                 ResetExprContext(winstate->tmpcontext);
  125.                 /* 进展到下一个聚合行 */
  126.                 winstate->aggregatedbase++;
  127.                 ExecClearTuple(temp_slot);
  128.         }
  129.         /*
  130.          * 如果我们成功推进了所有聚合的基准行,aggregatedbase 现在应该等于 frameheadpos;
  131.          * 如果失败了,我们必须强制更新 aggregatedbase。
  132.          */
  133.         winstate->aggregatedbase = winstate->frameheadpos;
  134.         /*
  135.          * 如果为聚合函数创建了标记指针,则将其推进到帧头,以便 tuplestore 可以丢弃不必要的行。
  136.          */
  137.         if (agg_winobj->markptr >= 0)
  138.                 WinSetMarkPosition(agg_winobj, winstate->frameheadpos);
  139.         /*
  140.          * 现在重启需要重启的聚合函数。
  141.          *
  142.          * 如果任何聚合函数需要重启,我们假设使用共享上下文的聚合函数也需要重启,
  143.          * 并且在这种情况下我们会清理共享的 aggcontext。
  144.          */
  145.         if (numaggs_restart > 0)
  146.                 MemoryContextResetAndDeleteChildren(winstate->aggcontext);
  147.         for (i = 0; i < numaggs; i++)
  148.         {
  149.                 peraggstate = &winstate->peragg[i];
  150.                 /* 如果共享上下文的聚合函数需要重启,则重启所有需要重启的聚合 */
  151.                 Assert(peraggstate->aggcontext != winstate->aggcontext ||
  152.                            numaggs_restart == 0 ||
  153.                            peraggstate->restart);
  154.                 if (peraggstate->restart)
  155.                 {
  156.                         wfuncno = peraggstate->wfuncno;
  157.                         initialize_windowaggregate(winstate,
  158.                                                                            &winstate->perfunc[wfuncno],
  159.                                                                            peraggstate);
  160.                 }
  161.                 else if (!peraggstate->resultValueIsNull)
  162.                 {
  163.                         if (!peraggstate->resulttypeByVal)
  164.                                 pfree(DatumGetPointer(peraggstate->resultValue));
  165.                         peraggstate->resultValue = (Datum) 0;
  166.                         peraggstate->resultValueIsNull = true;
  167.                 }
  168.         }
  169.         /*
  170.          * 非重启的聚合现在包含 aggregatedbase 和 aggregatedupto 之间的行,
  171.          * 而重启的聚合不包含任何行。如果有重启的聚合,我们必须从 frameheadpos 开始重新聚合,
  172.          * 否则可以从 aggregatedupto 开始继续聚合。
  173.          */
  174.         aggregatedupto_nonrestarted = winstate->aggregatedupto;
  175.         if (numaggs_restart > 0 &&
  176.                 winstate->aggregatedupto != winstate->frameheadpos)
  177.         {
  178.                 winstate->aggregatedupto = winstate->frameheadpos;
  179.                 ExecClearTuple(agg_row_slot);
  180.         }
  181.         /*
  182.          * 继续聚合直到遇到帧外的行(或分区结束)。
  183.          */
  184.         for (;;)
  185.         {
  186.                 int                        ret;
  187.                 /* 如果没有获取行,获取下一行 */
  188.                 if (TupIsNull(agg_row_slot))
  189.                 {
  190.                         if (!window_gettupleslot(agg_winobj, winstate->aggregatedupto,
  191.                                                                          agg_row_slot))
  192.                                 break;                        /* 到达分区结束 */
  193.                 }
  194.                 /*
  195.                  * 如果当前行不在帧内,跳过聚合。
  196.                  */
  197.                 ret = row_is_in_frame(winstate, winstate->aggregatedupto, agg_row_slot);
  198.                 if (ret < 0)
  199.                         break;
  200.                 if (ret == 0)
  201.                         goto next_tuple;
  202.                 /* 设置元组上下文 */
  203.                 winstate->tmpcontext->ecxt_outertuple = agg_row_slot;
  204.                 /* 将行累加到聚合中 */
  205.                 for (i = 0; i < numaggs; i++)
  206.                 {
  207.                         peraggstate = &winstate->peragg[i];
  208.                         /* 跳过未重启的聚合 */
  209.                         if (!peraggstate->restart &&
  210.                                 winstate->aggregatedupto < aggregatedupto_nonrestarted)
  211.                                 continue;
  212.                         wfuncno = peraggstate->wfuncno;
  213.                         advance_windowaggregate(winstate,
  214.                                                                         &winstate->perfunc[wfuncno],
  215.                                                                         peraggstate);
  216.                 }
  217. next_tuple:
  218.                 /* 重置每个输入元组的上下文 */
  219.                 ResetExprContext(winstate->tmpcontext);
  220.                 /* 进展到下一个聚合行 */
  221.                 winstate->aggregatedupto++;
  222.                 ExecClearTuple(agg_row_slot);
  223.         }
  224.         /* 确保帧的结束位置不会向后移动 */
  225.         Assert(aggregatedupto_nonrestarted <= winstate->aggregatedupto);
  226.         /*
  227.          * 最终化聚合并填充结果和空值字段
  228.          */
  229.         for (i = 0; i < numaggs; i++)
  230.         {
  231.                 Datum           *result;
  232.                 bool           *isnull;
  233.                 peraggstate = &winstate->peragg[i];
  234.                 wfuncno = peraggstate->wfuncno;
  235.                 result = &econtext->ecxt_aggvalues[wfuncno];
  236.                 isnull = &econtext->ecxt_aggnulls[wfuncno];
  237.                 finalize_windowaggregate(winstate,
  238.                                                                  &winstate->perfunc[wfuncno],
  239.                                                                  peraggstate,
  240.                                                                  result, isnull);
  241.                 /*
  242.                  * 如果下一个行共享同一帧,保存结果值
  243.                  */
  244.                 if (!peraggstate->resulttypeByVal && !*isnull)
  245.                 {
  246.                         oldContext = MemoryContextSwitchTo(peraggstate->aggcontext);
  247.                         peraggstate->resultValue =
  248.                                 datumCopy(*result,
  249.                                                   peraggstate->resulttypeByVal,
  250.                                                   peraggstate->resulttypeLen);
  251.                         MemoryContextSwitchTo(oldContext);
  252.                 }
  253.                 else
  254.                 {
  255.                         peraggstate->resultValue = *result;
  256.                 }
  257.                 peraggstate->resultValueIsNull = *isnull;
  258.         }
  259. }
复制代码
  让我们通过一个详细的例子来分析 eval_windowaggregates 函数的每一步利用。假设我们有一个销售数据表 sales,包含以下数据:
salesperson_idsale_datesale_amount12024-01-0110012024-01-0220012024-01-0330022024-01-0115022024-01-0225022024-01-03350   假设我们盼望计算每个销售人员的累计销售额,并且使用的是窗口聚合函数,按日期顺序(ORDER BY sale_date)来计算累计销售额。我们的窗口框架将从 UNBOUNDED PRECEDING 开始,直到当前行结束。
1. 初始化和设置
     在开始时,窗口函数会为每个聚合函数(在这个例子中是 SUM(sale_amount))创建一个 WindowStatePerAggData 结构体来保存当前的聚合状态。假设我们有两个销售人员的销售数据。对于每个销售人员,eval_windowaggregates 将会处置惩罚每个销售记录,维护其当前的聚合状态。
  初始化:numaggs = 1,因为只有一个聚合函数 SUM(sale_amount)。aggregatedbase 和 aggregatedupto 变量分别用于跟踪当前已聚合和尚未聚合的行。
  2. 更新帧头位置
     在窗口聚合中,frameheadpos 表示窗口帧的起始位置。update_frameheadpos 会根据窗口的当前状态更新这一位置。例如,假设当前处置惩罚的销售人员是销售员 1,并且当前销售记录是 2024-01-03
  帧头位置更新:frameheadpos 会根据查询的 PARTITION BY 和 ORDER BY 规则进行调整。这里,frameheadpos 会指向销售员 12024-01-03 的行。
  3. 优化增量计算
     如果当前的窗口帧没有发生变化,我们就可以复用之前保存的聚合结果,而不必重新计算。例如,在销售员 1 的数据中,假设前两天(2024-01-012024-01-02)已经聚合完成。
  复用结果:假设当前帧的结束位置是 2024-01-03,且没有清除子句(EXCLUSION),那么程序会查抄窗口帧是否变化。如果没有变化(即当前行仍然在上一帧内),则复用先前的聚合结果。
  4. 处置惩罚帧的变化
     如果窗口帧的头位置发生变化,我们必要做以下几步:
  

  • 查抄是否必要重启聚合:如果帧的头移动,大概窗口的范围发生变化(例如,加入了 EXCLUSION 子句),我们就必要重新聚合数据。eval_windowaggregates 会为每个聚合函数设置重启标记。
  • 更新聚合函数的状态:在此过程中,advance_windowaggregate_base 函数会根据新的帧头位置和数据,调整聚合的基准状态(aggregatedbase)。
    例如,如果帧的起始位置从 2024-01-01 移动到 2024-01-02,eval_windowaggregates 将使用反向过渡函数(invtransfn)删除帧头之前的行。
  5. 重新聚合数据
     如果 advance_windowaggregate_base 无法乐成移动聚合的基准行(即删除掉帧头之前的行),大概没有反向过渡函数,系统就会重新开始聚合。例如,在 2024-01-02 之后的帧头位置,可能必要重新的帧开始重新计算聚合结果。
  

  • 重启聚合:如果必要重启聚合(例如因为反向过渡失败),restart 标记会被设置为 true,然后聚合函数的状态会被重新初始化。
  6. 计算新行的聚合结果
     如果当前的聚合状态已经准备好,且没有出现必要重启的环境,eval_windowaggregates 会开始将新的一行数据添加到聚合中。
  

  • 逐行聚合:每次计算新的聚合值时,advance_windowaggregate 函数会根据当前行的数据更新聚合结果。例如,在 2024-01-03,销售员 1 的累计销售额将是 100 + 200 + 300 = 600。
  7. 最终化聚合结果
     当所有的行都被处置惩罚完后,finalize_windowaggregate 会被调用来计算窗口聚合的最终结果。例如,计算销售员 1 和销售员 2 的最终累计销售额。
  

  • 保存和返回结果:最终,eval_windowaggregates 会保存每个聚合函数的结果,并更新相应的输出字段。如果存在共享上下文(即多个聚合函数使用同一个上下文),它会进行清算,以确保没有内存泄漏。
  8. 返回结果
     函数会返回每个窗口聚合函数的在这里插入代码片最终结果,在每一行的输出中返回正确的累计销售额。
  示例执行:
假设我们在销售员 1 上执行上述利用:
  1. 初始时,销售员 1 在 2024-01-01 的销售额为 100,聚合值为 100。
  2. 接着,销售员 1 在 2024-01-02 的销售额为 200,聚合值为 100 + 200 = 300。
  3. 最后,在 2024-01-03,销售员 1 的销售额为 300,最终累计值为 100 + 200 + 300 = 600。
复制代码
update_frameheadpos 函数

  update_frameheadpos 函数的重要功能是更新窗口聚合的帧头位置 frameheadpos,确保其对于当前行有效。帧头的位置是窗口聚合计算的关键,因为它决定了每个窗口函数计算时所依据的数据范围。下面是详细的逐行注释和对每个步骤的解释。(路径:src\backend\executor\nodeWindowAgg.c)
  1. /*
  2. * update_frameheadpos
  3. * 使 frameheadpos 对当前行有效
  4. *
  5. * 注意,frameheadpos 计算时不考虑任何窗口排除子句;当前行和/或其同组行即使在后续需要被排除时,也会被视为帧的一部分。
  6. *
  7. * 可能会覆盖 winstate->temp_slot_2。
  8. */
  9. static void
  10. update_frameheadpos(WindowAggState *winstate)
  11. {
  12.         WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;   /* 获取窗口聚合节点 */
  13.         int                        frameOptions = winstate->frameOptions;  /* 获取当前的帧选项 */
  14.         MemoryContext oldcontext;  /* 保存当前的内存上下文 */
  15.         /* 如果帧头已经有效,则不需要更新,直接返回 */
  16.         if (winstate->framehead_valid)
  17.                 return;
  18.         /* 可能会在短生命周期的上下文中被调用,因此切换到合适的内存上下文 */
  19.         oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
  20.         /* 根据帧的起始选项来计算帧头 */
  21.         if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
  22.         {
  23.                 /* 在 UNBOUNDED PRECEDING 模式下,帧头始终是分区的第一行 */
  24.                 winstate->frameheadpos = 0;
  25.                 winstate->framehead_valid = true;
  26.         }
  27.         else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
  28.         {
  29.                 /* 如果是 CURRENT ROW 模式,根据排序模式计算帧头 */
  30.                 if (frameOptions & FRAMEOPTION_ROWS)
  31.                 {
  32.                         /* 在 ROWS 模式下,帧头与当前行相同 */
  33.                         winstate->frameheadpos = winstate->currentpos;
  34.                         winstate->framehead_valid = true;
  35.                 }
  36.                 else if (frameOptions & (FRAMEOPTION_RANGE | FRAMEOPTION_GROUPS))
  37.                 {
  38.                         /* 如果没有 ORDER BY,所有行是同行的 */
  39.                         if (node->ordNumCols == 0)
  40.                         {
  41.                                 winstate->frameheadpos = 0;
  42.                                 winstate->framehead_valid = true;
  43.                                 MemoryContextSwitchTo(oldcontext);
  44.                                 return;
  45.                         }
  46.                         /*
  47.                          * 在 RANGE 或 GROUPS START_CURRENT_ROW 模式下,帧头是当前行的同组中的第一行。
  48.                          * 我们保持帧头的最后已知位置,并根据需要前进。
  49.                          */
  50.                         tuplestore_select_read_pointer(winstate->buffer, winstate->framehead_ptr);
  51.                         if (winstate->frameheadpos == 0 && TupIsNull(winstate->framehead_slot))
  52.                         {
  53.                                 /* 如果尚未获取第一行,则将其获取到 framehead_slot */
  54.                                 if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))
  55.                                         elog(ERROR, "unexpected end of tuplestore");
  56.                         }
  57.                         /* 检查当前行是否是正确的帧头 */
  58.                         while (!TupIsNull(winstate->framehead_slot))
  59.                         {
  60.                                 if (are_peers(winstate, winstate->framehead_slot, winstate->ss.ss_ScanTupleSlot))
  61.                                         break;                /* 该行是正确的帧头 */
  62.                                
  63.                                 /* 即使获取失败,仍然推进帧头位置 */
  64.                                 winstate->frameheadpos++;
  65.                                 spool_tuples(winstate, winstate->frameheadpos);
  66.                                 if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))
  67.                                         break;                /* 到达分区末尾 */
  68.                         }
  69.                         winstate->framehead_valid = true;
  70.                 }
  71.                 else
  72.                         Assert(false);  /* 如果既不是 RANGE 也不是 GROUPS,应该抛出异常 */
  73.         }
  74.         else if (frameOptions & FRAMEOPTION_START_OFFSET)
  75.         {
  76.                 /* 在 OFFSET 模式下,帧头相对于当前行的位置是通过偏移量来决定的 */
  77.                 if (frameOptions & FRAMEOPTION_ROWS)
  78.                 {
  79.                         int64 offset = DatumGetInt64(winstate->startOffsetValue);
  80.                         if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
  81.                                 offset = -offset;  /* 如果是 PRECEDING,则是负偏移量 */
  82.                         winstate->frameheadpos = winstate->currentpos + offset;
  83.                         /* 帧头不能小于第一行 */
  84.                         if (winstate->frameheadpos < 0)
  85.                                 winstate->frameheadpos = 0;
  86.                         /* 确保帧头不超出分区末尾 */
  87.                         else if (winstate->frameheadpos > winstate->currentpos + 1)
  88.                         {
  89.                                 spool_tuples(winstate, winstate->frameheadpos - 1);
  90.                                 if (winstate->frameheadpos > winstate->spooled_rows)
  91.                                         winstate->frameheadpos = winstate->spooled_rows;
  92.                         }
  93.                         winstate->framehead_valid = true;
  94.                 }
  95.                 else if (frameOptions & FRAMEOPTION_RANGE)
  96.                 {
  97.                         /*
  98.                          * 在 RANGE START_OFFSET 模式下,帧头是满足范围约束的第一行。
  99.                          * 我们保持帧头的最后已知位置,并根据需要推进。
  100.                          */
  101.                         int sortCol = node->ordColIdx[0];
  102.                         bool sub, less;
  103.                         /* 确保有排序列 */
  104.                         Assert(node->ordNumCols == 1);
  105.                         /* 计算用于范围检查的标志 */
  106.                         if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
  107.                                 sub = true;
  108.                         else
  109.                                 sub = false;
  110.                         less = false;  /* 通常,帧头应满足 >= sum */
  111.                         if (!winstate->inRangeAsc)
  112.                         {
  113.                                 sub = !sub;
  114.                                 less = true;
  115.                         }
  116.                         tuplestore_select_read_pointer(winstate->buffer, winstate->framehead_ptr);
  117.                         if (winstate->frameheadpos == 0 && TupIsNull(winstate->framehead_slot))
  118.                         {
  119.                                 /* 如果尚未获取第一行,则将其获取到 framehead_slot */
  120.                                 if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))
  121.                                         elog(ERROR, "unexpected end of tuplestore");
  122.                         }
  123.                         /* 逐行检查,直到找到满足范围条件的帧头行 */
  124.                         while (!TupIsNull(winstate->framehead_slot))
  125.                         {
  126.                                 Datum headval, currval;
  127.                                 bool headisnull, currisnull;
  128.                                 headval = slot_getattr(winstate->framehead_slot, sortCol, &headisnull);
  129.                                 currval = slot_getattr(winstate->ss.ss_ScanTupleSlot, sortCol, &currisnull);
  130.                                 if (headisnull || currisnull)
  131.                                 {
  132.                                         /* 如果其中一行的值为 NULL,按照 nulls_first 设置推进帧头 */
  133.                                         if (winstate->inRangeNullsFirst)
  134.                                         {
  135.                                                 if (!headisnull || currisnull)
  136.                                                         break;
  137.                                         }
  138.                                         else
  139.                                         {
  140.                                                 if (headisnull || !currisnull)
  141.                                                         break;
  142.                                         }
  143.                                 }
  144.                                 else
  145.                                 {
  146.                                         if (DatumGetBool(FunctionCall5Coll(&winstate->startInRangeFunc,
  147.                                                                                                            winstate->inRangeColl,
  148.                                                                                                            headval,
  149.                                                                                                            currval,
  150.                                                                                                            winstate->startOffsetValue,
  151.                                                                                                            BoolGetDatum(sub),
  152.                                                                                                            BoolGetDatum(less))))
  153.                                                 break;  /* 该行是正确的帧头 */
  154.                                 }
  155.                                 /* 即使获取失败,仍然推进帧头位置 */
  156.                                 winstate->frameheadpos++;
  157.                                 spool_tuples(winstate, winstate->frameheadpos);
  158.                                 if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))
  159.                                         break;  /* 到达分区末尾 */
  160.                         }
  161.                         winstate->framehead_valid = true;
  162.                 }
  163.                 else if (frameOptions & FRAMEOPTION_GROUPS)
  164.                 {
  165.                         /*
  166.                          * 在 GROUPS START_OFFSET 模式下,帧头是满足偏移量约束的第一组的第一行。
  167.                          */
  168.                         int64 offset = DatumGetInt64(winstate->startOffsetValue);
  169.                         int64 minheadgroup;
  170.                         if (frameOptions & FRAMEOPTION_START_OFFSET_PRECEDING)
  171.                                 minheadgroup = winstate->currentgroup - offset;
  172.                         else
  173.                                 minheadgroup = winstate->currentgroup + offset;
  174.                         tuplestore_select_read_pointer(winstate->buffer, winstate->framehead_ptr);
  175.                         if (winstate->frameheadpos == 0 && TupIsNull(winstate->framehead_slot))
  176.                         {
  177.                                 /* 如果尚未获取第一行,则将其获取到 framehead_slot */
  178.                                 if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))
  179.                                         elog(ERROR, "unexpected end of tuplestore");
  180.                         }
  181.                         /* 逐组推进帧头 */
  182.                         while (!TupIsNull(winstate->framehead_slot))
  183.                         {
  184.                                 if (winstate->frameheadgroup >= minheadgroup)
  185.                                         break;  /* 找到满足条件的帧头行 */
  186.                                 ExecCopySlot(winstate->temp_slot_2, winstate->framehead_slot);
  187.                                 winstate->frameheadpos++;
  188.                                 spool_tuples(winstate, winstate->frameheadpos);
  189.                                 if (!tuplestore_gettupleslot(winstate->buffer, true, true, winstate->framehead_slot))
  190.                                         break;  /* 到达分区末尾 */
  191.                                 if (!are_peers(winstate, winstate->temp_slot_2, winstate->framehead_slot))
  192.                                         winstate->frameheadgroup++;
  193.                         }
  194.                         ExecClearTuple(winstate->temp_slot_2);
  195.                         winstate->framehead_valid = true;
  196.                 }
  197.                 else
  198.                         Assert(false);
  199.         }
  200.         else
  201.                 Assert(false);
  202.         /* 恢复原内存上下文 */
  203.         MemoryContextSwitchTo(oldcontext);
  204. }
复制代码
  依旧通过一个详细的例子来分析该函数的详细执行过程,案例参考函数eval_windowaggregates
案例背景:
     我们盼望计算每个销售员的累计销售额。使用窗口函数 SUM(sale_amount) OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW),即每个销售员的累计销售额是从该销售员的第一个销售日期开始,到当前行的销售额的累积。
  SQL 查询:
  1. SELECT salesperson_id, sale_date, sale_amount,
  2.        SUM(sale_amount) OVER (PARTITION BY salesperson_id ORDER BY sale_date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS cumulative_sales
  3. FROM sales;
复制代码
这个查询会根据 sale_date 排序每个销售员的数据,并为每一行计算累计销售额。为了计算窗口函数,update_frameheadpos 会在内部被调用来更新每个窗口的帧头位置。
详细步骤和代码说明:
  假设我们正在处置惩罚销售员 1 的数据,查询的当前行是 2024-01-02
第一步:更新帧头位置
  当函数 update_frameheadpos 被调用时,它的作用是更新 frameheadpos,即计算当前帧的起始位置。帧头位置决定了窗口函数计算时应包罗哪些行。
1. 查抄是否已经计算了帧头位置:
  1. if (winstate->framehead_valid)
  2.     return;  /* 如果帧头已经有效,直接返回 */
复制代码
  如果帧头已经计算过了,就跳过计算,制止重复计算。
2. 切换到符合的内存上下文:
  1. oldcontext = MemoryContextSwitchTo(winstate->ss.ps.ps_ExprContext->ecxt_per_query_memory);
复制代码
  这里,我们切换到符合的内存上下文,以确保计算不会泄漏内存。
3. 计算帧头位置: 接下来根据帧的选项 (frameOptions),我们来决定帧头的位置。


  • 如果是 UNBOUNDED PRECEDING:
  1. if (frameOptions & FRAMEOPTION_START_UNBOUNDED_PRECEDING)
  2. {
  3.     winstate->frameheadpos = 0;
  4.     winstate->framehead_valid = true;
  5. }
复制代码
  这里,UNBOUNDED PRECEDING 表示帧从分区的第一行开始。因此,帧头位置就是 0,即第一行。


  • 如果是 CURRENT ROW:
  1. else if (frameOptions & FRAMEOPTION_START_CURRENT_ROW)
  2. {
  3.     if (frameOptions & FRAMEOPTION_ROWS)
  4.     {
  5.         winstate->frameheadpos = winstate->currentpos;
  6.         winstate->framehead_valid = true;
  7.     }
  8. }
复制代码
  如果是 CURRENT ROW,那么帧头就是当前行的位置。在我们的例子中,假设当前行是 2024-01-02,frameheadpos 就是当前行的位置。
第二步:处置惩罚 RANGE 或 GROUPS 模式
  如果窗口界说了 RANGE 或 GROUPS,我们必要根据排序规则找到当前行地点的组,并确定该组的第一行作为帧头。
4. 如果没有排序列(ORDER BY):
  1. if (node->ordNumCols == 0)
  2. {
  3.     winstate->frameheadpos = 0;
  4.     winstate->framehead_valid = true;
  5.     MemoryContextSwitchTo(oldcontext);
  6.     return;
  7. }
复制代码
  如果没有界说排序列,那么所有行被以为是同一组,帧头位置就是 0,即分区的第一行。
5. 如果有排序列
  如果有排序列,我们会根据当前行的值和分区内其他行的值,找到与当前行同组的第一行作为帧头。例如,如果是 2024-01-02 的数据,程序会查找销售员 1 中销售额最早的那一行(即 2024-01-01)。
5. 查找同组的第一行
  1. while (!TupIsNull(winstate->framehead_slot))
  2. {
  3.     if (are_peers(winstate, winstate->framehead_slot, winstate->ss.ss_ScanTupleSlot))
  4.         break;  /* 找到当前行同组的第一行作为帧头 */
  5.     winstate->frameheadpos++;
  6.     spool_tuples(winstate, winstate->frameheadpos);
  7. }
复制代码
  这里,我们通过查抄每一行是否与当前行同组(are_peers 函数),找到属于同组的第一行,作为帧头。
第三步:更新帧头位置和返回
7. 设置帧头有效:
  1. winstate->framehead_valid = true;
复制代码
  一旦计算出帧头位置,就将 framehead_valid 设置为 true,表示帧头计算完成。
8. 规复内存上下文:
  1. MemoryContextSwitchTo(oldcontext);
复制代码
  最后,规复之前的内存上下文,确保内存管理的正确性。
详细例子:
  1. 假设当前行是 2024-01-02,销售员 1。
  2. 查询的窗口帧使用的是 ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW。
  3. 第一步:frameheadpos 将被设置为 0,即从 2024-01-01 开始。
  4. 第二步:在 RANGE 模式下,程序检查是否有排序列,并找到销售员 1 在 2024-01-01 的销售额作为帧头。
  5. 第三步:最终,帧头位置 frameheadpos 被设置为 0,并且标记为有效。
复制代码
因此,当前行的累计销售额将从 2024-01-012024-01-02,依此类推。
  窗口模式通过差别的帧界说方式,影响了窗口函数的计算范围,从而决定了聚合计算的结果。
   

  • UNBOUNDED PRECEDING:帧从分区的第一行开始,适用于计算从分区开始到当前行的累计值。
  • CURRENT ROW:帧仅包含当前行,适用于每行单独计算(如排名)。
  • RANGE:帧的起始位置是当前行地点同组的第一行,适用于基于排序的聚合(如销售排名)。
  • OFFSET:帧的起始位置是当前行位置的偏移,适用于计算行之间的偏移聚合。
  • GROUPS:帧的起始位置是当前行地点组的第一行,适用于按组聚合。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

老婆出轨

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表