使用Scala编写spark工程代码,将MySQL的ds_db01库中表customer_inf、customer_inf、order_detail、order_master、product_info的数据增量抽取到Hive的ods库(需自建)中对应表customer_inf、order_detail、order_master、product_info中。
hive原数据准备详见:
Spark(hive原数据准备)抽取MySQL指定命据表中的增量数据到Hive-CSDN博客https://blog.csdn.net/qq_31710395/article/details/144557049?spm=1001.2014.3001.5502
SQL文件下载:
123云盘:https://www.123684.com/s/kgrAjv-z7q3d
阿里云盘:https://www.alipan.com/s/A5QjDUv8PXQ
pom文件配置添加:
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.12</artifactId>
- <version>3.1.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-hive_2.12</artifactId>
- <version>3.1.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.12</artifactId>
- <version>3.1.2</version>
- </dependency>
- <dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
- <version>5.1.37</version>
- </dependency>
复制代码 1.抽取ds_db01库中customer_inf的增量数据进入Hive的ods库中表customer_inf。根据ods.customer_inf表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.customer_inf下令;
- import org.apache.spark.sql._
- import org.apache.spark.sql.functions.lit
- import java.text.SimpleDateFormat
- import java.util.Properties
- import java.util.Calendar
- object zlcq1 {
- def main(args: Array[String]): Unit = {
- // 建立spark环境
- val spark = SparkSession.builder().appName("zlcq1").enableHiveSupport().getOrCreate()
- // 获取hive指定表指定字段的最大值
- spark.sql("use ods")
- val max = spark.sql("select max(modified_time) from customer_inf").head().getTimestamp(0).toString
- // mysql连接配置
- val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
- val table = "customer_inf"
- val properties =new Properties()
- properties.put("user","root")
- properties.put("password","123456")
- properties.put("driver","com.mysql.jdbc.Driver")
- // 为被抽取表建立临时表,方便后续操作
- spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
- // 抽取符合要求的表数据
- val sqlMsg=s"select * from view where modified_time> '$max'"
- val dfMsg = spark.sql(sqlMsg)
- // 设置分区字段格式为-yyyyMMdd
- val format = new SimpleDateFormat("yyyyMMdd")
- // 获取当前时间的前一天
- val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
- // 添加分区字段
- val df = dfMsg.withColumn("etl_date",lit(day))
- // 写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的customer_inf
- df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.customer_inf")
- // 停止spark,释放内存
- spark.stop()
- }
- }
复制代码
2.抽取ds_db01库中product_info的增量数据进入Hive的ods库中表product_info。根据ods.product_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.product_info下令;
- import org.apache.spark.sql._
- import org.apache.spark.sql.functions.lit
- import java.text.SimpleDateFormat
- import java.util.{Calendar, Properties}
- object zlcq2 {
- def main(args: Array[String]): Unit = {
- // 建立spark环境
- val spark = SparkSession.builder().appName("zlcq2").enableHiveSupport().getOrCreate()
- // 获取hive指定表指定字段的最大值
- spark.sql("use ods")
- val max = spark.sql("select max(modified_time) from product_info").head().getTimestamp(0).toString
- // mysql连接配置
- val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
- val table = "product_info"
- val properties =new Properties()
- properties.put("user","root")
- properties.put("password","123456")
- properties.put("driver","com.mysql.jdbc.Driver")
- // 为被抽取表建立临时表,方便后续操作
- spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
- // 抽取符合要求的表数据
- val sqlMsg=s"select * from view where modified_time> '$max'"
- val dfMsg = spark.sql(sqlMsg)
- // 设置分区字段格式为-yyyyMMdd
- val format = new SimpleDateFormat("yyyyMMdd")
- // 获取当前时间的前一天
- val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
- // 添加分区字段
- val df = dfMsg.withColumn("etl_date",lit(day))
- // 写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的product_info
- df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.product_info")
- // 停止spark,释放内存
- spark.stop()
- }
- }
复制代码
3.抽取ds_db01库中order_master的增量数据进入Hive的ods库中表order_master,根据ods.order_master表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.order_master下令;
- import org.apache.spark.sql._
- import org.apache.spark.sql.functions.lit
- import java.text.SimpleDateFormat
- import java.util.{Calendar, Properties}
- object zlcq3 {
- def main(args: Array[String]): Unit = {
- // 建立spark环境
- val spark = SparkSession.builder().appName("zlcq3").enableHiveSupport().getOrCreate()
- // 获取hive指定表指定字段的最大值
- spark.sql("use ods")
- val max = spark.sql("select max(modified_time) from order_master").head().getTimestamp(0).toString
- // mysql连接配置
- val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
- val table = "order_master"
- val properties =new Properties()
- properties.put("user","root")
- properties.put("password","123456")
- properties.put("driver","com.mysql.jdbc.Driver")
- // 为被抽取表建立临时表,方便后续操作
- spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
- // 抽取符合要求的表数据
- val sqlMsg=s"select * from view where modified_time> '$max'"
- val dfMsg = spark.sql(sqlMsg)
- // 设置分区字段格式为-yyyyMMdd
- val format = new SimpleDateFormat("yyyyMMdd")
- // 获取当前时间的前一天
- val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
- // 添加分区字段
- val df = dfMsg.withColumn("etl_date",lit(day))
- // 写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的order_master
- df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.order_master")
- // 停止spark,释放内存
- spark.stop()
- }
- }
复制代码
4. 抽取ds_db01库中order_detail的增量数据进入Hive的ods库中表order_detail,根据ods.order_detail表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前角逐日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.order_detail下令。
- import org.apache.spark.sql._
- import org.apache.spark.sql.functions.lit
- import java.text.SimpleDateFormat
- import java.util.{Calendar, Properties}
- object zlcq4 {
- def main(args: Array[String]): Unit = {
- // 建立spark环境
- val spark = SparkSession.builder().appName("zlcq4").enableHiveSupport().getOrCreate()
- // 获取hive指定表指定字段的最大值
- spark.sql("use ods")
- val max = spark.sql("select max(modified_time) from order_detail").head().getTimestamp(0).toString
- // mysql连接配置
- val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
- val table = "order_detail"
- val properties =new Properties()
- properties.put("user","root")
- properties.put("password","123456")
- properties.put("driver","com.mysql.jdbc.Driver")
- // 为被抽取表建立临时表,方便后续操作
- spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
- // 抽取符合要求的表数据
- val sqlMsg=s"select * from view where modified_time> '$max'"
- val dfMsg = spark.sql(sqlMsg)
- // 设置分区字段格式为-yyyyMMdd
- val format = new SimpleDateFormat("yyyyMMdd")
- // 获取当前时间的前一天
- val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
- // 添加分区字段
- val df = dfMsg.withColumn("etl_date",lit(day))
- // 写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的order_detail
- df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.order_detail")
- // 停止spark,释放内存
- spark.stop()
- }
- }
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |