大数据之数据湖Apache Hudi

打印 上一主题 下一主题

主题 692|帖子 692|积分 2076

一、Hudi框架概述

  Apahe Hudi (Hadoop Upserts delete and Incrementals) 是Uber主导开辟的开源数据湖框架,为了解决大数据生态系统中必要插入更新及增量消耗原语的摄取管道和ETL管道的低效问题,该项目在2016年开始开辟,并于2017年开源,2019年1月进入 Apache 孵化器,且2020年6月称为Apache 顶级项目
  官网:Apache Hudi | An Open Source Data Lake Platform | Apache Hudi
  一)数据湖Data Lake

   数据湖是大数据架构的新范式,以原始格式存储数据,可以满足用户的广泛需求,并能提供更快的洞察力,过细的数据编录和管理是成功实施数据湖的关键。

  仓库(WareHouse)是人为提前建造好的,有货架,还有过道,而且还可以进一步为放置到货架的物品指定位置。

  1、什么是数据湖

  数据湖(Data Lake)和数据库、数据仓库一样,都是数据存储的计划模式。数据库和数据仓库会以关系型的方式来计划存储、处置惩罚数据。但数据湖的计划理念是相反的,数据仓库是为了保障数据的质量、数据的一致性、数据的重用性等对数据进行结构化处置惩罚。

  


  数据湖是一个数据存储库,可以使用数据湖来存储大量的原始数据。现在企业的数据仓库都会通过分层的方式将数据存储在文件夹、文件中,而数据湖使用的是平面架构来存储数据。我们必要做的只是给每个数据元素分配一个唯一的标识符,并通过元数据标签来进行标注。当企业中出现业务问题时,可以从数据湖中查询数据,然后分析业务对应的那一小部分数据集来解决业务问题

  


  数据湖越来越多的用于形貌任何的大型数据池,数据都是以原始数据方式存储,知道必要查询应用数据的时间才会开始分析数据需求和应用架构

  数据湖是专注于原始数据生存以及低本钱长期存储的存储计划模式,它相当于是对数据仓库的补充。数据湖是用于长期存储数据容器的集合,通过数据湖可以大规模的捕获、加工、探索任何形式的原始数据。通过使用一些低本钱的技术,可以让下游办法可以更好地利用,下游办法包括像数据集市、数据仓库或者是机器学习模型。
  

  数据湖(DataLake)是形貌数据存储策略的方式,并不与具体的某个技术框架关联,与数据库、数据仓库也一样,它们都是数据的管理策略,数据湖最核心的本领包括:
  


  2、数据湖的优点

  数据湖(Data Lake)是一个以原始格式存储数据的存储库或系统。它按原样存储数据,而无需事先对数据进行结构化处置惩罚。一个数据湖可以存储结构化数据(如关系型数据库中的表),半结构化数据(如CSV、日志、XML、JSON),非结构化数据(如电子邮件、文档、PDF)和二进制数据(如图形、音频、视频)。数据湖有如下几个方面优点:


  • 提供不限数据类型的存储;
  • 开辟职员和数据科学家可以快速动态创建数据模型、构建应用、查询数据,非常灵活;
  • 由于数据湖没有固定的结构,以是更易于访问;
  • 长期存储数据的本钱低廉,数据湖可以安装在低本钱的硬件在,例如:在一般的X86机器上部署Hadoop;
  • 由于数据湖是非常灵活的,它允许使用多种不同的处置惩罚、分析方式来让数据发挥价值,例如:数据分析、及时分析、机器学习以及SQL查询都可以
  3、Data Lake vs Data warehouse

数据湖和数据仓库是用于存储大数据的两种不同策略,最大区别是:数据仓库是提前计划好模式(schema)的,由于数据仓库中存储的都是结构化数据。而在数据湖中,不一定是这样的,数据湖中可以存储结构化和非结构化的数据,是无法预先界说好结构的
  


  7个方面临比认识数据仓库和数据湖不同点


  • 第一点:数据的存储位置不同

    • 数据仓库由于是要有结构的,在企业中很多都是基于关系型模型;
    • 数据湖通常位于分布式存储,例如Hadoop或者类似的大数据存储中;

  • 第二点:数据源不同

    • 数据仓库的数据泉源很多时间来自于OLTP应用的结构化数据库中提取的,用于支持内部的业务部分(例如:销售、市场、运营等部分)进行业务分析;
    • 数据湖的数据泉源可以是结构化的、也可以是非结构化的,例如:业务系统数据库、 IOT装备、社交媒体、移动APP等;

  • 第三点:用户不同

    • 数据仓库主要是业务系统的大量业务数据进行统计分析,以是会应用数据分析的部分是数据仓库的主要用户,例如:销售部、市场部、运营部、总裁办等等。而当必要一个大型的存储,而当前没有明确的数据应用用户或者是目标,将来想要使用这些数据的人可以在使用时开始计划架构,此时,数据湖更得当;
    • 数据湖中的数据都是原始数据,是未经整理的,这对于平凡的用户几乎是不可用的。数据湖更得当数据科学家,由于数据科学家可以应用模型、技术发觉数据中的价值,去解决企业中的业务问题;

  • 第四点:数据质量不同

    • 数据仓库是非常重数据质量的,各人现在经常听说的数据中台,其中有一大块是数据质量管理、数据资产管理等。数据仓库中的数据都是颠末处置惩罚的;
    • 数据湖中的数据可靠性是较差的,这些数据大概是任意状态、形态的数据;

  • 第五点:数据模式

    • 数据仓库在数据写入之前就要界说好模式(schema),例如:会先创建模型、创建表结构,然后导入数据,可以把它称之为write-schema;
    • 数据湖中的数据是没有模式的,直到有效户要访问数据、使用数据才会创建schema,可以把它称之为read-schema;

  • 第六点:灵敏扩展性

    • 数据仓库的模式一旦创建,要重新调整模式,每每代价很大,牵一发而动全身,全部相关的ETL程序大概都必要调整;
    • 数据湖是非常灵活的,可以根据必要重新设置结构或者模式;

  • 第七点:应用不同

    • 数据仓库一般用于做批处置惩罚报告、BI、可视化;
    • 数据湖主要用于机器学习、猜测分析、数据探索和分析;

 
  基于上述内容,可以了解到,数据湖和数据仓库的应用点是不一样的。它们是两种相对独立的数据计划模式。在一些企业中,大概会既有数据湖、又有数据仓库。数据湖并不是要更换数据仓库,而是对企业的数据管理模式进行补充;
  3、数据湖框架

  Apache Hudi:提供的fast upsert/delete以及compaction等功能


  • 官网:Apache Hudi | An Open Source Data Lake Platform | Apache Hudi
  



  • Hudi 计划目标正如其名,Hadoop Upserts Deletes and Incrementals(原为 Hadoop Upserts anD Incrementals),强调其主要支持Upserts、Deletes和Incrementa数据处置惩罚,其主要提供的写入工具是 Spark HudiDataSource API 和自身提供的 DeltaStreamer。
  • 支持三种数据写入方式:UPSERT,INSERT 和 BULK_INSERT。其对 Delete 的支持也是通过写入时指定一定的选项支持的,并不支持纯粹的 delete 接口。
  

  4、湖仓一体(Data Lakehouse)

    背景概述

  Data Lakehouse(湖仓一体)是新出现的一种数据架构,它同时吸收了数据仓库和数据湖的优势,数据分析师和数据科学家可以在同一个数据存储中对数据进行利用,同时它也能为公司进行数据管理带来更多的便利性


  • 数据仓库曾不停做为决议支持系统的支持平台。数据仓库使用良好计划的模式规范数据,例如星形模型、雪花模型和正常范式等。数据仓库无法生成数据所需的洞察
  • Hadoop 生态系统迅速演进,进而出现了称为“数据湖”的非结构化数据存储和处置惩罚新范式。数据湖由于缺乏结构和管理,会迅速沦为“数据沼泽”
  

