二、数据离线处置惩罚场景化解决方案

打印 上一主题 下一主题

主题 890|帖子 890|积分 2670

https://connect.huaweicloud.com/courses/learn/Learning/sp:cloudEdu_?courseNo=course-v1:HuaweiX+CBUCNXE147+Self-paced&courseType=1
1.离线处置惩罚方案

**业务场景-安平范畴**

业务场景-金融范畴

离线批处置惩罚常用组件


  • HDFS:分布式文件体系,为各种批处置惩罚引擎提供数据存储,可以存储各种文件格式数据。
  • YARN:资源调度引擎,为各种批处置惩罚引擎提供资源调度本事。
  • MapReduce:大数据批处置惩罚引擎,用于处置惩罚海量数据,但是处置惩罚速率较慢。
  • Hive:大数据 SQL 批处置惩罚引擎,用于处置惩罚 SQL 类批处置惩罚作业,但是处置惩罚速率较慢。
  • Spark:基于内存的数据处置惩罚引擎,适合海量数据,处置惩罚速率高校。

    • Spark SQL:Spark 处置惩罚结构化数据的一个模块。

思考题

  • 以下哪些是离线批处置惩罚的特点?()

    • 处置惩罚数据格式多样
    • 支持SQL类作业和自界说作业
    • 处置惩罚数据量巨大
    • 处置惩罚时间要求高

  • 离线批处置惩罚常用的组件不包罗哪个?()

    • Flume
    • Hive
    • Spark
    • Storm

  • Hive是大数据SQL批处置惩罚引I擎月用于处置惩罚SQL类批处置惩罚作业,但是处置惩罚速率较慢。()
2.离线处置惩罚技术框架介绍

2.1. 数据存储-HDFS

`2024年9月26日21:18:26` HDFS 的概念


  • HDFS(Hadoop Distributed File System)基于 Google 发布的 GFS 论文计划开辟。
  • 其除具备其它分布式文件体系相同特性外,HDFS 另有自己特有的特性:

    • 高容错性:认为硬件总是不可靠的。
    • 高吞吐量:为大量数据访问的应用提供高吞吐量支持
    • 大文件存储:支持存储 TB-PB 级别的数据

HDFS 的优势和劣势


  • 优势:大文件存储与访问;流式数据访问
  • 劣势:大量小文件存储;随机写入;低延迟读取
HDFS 根本体系架构

常用 Shell 下令
** 下令类别 **** 下令 **** 下令阐明 **hdfs dfs
-cat显示文件内容-ls显示目录列表-rm删除文件-put上传目录/文件到 HDFS-get从 HDFS 下载目录/文件到本地-mkdir创建目录-chmod/-chown改变文件属组……hdfs dfsadmin
-safemode安全模式操纵-report报告服务状态 HDFS 回收站机制


  • 在HDFS里,删除文件时,不会真正的删除,其实是放入回收站,回收站里的文件可以用来快速恢复误删文件。
  • 可以设置一个时间阀值(单元:分钟),当回收站里文件的存放时间超过这个阀值或是回收站被清空时,文件才会被彻底删除,而且释放占用的数据块。
  • Hadoop回收站trash,默认是关闭的,若开启必要修改配置文件core-site.xml。
  1. <property>
  2.   <name>fs.trash.interval</name>
  3.   <value>1440</value>
  4. </property>
复制代码
思考题

  • HDFS里包罗哪些实例?()

    • NameNode
    • TaskManager
    • DataNode
    • JobManager

  • 在HDFS根目录下有一个文件a.txt,我们应该怎样删除?
  • Hadoop回收站trash,若开启必要修改配置文件core-site.xml。()
2.2 数据堆栈-Hive

2.2.1 数据堆栈-Hive

**Hive 的概述**

  • Hive 是基于 Hadoop 的数据堆栈软件,可以查询和管理 PB 级别的分布式数据
  • Hive 特性:

    • 灵活方便的 ETL(Extract/TransForm/Load)
    • 支持 MapReduce、Tez、Spark 多种盘算引擎
    • 可直接访问 HDFS 文件以及 HBase
    • 易用易编程

Hive 的架构

Hive 的数据存储模型

Hive 内部表和外部表的区别


  • 区别:
