1. Load Data(数据加载)
这里我们主要使用的是Sqoop进行数据加载,但是在项目实现中,我们遇到过一个关于Sqoop的问题:在我们的第一个项目中,我们使用Sqoop的query进行数据导入成功进行,可是当我们在第二个项目中使用同样的方法就遇到了需要target-dir的问题。
经过上网搜索和使用chatgpt搜索得出结论:Sqoop使用query导入数据的时间,需要指定hive的库的地点,在第二个项目中,我们仅仅需要一次全量导入,所以我们使用以下方法:
- sqoop import \
- --connect jdbc:mysql://node1:3306/insurance \
- --username root \
- --password 123456 \
- --table area \
- --hive-table insurance_ods.area \
- --hive-import \
- --hive-overwrite \
- --fields-terminated-by '\t' \
- --delete-target-dir \
- -m 1
复制代码 重点:接下来,我们主要讲解的是我们第一个项目中使用query的方法。
首先,我们来解释一下为什么在我们的第一个项目中可以使用query这种方法,而在第二个项目中不可,缘故原由是:第一个项目中,我们使用的CDH配置的大数据集群,而第二个项目我们并没有使用CDH,经过一番查找,我们得出结论,CDH配置的sqoop会自动查询你的hive数据库的位置,所以不需要指定target-dir,但是,在我们的第一个项目中,我们需要进行增量导入,因为我们的第一个项目为新零售项目,订单天天都会在mysql中更新和新增,假如我们将表全部删除再全部添加,第一,这太费时间了,假如公司的数据量大了,这样天天导入的时间太久,第二,我们的hive数据库中的ods层除了是hive和mysql的一个通道,还会将mysql中的汗青数据全都保存下来,我们举个例子:我们如今有一个表,字段为:身份证号,姓名,性别,住址,创建时间,修改时间。那么在mysql中,假如有一个人改变了住址,我们会使用sql语句:
- update 表
- set 住址 = '新地址', 修改时间 = now()
- where 身份证号 = 'xxxx';
复制代码 那么这样会出现一个问题,假如我想知道,这个人在来这个新地点之前住在什么地方,应该怎么办?
这显然是mysql无法办到的事情,而在ods层中,我们使用的逻辑为,添加字段:dt,这个字段用来分区,也用来记录是哪一天加进来的数据,这样我们就可以做到备份的结果了。所以,接下来我们来讲一讲假如进行增量导入和全量导入:
- # 增量导入的第一步永远是全量导入
- # 全量导入
- sqoop import \
- --connect jabc:mysql://node1:3306/Test \
- --username root \
- --password 123456 \
- --query "select * from test where 1 = 1 and \$CONDITIONS" \
- --hive-database Test \
- --hive-table test \
- --m 1
- # 格式:
- sqoop import \
- --connect jabc:mysql://你的服务器的地址:3306(mysql的端口号)/要导入的表所在的数据库 \
- --username mysql账号 \
- --password mysql密码 \
- --query "select * from test where 1 = 1 and \$CONDITIONS" (重点: select 查询的字段 from mysql中的表 where 如果是全量导入没有过滤条件就写1 = 1 and \$CONDITIONS(必须要的, 如果你使用的是单引号,就可以不用这个\转义字符)) \
- --hive-database hive中的数据库, 如果你要导入的hive表为orc列存储, 前面的hive需要改成hcatalog \
- --hive-table 需要导入的表, 与数据库一样, 如果是orc列存储, 改成hcatalog-table \
- --m 需要几个线程
复制代码- # 增量导入
- sqoop import \
- --connect jabc:mysql://node1:3306/Test \
- --username root \
- --password 123456 \
- --query "select * from test where ((create_time between '2024-10-03 00:00:00' and '2024-10-03 23:59:59') or (update_time between '2024-10-03 00:00:00' and '2024-10-03 23:59:59')) and \$CONDITIONS" \
- --hive-database Test \
- --hive-table test \
- --m 1
复制代码 这里我们发现,全量导入和增量导入的区别就是where条件不同,在mysql中create_time和update_time用来记录数据创建的时间和修改的时间,所以我们只需要导入本日的新增的数据和修改的数据即可,所以得出了我们的增量导入的写法。
2. 分桶
作用:
- 支持事务表
- JOIN服从性能变高(Bucket-Mapjoin、SMB-Mapjoin)
- 数据采样
分桶的底层: 将数据按照hash取余划分为不同的文件, 第一个桶的余数是0
2.1 分桶表的创建
- create table if not exists Test.t_bucket(
- id int,
- name string
- ) clustered by (id) into 4 buckets
- row format delimited fields terminated by '\t';
复制代码 此中, clustered by (id) into 4 buckets 为分桶语句, 将表通过id字段分为了4个桶。
2.2 分桶表的加载数据
- -- 分桶表一共有三种加载数据的方法
- -- 方法1:直接load一个文档里面的数据到分桶表中
- load data local inpath '/root/data/test.txt' into table t_bucket;
- -- 方法2:使用insert into(overwrite)方式来加载,前提是先有一个跟分桶表字段一致的普通表,将普通表的数据加载到分桶表中
- insert into Test.t_bucket
- select * from Test.t_tmp cluster by (id) ;
- -- 方法3:与方法2类似,但是如果需要按照一定的规律排序添加到分桶表中可以使用以下方法
- -- 默认为按照字段顺序添加,当然,也可以使用desc倒序添加
- insert overwrite table Test.t_bucket
- select * from Test.t_tmp distribute by (id) sort by (id);
- -- 倒序添加
- insert overwrite table Test.t_bucket
- select * from Test.t_tmp distribute by (id) sort by (id desc);
复制代码 2.3 数据采样用法
- -- 注意:表的别名,一定要写在采样的后面,否则报错
- select * from table tablesample(bucket x out of y on column) as t;
- 可以通过执行计划来验证查询性能是否有提升
- explain select ...
- 栗子:
- --启用强制分桶
- set hive.enforce.bucketing=true;
- DROP table sqooptohive.emp3;
- CREATE TABLE emp3
- (
- id int,
- name string,
- deg STRING,
- salary int,
- dept string
- )
- CLUSTERED BY (id) sorted by (id DESC) INTO 5 BUCKETS
- ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
- STORED AS ORC
- TBLPROPERTIES ('orc.compress'='SNAPPY');
- INSERT into sqooptohive.emp3
- SELECT * from sqooptohive.emp;
- explain SELECT count(1), dept from sqooptohive.emp3 GROUP BY dept;
- explain SELECT count(1), dept from sqooptohive.emp3 TABLESAMPLE(BUCKET 1 OUT OF 5 on id)
- GROUP BY dept;
复制代码 3 数据倾斜
3.1 什么是数据倾斜?
数据倾斜是由于数据分布不匀称,造成数据大量的集中到一点,造成数据热点的征象。
主要体现为:使命进度长时间维持在 99%或者 100%的附近,查看使命监控页面,发现只有少量 reduce 子使命未完成,因为其处理惩罚的数据量和其他的 reduce 差异过大。 单一 reduce 处理惩罚的记录数宁静均记录数相差太大,通常到达好几倍之多,最长时间弘大于平均时长。
3.2 产生数据倾斜的常见缘故原由
3.2.1 join:大表关联小表时,轻易尝试数据倾斜
(概念泉源于:https://blog.csdn.net/qq_41018861/article/details/112238592)
这里的解决方案就是我们熟知的mapjoin。
Map Join是Hive的一种优化操作,其实用于小表JOIN大表的场景,由于表的JOIN操作是在Map端且在内存进行的,所以其并不需要启动Reduce使命也就不需要经过shuffle阶段,从而能在一定水平上节流资源提高JOIN服从
这里我们需要知道mapjoin的原理:
MAPJION会把小表全部读入内存中,在map阶段直接拿别的一个表的数据和内存中表数据做匹配,而普通的equality join则是类似于mapreduce模子中的file join,需要先分组,然后再reduce端进行连接,使用的时间需要联合着场景;由于mapjoin是在map是进行了join操作,省去了reduce的运行,服从也会高许多。这样就不会由于数据倾斜导致某个reduce上落数据太多而失败。于是原来的sql可以通过使用hint的方式指定join时使用mapjoin。
先发出来,后续进行补充
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |