光之使者 发表于 2024-12-25 08:56:45

Spark抽取MySQL指定命据表中的增量数据到Hive

   使用Scala编写spark工程代码,将MySQL的ds_db01库中表customer_inf、customer_inf、order_detail、order_master、product_info的数据增量抽取到Hive的ods库(需自建)中对应表customer_inf、order_detail、order_master、product_info中。
hive原数据准备详见:
Spark(hive原数据准备)抽取MySQL指定命据表中的增量数据到Hive-CSDN博客https://csdnimg.cn/release/blog_editor_html/release2.3.7/ckeditor/plugins/CsdnLink/icons/icon-default.png?t=O83Ahttps://blog.csdn.net/qq_31710395/article/details/144557049?spm=1001.2014.3001.5502
SQL文件下载:
        123云盘:https://www.123684.com/s/kgrAjv-z7q3d
        阿里云盘:https://www.alipan.com/s/A5QjDUv8PXQ
pom文件配置添加:
<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.12</artifactId>
      <version>3.1.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.12</artifactId>
      <version>3.1.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.12</artifactId>
      <version>3.1.2</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>5.1.37</version>
</dependency> 1.抽取ds_db01库中customer_inf的增量数据进入Hive的ods库中表customer_inf。根据ods.customer_inf表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.customer_inf下令;
import org.apache.spark.sql._
import org.apache.spark.sql.functions.lit
import java.text.SimpleDateFormat
import java.util.Properties
import java.util.Calendar
object zlcq1 {
def main(args: Array): Unit = {
//建立spark环境
    val spark = SparkSession.builder().appName("zlcq1").enableHiveSupport().getOrCreate()
//获取hive指定表指定字段的最大值
    spark.sql("use ods")
    val max = spark.sql("select max(modified_time) from customer_inf").head().getTimestamp(0).toString
//mysql连接配置
    val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
    val table = "customer_inf"
    val properties =new Properties()
    properties.put("user","root")
    properties.put("password","123456")
    properties.put("driver","com.mysql.jdbc.Driver")
//为被抽取表建立临时表,方便后续操作
    spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
//抽取符合要求的表数据
    val sqlMsg=s"select * from view where modified_time> '$max'"
    val dfMsg = spark.sql(sqlMsg)
//设置分区字段格式为-yyyyMMdd
    val format = new SimpleDateFormat("yyyyMMdd")
//获取当前时间的前一天
    val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
//添加分区字段
    val df = dfMsg.withColumn("etl_date",lit(day))
//写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的customer_inf
    df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.customer_inf")
//停止spark,释放内存
    spark.stop()
}
}
https://i-blog.csdnimg.cn/direct/d1658597c16d48bba2af5523e0666652.png
2.抽取ds_db01库中product_info的增量数据进入Hive的ods库中表product_info。根据ods.product_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.product_info下令;
import org.apache.spark.sql._
import org.apache.spark.sql.functions.lit
import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}
object zlcq2 {
def main(args: Array): Unit = {
//建立spark环境
    val spark = SparkSession.builder().appName("zlcq2").enableHiveSupport().getOrCreate()
//获取hive指定表指定字段的最大值
    spark.sql("use ods")
    val max = spark.sql("select max(modified_time) from product_info").head().getTimestamp(0).toString
//mysql连接配置
    val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
    val table = "product_info"
    val properties =new Properties()
    properties.put("user","root")
    properties.put("password","123456")
    properties.put("driver","com.mysql.jdbc.Driver")
//为被抽取表建立临时表,方便后续操作
    spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
//抽取符合要求的表数据
    val sqlMsg=s"select * from view where modified_time> '$max'"
    val dfMsg = spark.sql(sqlMsg)
//设置分区字段格式为-yyyyMMdd
    val format = new SimpleDateFormat("yyyyMMdd")
//获取当前时间的前一天
    val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
//添加分区字段
    val df = dfMsg.withColumn("etl_date",lit(day))
//写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的product_info
    df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.product_info")
//停止spark,释放内存
    spark.stop()
}
}
https://i-blog.csdnimg.cn/direct/24209302d4364f96985092f569185eaa.png
3.抽取ds_db01库中order_master的增量数据进入Hive的ods库中表order_master,根据ods.order_master表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.order_master下令;
import org.apache.spark.sql._
import org.apache.spark.sql.functions.lit
import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}
object zlcq3 {
def main(args: Array): Unit = {
//建立spark环境
    val spark = SparkSession.builder().appName("zlcq3").enableHiveSupport().getOrCreate()
//获取hive指定表指定字段的最大值
    spark.sql("use ods")
    val max = spark.sql("select max(modified_time) from order_master").head().getTimestamp(0).toString
//mysql连接配置
    val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
    val table = "order_master"
    val properties =new Properties()
    properties.put("user","root")
    properties.put("password","123456")
    properties.put("driver","com.mysql.jdbc.Driver")
//为被抽取表建立临时表,方便后续操作
    spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
//抽取符合要求的表数据
    val sqlMsg=s"select * from view where modified_time> '$max'"
    val dfMsg = spark.sql(sqlMsg)
//设置分区字段格式为-yyyyMMdd
    val format = new SimpleDateFormat("yyyyMMdd")
//获取当前时间的前一天
    val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
//添加分区字段
    val df = dfMsg.withColumn("etl_date",lit(day))
//写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的order_master
    df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.order_master")
//停止spark,释放内存
    spark.stop()
}
}
https://i-blog.csdnimg.cn/direct/993ef723bef54649b8f2b408d8c72ca0.png
4. 抽取ds_db01库中order_detail的增量数据进入Hive的ods库中表order_detail,根据ods.order_detail表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前角逐日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.order_detail下令。
import org.apache.spark.sql._
import org.apache.spark.sql.functions.lit
import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}
object zlcq4 {
def main(args: Array): Unit = {
//建立spark环境
    val spark = SparkSession.builder().appName("zlcq4").enableHiveSupport().getOrCreate()
//获取hive指定表指定字段的最大值
    spark.sql("use ods")
    val max = spark.sql("select max(modified_time) from order_detail").head().getTimestamp(0).toString
//mysql连接配置
    val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
    val table = "order_detail"
    val properties =new Properties()
    properties.put("user","root")
    properties.put("password","123456")
    properties.put("driver","com.mysql.jdbc.Driver")
//为被抽取表建立临时表,方便后续操作
    spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
//抽取符合要求的表数据
    val sqlMsg=s"select * from view where modified_time> '$max'"
    val dfMsg = spark.sql(sqlMsg)
//设置分区字段格式为-yyyyMMdd
    val format = new SimpleDateFormat("yyyyMMdd")
//获取当前时间的前一天
    val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
//添加分区字段
    val df = dfMsg.withColumn("etl_date",lit(day))
//写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的order_detail
    df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.order_detail")
//停止spark,释放内存
    spark.stop()
}
}
https://i-blog.csdnimg.cn/direct/03d4044959244d1fa377a652e64bfae5.png

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
页: [1]
查看完整版本: Spark抽取MySQL指定命据表中的增量数据到Hive