ToB企服应用市场:ToB评测及商务社交产业平台

标题: MaxCompute 近及时增全量处理一体化新架构和使用场景先容 [打印本页]

作者: 冬雨财经    时间: 2024-6-15 01:57
标题: MaxCompute 近及时增全量处理一体化新架构和使用场景先容
随着当前数据处理业务场景日趋复杂,对于大数据处理平台底子架构的能力要求也越来越高,既要求数据湖的大存储能力,也要求具备海量数据高效批处理能力,同时还可能对延时敏感的近及时链路有强需求,本文重要介基于 MaxCompute 的离线近及时一体化新架构如何来支持这些综合的业务场景,提供近及时增全量一体的数据存储和计算(Transaction Table2.0)解决方案。

业务配景与近况

当前典范的数据处理业务场景中,对于时效性要求低的大规模数据全量批处理的单一场景,直接使用 MaxCompute 足以很好的满足业务需求。但随着 MaxCompute 承载的业务无论是规模,还是使用场景,都越来越丰富,在处理好大规模离线批处理链路的同时,用户对近及时和增量处理链路也有很多的需求,下图展示了部分业务场景。





好比近及时数据导入链路,依赖平台引擎具备事务隔离,小文件自动归并等能力,又好比增全量数据归并链路,还依赖增量数据存储和读写,主键等能力。MaxCompute以前不具备新架构能力之前,要支持这些复杂的综合业务场景,只能通过下图所示的三种解决方案,但无论使用单一引擎大概联邦多引擎都存在一些无法解决的痛点。




方案一,只使用单一的MaxCompute离线批处理解决方案,对于近及时链路大概增量处理链路通常需要转化成T+1的批处理链路,会一定水平上增加业务逻辑复杂度,且时效性也较差,存储成本也可能较高。

方案二,只使用单一的及时引擎,那资源成本会较高,性价比力低,且对于大规模数据批处理链路的稳固性和灵活性也存在一些瓶颈。

方案三,使用典范的Lambda架构,全量批处理使用MaxCompute链路,时效性要求比力高的增量处理使用及时引擎链路,但该架构也存在大家所熟知的一些固有缺陷,好比多套处理和存储引擎引发的数据不一致问题,多份数据冗余存储和计算引入的额外成本,架构复杂以及开辟周期长等问题。

这些解决方案在成本,易用性,低延时,高吞吐等方面相互制约,很难同时具备较好的效果,这也驱动着MaxCompute有必要开辟新的架构既能满足这些业务场景需求,也能提供较低的成本和较好的用户体验。

近几年在大数据开源生态中,针对这些问题已经形成了一些典范的解决方案,最盛行的就是Spark/Flink/Trino开源数据处理引擎,深度集成Hudi / Delta Lake / Iceberg / Paimon开源数据湖,践行开放统一的计算引擎和统一的数据存储思想来提供解决方案,解决Lamdba架构带来的一系列问题。同时MaxCompute近一年多在离线批处理计算引擎架构上,自研设计了离线&近及时数仓一体化架构,在保持经济高效的批处理上风下,同时具备分钟级的增量数据读写和处理的业务需求,另外,还可提供Upsert,Time travel等一系列实用功能来扩展业务场景,可有效地节省数据计算,存储和迁移成本,切实提高用户体验。

离线&近及时增全量一体化业务架构





上图所示即为MaxCompute高效支持上述综合业务场景的全新业务架构

写入端会融合多种数据集成工具将丰富的数据源近及时增量或批量导入到统一的MaxCompute表存储中,存储引擎的表数据管理服务会自动优化编排数据存储结构来治理小文件等问题;使用统一的计算引擎支持近及时增量和大规模离线批量分析处理链路;由统一的元数据服务支持事务机制和海量文件元数据管理。统一的新架构带来的上风也是非常显著,可有效解决纯离线系统处理增量数据导致的冗余计算和存储、时效低等问题,也能避免及时系统高昂的资源消耗成本,同时可消除Lambda架构多套系统的不一致问题,镌汰冗余多份存储成本以及系统间的数据迁移成本。

简言之,一体化新架构既可以满足增量处理链路的计算存储优化以及分钟级的时效性,又能包管批处理的团体高效性,还能有效节省资源使用成本。

目前新架构已支持了部分焦点能力,包括主键表,Upsert及时写入,Time travel查询,增量查询,SQL DML操纵,表数据自动治理优化等,更详细的架构原理和相关操纵引导请参考↓官网架构原理和用户操纵文档。


业务场景实践

本章节重点先容新架构如何支持一些典范的业务链路以及产生的优化效果。

1. 表存储和数据治理优化

