flinksql-Queries查询相关实战

打印 上一主题 下一主题

主题 801|帖子 801|积分 2403


  • 分组聚合
   
  --分组集
  --GROUPING SETS() 答应你定义特定的分组方式,这样你可以选择只对感兴趣的分组进行计算。
  --通过手动指定不同的分组组合,你能够灵活地控制数据的聚合结果。
  --与 ROLLUP 和 CUBE 不同,GROUPING SETS 不会主动天生全部子集组合,而是只天生你指定的那些。
  SELECT supplier_id, rating, COUNT(*) AS total
  FROM (VALUES
      ('supplier1', 'product1', 4),
      ('supplier1', 'product2', 3),
      ('supplier2', 'product3', 3),
      ('supplier2', 'product4', 4))
  AS Products(supplier_id, product_id, rating)
  GROUP BY GROUPING SETS ((supplier_id, rating), (supplier_id), ());
  
  --ROLLUP
  --ROLLUP() 用于实验分层级别的聚合,主要用于必要按顺序逐层汇总数据的场景。
  --与 CUBE() 不同,ROLLUP() 只天生按从左到右逐步减少维度的组合,而不是全部大概的子集组合。
  --例如,ROLLUP(a, b, c) 会天生 (a, b, c), (a, b), (a), 和 (),而不会像 CUBE() 那样天生全部的大概组--合。
  SELECT supplier_id, rating, COUNT(*)
  FROM (VALUES
      ('supplier1', 'product1', 4),
      ('supplier1', 'product2', 3),
      ('supplier2', 'product3', 3),
      ('supplier2', 'product4', 4))
  AS Products(supplier_id, product_id, rating)
  GROUP BY ROLLUP (supplier_id, rating);
  
  --立方体
  --CUBE() 是一种扩展的 GROUP BY 操纵,答应你针对多列进行分组聚合,并天生每种大概的维度组合的聚合结果。
  --假如使用了 CUBE(a, b, c),Flink 会计算出全部 a, b, c 及其子集的组合的聚合结果。
  --在数据分析和 OLAP(在线分析处理)场景中,CUBE 常用来计算多维数据的统计值。
  SELECT supplier_id, rating, COUNT(*)
  FROM (VALUES
      ('supplier1', 'product1', 4),
      ('supplier1', 'product2', 3),
      ('supplier2', 'product3', 3),
      ('supplier2', 'product4', 4))
  AS Products(supplier_id, product_id, rating)
  GROUP BY CUBE (supplier_id, rating);
  

  • 窗口函数TVF
   
  --注:不支持cdc模式,因为窗口函数只支持追加模式的,不支持update与delete操纵
  --模仿表
  CREATE TABLE bid (
      `id` string,
      bidtime TIMESTAMP(3),
      price DECIMAL(10, 2),
      item string,
      ts as bidtime,
      WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
      --proc_time AS PROCTIME(),  
      PRIMARY KEY (`id`) NOT ENFORCED
    )
  WITH
    (
      'connector' = 'jdbc',
      ${36},
      'table-name' = 'bid'
    );
  
    --滚动窗口
    --  SELECT cast(window_start as string) AS window_start, cast(window_end as string) AS window_end , SUM(price) AS total_price
    --  FROM TABLE(
    --  TUMBLE(TABLE  bid, DESCRIPTOR(ts), INTERVAL '10' MINUTES))
    --  GROUP BY window_start, window_end;
  
  --滑动窗口
  --  SELECT cast(window_start as string) AS window_start, cast(window_end as string) AS window_end , SUM(price) AS total_price
  --    FROM TABLE(
  --      HOP(TABLE bid, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '10' MINUTES))
  --    GROUP BY window_start, window_end;
  
  --累计窗口
  --  SELECT  cast(window_start as string) AS window_start, cast(window_end as string) AS window_end , SUM(price) AS total_price
  --    FROM TABLE(
  --      CUMULATE(TABLE bid, DESCRIPTOR(ts), INTERVAL '5' MINUTES, INTERVAL '20' MINUTES))
  --    GROUP BY window_start, window_end;
  
   --会话窗口(不支持批处理)
   SELECT window_start, window_end, item, SUM(price) AS total_price
    FROM TABLE(
        SESSION(TABLE bid PARTITION BY item, DESCRIPTOR(ts), INTERVAL '5' MINUTES))
    GROUP BY item, window_start, window_end;
  

  • 窗口聚合
        
  CREATE TABLE bid (
      `id` string,
      bidtime TIMESTAMP(3),
      price DECIMAL(10, 2),
      item string,
      supplier_id string,
      ts as bidtime,
      WATERMARK FOR ts AS ts - INTERVAL '5' SECOND,
      --proc_time AS PROCTIME(),  
      PRIMARY KEY (`id`) NOT ENFORCED
    )
  WITH
    (
      'connector' = 'jdbc',
      ${36},
      'table-name' = 'bid'
    );
  
  --分组集
  --  SELECT  cast(window_start as string) AS window_start, cast(window_end as string) AS window_end , supplier_id, SUM(price) AS total_price
  --    FROM TABLE(
  --      TUMBLE(TABLE bid, DESCRIPTOR(ts), INTERVAL '10' MINUTES))
  --    GROUP BY window_start, window_end, GROUPING SETS ((supplier_id), ());
  
  --ROLLUP
  --  SELECT cast(window_start as string) AS window_start, cast(window_end as string) AS window_end , supplier_id, SUM(price) AS total_price
  --  FROM TABLE(
  --      TUMBLE(TABLE bid, DESCRIPTOR(ts), INTERVAL '10' MINUTES))
  --  GROUP BY window_start, window_end, ROLLUP (supplier_id);
  
  --立方体
  SELECT cast(window_start as string) AS window_start, cast(window_end as string) AS window_end , supplier_id, item, SUM(price) AS total_price
    FROM TABLE(
      TUMBLE(TABLE bid, DESCRIPTOR(ts), INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end, CUBE (supplier_id, item);
  

  • Over聚合(FLINK-CDC不支持)
        
  CREATE TABLE bid (
      `id` string,
      bidtime TIMESTAMP(3),
      price DECIMAL(10, 2),
      item string,
      supplier_id string,
      --proc_time AS PROCTIME(),  
      WATERMARK FOR bidtime AS bidtime - INTERVAL '5' SECOND,
      PRIMARY KEY (`id`) NOT ENFORCED
    )
  WITH
    (
      'connector' = 'jdbc',
      ${36},
      'table-name' = 'bid'
    );
  
 
  --  SELECT
  --    agg_func(agg_col) OVER (
  --      [PARTITION BY col1[, col2, ...]]
  --      ORDER BY time_col
  --      range_definition),
  --    ...
  --  FROM ...
  --OVER 窗口必要数据是有序的。因为表没有固定的排序,所以 ORDER BY 子句是强制的。对于流式查询,Flink 目前只支持 OVER 窗口定义在升序(asc)的 时间属性 上。其他的排序不支持。
  --ORDER BY:必须是时间戳列(事件时间、处理时间),只能升序
  --range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定命据范围的方式。第一种为按照行数聚合,第二种为按照时间区间聚合
  --不指定 range_definition 时:Flink 默认使用 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW。
  --  SELECT supplier_id, cast(bidtime as string) as bidtime, price,
  --    SUM(price) OVER (
  --         PARTITION BY supplier_id
  --      ORDER BY bidtime
  --    ) AS sum_pri
  --  FROM bid
  --  ;
  
  --WINDOW 子句可用于在 SELECT 子句之外定义 OVER 窗口。它让查询可读性更好,也答应多个聚合共用一个窗口定义。
   SELECT supplier_id, cast(bidtime as string) as bidtime, price,
    SUM(price) OVER w AS sum_pri,
    avg(price)  OVER w AS avg_pri
  FROM bid
  WINDOW  w as (    PARTITION BY supplier_id
      ORDER BY bidtime )
  ;
  

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

惊落一身雪

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表