LakeHouse是一种结合了数据湖和数据仓库优势的新范式。LakeHouse使用新的系统计划:直接在用于数据湖的低本钱存储上实现与数据仓库中类似的数据结构和数据管理功能。假如你现在必要重新计划数据仓库,鉴于现在存储(以对象存储的形式)便宜且高可靠,不妨可以使用LakeHouse。
如下展示,从数据仓库DataWarehouse到数据湖DataLake,再到湖仓一体LakeHouse。
  

    概念界说

  湖仓一体架构力图结合数据仓库的弹性和数据湖的灵活性。人们创建数据仓库来支持商业智能,主要用例包括编制报表、发布下游数据集市(Data Marts),以及支持自助式商业智能等。数据湖的概念来自于数据科学对数据的探索,主要用例包括通过快速实行创建和检验假设,以及利用半结构化和非结构化数据等。
  

  湖仓一体具有以下五个关键特性:


  • 支持分析结构化和非结构化数据;
  • 实用于分析师和数据科学家,不光支持报表,而且支持机器学习和人工智能相关用例;
  • 数据可管理,制止产生沼泽;
  • 架构鲁棒安全,确保利益相关者能准确访问以数据为中央的安全架构;
  • 以公道代价实现有效扩展。
    架构原则

  LakeHouse是一种新的数据管理范式,从根本上简化了企业数据基础架构,而且有望在机器学习已渗透到每个行业的期间加速创新。
  LakeHouse的核心计划要素


  • 可靠的湖上数据管理
  • 支持机器学习与数据科学
  • 高性能的SQL引擎
  

湖仓一体概念架构中,各核心组件通过有效的组织,形成了全新的湖仓一体范式。
l  支持结构化和非结构化数据。无论是类关系数据库那样静态存储,还是以及时数据流方式提供,数据均可转化为洞察。
l  数据抽取(Data ingestion)服务提供多种抽取方式,将数据抽取到数据湖中,既可以满足批处置惩罚需求,也可以满足流式加载需求。一条经验法则是,数据抽取中不做任何数据转换。
l  抽取的数据存储在数据湖的原始数据区域,该区域也称为“青铜层”(bronze layer)。数据依照源数据结构进行管理,实现源数据和下游分析的解耦。
l  数据处置惩罚(data processing)服务处置惩罚原始数据区域中的数据,执行清洗、归并、复杂业务逻辑等利用,并进一步预备好实用于人工智能、商业智能等下游分析的数据格式。
l  数据同时周期性地暂时存放在已清洗数据区域,该区域也称为“白银层”(silver layer)。已清洗数据区域制止对数据重复做多次处置惩罚。处置惩罚完成的数据最终存储在已处置惩罚数据区域,该区域也称为“黄金层”(gold layer)。
l  数据都存储在数据湖中,可用于即席分析、机器学习和报表等多种用例。但数据湖不利于结构化报表或自助式商业智能,而数据仓库在此类需求上表现出色。这必要数据存储同时提供数据仓库本领。
l  数据编目(Data cataloging)服务确保全部源数据、数据湖和数据仓库中的数据、数据处置惩罚流水线管道以及从湖仓一体中抽取的输出都做了适当的编目,防止湖仓一体变成数据沼泽。
l  数据分析(Analytics)服务提供多种数据用途。数据科学家可创建分析沙箱,运行实行和假设测试。数据分析职员可创建沙箱,触发快速查询并对数据执行即席分析。人工智能和机器学习职员可运行和维护模型。商业智能为用户提供了具有丰富可视化效果的自助式商业智能。
 
  创建真实有效湖仓一体架构,应遵照如下五个关键原则:
