大数据学习18之Spark-SQL

打印 上一主题 下一主题

主题 802|帖子 802|积分 2406

1.概述

1.1.简介

        Spark SQL 是 Apache Spark 用于处理结构化数据的模块。
1.2.历史

1.2.1.Shark

        Hadoop诞生初期,Hive是唯一在Hadoop上运行的SQL-on-Hadoop工具,MR的中间盘算过程产生了大量的磁盘落地操纵,消耗了大量的I/O,降低了步伐的运行服从。
        为了提高SQL-on-Hadoop的服从,大量的SQL-on-Hadoop工具被开发,Spark-SQL便是其中的一种,Spark-SQL的前身就是Shark。
1.2.2.Hive on Spark*

        Spark的开发较晚,那时间的主流数据仓库便是Hive,为了通用性,Spark就与Hive结合起来了,对于SQL语句的解析都交给Hive来举行处理,并且Spark步伐一定程度上替换了Hive底层的MapReduce步伐,提高了作业的盘算服从。
1.2.3.Spark on Hive*

        随着Spark发展,Shark 对于 Hive 的太多依赖(如采用 Hive 的语法解析器、查询优化器等等)制约了 Spark的 One stack to rule them all 的既定方针,制约了 Spark 各个组件的相互集成,所以就提出了 SparkSQL 项目。
        并且Hive本身的迭代更新速度较慢,就算是如今的最新版本的Hive支持的Spark也才2.x.x,
同时Spark在3.0.0版本时做出了一系列的优化,假如还是依赖于与Hive的化Spark3.0以上的版本的是用不了的,那么Spark的优化就没有意义了。
        为了让Spark的优化变得可用,Spark就自己开发了一套用于SQL操纵的模块,由之前的Shark来到了如今的Spark-SQL。
        经过这次的变革,Spark由原来的依赖Hive解析SQL酿成了由自己的Spark-SQL模块解析的方式,但是保留了对Hive的元数据访问。
        也就是说,如今的Spark除了元数据外,险些可以说是一个一栈式大数据框架了。
1.2.4.Hive on Spark vs. Spark on Hive

        Hive on Spark:Hive为主体,在Hive中继续Spark,Hive即存储元数据,也解析SQL语句,只是Hive将引擎从MR更换为Spark由 ,Spark 负责运算工作,但部署较为复杂。
        Spark on Hive:Spark为主体,Hive只负责元数据的存储,由Spark来解析和执行SQL语句,其中SQL语法为Spark-SQL语法,且部署简单。Spark on Hive 的优点在于它提供了更灵活的编程接口,实用于各种数据处理需求。

2.数据模子

2.1. RDD 和 DataFrame

2.1.1.RDD转DataFrame

  1. //创建样例类
  2. scala> case class User(id: Int, name: String, age: Int, gender: Int)
  3. defined class User
  4. //创建 RDD
  5. scala> val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",20, 1)))
  6. //RDD 转 DataFrame
  7. scala> val df = rdd.toDF
复制代码
2.1.2.DataFrame 转 RDD

  1. //创建 DataFrame
  2. scala> val df = spark.read.json("file:///opt/spark-local/data/user/user.json")
  3. //DataFrame 转 RDD
  4. scala> val rdd = df.rdd
复制代码
 2.2.RDD 和 Dataset

2.2.1. RDD 转 Dataset

        RDD 和 Dataset 两个都是强类型模子,所以可以相互直接转换。
  1. //创建样例类
  2. scala> case class User(id: Int, name: String, age: Int, gender: Int)
  3. //创建 RDD
  4. scala> val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",20, 1)))
  5. //RDD 转 Dataset。
  6. scala> val ds = rdd.toDS
复制代码
2.2.2.Dataset 转 RDD

  1. scala> val rdd = ds.rdd
复制代码
2.3. DataFrame 和 Dataset

2.3.1.DataFrame 转 Dataset

        配合样例类利用 as[类型] 转换为 DataSet。
  1. scala> val df = spark.read.json("file:///opt/yjx/spark-scalocal/data/user/user.json")
  2. scala> val ds = df.as[User]
