基于Spark实现将MySQL数据导入Hive——替换sqoop

火影  论坛元老 | 2025-4-16 11:54:39 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 1749|帖子 1749|积分 5247


前言

近来在做数仓项目时在ods层导入数据时有个难题,sqoop连接不上hive,经查询是底层的hadoo与hive版本都太新了不兼容,但是仅仅因为一个sqoop工具就牵涉到集群配置那可太小题大做了,颠末翻阅网上课程得知sqoop就是运行在MR上的数据抽取工具,理解了这一层就简单了,我还熟悉个比MR快好多倍的引擎——spark,那么能否用spark实现出类似sqoop抽取的效果呢,不管了尝试一下 --> 用spark 手撕sqoop。

一、预备工作

1.创建spark程序

配置好相关依靠和hive元数据地址(hive-site.xml文件)

2.确定需求

本文是希望将MySQL某个数据库下的全部表导入Hive下的库中
20张表

3.初始化spark程序

  1.   /**
  2.    * 读取Mysql数据导入Hive
  3.    * 小脚本
  4.    * @param args
  5.    */
  6.     //临时设置日志打印级别
  7.     Logger.getLogger("org").setLevel(Level.WARN)
  8.     //初始化SparkSession,整个程序的入口
  9.     val spark = SparkSession.builder()
  10.       .enableHiveSupport()
  11.       .appName("读取Mysql导入Hive")
  12.       .config("spark.default.parallelism", 1)
  13.       .master("local")
  14.       .getOrCreate()
复制代码
  1.   //调大SparkSQL的默认打印字段个数,避免打印的json数据过长(根据需求,如果有超长数据就开)
  2. spark.conf.set("spark.sql.debug.maxToStringFields", 1000)
复制代码
二、具体实现

1.获取全部表名

循环jdbc获取表数据
因为jdbc连接必要传递库表名,这里的库是同一的,但是20张表名必要获取,直接简单粗暴复制全部上图表名到表名文本中tableNames.txt (简单,一整个全选cccv就行)

2.初始化jdbc参数

JDBC路径后的参数 zeroDateTimeBehavior=convertToNull 是防止导出数据有空值会报错,能确保MySQL中的数据足够干净可以不加,用户名暗码必要修改
  1. val URL = "jdbc:mysql://node1:3306/jrxd?zeroDateTimeBehavior=convertToNull"
  2. val properties = new Properties()
  3. properties.put("user", "********")
  4. properties.put("password", "********")
复制代码
3.获取全部表名便于后续拆分

调用sparkContext获取全部表数据,由于txt中表文件换行存储,这里不必要再额外清洗
由于sparkContext得到的是RDD迭代器运行在Executor进程中,后续读取操作必要Driver运行
调用行动算子collect收集成数组,后续可遍历数组得到表名
  1. // 手动获取mysql中的表名储存到文件,用RDD解析
  2. val tables:RDD[String] = spark.sparkContext.textFile("data/tableNames.txt")
  3. //不加这一条会报错,好像是什么在Executor端转到driver端运行
  4. val tableNames = tables.collect()
复制代码
4.用jdbc读取出并写入hive

遍历每张表名
JDBC格式( 数据库路径,表名 ,调用者信息 )
设置写入模式mode为覆盖写入Overwrite,这样测试过程不会报错(表已存在)
写入HIive的格式要注意fincredit为库名,注意后续表名的命名规范(ods层表命名通常为 {ods_原表名} )
添加写入标记提示,便于可视化
  1. ```scala
  2. //循环遍历表名,写入hive,注意ods层命名规范
  3.     tableNames.foreach(tableName => {
  4.       val df = spark.read.jdbc(URL, tableName, properties)
  5.       df.write.mode("Overwrite").saveAsTable(s"fincredit.ods_${tableName}")
  6.       println(s"写入ods_${tableName}完成...")//添加标记
  7.     })
  8. println("所有数据写入完毕,正在加载数据....")
复制代码
5.写入完毕后测试

读出hive中生成的每张表
show(5)每张表只展示5行 考虑性能,也没须要全表查看;
默认写入parquet格式,在其他地方大概率无法查看,所以录入后当场测试表数据既省时又方便
  1. //测试写入数据是否完整
  2. tableNames.foreach(tableName => {
  3.   spark.sql(s"select * from fincredit.ods_${tableName}").show(5)
  4.   println(s"加载ods_${tableName}完成...")
  5. })
  6. println("所有数据加载完毕...")
  7. //至此ods层数据写入完毕,加载完毕,程序结束(写入格式默认parquet)
复制代码
6.运行代码

写入hive过程统统顺利


工作台展示部门库数据

*每张表展示五行数据
*主动处置惩罚空值为null,在 二、2中的数据库参数配置. zeroDateTimeBehavior=convertToNull

查看Hive的fincredit库数据

20张ods表满满当当

至此数据导入部门竣事,要开始后续的数仓建设了。。。
总结

以上spark代码在实现功能方面能轻松取代sqoop,各人都知道sqoop依靠MepReduce实行,在速率上面肯定不如spark,所以接纳该方法可以轻松实现提高数据源抽取效率,在开发过程中可节约时间成本,也省去了工具配置的过程。`
那么问题来了,这么一个小脚本在缺点上还是有挺多的,比如说collect在收集多分区海量数据表名时大概会吃不消的,使用场景也是库中表名不多的情况。另外,输出的格式parquet大大低落了数据的可读性,不像sqoop可以指定输出为TextFile。还有一方面大概循环读表时反复调用JDBC会产生大量磁盘IO,这方面还能有很大改进,由于不是项目重点,这里就没有继续深挖性能。还有什么不敷,感谢大佬增补!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

火影

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表