ToB企服应用市场:ToB评测及商务社交产业平台

标题: Spark抽取MySQL指定命据表中的增量数据到Hive [打印本页]

作者: 光之使者    时间: 前天 08:56
标题: Spark抽取MySQL指定命据表中的增量数据到Hive
   使用Scala编写spark工程代码,将MySQL的ds_db01库中表customer_inf、customer_inf、order_detail、order_master、product_info的数据增量抽取到Hive的ods库(需自建)中对应表customer_inf、order_detail、order_master、product_info中。
hive原数据准备详见:
Spark(hive原数据准备)抽取MySQL指定命据表中的增量数据到Hive-CSDN博客
https://blog.csdn.net/qq_31710395/article/details/144557049?spm=1001.2014.3001.5502
SQL文件下载:
        123云盘:https://www.123684.com/s/kgrAjv-z7q3d
        阿里云盘:https://www.alipan.com/s/A5QjDUv8PXQ
pom文件配置添加:
  1. <dependency>
  2.       <groupId>org.apache.spark</groupId>
  3.       <artifactId>spark-core_2.12</artifactId>
  4.       <version>3.1.2</version>
  5.     </dependency>
  6.     <dependency>
  7.       <groupId>org.apache.spark</groupId>
  8.       <artifactId>spark-hive_2.12</artifactId>
  9.       <version>3.1.2</version>
  10.     </dependency>
  11.     <dependency>
  12.       <groupId>org.apache.spark</groupId>
  13.       <artifactId>spark-sql_2.12</artifactId>
  14.       <version>3.1.2</version>
  15.     </dependency>
  16.     <dependency>
  17.       <groupId>mysql</groupId>
  18.       <artifactId>mysql-connector-java</artifactId>
  19.       <version>5.1.37</version>
  20. </dependency>
复制代码
1.抽取ds_db01库中customer_inf的增量数据进入Hive的ods库中表customer_inf。根据ods.customer_inf表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.customer_inf下令;
  1. import org.apache.spark.sql._
  2. import org.apache.spark.sql.functions.lit
  3. import java.text.SimpleDateFormat
  4. import java.util.Properties
  5. import java.util.Calendar
  6. object zlcq1 {
  7.   def main(args: Array[String]): Unit = {
  8. //  建立spark环境
  9.     val spark = SparkSession.builder().appName("zlcq1").enableHiveSupport().getOrCreate()
  10. //  获取hive指定表指定字段的最大值
  11.     spark.sql("use ods")
  12.     val max = spark.sql("select max(modified_time) from customer_inf").head().getTimestamp(0).toString
  13. //  mysql连接配置
  14.     val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
  15.     val table = "customer_inf"
  16.     val properties =new Properties()
  17.     properties.put("user","root")
  18.     properties.put("password","123456")
  19.     properties.put("driver","com.mysql.jdbc.Driver")
  20. //  为被抽取表建立临时表,方便后续操作
  21.     spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
  22. //  抽取符合要求的表数据
  23.     val sqlMsg=s"select * from view where modified_time> '$max'"
  24.     val dfMsg = spark.sql(sqlMsg)
  25. //  设置分区字段格式为-yyyyMMdd
  26.     val format = new SimpleDateFormat("yyyyMMdd")
  27. //  获取当前时间的前一天
  28.     val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
  29. //  添加分区字段
  30.     val df = dfMsg.withColumn("etl_date",lit(day))
  31. //  写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的customer_inf
  32.     df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.customer_inf")
  33. //  停止spark,释放内存
  34.     spark.stop()
  35.   }
  36. }
复制代码