关键字内部表外部表CREATE/LOAD数据移到堆栈目录数据位置不移动DROP元数据和数据会被一起删除只删除元数据

  • 查询表的类型: desc formatted tableName;
  • 修改内部表 tableName 为外部表:alter table tableName set tblproperties('EXTERNAL'='TRUE');
  • 修改外部表 tableName 为外部表:alter table tableName set tblproperteis('EXTERNAL'='FALSE');
Hive 内置函数


  • 检察体系函数的用法:hive> show funcitons;
  • 显示函数的用法:hive> desc function upper;
  • 具体显示函数的用法:hive> desc function extended upper;
  • 常用函数:

    • 数学函数,如 round()、abs()、rand()等
    • 日期函数,如 to_date(),current_date()等
    • 字符串函数,如 trim(),length(),substr()等

Hive 自界说 UDF


  • 当Hive提供的内置函数无法满足业务处置惩罚必要时,此时就可以思量使用用户自界说函数编写处置惩罚代码并在查询中使用
  • UDF(User-Defined-Function)

    • 用于接收单个数据行,并产生一个数据行作为输出。

  • UDAF(User-DefinedAggregationFunction)

    • 用于接收多个数据行,并产生一个数据行作为输出。

  • UDTF(User-DefinedTable-GeneratingFunctions)

    • 用于接收单个数据行,并产生多个数据行作为输出。

UDF 开辟步调


  • 继承"org.apache.hadoop.hive.ql.exec.UDF"
  • 实现一个evaluateO方法,编写要实现的逻辑
  • 打包并上传到HDFS里
  • Hive创建临时函数
  • 调用该函数
Hive 调优


  • 数据倾斜

    • 数据倾斜指盘算数据的时候,数据的分散度不敷,导致大量的数据会集到了一台大概几台机器上盘算,这些数据的盘算速率远远低于均匀盘算速率,导致整个盘算过程过慢。
    • 日常使用过程中,容易造成数据倾斜的原因可以归纳为如下几点:

      • group by
      • distinct count(distinctxx)
      • join


  • 调优参数:set hive.map.aggr=true;

    • 在map中会做部分聚集操纵,效率更高但必要更多的内存。set hive.groupby.skewindata=true;
    • 此时生成的查询计划会有两个MRJob,可实现数据倾斜时负载均衡。

  • map side join set hive.auto.convert.join=true;

    • 当毗连一个较小和较大表的时候,把较小的表直接放到内存中去,然后再对较大的表举行map操纵。

  • 并行化执行

    • 每个查询会被Hive转化为多个阶段,当有些阶段关联性不大时,可以并行化执行,镌汰整个任务的执行时间。
    • 开启任务并行执行:set hive.exec.parallel=true;
    • 设置同一个sql允许并行任务的最大线程数(例如设置为8个):set hive.exec.parallel.thread.number=8;

2.2.2 Hive SQL 下令

HQL 开辟

+ 场景阐明 - 假定用户开辟一个基于Hive的数据分析应用,用于分析企业雇员信息。 - 假定必要创建三张表: * 雇员信息表:"employees_info" * 雇员联络信息表:"employees_concat” * 雇员信息扩展表:"employees_info_extended" 雇员信息表

雇员联络信息表


统计要求

**统计要求**

  • 检察薪水支付币种为美元的雇员接洽方式。
  • 查询入职时间为2019年的雇员编号、姓名和电话号码字段,并将查询结果加载进表employees_info_extended对应的分区中。
  • 统计表employees_info中有多少条记载。
  • 查询以“cn”结尾的邮箱的员工信息。
创建雇员信息表

创建雇员接洽表

创建雇员信息扩展表

加载数据

查询 1
——检察薪水支付币种为美元的雇员接洽方式

查询 2
——查询入职时间为2019年的雇员编号、姓名和电话号码字段,并将查询结果加载进表employees_info_extended中的入职时间为2019的分区中。

查询 3


  • 使用Hive中自带的函数COUNTO,6统计表employees_concat中有多少条记载
    SELECT COUNT (*) FROM employees_contact;
  • 查询以’cn’结尾的邮箱的员工信息
    SELECT a.name,b.tel_phone FROM employees_info a JoIN employees_contact b ON a.id=b.id WHERE b.email like '%cn';
2.2.3 Hive 数据堆栈计划

