最详细数据仓库项目实现:从0到1的电商数仓创建(采集部门) ...

打印 上一主题 下一主题

主题 547|帖子 547|积分 1641

1、数据库和数据仓库的区别:
数据仓库就是data warehouse,数据小卖店,相称于是对数据加工,盘算然后对外提供服务,而不是单纯的存储
2、数据流转过程中数据仓库中的数据源部门
数据源部门的数据**不是只同步数据库当前状态的数据,**由于数据库中的数据不会生存海量数据,只会生存近几年的数据,
而数据源应该是汇总数据库中的数据,就是数据库中增长一些数据就汇总到数据仓库中的数据源中,
由于数据库只保存最近一些的数据,太久远的数据会存放到磁盘中,必要的时候数据库会拉取这部门数据。
所以数据源相称于必要一个独立的部门,单独用来存储数据,数据仓库不会直接从数据库拉取数据
3、必要明确的是,项目分为采集部门和数据仓库部门,是两个相互独立的部门
1、采集

   架构部门可以看第一个讲义文件,有项目架构的先容,版本的先容,和搭建集群的思路【这个重要相识一下就可以】
  采集涉及两部门:行为数据和业务数据。
  业务数据通过MySQL存储,而行为日志数据通过对日志文件处置惩罚利用flume以及kafka存储到集群中
  

  • 系统数据流程图:

1.1、行为数据

1)行为日志

  1. // 第一份讲义的第8页开始
  2. 这部分行为日志是json文件
  3.    
  4. 所以如果想要用的话,需要将文件转成一行的格式,就是一长行
  5. 因为在hive或者spark中,加载数据到表格中,解析数据的时候,就是只能解析一行json文件
  6.    
  7. // 这里可以用插件fehelper,进行整理json代码
复制代码
用户行为日志的内容,重要包括用户的各项行为信息以及行为所处的情况信息
收集这些信息的手段通常为埋点
   行为日志分为两类: 页面日志、启动日志
  1、页面浏览日志



  • common : 情况信息
  • actions : 动作信息
  • displays : 曝光信息
  • page : 页面信息
  • err : 错误信息
  • ts : 进入当前页面的时间戳
2、App启动日志



  • common : 情况信息
  • start : 启动信息
  • err : 错误信息
  • ts : 启动app的时间戳
2)行为数据采集模块

1、模块流程图


2、按照上图形貌安装步骤


  • 安装假造机,这里可以用课程资料里的模板机hadoop100,也可以用自己的hadoop_base
  • 之后克隆三台假造机,这里我们是hadoop106、hadoop107、hadoop108。更改主机名,ip地点,ip映射,主机映射
  • 设置免密登录
  • 安装JDK
  • 数据模拟:
    这个指的是,我们项目用到的行为日志和业务数据都是我们自己模拟的,这是必要java程序或者mysql数据,我们按照上图中的lg.sh脚本,已经将相关的java程序和设置文件加载到文件夹applog中
  • 安装hadoop、zookeeper、kafka、flume

    必要先装zookeeper再安装kafka,由于必要先启动zookeeper才气启动kafka。别的hadoop、zookeeper、kafka是集群搭建。意思是三台主机都必要,并且举行相应设置。
    **而flume是单点的,那里必要就在那里装,**就是一个传数据的。别的在设置flume的时候,选择TailDirSource和KafkaChannel,不要sink了
    比如我们如果在hadoop106这台主机上装了模拟数据天生的程序,天生数据后我们必要将数据上传到hdfs,这就只必要在hadoop106上装有flume

  • flume日志采集设置

    • 由于我们行为日志产生的程序,也就是模拟数据天生的java程序装在了hadoop106上,所以flume也装在hadoop106上。但由于模拟数据天生的日志json文件是竖着的,我们必要排列成一长行的样子,后面才可以用上,所以我们必要在flume的source部门加上一个拦截器举行处置惩罚数据
    • 编写flume设置文件,我们只必要taildir的source,以及kafka类型的channel
    • 之后先启动zookeeper,然后是启动Kafka,再启动flume
    • 就可以模拟数据了,lg.sh test 100 ,就会通过模拟数据天生的数据经过flume自动存到kafka中,之后如果想要及时处置惩罚或者离线处置惩罚,都可以从Kafka中拿去数据举行处置惩罚,盘算

1.2、业务数据

首先必要安装MySQL,这里有脚本,可以直接利用
但是最后更改权限部门必要自己再单独执行以下,就是脚本中有部门没有发挥作用
  1. mysql -uroot -p000000   -- 登录MySQL
  2. -- 然后执行下面两个命令
  3. alter user 'root'@'%' identified with mysql_native_password by '000000';
  4. flush privileges;
复制代码
1)模拟数据

利用远程连接软件连接上数据库后,将数据库脚本文件gmall导入,里面都是表结构和一些默认数据
后续会通过模拟程序,继续天生业务数据
gmall中会有全部的表结构文件
但是只有必要的数据,并不是全部的表都有数据,之后照旧必要json模拟天生数据加到数据库中
2)业务结构关系

重要涉及电商业务以及后台管理两部门
  1. // 看业务数据部分讲义的第13页
复制代码


增补一个知识点:
对于付出表中的数据,在订单下单时自动插入付出记载,这是一个系统自动执行的操作,通常不会被以为是逐日的增量数据。原因在于,这种插入操作是订单下单时自动触发的,而不是在逐日的数据处置惩罚流程中产生的。
逐日的增量数据通常指的是在逐日的数据处置惩罚过程中,通过比力昨日和今日数据的差异,来确定今日新增、更新或删除的数据。而订单下单时自动插入付出记载这个操作通常不会被包括在这个范畴内,由于它不是在逐日的数据处置惩罚过程中产生的变化。
所以,固然订单下单时会有付出记载插入,但这个操作不会被当做逐日的增量数据。相反,逐日的增量数据更关注于逐日数据处置惩罚周期内的变化,比如订单状态的更新、付出状态的改变等。
1.3、数据采集流程

不外这个hadoop启动的脚本不是 hdp.sh 而是hadoop.sh

增量数据重要用于及时数仓的处置惩罚
全量数据离线数仓项目中用到比力多
别的这两个数据采集的软件各不雷同,Maxwell是用于采集增量数据然后放置于Kafka中用于及时数仓的项目处置惩罚
1)Maxwell同步数据

先打开zookeeper,然后打开Kafka,执行msw.sh脚本,运行Maxwell,开始同步数据
但是汗青数据的同步分为全量和增量数据的同步,直接执行msw.sh脚本是及时同步MySql的变动数据【增量数据】
  1. 但有时只有增量数据是不够的,我们可能需要使用到MySQL数据库中从历史至今的一个完整的数据集。这就需要我们在进行增量同步之前,先进行一次历史数据的全量同步。这样就能保证得到一个完整的数据集。
  2.    
  3.     /* 但其实业务数据去采集全量数据可以用不同的软件,不是用Maxwell  !!!!!!!!!!!! */
  4.    
  5.     //提供了一个bootstrap功能
  6.     // 这个可以同步一张表得全部数据
  7. /opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table activity_info --config /opt/module/maxwell/config.properties
  8. //但是上面的table 后面跟的是需要同步的表名字,需要自己根据情况去更改   
复制代码
2)业务数据部门必要别的一个flume进程


   这是在hadoop107上面,不是Hadoop106上举行设置
  但是这里有一个零点漂移的题目。就是有一个几秒的时间差,就会把文件存储错位置,由于他是按照时间戳【这是flume采集到数据自动加上的时间戳】,时间,就是详细的日期去存储的,可以看下面的图
解决方式就是通过加上一个拦截器
如许就会让时间戳,也就是header的时间和body的时间是一样的
所以最后加上拦截器的目的就是包管时间同等,别的json的格式是正确的

这里从kafka中拉取得数据只有行为数据,可以看flume设置文件中,拉去Kafka中得主题只是行为数据得主题
之后会通过别的软件拉取业务数据得全量数据
  1. // 配置source1
  2. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
  3. a1.sources.r1.batchSize = 5000
  4. a1.sources.r1.batchDurationMillis = 2000
  5. a1.sources.r1.kafka.bootstrap.servers = hadoop106:9092,hadoop107:9092,hadoop108:9092
  6. a1.sources.r1.kafka.topics=topic_log
  7. a1.sources.r1.interceptors = i1
  8. a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder
复制代码
1.4、数据同步策略

   本项目中,全量同步采用DataX,增量同步采用Maxwell
  1)增量方案对比


2)dataX同步全量数据(存在两种模式)

解决异构数据源同步题目,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当必要接入一个新的数据源的时候,只必要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。

利用方法照旧比力简单的,只需根据自己同步数据的数据源和目的地选择相应的Reader和Writer,并将Reader和Writer的信息设置在一个json文件中,然后执行如下下令提交数据同步任务即可。
   但是必要注意的是,利用dataX的时候,肯定要先提前在hdfs中创建目标路径下必要的文件夹!!!!!,否则会提示找不到目标路径
  1. python bin/datax.py path/to/your/job.json  //最后这个json文件名是自己定义的
  2.    
  3. //可以使用如下命名查看DataX配置文件模板。
  4. python bin/datax.py -r mysqlreader -w hdfswriter   //其中setting用于对整个job进行配置,content用户配置数据源和目的地。
复制代码
  1. 在讲义中举了例子,MySQL同步到hdfs中
  2. 需要注意的是,null值的处理
  3. //这部分去看同步数据讲义的第7-8页