本章节重要先容建表操纵和关键表属性的寄义,以及根据业务场景如何设置表属性值以达到最佳效果,也会简单描述一下存储引擎配景如何自动优化表数据。

▶ 建表




首先,一体化新架构需要设计统一的表格式来存储差别格式的数据以支撑差别业务场景的数据读写,这里称为Transaction Table2.0,简称TT2,可以同时支持既有的批处理链路,以及近及时增量等新链路的全部功能。
建表语法参考官网,简单示例:
  1. createtable tt2 (pk bigint notnullprimarykey, val string) tblproperties ("transactional"="true");
  2. createtable par_tt2 (pk bigint notnullprimarykey, val string)
  3. partitioned by (pt string) tblproperties ("transactional"="true");
复制代码
只需要设置主键Primary Key(PK),以及表属性transactional为true,就可以创建一张TT2。PK用来保障数据行的unique属性,transactional属性用来配置ACID事务机制,满足读写快照隔离。

▶ 关键的表属性配置

详细属性配置参考官网,简单示例:
  1. createtable tt2 (pk bigint notnullprimarykey, val string)  tblproperties ("transactional"="true", "write.bucket.num" = "32", "acid.data.retain.hours"="48");
复制代码

此属性非常告急,体现每个partition大概非分区表的分桶数量,默认值为16,全部写入的记录会根据PK值对数据进行分桶存储,类似PK值的记录会落在同一个桶中。非分区表不支持修改,分区表可修改,但只有新分区生效。

数据写入和查询的并发度可通过bucket数量来水平扩展,每个并发可至少处理一个桶数据。但桶数量并不是越多越好,对于每个数据文件只会归属一个桶,因此桶数量越多,越容易产生更多的小文件,进一步可能增加存储成本和压力,以及读取效率。因此需要结合数据写入的吞吐,延时,总数据的巨细,分区数,以及读取延时来团体评估合理的桶数量。

别的,数据分桶存储也非常有助于提拔点查场景性能,如果查询语句的过滤条件为详细的PK值,那查询时可进行高效的桶裁剪和数据文件裁剪,极大镌汰查询的数据量。





此属性也很告急,代表time travel查询时可以读取的历史数据实践范围,默认值是1天,最大支持7天。

建议用户按真实的业务场景需求来设置合理的时间周期,设置的时间越长,保存的历史数据越多,产生的存储费用就越多,而且也会一定水平上影响查询效率,如果用户不需要time travel查询历史数据,建议此属性值设置为0,代表关掉time travel功能,如许可以有效节省数据历史状态的存储成本。
 
▶ Schema Evolution操纵

TT2支持完整的Schema Evolution操纵,包括增加和删除列。在time travel查询历史数据时,会根据历史数据的Schema来读取数据。另外PK列不支持修改。

详细DDL语法参考官网,简单示例:
  1. altertable tt2 add columns (val2 string);altertable tt2 drop columns val;
复制代码

▶ 表数据自动治理优化


TT2典范场景之一是支持分钟级近及时增量数据导入,因此可能导致增量小文件数量膨胀,尤其是桶数量较大的环境,从而引发存储访问压力大、成本高,数据读写IO效率低下,文件元数据分析慢等问题,如果Update/Delete格式的数据较多,也会造成数据中心状态的冗余记录较多,进一步增加存储和计算的成本,查询效率低落等问题。

为此,配景存储引擎配套支持了合理高效的表数据服务对存储数据进行自动治理和优化,低落存储和计算成本,提拔分析处理性能。





如上图所示,展示了分区表的数据结构,先按照分区对数据文件进行物理隔离,差别分区的数据在差别的目录之下; 每个分区内的数据按照桶数量来切分数据,每个桶的数据文件单独存放; 每个桶内的数据文件范例重要分成三种:







如上图所示,TT2的表数据服务重要分成Auto Sort / Auto Merge / Auto Compact / Auto Clean四种,用户无需主动配置,存储引擎配景服务会智能的自动网络各个维度的数据信息,配置合理的计谋自动实行。



如果用户对于查询性能的要求非常高,也可实验手动实行全量数据的major compact操纵,每个桶的全部数据会消除全部的历史状态,而且额外生成一个新的Aliorc列存数据文件,用于高效查询,但也会产生额外的实行成本,以及新文件的存储成本,因此非必要尽量不实行。

详细语法参考官网,简单示例:
  1. set odps.merge.task.mode=service;altertable tt2 compact major;
复制代码


2. 数据写入场景业务实践

本章节重要先容部分典范的写入场景业务实践。

▶ 分钟级近及时 Upsert 写入链路

MaxCompute离线架构一般在小时或天级别批量导入增量数据到一张新表大概新分区中,然后配置对应的离线ETL处理链路,将增量数据和存量表数据实行Join Merge操纵,生成最新的全量数据,此离线链路的延时较长,计算和存储也会消耗一定的成本。

使用新架构的upsert及时导入链路根本可以保持数据从写入到查询可见的延时在5-10分钟,满足分钟级近及时业务需求,而且不需要复杂的ETL链路来进行增全量的Merge操纵,节省相应的计算和存储成本。

实际业务数据处理场景中,涉及的数据源丰富多样,可能存在数据库、日志系统大概其他消息队列等系统,为了方便用户数据写入TT2, MaxCompute深度定制开辟了开源Flink Connector工具,针对高并发、容错、事务提交等场景做了定制化的设计及开辟优化,以满足延时低、精确性高等要求,同时也能很好的对接融合Flink生态。详细使用细节可以参考官网产品说明





上图简单展示了团体写入的流程,可总结如下重要关键点:


▶ 部分列增量更新链路

该链路可用来优化将多张增量表的数据列拼接到一张大宽表的场景,比力类似多流join的业务场景。



如上图所示,左边展示了MaxCompute的离线ETL链路处理此类场景,将多张增量表按照比力固定的时间来对齐数据,通常小时/天级别,然后触发一个join使命,把全部表的数据列拼接起来生成大宽表,如果有存量数据,还需要实行类似upsert的ETL链路。因此团体ETL链路延时较长,流程复杂,也比力消耗计算和存储资源,数据也容易遇到无法对齐的场景。

右边展示了通过TT2表支持部分列更新的能力,只需要将各个表的数据列及时增量更新到TT2大宽表中即可,TT2表的配景Compact服务以及查询时,会自动把类似PK值的数据行拼接成一行数据。该链路根本完全解决了离线链路遇到的问题,延时从小时/天级别低落到分钟级,而且链路简单,几乎是ZeroETL,也能成倍节省计算和存储成本。

目前支持以下两种方式进行部分列更新,功能还在灰度上线中,还未发布到官网(预计两个月内在公共云发布)。

  1. createtable tt2 (pk bigint notnullprimarykey, val1 string, val2 string, val3 string) tblproperties ("transactional"="true");
  2. insertinto tt2 (pk, val1) select pk, val1 from table1;
  3. insertinto tt2 (pk, val2) select pk, val2 from table2;
  4. insertinto tt2 (pk, val3) select pk, val3 from table3;
复制代码


▶ SQL DML / Upsert 批处理链路

为了方便用户操纵TT2表,MaxCompute计算引擎对SQL全套的数据查询DQL语法和数据操纵DML语法进行了支持,保障离线链路的高可用和精良的用户体验。SQL引擎的内核模块包括Compiler、Optimizer、Runtime等都做了专门适配开辟以支持相关功能和优化,包括特定语法的剖析,特定算子的Plan优化,针对pk列的去重逻辑,以及runtime upsert并发写入等。



数据处理完成之后,会由Meta Service来实行事务辩论检测,原子更新数据文件元信息等,保障读写隔离和事务一致性。

SQL DML详细语法可参考官网文档,对于Insert / Update / Delete / Merge Into都有详细的先容和示例。

对于Upsert批式写入能力,由于TT2表配景服务大概查询时会自动根据PK值来归并记录,因此对于Insert + Update场景,不需要使用复杂的Update/Merge Into语法,可统一使用Insert into插入新数据即可,使用简单,而且能节省一些读取IO和计算资源。

3. 数据查询场景业务实践

本章节重要先容部分典范的查询场景业务实践。

▶ Time travel查询

基于TT2,计算引擎可高效支持Time travel查询的典范业务场景,即查询历史版本的数据,可用于回溯业务数据的历史状态,或数据出错时,用来恢复历史状态数据进行数据改正。

详细语法参考官网,简单示例:​​​​​​​
  1. //查询指定时间戳的历史数据select * from tt2 timestampasof'2024-04-01 01:00:00';//查询5分钟之间的历史数据select * from tt2 timestampasofcurrent_timestamp() - 300;//查询截止到最近第二次Commit写入的历史数据select * from tt2 timestampasof get_latest_timestamp('tt2', 2);
复制代码

可查询的历史数据时间范围,可通过表属性acid.data.retain.hours来配置,配置计谋上文已先容,配置参数详解参考官网。





SQL引擎吸收到用户侧输入的time travel查询语法后,会先从Meta服务中剖析出来要查询的历史数据版本,然后过滤出来要读取的Compacted file和Delta file,进行归并merge输出,Compacted file可极大提拔读取效率。

结合上图示例进一步描述查询细节:





▶ 增量查询

TT2表支持增量写入和存储,最告急的一个思量就是支持增量查询以及增量计算链路,为此,也专门设计开辟了新的SQL增量查询语法来支持近及时增量处理链路。用户通过增量查询语句可灵活构建增量数仓业务链路,近期正在规划开辟支持增量物化视图来进一步简化使用门槛,提拔用户体验,低落用户成本。

支持两种增量查询语法:

  1. //查询2024-04-0101:00:00-01:10:00之间十分钟的增量数据
  2. select * from tt2 timestampbetween'2024-04-01 01:00:00'and'2024-04-01 01:10:00';
  3. //查询前10分钟到前5分钟之间的增量数据
  4. select * from tt2 timestampbetweencurrent_timestamp() - 601andcurrent_timestamp() - 300;
  5. //查询最近一次commit的增量数据
  6. select * from tt2 timestampbetween get_latest_timestamp('tt2', 2) and get_latest_timestamp('tt2');
复制代码


  1. //绑定一个stream对象到tt2表上create stream tt2_stream ontable tt2;insertinto tt2 values (1, 'a'), (2, 'b');//自动查询出来新增的两条记录(1, 'a'), (2, 'b'), 并把下一次的查询版本更新到最新的数据版本insert overwrite dest select * from tt2_stream;insertinto tt2 values (3, 'c'), (4, 'd');//自动查询出来新增的两条记录(3, 'c'), (4, 'd')insert overwrite dest select * from tt2_stream;
复制代码




SQL引擎吸收到用户侧输入的增量查询语法后,会先从Meta服务中剖析出来要查询的历史增量数据版本,然后过滤出来要读取的Delta file列表,进行归并merge输出。

结合上图示例进一步描述查询细节:


▶ PK 点查 DataSkipping 优化




上文提到,TT2表的数据分布和索引根本是按照PK列值进行构建的,因此如果对TT2表进行点查,并指定了PK值进行过滤的话,将会极大镌汰要读取的数据量和读取耗时,资源消耗可能也会成百上千倍的镌汰。好比,TT2表总的数据记录是1亿,颠末过滤后真正从数据文件中读取的数据记录可能只有一万条。

重要的DataSkipping优化包括:


遵照通例的SQL查询语法,简单示例:
  1. select * from tt2 where pk = 1;
复制代码

▶ SQL查询分析Plan优化




由于TT2表数据按照PK值进行分桶分布的,而且桶内部数据查询出来具备Unique属性和Sort有序性,因此SQL Optimizer利用这些属性可以做大量的优化。

好比图中示例的SQL语句 (假设tt2_t1和tt2_t2的桶数量类似),SQL Optimizer可做的重要优化如下:


这些消除的算子都极为消耗资源,因此这些优化可团体让性能提拔1倍以上。

遵照通例的SQL查询语法,简单示例:​​​​​​​
  1. select * from (selectdistinct pk from tt2_t1) tjoin (selectdistinct pk from tt2_t2) t2 on t.pk = t2.pk;
复制代码
4. 数据库整库及时同步写入 MaxCompute

当前数据库和大数据处理引擎都有各自擅长的数据处理场景,部分复杂的业务场景同时需要OLTP/OLAP/离线分析引擎对数据进行分析处理,因此数据也需要在各个引擎之间活动。将数据库的单表大概整库的变更记录及时同步到MaxCompute进行分析处理是目前比力典范的业务链路。




如上图所示,左边流程是之前MaxCompute支持此类场景的典范ETL处理链路,按照小时/天级别读取数据库的变更记录写入到MaxCompute一张临时的增量表中,然后将临时表和存量的全量表进行Join Merge处理,生成新的全量数据。此链路较复杂,而且延时较长,也会消耗一定的计算和存储成本。

右边流程则是使用新架构支持该场景,直接按照分钟级别及时读取数据库的变更记录upsert写入到TT2表即可。链路极简单,数据可见低落到分钟级,只需要一张TT2表即可,计算和存储成本降到最低。

目前MaxCompute集成了两种方式支持该链路:


上风

MaxCompute离线&近及时数仓一体化新架构会尽量覆盖部分近及时数据湖(HUDI/ICEBERG等)的通用功能,别的,作为完全自研设计的新架构,在低成本,功能,性能,稳固性,集成等方面也具备很多独特亮点:


生产近况和未来规划
团体功能邀测运行大概半年时间,单中国公共云已经超过100+ project, 700+张TT2表存在有效数据存储和读写,近及时链路和Upsert能力已经在部分客户的生产链路上得到充实验证。

未来半年规划:


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4