**数据集市和数据堆栈的区别**

  • 数据集市

    • 数据集市(DataMart),也叫数据市场,数据集市就是满足特定的部分大概用户的需求按照多维的方式举行存储,包罗界说维度、必要盘算的指标、维度的条理等,生成面向决策分析需求的数据立方体。

  • 数据堆栈

    • 为满足各类零散分析的需求,通过数据分层和数据模型的方式,并以基于业务和应用的角度将数据举行模块化的存储。

Hive 数据堆栈
数据堆栈分层:


  • ODS层:原始数据层。
  • DWD层:结构和粒度与原始表保持一致,简单清洗。
  • DWS层:以DWD为基础,举行轻度汇总。
  • ADS层:为各种统计报表提供数据。

分层的优点


  • 复杂题目简单化

    • 将任务分解成多个步调完成,每一层只处置惩罚单一的步调,比力简单,而且方便定位题目。

  • 镌汰重复开辟

    • 规范数据分层,通过中间层数据,镌汰最大的重复盘算,增加一次盘算结果的复用性。

  • 隔离原始数据

    • 避免数据异常大概数据敏感,使真实数据与统计数据解耦。

思考题

  • Hive的自界说函数有哪几种?()

    • UDF
    • UDTF
    • UDCF
    • UDAF

  • Hive在删除表的时候,内部表的元数据和实际数据不会被一起删除。()
  • Hive构建数据堆栈时通常必要举行分层。()
2.3 离线分析-Spark SQL

2.3.1 离线分析-Spark SQL

**Spark 简介** ——Spark是基于内存的分布式批处置惩罚体系,它把任务拆分,然后分配到多个的CPU上举行处置惩罚,处置惩罚数据时产生的中间产物(盘算结果)存放在内存中,镌汰了对磁盘的I/O操纵,大大的提拔了数据的处置惩罚速率,在数据处置惩罚和数据发掘方面比力占优势。
Spark 应用场景


  • 数据处置惩罚(DataProcessing):可以用来快速处置惩罚数据,兼具容错性和可扩展性。
  • 选代盘算(lterativeComputation):支持迭代盘算,有用应对复杂的数据处置惩罚逻辑。
  • 数据发掘(DataMining):在海量数据基础上举行复杂的发掘分析,可支持多种数据发掘和机器学习算法。
  • 流式处置惩罚(StreamingProcessing):支持秒级延迟的流处置惩罚,可支持多种外部数据源。
  • 查询分析(QueryAnalysis):支持SQL的查询分析,同时提供范畴特定语言(DSL)以方便操纵结构化数据,并支持多种外部数据源。
Spark 对比 MapReduce


  • 性能上提拔了100倍
  • Spark的中间数据放在内存中,对于迭代运算的效率更高;举行批处置惩罚时更高效,同时有着更低的延迟
  • Spark提供更多的数据集操纵类型,编程模型比MapReduce更灵活,开辟效率更高
  • 更高的容错本事(血统机制)
RDD


  • RDD是分布式弹性数据集,可以理解一个存储数据的数据结构。Spark会把所要操纵的数据,加载到RDD上,即RDD所有操纵都是基于RDD来举行的。RDD是只读和可分区。要想对RDD举行操纵,只能重新生成一个新的RDD。

    • 从HDFS输入创建,或从与Hadoop兼容的其他存储体系中输入创建。
    • 从父的RDD转换的到新的RDD。
    • 从数据聚集转换而来,通过编码实现。

  • RDD的存储:

    • 用户可以选择不同的存储级别缓存RDD以便重用。
    • 当前RDD默认是存储于内存,但当内存不足时,RDD会溢出到磁盘中。

Shuffle


  • Shuffle是划分DAG中stage的标识,同时影响Spark执行速率的关键步调

    • RDD的Transformation函数中,分为窄依靠(narrow dependency)和宽依靠(widedependency)的操纵。
    • 窄依靠跟宽依靠的区别是是否发生Shuffle(洗牌)操纵

窄依靠


  • 窄依靠是指父RDD的每个分区只被子RDD的一个分区所使用
  • 表现为:

    • 一个父RDD的每一个分区对应于一个子RDD分区


stage

TransFormation


  • Transformation是RDD的算子类型,它的返回值还是一个RDD。
  • Transformation操纵属于懒操纵(算子),不会真正触发RDD的处置惩罚盘算。
  • 变换方法的共同点:

    • 不会马上触发盘算。
    • 每当调用一次变换方法,都会产生一个新的RDD。

  • 例如:map(func), flatMap(func)
