基于Spark实现将MySQL数据导入Hive——替换sqoop
前言近来在做数仓项目时在ods层导入数据时有个难题,sqoop连接不上hive,经查询是底层的hadoo与hive版本都太新了不兼容,但是仅仅因为一个sqoop工具就牵涉到集群配置那可太小题大做了,颠末翻阅网上课程得知sqoop就是运行在MR上的数据抽取工具,理解了这一层就简单了,我还熟悉个比MR快好多倍的引擎——spark,那么能否用spark实现出类似sqoop抽取的效果呢,不管了尝试一下 --> 用spark 手撕sqoop。
一、预备工作
1.创建spark程序
配置好相关依靠和hive元数据地址(hive-site.xml文件)
https://i-blog.csdnimg.cn/direct/dcbdd29e340549d1b1328519afa42da4.png
2.确定需求
本文是希望将MySQL某个数据库下的全部表导入Hive下的库中
20张表
https://i-blog.csdnimg.cn/direct/844e8fb6c8a549b0a277809c617c177e.png
3.初始化spark程序
/**
* 读取Mysql数据导入Hive
* 小脚本
* @param args
*/
//临时设置日志打印级别
Logger.getLogger("org").setLevel(Level.WARN)
//初始化SparkSession,整个程序的入口
val spark = SparkSession.builder()
.enableHiveSupport()
.appName("读取Mysql导入Hive")
.config("spark.default.parallelism", 1)
.master("local")
.getOrCreate()
//调大SparkSQL的默认打印字段个数,避免打印的json数据过长(根据需求,如果有超长数据就开)
spark.conf.set("spark.sql.debug.maxToStringFields", 1000)
二、具体实现
1.获取全部表名
循环jdbc获取表数据
因为jdbc连接必要传递库表名,这里的库是同一的,但是20张表名必要获取,直接简单粗暴复制全部上图表名到表名文本中tableNames.txt (简单,一整个全选cccv就行)
https://i-blog.csdnimg.cn/direct/0312075fcd1344ddbcf454320cdcd773.png
2.初始化jdbc参数
JDBC路径后的参数 zeroDateTimeBehavior=convertToNull 是防止导出数据有空值会报错,能确保MySQL中的数据足够干净可以不加,用户名暗码必要修改
val URL = "jdbc:mysql://node1:3306/jrxd?zeroDateTimeBehavior=convertToNull"
val properties = new Properties()
properties.put("user", "********")
properties.put("password", "********")
3.获取全部表名便于后续拆分
调用sparkContext获取全部表数据,由于txt中表文件换行存储,这里不必要再额外清洗
由于sparkContext得到的是RDD迭代器运行在Executor进程中,后续读取操作必要Driver运行
调用行动算子collect收集成数组,后续可遍历数组得到表名
// 手动获取mysql中的表名储存到文件,用RDD解析
val tables:RDD = spark.sparkContext.textFile("data/tableNames.txt")
//不加这一条会报错,好像是什么在Executor端转到driver端运行
val tableNames = tables.collect()
4.用jdbc读取出并写入hive
遍历每张表名
JDBC格式( 数据库路径,表名 ,调用者信息 )
设置写入模式mode为覆盖写入Overwrite,这样测试过程不会报错(表已存在)
写入HIive的格式要注意fincredit为库名,注意后续表名的命名规范(ods层表命名通常为 {ods_原表名} )
添加写入标记提示,便于可视化
```scala
//循环遍历表名,写入hive,注意ods层命名规范
tableNames.foreach(tableName => {
val df = spark.read.jdbc(URL, tableName, properties)
df.write.mode("Overwrite").saveAsTable(s"fincredit.ods_${tableName}")
println(s"写入ods_${tableName}完成...")//添加标记
})
println("所有数据写入完毕,正在加载数据....")
5.写入完毕后测试
读出hive中生成的每张表
show(5)每张表只展示5行 考虑性能,也没须要全表查看;
默认写入parquet格式,在其他地方大概率无法查看,所以录入后当场测试表数据既省时又方便
//测试写入数据是否完整
tableNames.foreach(tableName => {
spark.sql(s"select * from fincredit.ods_${tableName}").show(5)
println(s"加载ods_${tableName}完成...")
})
println("所有数据加载完毕...")
//至此ods层数据写入完毕,加载完毕,程序结束(写入格式默认parquet)
6.运行代码
写入hive过程统统顺利
https://i-blog.csdnimg.cn/direct/07ff868295024361b8491109fe205aa2.png
工作台展示部门库数据
*每张表展示五行数据
*主动处置惩罚空值为null,在 二、2中的数据库参数配置. zeroDateTimeBehavior=convertToNull
https://i-blog.csdnimg.cn/direct/5e21061101d4401fa08d728c54d4e7dc.png
查看Hive的fincredit库数据
20张ods表满满当当
https://i-blog.csdnimg.cn/direct/59203603a7494e2eb451f6fa1ed5f464.png
至此数据导入部门竣事,要开始后续的数仓建设了。。。
总结
以上spark代码在实现功能方面能轻松取代sqoop,各人都知道sqoop依靠MepReduce实行,在速率上面肯定不如spark,所以接纳该方法可以轻松实现提高数据源抽取效率,在开发过程中可节约时间成本,也省去了工具配置的过程。`
那么问题来了,这么一个小脚本在缺点上还是有挺多的,比如说collect在收集多分区海量数据表名时大概会吃不消的,使用场景也是库中表名不多的情况。另外,输出的格式parquet大大低落了数据的可读性,不像sqoop可以指定输出为TextFile。还有一方面大概循环读表时反复调用JDBC会产生大量磁盘IO,这方面还能有很大改进,由于不是项目重点,这里就没有继续深挖性能。还有什么不敷,感谢大佬增补!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页:
[1]