l  盘算和存储的解耦:首要原则是加入解耦和存储。存储自制且长期,盘算昂贵且短暂。盘算和存储的解耦,可使系统灵活地按需升级并扩展盘算服务。
l  目标驱动的存储层:数据以多种形态和形式出现,因此数据的存储方式应具灵活性,以适应数据的不同形态和用途。灵活性包括根据数据的种类及提供方式不同,提供关系层、图数据层、文档层以及 Blob 等多模态存储层。
l  模块化的体系架构:该原则源自于 SOA,确保数据处于核心职位,以围绕数据开展所需服务为关键。基于数据开展数据抽取、处置惩罚、编目和分析等不同类型的服务,而不是借助流水线将数据提供给服务。
l  聚焦于功能,而非技术:该原则体现了灵活性。功能的变革迟钝,但技术的厘革日新月异。因此一定要聚焦于组件所完成的功能,进而可轻易追随技术的发展而更换旧技术。
l  运动编目(Active cataloging):该项基本原则是制止数据湖沦为数据沼泽的关键。编目上需具有明确的管理原则,有助于确保数据充分记录到数据湖中。
数据是复杂的,而且在不断地发展。业务也在迅速地变革,需求同样再不断地变革,架构必须具备能适应全部变革的灵活性,上述五个架构原则有助于创建切实有效的湖仓一体架构。

  二)Hudi先容

  Hudi是Hadoop Upserts anD Incrementals的缩写,用于管理HDFS上的大型分析数据集存储。 Hudi的主要目标是高效的减少入库延时,Hudi是Uber开辟的一个开源项目。存储于HDFS上的分析数据集一般通过两种类型的表来提供,即Copy-On-Write Table写时拷贝和Merge-On-Read Table读时合并。 
   

  Apache Hudi是在大数据存储上的一个数据集,可以将 Change Logs 通过 upsert 的方式合并进 Hudi;Hudi 对上可以暴露成一个平凡的 Hive 或 Spark 的表,通过 API 或下令行可以获取到增量修改的信息,继续供下游消耗;Hudi 还保管了修改汗青,可以做时间观光或回退;Hudi 内部有主键到文件级的索引,默认是记录到文件的布隆过滤器,高级的有存储到 HBase 索引提供更高的服从。
  1、官方界说

  Apache Hudi通过分布式文件系统(HDFS或者云存储)来摄取(Ingests)、管理(Manages)大型分析型数据集。Hudi是可以借助于DFS之上,提供了一些数据提取、管理功能。
  一言以蔽之,Hudi是一种针对分析型业务的、扫描优化的数据存储抽象,它能够使DFS数据集在分钟级的时延内支持变动,也支持下游系统对这个数据集的增量处置惩罚。
  Hudi 数据湖的基础架构:
  


  • 通过DeltaStreammer、Flink、Spark等工作,将数据摄取到数据湖的数据存储,例如:可以使用HDFS作为数据湖的数据存储;
  • 基于HDFS可以构建Hudi的数据湖;
  • Hudi提供同一的访问Spark数据源和Flink数据源;
  • 外部通过不同引擎,例如:Spark、Flink、Presto、Hive、Impala、Aliyun DLA、AW
   Redshit访问接口
  下载地点:https://hudi.apache.org/download.html
   2、Hudi特性

  Apache Hudi使得用户能在Hadoop兼容的存储之上存储大量数据,同时它还提供两种原语,使得除了经典的批处置惩罚之外,还可以在数据湖上进行流处置惩罚。
  这两种原语分别是:


  • Update/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时还提供写利用的事件包管。查询会处置惩罚末了一个提交的快照,并基于此输出结果。
  • 变动流:Hudi对获取数据变动提供了一流的支持:可以从给定的时间点获取给定表中已updated/inserted/deleted的全部记录的增量流,并解锁新的查询姿势(类别)。
   

  
  Hudi提供的功能特性如下:
  第一、支持使用索引方式Upsert
    Upsert support with fast, pluggable indexing
  第二、可以原子性的发布数据并支持回滚
    Atomically publish data with rollback support
  第三、写入和查询使用快照进行隔离,包管数据的一致性
    Snapshot isolation between writer & queries
  第四、可以用Savepoint进行数据恢复
    Savepoints for data recovery
  第五、支持基于统计数据管理文件巨细和分布
    Manages file sizes, layout using statistics
  第六、支持对基于行、列的数据进行异步压缩
    Async compaction of row & columnar data
  第七、支持时间轴元数据进行数据血统追踪
    Timeline metadata to track lineage
  利用聚类优化数据湖布局
    Optimize data lake layout with clustering
  可以说,Hudi支持了数据湖的数据存储以及一定的管理功能。
  3、应用场景

  Apache Hudi作为Uber开源的数据湖框架,抽象了存储层(支持数据集的变动,增量处置惩罚);为Spark的一个Lib(任意水平扩展,支持将数据存储至HDFS);开源(现已在Apache顶级项目)。
  场景一:近及时摄取(Near Real-Time Ingestion)


  • 对于RDBMS摄取,Hudi通过Upserts提供了更快的负载;
  • 对于像Cassandra / Voldemort / HBase这样的NoSQL数据库,采用更有效的方法使得摄取速度与较频繁的更新数据量相匹配;
  • 对于像Kafka这样的不可变数据源,Hudi也会逼迫在DFS上保持最小文件巨细,从而解决Hadoop范畴中的古老问题以便改善NameNode的运行状况。
  • 对于全部数据源,Hudi都提供了通过提交将新数据原子化地发布给消耗者,从而制止部分提取失败。
  场景二:增量处置惩罚管道(Incremental Processing Pipelines)


  • 通过记录粒度(而非文件夹或分区)来消耗上游Hudi表HU中的新数据,下游的Hudi表HD应用处置惩罚逻辑并更新/和谐延迟数据,这里HU和HD可以以更频繁的时间(例如15分钟)连续进行调度,并在HD上提供30分钟的端到端延迟。
  场景三:同一存储分析(Unified Storage For Analytics)


  • 将流式原始数据带到数据湖存储中,Hudi能够在几分钟内提取数据,并编写比传统批处置惩罚速度快几个数量级的增量数据管道,从而开辟了新的大概性。
  • Hudi没有前期服务器基础办法投资,因此可以在不增加运营开销的情况下,对更新鲜的分析进行更快的分析。
  场景四:数据删除(Data Deletion)


  • Hudi还提供了删除存储在数据湖中的数据的功能,而且还提供了处置惩罚大型写放大的有效方法,该写放大是由于用户通过基于user_id(或任何辅助键)的Merge On Read表类型进行的随机删除而导致的。
  • Hudi优雅的基于日志的并发控制,确保了提取/写入可以继续进行,由于背景压缩作业可分摊重写数据/逼迫执行删除的本钱。
  三)Hudi发展及特性

  Apache Hudi代表Hadoop Upserts anD Incrementals,管理大型分析数据集在HDFS上的存储。Hudi的主要目标是高效减少摄取过程中的数据延迟。
  1、Hudi发展汗青

  从2015年提出增量处置惩罚数据模型思想开始,到至今Hudi发展成熟,经历如下几个阶段:


  • 2015 年:发表了增量处置惩罚的核心思想/原则(O'reilly 文章)
  • 2016 年:由 Uber 创建并为全部数据库/关键业务提供支持
  • 2017 年:由 Uber 开源,并支持 100PB 数据湖
  • 2018 年:吸引大量使用者,并因云盘算遍及
  • 2019 年:成为 ASF 孵化项目,并增加更多平台组件
  • 2020 年:毕业成为 Apache 顶级项目,社区、下载量、采用率增长超过 10 倍
  • 2021 年:支持 Uber 500PB 数据湖,SQL DML、Flink 集成、索引、元服务器、缓存
   

  国内大部分互联网公司都在使用Hudi数据湖框架,进行数据存储,管理数据。
二、体验Hudi

  一)编译Hudi

  Apache Hudi数据湖框架开辟时添加MAVEN依赖即可,使用下令管理Hudi表数据,必要下载Hudi 源码包编译,利用步调如下。
  第一步、Maven 安装

  利用上下载和安装Maven,直接将Maven软件包解压,然后设置系统情况变量即可。Maven版本为:3.5.4,仓库目录名称:m2,如下图所示:
  设置Maven情况变量以后,执行:mvn -version
  

  第二步、下载源码包

  到Apache 软件归档目录下载Hudi 指定源码包:http://archive.apache.org/dist/hudi
  
  1. wget https://archive.apache.org/dist/hudi/0.9.0/hudi-0.9.0.src.tgz
复制代码
  别的,也可以从Github上下载Hudi源码
  
  1. https://github.com/apache/hudi
复制代码
  其中说明如何编译Hudi源码:
  

  第三步、添加Maven镜像

  由于Hudi编译时,必要下载相关依赖包,必要添加Maven镜像仓库路径,以便下载JAR包。
  编辑$MAVEN_HOME/conf/settings.xml文件,添加如下镜像:
  
  1. <mirror>
  2.     <id>alimaven</id>
  3.     <name>aliyun maven</name>
  4.     <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  5.     <mirrorOf>central</mirrorOf>
  6. </mirror>
  7. <mirror>
  8.     <id>aliyunmaven</id>
  9.     <mirrorOf>*</mirrorOf>
  10.     <name>阿里云spring插件仓库</name>
  11.     <url>https://maven.aliyun.com/repository/spring-plugin</url>
  12. </mirror>
  13. <mirror>
  14.     <id>repo2</id>
  15.     <name>Mirror from Maven Repo2</name>
  16.     <url>https://repo.spring.io/plugins-release/</url>
  17.     <mirrorOf>central</mirrorOf>
  18. </mirror>
  19. <mirror>
  20.     <id>UK</id>
  21.     <name>UK Central</name>
  22.     <url>http://uk.maven.org/maven2</url>
  23.     <mirrorOf>central</mirrorOf>
  24. </mirror>
  25. <mirror>
  26.     <id>jboss-public-repository-group</id>
  27.     <name>JBoss Public Repository Group</name>
  28.     <url>http://repository.jboss.org/nexus/content/groups/public</url>
  29.     <mirrorOf>central</mirrorOf>
  30. </mirror>
  31. <mirror>
  32.     <id>CN</id>
  33.     <name>OSChina Central</name>
  34.     <url>http://maven.oschina.net/content/groups/public/</url>
  35.     <mirrorOf>central</mirrorOf>
  36. </mirror>
  37. <mirror>
  38.     <id>google-maven-central</id>
  39.     <name>GCS Maven Central mirror Asia Pacific</name>
  40.     <url>https://maven-central-asia.storage-download.googleapis.com/maven2/</url>
  41.     <mirrorOf>central</mirrorOf>
  42. </mirror>
  43. <mirror>
  44.     <id>confluent</id>
  45.     <name>confluent maven</name>
  46.     <url>http://packages.confluent.io/maven/</url>
  47.     <mirrorOf>confluent</mirrorOf>
  48. </mirror>