Action
——Action是RDD的算子,它的返回值不是一个RDD。Action操纵是返回结果大概将结果写入存储的操纵。Action是Spark应用启动执行的触发动作,得到RDD的相干盘算结果或将RDD保存到文件体系中。

SparkConf


  • SparkConf是用来对Spark举行任务参数配置的对象
  • 是通过键值对的情势,设置Spark任务执行时所必要的参数。
  • Spark读取任务参数的优先级是:
  • 代码配置>动态参数>配置文件
SparkContext


  • SparkContext是Spark的入口,相称于应用步伐的main函数。
  • SparkContext表现与Spark集群的毗连,可用于在该集群上创建RDD,记载盘算结果和情况配置等信息。
  • Spark2.0中引l入了SparkSession的概念,为用户提供了一个同一的切入点来使用Spark的各项功能。
  • 封装了SparkConf和SparkContext对象,方便用户使用Spark的各种API。


2.3.2 Spark SQL 架构原理

**SparkSQL 简介**

  • SparkSQL是Spark用来处置惩罚结构化数据的一个模块,可以在Spark应用中直接使用SQL语句对数据举行操纵。
  • SQL语句通过SparkSQL模块解析为RDD执行计划,交给SparkCore执行。

SparkSQL 使用方式


  • 通过SparkSession提交SQL语句任务像平凡Spark应用一样,提交到集群中分布式运行。
  • JDBC:

    • 应用加载JDBC驱动然后同一提交到集群的JDBCServer执行
    • JDBCServer是单点服务,会成为任务执行的瓶颈,不能处置惩罚海量数据和高并发任务


SparkSQL 关键概念 DataSet


  • DataSet

    • DataSet是一个由特定域的对象构成的强类型聚集,可通过功能或关系操纵并行转换其中的对象
    • DataSet以Catalyst逻辑执行计划表现,而且数据以编码的二进制情势存储,不必要反序列化就可以执行sort、wefilter、huffle等操纵。
    • Dataset是“懒惰”的,只在执行action操纵时触发盘算。当执行action操纵时,Spark用查询优化步伐来优化逻辑计划,并生成一个高效的并行分布式的物理计

Spark SQL 简单查询


  • 查询:df.select("id","name").show()
  • 带条件的查询:df.select($"id",$"name").where($"name"==="bbb").show()
  • 排序查询:df.select($"id",$"name").orderBy($"name".desc).show
    df.select($"id",$"name").sort($"name".desc).show
2.3.3 Spark SQL 开辟

**场景阐明:**

  • 假定用户有网民网购时停留网站的日记文本,基于某些业务要求,必要开辟Spark应用步伐并实现如下功能:

    • 统计日记文件中网购停留总时间超过2个小时的女性网民信息。

log1.txt:网民停留日记


  • 日记文件第一列为姓名,第二列为性别,第第三列为本次停留时间,单元为分钟

    • 分隔符为","


开辟思绪


  • 目标

    • 统计日记文件中网购停留总时间超过2个小时的女性网民信息。

  • 大致步调:

    • 创建表,将日记文件数据导入到表中
    • 筛选女性网民,提取停留时间数据信息。
    • 汇总每个女性停留总时间。
    • 筛选出停留时间大于2个小时的女性网民信息。