复制代码
2.3.2.Dataset 转 DataFrame

  1. //创建 Dataset
  2. scala> case class User(id: Int, name: String, age: Int, gender: Int)defined class User
  3. scala> val list = List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu", 20, 1))
  4. list: List[User] = List(User(1,zhangsan,18,1), User(2,lisi,19,0), User(3,wangwu,20,1))
  5. scala> val ds = list.toDS
  6. //Dataset 转 DataFrame
  7. scala> val df = ds.toDF
复制代码
3. IDEA 开发 SparkSQL

        创建普通 Maven 项目,添加以下依赖。
  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-core_2.12</artifactId>
  4. <version>3.3.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.spark</groupId>
  8. <artifactId>spark-sql_2.12</artifactId>
  9. <version>3.3.2</version>
  10. </dependency>
复制代码
3.1.DataFrame

  1. object DataFrameDemo {
  2. case class User(id: Int, name: String, age: Int, gender: Int)
  3. def main(args: Array[String]): Unit = {
  4. // ==================== 建立连接 ====================
  5. // 初始化配置对象并设置运行模式与 AppName
  6. val conf = new SparkConf().setMaster("local[*]").setAppName("DataFrameDemo")
  7. // 根据配置对象初始化 SparkSession 对象
  8. val spark = SparkSession.builder().config(conf).getOrCreate()
  9. // 日志级别
  10. val sc = spark.sparkContext
  11. sc.setLogLevel("ERROR")
  12. // 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
  13. import spark.implicits._
  14. // ==================== 业务处理 ====================
  15. // RDD 转 DataFrame
  16. val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",
  17. 20, 1)))
  18. val df1: DataFrame = rdd.toDF()
  19. df1.show()
  20. // 直接创建 DataFrame
  21. val df2 = spark.read.json("data/user/user.json")
  22. df2.show()
  23. // 创建临时表
  24. df2.createOrReplaceTempView("t_user")
  25. // 编写 SQL
  26. lazy val sql = "SELECT id, name, age, gender FROM t_user ORDER BY age DESC"
  27. // 执行 SQL
  28. spark.sql(sql).show()
  29. // ==================== 关闭连接 ====================
  30. spark.stop
  31. }
复制代码
3.2.Dataset

  1. def main(args: Array[String]): Unit = {
  2. // ==================== 建立连接 ====================
  3. // 初始化配置对象并设置运行模式与 AppName
  4. val conf = new SparkConf().setMaster("local[*]").setAppName("DatasetDemo")
  5. // 根据配置对象初始化 SparkSession 对象
  6. val spark = SparkSession.builder().config(conf).getOrCreate()
  7. // 日志级别
  8. val sc = spark.sparkContext
  9. sc.setLogLevel("ERROR")
  10. // 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
  11. import spark.implicits._
  12. // ==================== 业务处理 ====================
  13. // RDD 转 Dataset
  14. val rdd = sc.makeRDD(List(User(1, "zhangsan", 18, 1), User(2, "lisi", 19, 0), User(3, "wangwu",
  15. 20, 1)))
  16. val ds1: Dataset[User] = rdd.toDS()
  17. ds1.show()
  18. // 创建 DataFrame
  19. val df: DataFrame = spark.read.json("data/user/user.json")
  20. // 通过 DataFrame 使用 as[类型] 转换为 DataSet
  21. val ds2: Dataset[User] = df.as[User]
  22. ds2.show()
  23. // 创建临时表
  24. ds2.createOrReplaceTempView("t_user")
  25. // 编写 SQL
  26. lazy val sql = "SELECT id, name, age, gender FROM t_user ORDER BY age DESC"
  27. // 执行 SQL
  28. spark.sql(sql).show
  29. // ==================== 关闭连接 ====================
  30. spark.stop
  31. }
复制代码
4.DSL 领域特定语言

        DSL 为 Domain Specific Language 的缩写,翻译过来为领域特定语言。简单理解就是 Spark 独有的结构化数据操纵语法。
        此处不做赘述。

5.自定义函数

5.1.UDF用户定义普通函数

案例: 
  1. object UDFDemo {
  2. case class Emp(empno: Int, ename: String, job: String, mgr: Int, hiredate: String, sal: Double,
  3. comm: Double, deptno: Int)
  4. def main(args: Array[String]): Unit = {
  5. // ==================== 建立连接 ====================
  6. // 初始化配置对象并设置运行模式与 AppName
  7. val conf = new SparkConf().setMaster("local[*]").setAppName("UDFDemo")
  8. // 根据配置对象初始化 SparkSession 对象
  9. val spark = SparkSession.builder().config(conf).getOrCreate()
  10. // 日志级别
  11. val sc = spark.sparkContext
  12. sc.setLogLevel("ERROR")
  13. // 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
  14. import spark.implicits._
  15. // ==================== 业务处理 ====================
  16. // 数据准备
  17. val df: DataFrame = spark.read
  18. .option("header", "true")
  19. .option("sep", ",")
  20. .option("inferSchema", "true")
  21. .csv("data/scott/emp.csv")
  22. val emp: Dataset[Emp] = df.as[Emp]
  23. emp.createOrReplaceTempView("emp")
  24. // 注册 UDF 函数
  25. val prefix_name = spark.udf.register("prefix_name", (name: String) => {
  26. "Hello: " + name
  27. })
  28. // 在 SQL 中使用
  29. val sql =
  30. """
  31. |SELECT ename, prefix_name(ename) AS new_name FROM emp
  32. |""".stripMargin
  33. spark.sql(sql).show(5)
  34. // 在 DSL 中使用
  35. emp.select('job, prefix_name('job).as("new_job")).show(5)
  36. // ==================== 关闭连接 ====================
  37. spark.stop
  38. }
  39. }
复制代码
5.2.UDAF用户定义聚合函数