复制代码
 
  第四步、执行编译下令

  上传下载Hudi源码至CentOS系统目录:/root,解压tar包,进入软件包,执行编译下令:
  
  1. ]# mvn clean install -DskipTests -DskipITs -Dscala-2.12 -Dspark3
复制代码
  

  编译成功以后,截图如下所示:
  

  第五步、Hudi CLI测试

  编译完成以后,进入$HUDI_HOME/hudi-cli目录,运行hudi-cli脚本,假如可以运行,说明编译成功
  

三、Hudi核心概念

  Hudi数据湖框架的基本概念及表类型,属于Hudi框架计划原则和表的计划核心。
  文档:https://hudi.apache.org/docs/concepts.html
  一)基本概念

  Hudi 提供了Hudi 表的概念,这些表支持CRUD利用,可以利用现有的大数据集群比如HDFS做数据文件存储,然后使用SparkSQL或Hive平分析引擎进行数据分析查询
  

  Hudi表的三个主要组件:

  • 有序的时间轴元数据,类似于数据库事件日志。
  • 分层布局的数据文件:实际写入表中的数据;
  • 索引(多种实现方式):映射包罗指定记录的数据集
  1、时间轴Timeline

  Hudi 核心是在全部的表中维护了一个包罗在不同的即时(Instant)时间对数据集利用(比如新增、修改或删除)的时间轴(Timeline),在每一次对Hudi表的数据集利用时都会在该表的Timeline上生成一个Instant,从而可以实现在仅查询某个时间点之后成功提交的数据,或是仅查询某个时间点之前的数据,有效制止了扫描更大时间范围的数据。同时,可以高效地只查询更改前的文件(如在某个Instant提交了更改利用后,仅query某个时间点之前的数据,则仍可以query修改前的数据)
  

  Timeline 是 Hudi 用来管理提交(commit)的抽象,每个 commit 都绑定一个固定时间戳,分散到时间线上。在 Timeline 上,每个 commit 被抽象为一个 HoodieInstant,一个 instant 记录了一次提交 (commit) 的行为、时间戳、和状态。HUDI 的读写 API 通过 Timeline 的接口可以方便的在 commits 上进行条件筛选,对 history 和 on-going 的 commits 应用各种策略,快速筛选出必要利用的目标 commit
  

上图中采用时间(小时)作为分区字段,从 10:00 开始陆续产生各种 commits,10:20 来了一条 9:00 的数据,该数据仍然可以落到 9:00 对应的分区,通过 timeline 直接消耗 10:00 之后的增量更新(只消耗有新 commits 的 group),那么这条延迟的数据仍然可以被消耗到。
  时间轴(Timeline)的实现类(位于hudi-common-xx.jar中),时间轴相关的实现类位于org.apache.hudi.common.table.timeline包下
  

    hudi的timeline(时间线构成)

  
  1. 1)、Instant action: 在表上执行的操作类型
  2. 2)、Instant time: 即时时间,通常是一个时间戳,它按照action的开始时间单调递增
  3. 3)、State: 时刻的当前状态
复制代码
    时间线上的Instant action利用类型

  (hudi包管在时间线上的利用都是基于即时时间的,两者的时间保持一致而且是原子性的)
  
  1. 1)、commits: 表示将一批数据原子写入表中
  2. 2)、cleans: 清除表中不在需要的旧版本文件的后台活动。
  3. 3)、delta_commit:增量提交是指将一批数据原子性写入MergeOnRead类型的表中,其中部分或者所有数据可以写入增量日志中。
  4. 4)、compaction: 协调hudi中差异数据结构的后台活动,例如:将更新从基于行的日志文件变成列格式。在内部,压缩的表现为时间轴上的特殊提交。
  5. 5)、rollback:表示提交操作不成功且已经回滚,会删除在写入过程中产生的数据
  6. 6)、savepoint:将某些文件标记为“已保存”,以便清理程序时不会被清楚。在需要数据恢复的情况下,有助于将数据集还原到时间轴上某个点。
复制代码
 
    时间线上State状态类型

  
  1. 1)、requested:表示一个动作已被安排,但尚未启动
  2. 2)、inflight:表是当前正在执行操作
  3. 3)、completed:表是在时间线上完成了操作
复制代码
 
  2、文件管理

  Hudi将DFS上的数据集组织到基本路径(HoodieWriteConfig.BASEPATHPROP)下的目录结构中。数据集分为多个分区(DataSourceOptions.PARTITIONPATHFIELDOPT_KEY),这些分区与Hive表非常相似,是包罗该分区的数据文件的文件夹
  

  在每个分区内,文件被组织为文件组,由文件id充当唯一标识。每个文件组包罗多个文件切片,其中每个切片包罗在某个即时时间的提交/压缩生成的基本列文件(.parquet)以及一组日志文件(.log),该文件包罗自生成基本文件以来对基本文件的插入/更新
l  一个新的 base commit time 对应一个新的 FileSlice,实际就是一个新的数据版本。
l  Hudi 的每个 FileSlice 中包罗一个 base file (merge on read 模式大概没有)和多个 log file (copy on write 模式没有)。
l  每个文件的文件名都带有其归属的 FileID(即 FileGroup Identifier)和 base commit time(即 InstanceTime)。通过文件名的 group id 组织 FileGroup 的 logical 关系;通过文件名的 base commit time 组织 FileSlice 的逻辑关系。
l  Hudi 的 base file (parquet 文件) 在 footer 的 meta 去记录了 record key 构成的 BloomFilter,用于在 file based index 的实现中实现高服从的 key contains 检测。只有不在 BloomFilter 的 key 才必要扫描整个文件消灭假阳。
l  Hudi 的 log (avro 文件)是自己编码的,通过积攒数据 buffer 以 LogBlock 为单位写出,每个 LogBlock 包罗 magic number、size、content、footer 等信息,用于数据读、校验和过滤。
  Hudi采用MVCC(多版本并发控制)计划,其中压缩利用将日志和基本文件合并以产生新的文件切片,而清理利用则将未使用的/较旧的文件片删除以回收DFS上的空间
    表存储路径目录结构

  hudi_trips_cow表在HDFS的存储组织(hudi表数据可以存储在hdfs分布式系统,也可以存储在本地)


  • .hoodie 文件:由于CRUD的零散性,每一次的利用都会生成一个文件,这些小文件越来越多后,会严重影响HDFS的性能,Hudi计划了一套文件合并机制。 .hoodie文件夹中存放了对应的文件合并利用相关的日志文件
  • amricas和asia相关的路径是实际的数据文件,按分区存储,分区的路径key是可以指定的,数据文件使用Parquet文件格式存储,其中包罗一个metadata元数据文件和数据文件
      

  • .hoodie_partition_metadata 元数据:里面生存了commit的时间和分区深度信息
        数据存储结构

  Hudi数据集的目录结构与Hive表示非常相似,一份数据集对应这一个根目录。数据集被打散为多个分区,分区字段以文件夹形式存在,该文件夹包罗该分区的全部文件
  特点:在每个分区内,文件被组织为文件组,由文件id充当唯一标识。每个文件组包罗多个文件切片,其中每个切片包罗在某个即时时间的提交/压缩生成的基本列文件(.parquet)以及一组日志文件(.log),该文件包罗自生成基本文件以来对基本文件的插入/更新
  3、索引index