Scala 样例代码
  1. objectCollectFemaleInfo{
  2. //表结构,后面用来将文本数据映射为df
  3. case class FemaleInfo(name: String, gender: String, stayTime: Int)
  4. def main(args:Array[String]){
  5. //配置Spark应用名称
  6. val sparkConf =new SparkConf().setAppName("FemaleInfo")
  7. val ss = SparkSession.builder().config(sparkconf).getorCreate()
  8. val sc = ss.sparkContext
  9. sc.textFile(args(0) ).map(_.split(";")).map(p => FemaleInfo(p(0), p(1),
  10. importss.implicits·_//通过隐式转换,将RDD转换成DataFrame,然后注册表
  11. p(2).trim.toInt)).toDF.registerTempTable("FemaleInfoTable")
  12. val femaleTimeInfo=sqlContext.sql(
  13. //通过sql语句筛选女性上网时间数据,对相同名字行进行聚合
  14. select name,sum(stayTime) as stayTime from FemaleInfoTable where
  15. gender = 'female' group by name")
  16. //筛选出时间大于两个小时的女性网民信息,并输出
  17. val c= femaleTimeInfo.filter("stayTime >=120").collect()
  18. c.foreach(println)
  19. sc.stop()
复制代码

Yarn-cluster 作业提交


  • 打 jar 包
  • 上传到Linux某个目录下
  • Yarn-cluster提交方式
  1. ./spark-submit
  2. --master yarn-cluster
  3. --class com.huawei.bigdata.spark.CollectFemaleInfo
  4. ../lib/spark-examples.jar
  5. /user/logl.txt
复制代码
思考题

  • Spark的应用场景有哪些?()

    • 查询分析
    • 流式处置惩罚
    • 迭代盘算
    • 数据发掘

  • Spark2.0中引入了SparkSession的概念,为用户提供了一个同一的切入点来使用Spark的各项功能。()
  • SparkSQL可以举行实时查询()
2.4 数据收罗工具

**常用收罗工具** ——由于大数据的数据源各种各样;6由此对数据收罗的挑衅变的尤为突出。这里介绍几款常用数据收罗工具:
——Sqoop
——Loader
Sqoop 简介


  • Sqoop项目开始于2009年,最早是作为Hadoop的一个第三方模块存在,厥后为了让使用者能够快速摆设,也为了让开辟职员能够更快速的迭代开辟,Sqoop独立成为一个Apache项目。
  • Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(MySQL、PostgreSQL…)间举行数据的传递,可以将一个关系型数据库(例如MySQL,Oracle,PostgreSQL等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。

Sqoop 原理

  • Import 原理

    • Sqoop在import时,必要指定split-by参数。Sqoop根据不同的split-by参数值来举行切分,然后将切分出来的区域分配到不同map中。
    • 每个map中再处置惩罚数据库中获取的一行一行的值,写入到HDFS中。
    • 同时split-by根据不同的参数类型有不同的切分方法,如比力简单的int型,Sqoop会取最大和最小split-by字段值,然后根据传入的num-mappers来确定划分几个区域。

  • Export原理:

    • 获取导出表的schema、meta信息,和Hadoop中的字段match;
    • 并行导入数据:将Hadoop上文件划分成多少个分片,每个分片由一个MapTask举行数据导入。

Loader 简介


  • Loader是实现FusionlnsightHD与关系型数据库、文件体系之间互换数据和文件的数据加载工具。

    • 提供可视化向导式的作业配置管理界面;
    • 提供定时调度任务,周期性执行Loader作业;
    • 在界面中可指定多种不同的数据源、配置数据的清洗和转换步调、配置集群存储体系等。

  • 基于开源Sqoop研发,做了大量优化和扩展。


思考题

  • Sqoop主要用来做实时流处置惩罚()
  • Sqoop可以用于Hadoop(Hive)与传统的数据库(MySQL、PostgreSQL…)间举行数据的传递。()
  • Loader不能提供图形化操纵。()
3.离线批处置惩罚实战

**场景阐明**

  • 某公司拥有一个购物网站,用户在登录网站、点击商品、收藏商品、购买商品时都会产生举动日记,该公司渴望结合业务数据库中的数据和日记数据,举行大数据分析,得到分析结果作为公司决策依据。

客户需求


  • 网络产生的日记包罗如下特点:

    • 数据量大
    • 价值密度低
    • 数据的业务种类多

  • 客户渴望通过数据分层,让底层数据和指标数据分层,最终得到相干业务指标的数
据。
数据源
——数据来自于网站,网站通过在网页内代码埋点,用户每一次操纵,都会将相干信息传到日记服务器,日记收罗工具收罗日记服务器上的数据,然后会对数据举行简单的处置惩罚过滤,末了将数据按照日期存到HDFS上。
计划分析


  • 数据源:网站产生的日记
  • 存储数据:HDFS上的不同目录下的数据。
  • 构建数据堆栈:根据业务数据分层
  • 分析数据:相干指标,GMV,活跃度等
  • 结果的展示:网页版报表显示
方案架构-组件选取


  • 数据导入导出:Loader
  • 数据存储:HDFS、MySQL
  • 数据盘算:Hive
方案架构-方案流程

效果展示

思考题

  • 本实验的网站日记数据必要存放到HDFS。()
  • 本实验的数据源包罗网站日记数据和数据库业务数据()
  • 本实验对数据处置惩罚过程中,举行了数据分层处置惩罚。()

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

河曲智叟

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表