ToB企服应用市场:ToB评测及商务社交产业平台
标题:
Spark抽取MySQL指定命据表中的增量数据到Hive
[打印本页]
作者:
光之使者
时间:
前天 08:56
标题:
Spark抽取MySQL指定命据表中的增量数据到Hive
使用Scala编写spark工程代码,将MySQL的ds_db01库中表customer_inf、customer_inf、order_detail、order_master、product_info的数据增量抽取到Hive的ods库(需自建)中对应表customer_inf、order_detail、order_master、product_info中。
hive原数据准备详见:
Spark(hive原数据准备)抽取MySQL指定命据表中的增量数据到Hive-CSDN博客
https://blog.csdn.net/qq_31710395/article/details/144557049?spm=1001.2014.3001.5502
SQL文件下载:
123云盘:https://www.123684.com/s/kgrAjv-z7q3d
阿里云盘:https://www.alipan.com/s/A5QjDUv8PXQ
pom文件配置添加:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.37</version>
</dependency>
复制代码
1.抽取ds_db01库中customer_inf的增量数据进入Hive的ods库中表customer_inf。根据ods.customer_inf表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.customer_inf下令;
import org.apache.spark.sql._
import org.apache.spark.sql.functions.lit
import java.text.SimpleDateFormat
import java.util.Properties
import java.util.Calendar
object zlcq1 {
def main(args: Array[String]): Unit = {
// 建立spark环境
val spark = SparkSession.builder().appName("zlcq1").enableHiveSupport().getOrCreate()
// 获取hive指定表指定字段的最大值
spark.sql("use ods")
val max = spark.sql("select max(modified_time) from customer_inf").head().getTimestamp(0).toString
// mysql连接配置
val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
val table = "customer_inf"
val properties =new Properties()
properties.put("user","root")
properties.put("password","123456")
properties.put("driver","com.mysql.jdbc.Driver")
// 为被抽取表建立临时表,方便后续操作
spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
// 抽取符合要求的表数据
val sqlMsg=s"select * from view where modified_time> '$max'"
val dfMsg = spark.sql(sqlMsg)
// 设置分区字段格式为-yyyyMMdd
val format = new SimpleDateFormat("yyyyMMdd")
// 获取当前时间的前一天
val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
// 添加分区字段
val df = dfMsg.withColumn("etl_date",lit(day))
// 写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的customer_inf
df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.customer_inf")
// 停止spark,释放内存
spark.stop()
}
}
复制代码
2.抽取ds_db01库中product_info的增量数据进入Hive的ods库中表product_info。根据ods.product_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.product_info下令;
import org.apache.spark.sql._
import org.apache.spark.sql.functions.lit
import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}
object zlcq2 {
def main(args: Array[String]): Unit = {
// 建立spark环境
val spark = SparkSession.builder().appName("zlcq2").enableHiveSupport().getOrCreate()
// 获取hive指定表指定字段的最大值
spark.sql("use ods")
val max = spark.sql("select max(modified_time) from product_info").head().getTimestamp(0).toString
// mysql连接配置
val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
val table = "product_info"
val properties =new Properties()
properties.put("user","root")
properties.put("password","123456")
properties.put("driver","com.mysql.jdbc.Driver")
// 为被抽取表建立临时表,方便后续操作
spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
// 抽取符合要求的表数据
val sqlMsg=s"select * from view where modified_time> '$max'"
val dfMsg = spark.sql(sqlMsg)
// 设置分区字段格式为-yyyyMMdd
val format = new SimpleDateFormat("yyyyMMdd")
// 获取当前时间的前一天
val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
// 添加分区字段
val df = dfMsg.withColumn("etl_date",lit(day))
// 写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的product_info
df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.product_info")
// 停止spark,释放内存
spark.stop()
}
}
复制代码
3.抽取ds_db01库中order_master的增量数据进入Hive的ods库中表order_master,根据ods.order_master表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.order_master下令;
import org.apache.spark.sql._
import org.apache.spark.sql.functions.lit
import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}
object zlcq3 {
def main(args: Array[String]): Unit = {
// 建立spark环境
val spark = SparkSession.builder().appName("zlcq3").enableHiveSupport().getOrCreate()
// 获取hive指定表指定字段的最大值
spark.sql("use ods")
val max = spark.sql("select max(modified_time) from order_master").head().getTimestamp(0).toString
// mysql连接配置
val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
val table = "order_master"
val properties =new Properties()
properties.put("user","root")
properties.put("password","123456")
properties.put("driver","com.mysql.jdbc.Driver")
// 为被抽取表建立临时表,方便后续操作
spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
// 抽取符合要求的表数据
val sqlMsg=s"select * from view where modified_time> '$max'"
val dfMsg = spark.sql(sqlMsg)
// 设置分区字段格式为-yyyyMMdd
val format = new SimpleDateFormat("yyyyMMdd")
// 获取当前时间的前一天
val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
// 添加分区字段
val df = dfMsg.withColumn("etl_date",lit(day))
// 写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的order_master
df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.order_master")
// 停止spark,释放内存
spark.stop()
}
}
复制代码
4. 抽取ds_db01库中order_detail的增量数据进入Hive的ods库中表order_detail,根据ods.order_detail表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前角逐日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli实行show partitions ods.order_detail下令。
import org.apache.spark.sql._
import org.apache.spark.sql.functions.lit
import java.text.SimpleDateFormat
import java.util.{Calendar, Properties}
object zlcq4 {
def main(args: Array[String]): Unit = {
// 建立spark环境
val spark = SparkSession.builder().appName("zlcq4").enableHiveSupport().getOrCreate()
// 获取hive指定表指定字段的最大值
spark.sql("use ods")
val max = spark.sql("select max(modified_time) from order_detail").head().getTimestamp(0).toString
// mysql连接配置
val jdbc="jdbc:mysql://bigdata1:3306/ds_db01"
val table = "order_detail"
val properties =new Properties()
properties.put("user","root")
properties.put("password","123456")
properties.put("driver","com.mysql.jdbc.Driver")
// 为被抽取表建立临时表,方便后续操作
spark.read.jdbc(jdbc, table, properties).createOrReplaceTempView("view")
// 抽取符合要求的表数据
val sqlMsg=s"select * from view where modified_time> '$max'"
val dfMsg = spark.sql(sqlMsg)
// 设置分区字段格式为-yyyyMMdd
val format = new SimpleDateFormat("yyyyMMdd")
// 获取当前时间的前一天
val day = format.format(Calendar.getInstance().getTime.getTime-24*60*60*1000)
// 添加分区字段
val df = dfMsg.withColumn("etl_date",lit(day))
// 写入hive,mode方式:append,指定分区字段:etl_date,目标表:ods层的order_detail
df.write.mode("append").partitionBy("etl_date").saveAsTable("ods.order_detail")
// 停止spark,释放内存
spark.stop()
}
}
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/)
Powered by Discuz! X3.4