Hudi通过索引机制提供高效的Upsert利用,该机制会将一个RecordKey+PartitionPath组合的方式作为唯一标识映射到一个文件ID,而且,这个唯一标识和文件组/文件ID之间的映射自记录被写入文件组开始就不会再改变。
Hudi内置了4类(6个)索引实现,均是继承自顶层的抽象类HoodieIndex而来,如下注意:
l  全局索引:指在全表的全部分区范围下逼迫要求键保持唯一,即确保对给定的键有且只有一个对应的记录。全局索引提供了更强的包管,也使得更删的斲丧随着表的巨细增加而增加(O(表的巨细)),更实用于是小表。
l  非全局索引:仅在表的某一个分区内逼迫要求键保持唯一,它依靠写入器为同一个记录的更删提供一致的分区路径,但由此同时大幅提高了服从,由于索引查询复杂度成了O(更删的记录数量)且可以很好地应对写入量的扩展。
Hoodie key (record key + partition path) 和 file id (FileGroup) 之间的映射关系,数据第一次写入文件后保持不变,以是,一个 FileGroup 包罗了一批 record 的全部版本记录。Index 用于区分消息是 INSERT 还是 UPDATE。
l  BloomFilter Index(布隆过滤器索引)
n  新增 records 找到映射关系:record key => target partition
n  当前最新的数据 找到映射关系:partition => (fileID, minRecordKey, maxRecordKey) LIST (假如是 base files 可加速)
n  新增 records 找到必要搜索的映射关系:fileID => HoodieKey(record key + partition path) LIST,key 是候选的 fileID
n  通过 HoodieKeyLookupHandle 查找目标文件(通过 BloomFilter 加速)
l  Flink State-based Index(基于状态Index)
n  HUDI 在 0.8.0 版本中实现的 Flink witer,采用了 Flink 的 state 作为底层的 index 存储,每个 records 在写入之前都会先盘算目标 bucket ID,不同于 BloomFilter Index,制止了每次重复的文件 index 查找。
  二)存储类型(表类型)

Hudi提供两类型表:写时复制(Copy on Write,COW)表读时合并(Merge On Read,MOR)表,主要区别如下:
l  对于 Copy-On-Write Table,用户的 update 会重写数据地点的文件,以是是一个写放大很高,但是读放大为 0,得当写少读多的场景。
l  对于 Merge-On-Read Table,整体的结构有点像 LSM-Tree,用户的写入先写入到 delta data 中,这部分数据使用行存,这部分 delta data 可以手动 merge 到存量文件中,整理为 parquet 的列存结构。
  

      
  Hudi 是 Uber 主导开辟的开源数据湖框架,以是大部分的出发点都泉源于 Uber 自身场景,比如司机数据和搭客数据通过订单 Id 来做 Join 等。在 Hudi 过去的使用场景里,和大部分公司的架构类似,采用批式和流式共存的 Lambda 架构,从延迟,数据完整度还有本钱 三个方面来对比一下批式(Batch)和流式(Stream)盘算模型的区别。

  
 

  
  批式模型(Batch)

  
批式模型就是使用 MapReduce、Hive、Spark 等典范的批盘算引擎,以小时任务或者天任务的形式来做数据盘算

  
l  延迟:小时级延迟或者天级别延迟。这里的延迟不单单指的是定时任务的时间,在数据架构里,这里的延迟时间通常是定时任务隔断时间 + 一系列依赖任务的盘算时间 + 数据平台最终可以展示结果的时间。数据量大、逻辑复杂的情况下,小时任务盘算的数据通常真正延迟的时间是 2-3 小时。

  
l  数据完整度:数据较完整。以处置惩罚时间为例,小时级别的任务,通常盘算的原始数据已经包罗了小时内的全部数据,以是得到的数据相对较完整。但假如业务需求是事件时间,这里涉及到终端的一些延迟上报机制,在这里,批式盘算任务就很难派上用场。

  
l  本钱:本钱很低。只有在做任务盘算时,才会占用资源,假如不做任务盘算,可以将这部分批式盘算资源出让给在线业务使用。但从另一个角度来说本钱是挺高的,比如原始数据做了一些增编削查,数据晚到的情况,那么批式任务是要全量重新盘算。

  
 

  
  流式模型(Stream)

  
流式模型,典范的就是使用 Flink 来进行及时的数据盘算

  
l  延迟:很短,乃至是及时。

  
l  数据完整度:较差。由于流式引擎不会比及全部数据到齐之后再开始盘算,以是有一个 watermark 的概念,当数据的时间小于 watermark 时,就会被抛弃,这样是无法对数据完整度有一个绝对的报障。在互联网场景中,流式模型主要用于运动时的数据大盘展示,对数据的完整度要求并不算很高。在大部分场景中,用户必要开辟两个程序,一是流式数据生产流式结果,二是批式盘算任务,用于次日修复及时结果。

  
l  本钱:很高。由于流式任务是常驻的,而且对于多流 Join 的场景,通常要借助内存或者数据库来做 state 的存储,不管是序列化开销,还是和外部组件交互产生的额外 IO,在大数据量下都是不容忽视的。

  
 

  
  增量模型(Incremental)

  
针对批式和流式的优缺点,Uber 提出了增量模型(Incremental Mode),相对批式来讲,更加及时;相对流式而言,更加经济。

  
增量模型,简朴来讲,是以 mini batch 的形式来跑准及时任务。Hudi 在增量模型中支持了两个最紧张的特性:

  
l  Upsert:这个主要是解决批式模型中,数据不能插入、更新的问题,有了这个特性,可以往 Hive 中写入增量数据,而不是每次进行完全的覆盖。(Hudi 自身维护了 key->file 的映射,以是当 upsert 时很轻易找到 key 对应的文件)

  
l  Incremental Query:增量查询,减少盘算的原始数据量。以 Uber 中司机和搭客的数据流 Join 为例,每次抓取两条数据流中的增量数据进行批式的 Join 即可,相比流式数据而言,本钱要低沉几个数量级。

  
在增量模型中,Hudi 提供了两种 Table,分别为 Copy-On-Write和 Merge-On-Read两种

    2、查询类型(Query Type)

  
  Hudi能够支持三种不同的查询表的方式(Snapshot Queries、Incremental Queries和Read Optimized Queries),具体取决于表的类型。
  

  

  
l  类型一:Snapshot Queries(快照查询)

  
n  查询某个增量提交利用中数据集的最新快照,会先进办法态合并最新的基本文件(Parquet)和增量文件(Avro)来提供近及时数据集(通常会存在几分钟的延迟)。

  
n  读取全部 partiiton 下每个 FileGroup 最新的 FileSlice 中的文件,Copy On Write 表读 parquet 文件,Merge On Read 表读 parquet + log 文件

  
 

  
l  类型二:Incremental Queries(增量查询)

  
n  仅查询新写入数据集的文件,必要指定一个Commit/Compaction的即时时间(位于Timeline上的某个Instant)作为条件,来查询此条件之后的新数据。

  
n  可检察自给定commit/delta commit即时利用以来新写入的数据。有效的提供变动流来启用增量数据管道。

  
 

  
l  类型三:Read Optimized Queries(读优化查询)

  
n  直接查询基本文件(数据集的最新快照),实在就是列式文件(Parquet)。并包管与非Hudi列式数据集相比,具有相同的列式查询性能。

  
n  可检察给定的commit/compact即时利用的表的最新快照。

  
  读优化查询和快照查询相同仅访问基本文件,提供给定文件片自上次执行压缩利用以来的数据。通常查询数据的最新程度的包管取决于压缩策略

  
 

    3、Copy On Write

  
  简称COW,顾名思义,它是在数据写入的时间,复制一份原来的拷贝,在其基础上添加新数据。正在读数据的哀求,读取的是最近的完整副本,这类似Mysql 的MVCC的思想。

  
  

  

  
上图中,每一个颜色都包罗了停止到其地点时间的全部数据。老的数据副本在超过一定的个数限制后,将被删除。这种类型的表,没有compact instant,由于写入时相当于已经compact了。

  
l  优点:读取时,只读取对应分区的一个数据文件即可,较为高效;

  
l  缺点:数据写入的时间,必要复制一个先前的副本再在其基础上生成新的数据文件,这个过程比力耗时。由于耗时,读哀求读取到的数据相对就会滞后;

  
  

  

  
对于这种 Table,提供了两种查询:

  
l  Snapshot Query: 查询最近一次 snapshot 的数据,也就是最新的数据。

  
l  Incrementabl Query:用户必要指定一个 commit time,然后 Hudi 会扫描文件中的记录,过滤出 commit_time > 用户指定的 commit time 的记录。

  
COW表主要使用列式文件格式(Parquet)存储数据,在写入数据过程中,执行同步合并,更新数据版本并重写数据文件,类似RDBMS中的B-Tree更新。

  
l  1)、更新update:在更新记录时,Hudi会先找到包罗更新数据的文件,然后再使用更新值(最新的数据)重写该文件,包罗其他记录的文件保持不变。当忽然有大量写利用时会导致重写大量文件,从而导致极大的I/O开销。

  
l  2)、读取read:在读取数据集时,通过读取最新的数据文件来获取最新的更新,此存储类型实用于少量写入和大量读取的场景。

  
  Copy On Write 类型表每次写入都会生成一个新的持有 base file(对应写入的 instant time) 的 FileSlice。用户在 snapshot 读取的时间会扫描全部最新的 FileSlice 下的 base file

    4、Merge On Read

  
  简称MOR,新插入的数据存储在delta log 中,定期再将delta log合并进行parquet数据文件。读取数据时,会将delta log跟老的数据文件做merge,得到完整的数据返回。下图演示了MOR的两种数据读写方式。

  
  

  

  
MOR表也可以像COW表一样,忽略delta log,只读取最近的完整数据文件。

  
l  优点:由于写入数据先写delta log,且delta log较小,以是写入本钱较低;

  
l  缺点:必要定期合并整理compact,否则碎片文件较多。读取性能较差,由于必要将delta log 和 老数据文件合并;

  
对于这类 Table,提供了三种查询:

  
l  Snapshot Query: 查询最近一次 snapshot 的数据,也就是最新的数据。这里是一个行列数据混淆的查询。

  
l  Incrementabl Query用户必要指定一个 commit time,然后 Hudi 会扫描文件中的记录,过滤出 commit_time > 用户指定的 commit time 的记录。这里是一个行列数据混淆的查询。

  
l  Read Optimized Query: 只查存量数据,不查增量数据,由于使用的都是列式文件格式,以是服从较高。

  
  

  

  
MOR表是COW表的升级版,它使用列式(parquet)与行式(avro)文件混淆的方式存储数据。在更新记录时,类似NoSQL中的LSM-Tree更新。

  
l  1) 更新:在更新记录时,仅更新到增量文件(Avro)中,然后进行异步(或同步)的compaction,末了创建列式文件(parquet)的新版本。此存储类型得当频繁写的工作负载,由于新记录是以追加的模式写入增量文件中。

  
l  2) 读取:在读取数据集时,必要先将增量文件与旧文件进行合并,然后生成列式文件成功后,再进行查询。

    5、COW和MOR对比

  
对于写时复制(COW)和读时合并(MOR)writer来说,Hudi的WriteClient是相同的。

  
l  COW表,用户在 snapshot 读取的时间会扫描全部最新的 FileSlice 下的 base file。

  
l  MOR表,在 READ OPTIMIZED 模式下,只会读最近的颠末 compaction 的 commit。

  
权衡CopyOnWriteMergeOnRead
数据延迟
查询延迟
Update(I/O)更新本钱高(重写整个Parquet文件)  低(追加到增量日志) 
Parquet File Size低(更新本钱I/O高)较大(低更新本钱)
Write Amplification(WA写入放大)低(取决于压缩策略)
   
    三)数据写利用流程

  在Hudi数据湖框架中支持三种方式写入数据:UPSERT(插入更新)、INSERT(插入)和BULK INSERT(写排序)。
  l  UPSERT:默认行为,数据先通过 index 打标(INSERT/UPDATE),有一些启发式算法决定消息的组织以优化文件的巨细
  l  INSERT:跳过 index,写入服从更高
  l  BULK_INSERT:写排序,对大数据量的 Hudi 表初始化友好,对文件巨细的限制 best effort(写 HFile)
    1、UPSERT 写流程

      Copy On Write

  l  第一步、先对 records 按照 record key 去重;
  l  第二步、首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是 update,哪些 records 是 insert(key 第一次写入);
  l  第三步、对于 update 消息,会直接找到对应 key 地点的最新 FileSlice 的 base 文件,并做 merge 后写新的 base file (新的 FileSlice);
  l  第四步、对于 insert 消息,会扫描当前 partition 的全部 SmallFile(小于一定巨细的 base file),然后 merge 写新的 FileSlice;假如没有 SmallFile,直接写新的 FileGroup + FileSlice;
      Merge On Read

  l  第一步、先对 records 按照 record key 去重(可选)
  l  第二步、首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是 update,哪些 records 是 insert(key 第一次写入)
  l  第三步、假如是 insert 消息,假如 log file 不可建索引(默认),会尝试 merge 分区内最小的 base file (不包罗 log file 的 FileSlice),生成新的 FileSlice;假如没有 base file 就新写一个 FileGroup + FileSlice + base file;假如 log file 可建索引,尝试 append 小的 log file,假如没有就新写一个 FileGroup + FileSlice + base file
  l  第四步、假如是 update 消息,写对应的 file group + file slice,直接 append 最新的 log file(假如碰巧是当前最小的小文件,会 merge base file,生成新的 file slice)log file 巨细达到阈值会 roll over 一个新的
    2、INSERT 写流程

    同样由于Hudi中表的类型分为:COW和MOR,以是INSERT写入数据时,流程也是有区别的。      Copy On Write

  l  第一步、先对 records 按照 record key 去重(可选);
  l  第二步、不会创建 Index;
  l  第三步、假如有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否则直接写新的 FileSlice + base file;
      Merge On Read

  l  第一步、先对 records 按照 record key 去重(可选);
  l  第二步、不会创建 Index;
  l  第三步、假如 log file 可索引,而且有小的 FileSlice,尝试追加或写最新的 log file;假如 log file 不可索引,写一个新的 FileSlice + base file;
   
  三、Hudi CDC

    CDC的全称是Change data Capture,即变动数据捕获,主要面向数据库的变动,是是数据库范畴非经常见的技术,主要用于捕获数据库的一些变动,然后可以把变动数据发送到下游。  
    
  

    对于CDC,业界主要有两种类型
    一是基于查询的,客户端会通过SQL方式查询源库表变动数据,然后对外发送
    二是基于日志,这也是业界广泛使用的一种方式,一般是通过binlog方式,变动的记录会写入binlog,剖析binlog后会写入消息系统,或直接基于Flink CDC进行处置惩罚  
  

  l  基于查询:这种 CDC 技术是入侵式的,必要在数据源执行 SQL 语句。使用这种技术实现CDC 会影响数据源的性能。通常必要扫描包罗大量记录的整个表。
   
  l  基于日志:这种 CDC 技术是非侵入性的,不必要在数据源执行 SQL 语句。通过读取源数据库的日志文件以辨认对源库表的创建、修改或删除数据。
    一)CDC入湖

    基于CDC数据的入湖,这个架构非常简朴:上游各种各样的数据源,比如DB的变动数据、事件流,以及各种外部数据源,都可以通过变动流的方式写入表中,再进行外部的查询分析
    
  

    典范CDC入湖的链路:
  

  • 上面的链路是大部分公司采取的链路,前面CDC的数据先通过CDC工具导入Kafka或者Pulsar,再通过Flink或者是Spark流式消耗写到Hudi里。
  • 第二个架构是通过Flink CDC直联到MySQL上游数据源,直接写到下游Hudi表
   
    
  

   
    二)Flink CDC Hudi

    基于Flink CDC技术,及时采集MySQL数据库表数据,进行过ETL转换处置惩罚,最终存储Hudi表。
    
  

    1、业务需求

    MySQL数据库创建表,及时添加数据,通过Flink CDC将数据写入Hudi表,而且Hudi与Hive集成,主动在Hive中创建表与添加分区信息,末了Hive终端Beeline查询分析数据
    
  

  Hudi 表与Hive表,主动关联集成,必要重新编译Hudi源码,指定Hive版本及编译时包罗Hive依赖jar包,具体步调如下。
  l  修改Hudi集成flink和Hive编译依赖版本设置
  缘故因由:现在版本Hudi,在编译的时间本身默认已经集成的flink-SQL-connector-hive的包,会和Flink lib包下的flink-SQL-connector-hive冲突。以是,编译的过程中只修改hive编译版本。
   
    文件:hudi-0.9.0/packaging/hudi-flink-bundle/pom.xml
  
  

  l  编译Hudi源码
   
  1. mvn clean install -DskipTests -Drat.skip=true -Dscala-2.12 -Dspark3 -Pflink-bundle-shade-hive2