案例:
  1. object UDAFDemo03_Spark3 {
  2. case class Emp(empno: Int, ename: String, job: String, mgr: Int, hiredate: String, sal: Double,
  3. comm: Double, deptno: Int)
  4. def main(args: Array[String]): Unit = {
  5. // ==================== 建立连接 ====================
  6. // 初始化配置对象并设置运行模式与 AppName
  7. val conf = new SparkConf().setMaster("local[*]").setAppName("UDAFDemo02")
  8. // 根据配置对象初始化 SparkSession 对象
  9. val spark = SparkSession.builder().config(conf).getOrCreate()
  10. // 日志级别
  11. val sc = spark.sparkContext
  12. sc.setLogLevel("ERROR")
  13. // 在 IDEA 中开发 SparkSQL 如果遇到模型转换,需要导入隐式转换
  14. import spark.implicits._
  15. // ==================== 业务处理 ====================
  16. // 数据准备
  17. val df: DataFrame = spark.read
  18. .option("header", "true")
  19. .option("sep", ",")
  20. .option("inferSchema", "true")
  21. .csv("data/scott/emp.csv")
  22. val emp: Dataset[Emp] = df.as[Emp]
  23. emp.createOrReplaceTempView("emp")
  24. // 注册 UDAF 函数(强类型自定义 UDAF 在 Spark 3.0.0 中的使用方式)
  25. val my_avg = spark.udf.register("my_avg", functions.udaf(MyAvg))
  26. // 在 SQL 中使用
  27. val sql =
  28. """
  29. |SELECT my_avg(sal) AS avg_sal FROM emp
  30. |""".stripMargin
  31. spark.sql(sql).show()
  32. // 在 DSL 中使用
  33. emp.select(my_avg('sal).as("avg_sal")).show()
  34. // ==================== 关闭连接 ====================
  35. spark.stop
  36. }
  37. // 缓存区数据的结构 Buff(求和, 计数)
  38. case class Buff(var sum: Double, var count: Long)
  39. /**
  40. * 自定义 UDAF 聚合函数:计算薪资的平均值
  41. * IN:输入数据的类型
  42. * BUFF:缓存区数据的类型
  43. * OUT:返回值数据的类型
  44. */
  45. object MyAvg extends Aggregator[Double, Buff, Double] {
  46. // 初始化缓冲区 Buff(求和, 计数)
  47. override def zero: Buff = Buff(0D, 0L)
  48. // 根据输入的数据更新缓冲区的数据
  49. override def reduce(b: Buff, in: Double): Buff = {
  50. // 累加每次输入的数据
  51. b.sum += in
  52. // 计数器每次 +1
  53. b.count += 1
  54. // 返回缓冲区对象
  55. b
  56. }
  57. // 合并缓冲区
  58. override def merge(b1: Buff, b2: Buff): Buff = {
  59. b1.sum += b2.sum
  60. b1.count += b2.count
  61. b1
  62. }
  63. // 计算最终结果
  64. override def finish(b: Buff): Double = b.sum / b.count
  65. // 缓冲区数据的编码处理
  66. // Encoders.product 是进行 Scala 元组和 case 类转换的编码器
  67. //override def bufferEncoder: Encoder[Buff] = Encoders.product
  68. // 或者
  69. override def bufferEncoder: Encoder[Buff] = Encoders.kryo(classOf[Buff])
  70. // 输出数据的编码处理
  71. override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
  72. }
复制代码
5.3.UDTF用户定义表创建函数

        先添加依赖:
  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-hive_2.12</artifactId>
  4. <version>3.3.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.json</groupId>
  8. <artifactId>json</artifactId>
  9. <version>20220924</version>
  10. </dependency>
复制代码
     案例:
  1. /**
  2. * 数据:{"movie": [{"movie_name": "肖申克的救赎", "movie_type": "犯罪" }, {"movie_name": "肖申克的救赎",
  3. "movie_type": "剧情" }]}
  4. * 需求:从一行 JSON 格式数据中取出 movie_name 和 movie_type 两个 Key 及其对应的 Value。K-V 输出的格式为:
  5. * movie_name movie_type
  6. * 肖申克的救赎 犯罪
  7. * 肖申克的救赎 剧情
  8. */
  9. class MyUDTF extends GenericUDTF {
  10. // 实例化 UDTF 对象,判断传入参数的长度以及数据类型
  11. // 和 Hive 的自定义 UDTF 不一样的是,Spark 使用的是已经过时的 initialize(ObjectInspector[] argOIs)
  12. override def initialize(argOIs: Array[ObjectInspector]): StructObjectInspector = {
  13. // 获取入参
  14. // 参数校验,判断传入参数的长度以及数据类型
  15. if (argOIs.length != 1) throw new UDFArgumentLengthException("参数个数必须为 1")
  16. if (ObjectInspector.Category.PRIMITIVE != argOIs(0).getCategory) {
  17. /*
  18. UDFArgumentTypeException(int argumentId, String message)
  19. 异常对象需要传入两个参数:
  20. int argumentId:参数的位置,ObjectInspector 中的下标
  21. String message:异常提示信息
  22. */
  23. throw new UDFArgumentTypeException(0, "参数类型必须为 String")
  24. }
  25. // 自定义函数输出的字段和类型
  26. // 创建输出字段名称的集合
  27. val columNames = new util.ArrayList[String]
  28. // 创建字段数据类型的集合
  29. val columType = new util.ArrayList[ObjectInspector]
  30. columNames.add("movie_name")
  31. columType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)
  32. columNames.add("movie_type")
  33. columType.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector)
  34. ObjectInspectorFactory.getStandardStructObjectInspector(columNames, columType)
  35. }
  36. // 处理数据
  37. override def process(objects: Array[AnyRef]): Unit = {
  38. val outline = new Array[String](2)
  39. if (objects(0) != null) {
  40. val jsonObject = new JSONObject(objects(0).toString)
  41. val jsonArray: JSONArray = jsonObject.getJSONArray("movie")
  42. var i = 0
  43. while ( {
  44. i < jsonArray.length
  45. }) {
  46. outline(0) = jsonArray.getJSONObject(i).getString("movie_name")
  47. outline(1) = jsonArray.getJSONObject(i).getString("movie_type")
  48. // 将处理好的数据通过 forward 方法将数据按行写出
  49. forward(outline)
  50. i += 1
  51. }
  52. }
  53. }
  54. override def close(): Unit = {}
  55. }
复制代码



免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

耶耶耶耶耶

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表