2.抽取ds_db01库中product_info的增量数据进入Hive的ods库中表product_info。根据ods.product_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.product_info下令;
  1. import org.apache.spark.sql._
  2. import org.apache.spark.sql.functions.lit
  3. import java.text.SimpleDateFormat
  4. import java.util.{Calendar, Properties}
  5. object zlcq2 {
  6.   def main(args: Array[String]): Unit = {
  7. //  建立spark环境
  8.     val spark = SparkSession.builder().appName("zlcq2").enableHiveSupport().getOrCreate()
  9. //  获取hive指定表指定字段的最大值
  10.     spark.sql("use ods")
  11.     val max = spark.sql("select max(modified_time) from product_info").head().getTimestamp(0).toString
  12. //  mysql连接配置
  13.     val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
  14.     val table = "product_info"
  15.     val properties =new Properties()
  16.     properties.put("user","root")
  17.     properties.put("password","123456")
  18.     properties.put("driver","com.mysql.jdbc.Driver")
  19. //  为被抽取表建立临时表,方便后续操作
  20.     spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
  21. //  抽取符合要求的表数据
  22.     val sqlMsg=s"select * from view where modified_time> '$max'"
  23.     val dfMsg = spark.sql(sqlMsg)
  24. //  设置分区字段格式为-yyyyMMdd
  25.     val format = new SimpleDateFormat("yyyyMMdd")
  26. //  获取当前时间的前一天
  27.     val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
  28. //  添加分区字段
  29.     val df = dfMsg.withColumn("etl_date",lit(day))
  30. //  写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的product_info
  31.     df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.product_info")
  32. //  停止spark,释放内存
  33.     spark.stop()
  34.   }
  35. }
复制代码

3.抽取ds_db01库中order_master的增量数据进入Hive的ods库中表order_master,根据ods.order_master表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.order_master下令;
  1. import org.apache.spark.sql._
  2. import org.apache.spark.sql.functions.lit
  3. import java.text.SimpleDateFormat
  4. import java.util.{Calendar, Properties}
  5. object zlcq3 {
  6.   def main(args: Array[String]): Unit = {
  7. //  建立spark环境
  8.     val spark = SparkSession.builder().appName("zlcq3").enableHiveSupport().getOrCreate()
  9. //  获取hive指定表指定字段的最大值
  10.     spark.sql("use ods")
  11.     val max = spark.sql("select max(modified_time) from order_master").head().getTimestamp(0).toString
  12. //  mysql连接配置
  13.     val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
  14.     val table = "order_master"
  15.     val properties =new Properties()
  16.     properties.put("user","root")
  17.     properties.put("password","123456")
  18.     properties.put("driver","com.mysql.jdbc.Driver")
  19. //  为被抽取表建立临时表,方便后续操作
  20.     spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
  21. //  抽取符合要求的表数据
  22.     val sqlMsg=s"select * from view where modified_time> '$max'"
  23.     val dfMsg = spark.sql(sqlMsg)
  24. //  设置分区字段格式为-yyyyMMdd
  25.     val format = new SimpleDateFormat("yyyyMMdd")
  26. //  获取当前时间的前一天
  27.     val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
  28. //  添加分区字段
  29.     val df = dfMsg.withColumn("etl_date",lit(day))
  30. //  写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的order_master
  31.     df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.order_master")
  32. //  停止spark,释放内存
  33.     spark.stop()
  34.   }
  35. }
复制代码

4. 抽取ds_db01库中order_detail的增量数据进入Hive的ods库中表order_detail,根据ods.order_detail表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前角逐日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.order_detail下令。
  1. import org.apache.spark.sql._
  2. import org.apache.spark.sql.functions.lit
  3. import java.text.SimpleDateFormat
  4. import java.util.{Calendar, Properties}
  5. object zlcq4 {
  6.   def main(args: Array[String]): Unit = {
  7. //  建立spark环境
  8.     val spark = SparkSession.builder().appName("zlcq4").enableHiveSupport().getOrCreate()
  9. //  获取hive指定表指定字段的最大值
  10.     spark.sql("use ods")
  11.     val max = spark.sql("select max(modified_time) from order_detail").head().getTimestamp(0).toString
  12. //  mysql连接配置
  13.     val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
  14.     val table = "order_detail"
  15.     val properties =new Properties()
  16.     properties.put("user","root")
  17.     properties.put("password","123456")
  18.     properties.put("driver","com.mysql.jdbc.Driver")
  19. //  为被抽取表建立临时表,方便后续操作
  20.     spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
  21. //  抽取符合要求的表数据
  22.     val sqlMsg=s"select * from view where modified_time> '$max'"
  23.     val dfMsg = spark.sql(sqlMsg)
  24. //  设置分区字段格式为-yyyyMMdd
  25.     val format = new SimpleDateFormat("yyyyMMdd")
  26. //  获取当前时间的前一天
  27.     val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
  28. //  添加分区字段
  29.     val df = dfMsg.withColumn("etl_date",lit(day))
  30. //  写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的order_detail
  31.     df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.order_detail")
  32. //  停止spark,释放内存
  33.     spark.stop()
  34.   }
  35. }
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4