复制代码
    
  编译完成以后,有2个jar包,至关紧张:
  n  hudi-flink-bundle_2.12-0.9.0.jar,位于hudi-0.9.0/packaging/hudi-flink-bundle/target,flink 用来写入和读取数据,将其拷贝至$FLINK_HOME/lib目录中,假如以前有同名jar包,先删除再拷贝。
  n  hudi-hadoop-mr-bundle-0.9.0.jar,位于hudi-0.9.0/packaging/hudi-hadoop-mr-bundle/target,hive 必要用来读hudi数据,将其拷贝至$HIVE_HOME/lib目录中。
  l  将Flink CDC MySQL对应jar包,放到$FLINK_HOME/lib目录中
   
  1. flink-sql-connector-mysql-cdc-1.3.0.jar
复制代码
   
    至此,$FLINK_HOME/lib目录中,有如下所需的jar包,缺一不可,注意版本号
    

    2、创建MySQL表

    首先开启MySQL数据库binlog日志,再重启MySQL数据库服务,末了创建表。  

  • 第一步、开启MySQL binlog日志
   
  1. ]# vim /etc/my.cnf
  2. 在[mysqld]下面添加内容:
  3. server-id=2
  4. log-bin=mysql-bin
  5. binlog_format=row
  6. expire_logs_days=15
  7. binlog_row_image=full
复制代码
    
  

  • 第二步、重启MySQL Server
   
  1. service mysqld restart
复制代码
     登录MySQL Client下令行,检察是否生效。
   
  1. show master logs;
复制代码
    
  

  • 第三步、在MySQL数据库,创建表
   
  1. -- MySQL 数据库创建表
  2. create database test ;
  3. create table test.tbl_users(
  4.    id bigint auto_increment primary key,
  5.    name varchar(20) null,
  6.    birthday timestamp default CURRENT_TIMESTAMP not null,
  7.    ts timestamp default CURRENT_TIMESTAMP not null
  8. );
复制代码
     检察表结构
   
  1. desc test.tb1_users;
复制代码
    
    3、创建CDC表

    先启动HDFS服务、Hive MetaStore和HiveServer2服务和Flink Standalone集群,再运行SQL Client,末了创建表关联MySQL表,采用MySQL CDC方式。
   
    启动HDFS服务,分别启动NameNode和DataNode
   
  1. -- 启动HDFS服务
  2. hadoop-daemon.sh start namenode
  3. hadoop-daemon.sh start datanode
复制代码
    
    启动Hive服务:元数据MetaStore和HiveServer2
   
  1. -- Hive服务
  2. /export/server/hive/bin/start-metastore.sh
  3. /export/server/hive/bin/start-hiveserver2.sh
复制代码
    
    启动Flink Standalone集群
   
  1. -- 启动Flink Standalone集群
  2. export HADOOP_CLASSPATH=`/export/server/hadoop/bin/hadoop classpath`
  3. /export/server/flink/bin/start-cluster.sh
复制代码
    
    启动SQL Client客户端
   
  1. /export/server/flink/bin/sql-client.sh embedded -j /export/server/flink/lib/hudi-flink-bundle_2.12-0.9.0.jar shell
复制代码
    
    设置属性
   
  1. set execution.result-mode=tableau;
  2. set execution.checkpointing.interval=3sec;
复制代码
    
    创建输入表,关联MySQL表,采用MySQL CDC 关联
   
  1. -- Flink SQL Client创建表
  2. CREATE TABLE users_source_mysql (
  3.   id BIGINT PRIMARY KEY NOT ENFORCED,
  4.   name STRING,
  5.   birthday TIMESTAMP(3),
  6.   ts TIMESTAMP(3)
  7. ) WITH (
  8. 'connector' = 'mysql-cdc',
  9. 'hostname' = 'node1.itcast.cn',
  10. 'port' = '3306',
  11. 'username' = 'root',
  12. 'password' = '123456',
  13. 'server-time-zone' = 'Asia/Shanghai',
  14. 'debezium.snapshot.mode' = 'initial',
  15. 'database-name' = 'test',
  16. 'table-name' = 'tbl_users'
  17. );
复制代码
    
    查询表的结构,其中id为主键,ts为数据合并字段。
    
  

    查询CDC表数据
   
  1. -- 查询数据
  2. select * from users_source_mysql;
复制代码
    
    开启MySQL Client客户端,执行DML语句,插入数据
   
  1. insert into test.tbl_users (name) values ('zhangsan')
  2. insert into test.tbl_users (name) values ('lisi');
  3. insert into test.tbl_users (name) values ('wangwu');
  4. insert into test.tbl_users (name) values ('laoda');
  5. insert into test.tbl_users (name) values ('laoer');