复制代码
MySQLreader这里分为两种模式,一种是写清楚详细的id属性的TableMode,另一种是在json中写查询语句即可的QuerySQLMode,在编辑好json文件后,按照模板编写
之后就是运行,可以直接输入下令行
  1. python bin/datax.py job/base_province_sql.json  //后面job/base_province_sql.json,是路径和自己编写的文件名
复制代码
3)dataX传参

dataX是可以传参的
通常情况下,离线数据同步任务必要逐日定时重复执行,故HDFS上的目标路径通常会包含一层日期,以对逐日同步的数据加以区分,也就是说逐日同步数据的目标路径不是固定稳定的,因此DataX设置文件中HDFS Writer的path参数的值应该是动态的。为实现这一结果,就必要利用DataX传参的功能。
DataX传参的用法如下,在JSON设置文件中利用${param}引用参数,在提交任务时利用-p"-Dparam=value"传入参数值,详细示例如下。

4)dataX同步全量数据

由于dataX同步数据必要类似flume一样的设置文件,但是每一个设置文件都必要写数据库中表的格式
有很多表,每一个表都必要写列名字和属性种别,比力多,所以就写了一个dataX设置天生器,这里由于数据库是在hadoop106上,所以把代码中的主机名改成hadoop106,在资料中的就是已经改好的
之后编写一个脚本文件,就可以执行下令,对17张表利用datax举行同步全量数据
  1. // 在同步数据的讲义第11到18页
复制代码
5)增量表数据同步

  1. // 在第三个同步数据的讲义中第19页到26页
复制代码
1、同步从Maxwell获取的MySQL业务增量数据



  • 设置kafka到hdfs的flume设置文件,之前设置的Kafka到hdfs的是关于行为日志数据的,现在是关于业务增量数据的,这个的脚本下令是叫做:f3.sh。【Flume必要将Kafka中topic_db主题的数据传输到HDFS】
  • 别的必要编写拦截器
    这个是涉及到时间戳的题目,由于利用Maxwell会涉及到一个ts的参数,不像行为日志采集的时候直接把时间就获取到,Maxwell获取时间不是操作时间,是由于他涉及到一系列步骤,从你写下sql语句,到binlog获取时间,再到Maxwell获取时间,再去链接MySQL等等不像行为数据直接同步数据,由于sql涉及到代码操作和连接mysql以及sql语句生效运作这些环节
    别的,就是我们在对表举行同步的时候,涉及到多个表,不像行为日志,就是只有一个日志文件,在Kafka中也就是一个主题,主题下不用再去分别,所以必要获取各个表的名字,所以就必要编写拦截器

2、增量表首日全量同步

通常情况下,增量表必要在首日举行一次全量同步,后续逐日再举行增量同步,首日全量同步可以利用Maxwell的bootstrap功能,方便起见,下面编写一个增量表首日全量同步脚本。
就是之前说的那个bootstrap

1.5、采集流程总结


   涉及两类数据:行为数据(行为日志)业务数据(MySQL)
  这两类数据,都是通过模拟数据的程序天生,通过脚本文件执行lg.sh,模拟天生行为日志数据和业务数据。
日志数据天生后直接写入/opt/module/applog/log目次下,而业务数据是在下载设置好MySQL后创建名为gmall的数据库,直接将业务数据通过jdbc远程连接,写入数据库中对应的表中
业务数据还必要注意的一个是,它分为增量数据和全量数据,增量数据是通过Maxwell写入到Kafka中,之后Kafka可以通过flume流到hdfs,这一部门的hdfs承载两部门数据,业务数据的增量数据,以及行为日志的数据
增量数据的同步操作还分为两步骤,分别是第一日的全量数据同步,是利用的Maxwell中的bootstrap,之后便是逐日的增量数据同步。
别的业务数据的全量数据同步,是利用的dataX,这个是类似flume的一个程序文件,设置好即可利用,只不外每一个表都必要一个设置文件,所以我们就用java写了一个设置文件天生器,由于设置文件必要的其实就是表的名字和id名字和属性种别,
利用编写好的拦截器,运行java -jar datax-config-generator-1.0-SNAPSHOT-jar-with-dependencies.jar
有多少表就会天生多少表的json设置文件
编写利用好的设置文件运行的脚本,就可以执行dataX了,利用下令 full.sh all。如许就可以做到将表中全量数据同一同步到hdfs中
必要明确的是,业务数据分为:增量数据和全量数据,这两类数据分别拥有一些表,没有重复,由于有的表适合增量处置惩罚,有的表适适用全量数据去处置惩罚。在hdfs中增量数据后面文件名会加上一个“inc”,而全量数据会加上一个“full”


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

络腮胡菲菲

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表