复制代码
    
    4、创建视图

    创建一个暂时视图,增加分区列part,方便后续同步hive分区表
   
  1. -- 创建一个临时视图,增加分区列 方便后续同步hive分区表
  2. create view view_users_cdc
  3. AS
  4. SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as part FROM users_source_mysql;
复制代码
    
    检察视图view中数据
   
  1. select * from view_users_cdc;
复制代码
    
    5、创建Hudi表

    创建 CDC Hudi Sink表,并主动同步hive分区表,具体DDL语句。
   
  1. CREATE TABLE users_sink_hudi_hive(
  2. id bigint ,
  3. name string,
  4. birthday TIMESTAMP(3),
  5. ts TIMESTAMP(3),
  6. part VARCHAR(20),
  7. primary key(id) not enforced
  8. )
  9. PARTITIONED BY (part)
  10. with(
  11. 'connector'='hudi',
  12. 'path'= 'hdfs://node1.itcast.cn:8020/users_sink_hudi_hive',
  13. 'table.type'= 'MERGE_ON_READ',
  14. 'hoodie.datasource.write.recordkey.field'= 'id',
  15. 'write.precombine.field'= 'ts',
  16. 'write.tasks'= '1',
  17. 'write.rate.limit'= '2000',
  18. 'compaction.tasks'= '1',
  19. 'compaction.async.enabled'= 'true',
  20. 'compaction.trigger.strategy'= 'num_commits',
  21. 'compaction.delta_commits'= '1',
  22. 'changelog.enabled'= 'true',
  23. 'read.streaming.enabled'= 'true',
  24. 'read.streaming.check-interval'= '3',
  25. 'hive_sync.enable'= 'true',
  26. 'hive_sync.mode'= 'hms',
  27. 'hive_sync.metastore.uris'= 'thrift://node1.devops.cn:9083',
  28. 'hive_sync.jdbc_url'= 'jdbc:hive2://node1.devops.cn:10000',
  29. 'hive_sync.table'= 'users_sink_hudi_hive',
  30. 'hive_sync.db'= 'default',
  31. 'hive_sync.username'= 'root',
  32. 'hive_sync.password'= '123456',
  33. 'hive_sync.support_timestamp'= 'true'
  34. );
复制代码
     此处Hudi表类型:MOR,Merge on Read (读时合并),快照查询+增量查询+读取优化查询(近及时)。使用列式存储(parquet)+行式文件(arvo)组合存储数据。更新记录到增量文件中,然后进行同步或异步压缩来生成新版本的列式文件。
    检察表结构
    6、数据写入Hudi表

    编写INSERT语句,从视图中查询数据,再写入Hudi表中,语句如下
   
  1. insert into users_sink_hudi_hive select id, name, birthday, ts, part from view_users_cdc;
复制代码
     
  

    HDFS上Hudi文件目录情况:50070
    Flink上执行SQL
   
  1. select * from users_sink_hudi_hive;
复制代码
    
   
    7、Hive表查询

    必要引入hudi-hadoop-mr-bundle-0.9.0.jar包,放到$HIVE_HOME/lib下。
    
  

    启动Hive中beeline客户端,连接HiveServer2服务:
   
  1. /export/server/hive/bin/beeline -u jdbc:hive2://node1.devops.cn:10000 -n root -p 123456
复制代码
     已主动生产hudi MOR模式的2张表:
  

  • users_sink_hudi_hive_ro,ro 表全称 read oprimized table,对于 MOR 表同步的 xxx_ro 表,只暴露压缩后的 parquet。其查询方式和COW表类似。设置完 hiveInputFormat 之后 和平凡的 Hive 表一样查询即可;
  • users_sink_hudi_hive_rt,rt表示增量视图,主要针对增量查询的rt表;ro表只能查parquet文件数据, rt表 parquet文件数据和log文件数据都可查;
    检察主动生成表users_sink_hudi_hive_ro结构:
   
  1. CREATE EXTERNAL TABLE `users_sink_hudi_hive_ro`(
  2.   `_hoodie_commit_time` string COMMENT '',
  3.   `_hoodie_commit_seqno` string COMMENT '',
  4.   `_hoodie_record_key` string COMMENT '',
  5.   `_hoodie_partition_path` string COMMENT '',
  6.   `_hoodie_file_name` string COMMENT '',
  7.   `_hoodie_operation` string COMMENT '',
  8.   `id` bigint COMMENT '',
  9.   `name` string COMMENT '',
  10.   `birthday` bigint COMMENT '',
  11.   `ts` bigint COMMENT '')
  12. PARTITIONED BY (
  13.   `part` string COMMENT '')
  14. ROW FORMAT SERDE
  15.   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
  16. WITH SERDEPROPERTIES (
  17.   'hoodie.query.as.ro.table'='true',
  18.   'path'='hdfs://node1.itcast.cn:8020/users_sink_hudi_hive')
  19. STORED AS INPUTFORMAT
  20.   'org.apache.hudi.hadoop.HoodieParquetInputFormat'
  21. OUTPUTFORMAT
  22.   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
  23. LOCATION
  24.   'hdfs://node1.devops.cn:8020/users_sink_hudi_hive'
  25. TBLPROPERTIES (
  26.   'last_commit_time_sync'='20211125095818',
  27.   'spark.sql.sources.provider'='hudi',
  28.   'spark.sql.sources.schema.numPartCols'='1',
  29.   'spark.sql.sources.schema.numParts'='1',
  30. 'spark.sql.sources.schema.part.0'='{"type":"struct","fields":[{"name":"_hoodie_commit_time","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_commit_seqno","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_record_key","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_partition_path","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_file_name","type":"string","nullable":true,"metadata":{}},{"name":"_hoodie_operation","type":"string","nullable":true,"metadata":{}},{"name":"id","type":"long","nullable":false,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"birthday","type":"timestamp","nullable":true,"metadata":{}},{"name":"ts","type":"timestamp","nullable":true,"metadata":{}},{"name":"part","type":"string","nullable":true,"metadata":{}}]}',
  31.   'spark.sql.sources.schema.partCol.0'='partition',
  32.   'transient_lastDdlTime'='1637743860')
复制代码
     检察主动生成表的分区信息:
   
  1. show partitions users_sink_hudi_hive_ro ;
  2. show partitions users_sink_hudi_hive_rt ;
复制代码
     查询Hive 分区表数据
   
  1. set hive.exec.mode.local.auto=true;
  2. set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
  3. set hive.mapred.mode=nonstrict ;
  4. select id, name, birthday, ts, `part` from users_sink_hudi_hive_ro;
复制代码
     指定分区字段过滤,查询数据
   
  1. select name, ts from users_sink_hudi_hive_ro where part ='20211125';
  2. select name, ts from users_sink_hudi_hive_rt where part ='20211125';
复制代码
    
    三)Hudi Client利用Hudi表

    进入Hudi客户端下令行:hudi-0.9.0/hudi-cli/hudi-cli.sh
    
  

   
    连接Hudi表,检察表信息
   
  1. connect --path hdfs://node1.devops.cn:8020/users_sink_hudi_hive
复制代码
     检察Hudi commit信息
   
  1. commits show --sortBy "CommitTime"
复制代码
     
  

    检察Hudi compactions 计划
   
  1. compactions show all
复制代码
     
  

   
  

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

尚未崩坏

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表