ToB企服应用市场:ToB评测及商务社交产业平台

标题: Spark SQL [打印本页]

作者: 天津储鑫盛钢材现货供应商    时间: 2024-7-13 02:29
标题: Spark SQL


Spark SQL

一、Spark SQL架构


二、Spark SQL运行原理




  1. SELECT name FROM(
  2.   SELECT id, name FROM people
  3. ) p
  4. WHERE p.id = 1
复制代码


三、Spark SQL API

详细解释

创建SparkSession的代码

  1. val conf: SparkConf = new SparkConf()
  2.         .setMaster("local[4]")
  3.         .setAppName("SparkSql")
  4. def main(args: Array[String]): Unit = {
  5.         SparkSession.builder()
  6.                 .config(conf)
  7.                 .getOrCreate()
  8. }
复制代码
优化:减少创建代码,SparkSessionBuilder工具类

  1. package com.ybg
  2. import org.apache.spark.{SparkConf, SparkContext}
  3. import org.apache.spark.sql.SparkSession
  4. // 封装SparkSession的创建方法
  5. class SparkSessionBuilder(master:String,appName:String){
  6.   lazy val config:SparkConf = {
  7.     new SparkConf()
  8.       .setMaster(master)
  9.       .setAppName(appName)
  10.   }
  11.   lazy val spark:SparkSession = {
  12.     SparkSession.builder()
  13.       .config(config)
  14.       .getOrCreate()
  15.   }
  16.   lazy val sc:SparkContext = {
  17.     spark.sparkContext
  18.   }
  19.   def stop(): Unit = {
  20.     if (null != spark) {
  21.       spark.stop()
  22.     }
  23.   }
  24. }
  25. object SparkSessionBuilder {
  26.   def apply(master: String, appName: String): SparkSessionBuilder = new SparkSessionBuilder(master, appName)
  27. }
复制代码
四、Spark SQL依赖

pom.xml

  1. <properties>
  2.   <maven.compiler.source>8</maven.compiler.source>
  3.   <maven.compiler.target>8</maven.compiler.target>
  4.   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  5.   <spark.version>3.1.2</spark.version>
  6.   <spark.scala.version>2.12</spark.scala.version>
  7.   <hadoop.version>3.1.3</hadoop.version>
  8.   <mysql.version>8.0.33</mysql.version>
  9.   <hive.version>3.1.2</hive.version>
  10.   <hbase.version>2.3.5</hbase.version>
  11.   <jackson.version>2.10.0</jackson.version>
  12. </properties>
  13. <dependencies>
  14.   <!-- spark-core -->
  15.   <dependency>
  16.     <groupId>org.apache.spark</groupId>
  17.     <artifactId>spark-core_${spark.scala.version}</artifactId>
  18.     <version>${spark.version}</version>
  19.   </dependency>
  20.   <!-- spark-sql -->
  21.   <dependency>
  22.     <groupId>org.apache.spark</groupId>
  23.     <artifactId>spark-sql_${spark.scala.version}</artifactId>
  24.     <version>${spark.version}</version>
  25.   </dependency>
  26.   若出现如下异常:
  27.   Caused by: com.fasterxml.jackson.databind.JsonMappingException:
  28.   Scala module 2.10.0 requires Jackson Databind version >= 2.10.0 and < 2.11.0
  29.     追加如下依赖:
  30.         -->
  31.   <!-- jackson-databind -->
  32.   <dependency>
  33.       <groupId>com.fasterxml.jackson.core</groupId>
  34.       <artifactId>jackson-databind</artifactId>
  35.       <version>2.10.0</version>
  36.   </dependency>
  37.   
  38.   <!-- mysql -->
  39.   <dependency>
  40.     <groupId>com.mysql</groupId>
  41.     <artifactId>mysql-connector-j</artifactId>
  42.     <version>${mysql.version}</version>
  43.   </dependency>
  44. </dependencies>
复制代码
log4j.properties

log4j.properties应该放在资源包下。
  1. log4j.rootLogger=ERROR, stdout, logfile # 设置可显示的信息等级
  2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
  3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
  4. log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
  5. log4j.appender.logfile=org.apache.log4j.FileAppender
  6. log4j.appender.logfile.File=log/spark_first.log
  7. log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
  8. log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
复制代码
五、Spark SQL数据集

1、DataSet


2、DataFrame


详细解释

创建DataSet的代码

  1. val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
  2. // 提供了一组隐式转换,这些转换允许将Scala的本地集合类型(如Seq、Array、List等)转换为Spark的DataSet。
  3. import spark.implicits._
  4. val dsPhone: Dataset[Product] = spark.createDataset(Seq(
  5.   Product(1, "Huawei Mate60", 5888.0f),
  6.   Product(2, "IPhone", 5666.0f),
  7.   Product(3, "OPPO", 1888.0f)
  8. ))
  9. dsPhone.printSchema()
  10. /**
  11. * root
  12. * |-- id: integer (nullable = false)
  13. * |-- name: string (nullable = true)
  14. * |-- price: float (nullable = false)
  15. */
复制代码


创建DataFrame的代码


  1. val spark: SparkSession = SparkSession.builder()
  2. .config(conf)
  3. .getOrCreate()
  4. import spark.implicits._
  5. val schema: StructType = StructType(
  6.   Seq(
  7.     StructField("user_id", LongType),
  8.     StructField("locale", StringType),
  9.     StructField("birthYear", IntegerType),
  10.     StructField("gender", StringType),
  11.     StructField("joinedAt", StringType),
  12.     StructField("location", StringType),
  13.     StructField("timezone", StringType)
  14.   )
  15. )
  16. val frmUsers: DataFrame = spark.read
  17. .schema(schema)
  18. .option("separator", ",") // 指定文件分割符
  19. .option("header", "true") // 指定CSV文件包含表头
  20. .option("quoteChar", """)
  21. .option("escapeChar", "\")
  22. .csv("C:\\Users\\lenovo\\Desktop\\users.csv")
  23. .repartition(4)
  24. .cache()
复制代码

  1. val frmUsers2: DataFrame = spark.read.json("hdfs://single01:9000/spark/cha02/users.json")
  2. frmUsers2.show()
复制代码

  1. val url = "jdbc:mysql://single01:3306/test_db_for_bigdata" // 数据库连接地址
  2. val mysql = new Properties()
  3. mysql.setProperty("driver", "com.mysql.cj.jdbc.Driver")
  4. mysql.setProperty("user", "root")
  5. mysql.setProperty("password", "123456")
  6. spark
  7.   .read
  8.   .jdbc(url,"test_table1_for_hbase_import",mysql) // (url,TableName,连接属性)
  9.   .show(100)
复制代码
六、Spark_SQL的两种编码方式

  1. val spark: SparkSession = SparkSession.builder()
  2. .config(conf)
  3. .getOrCreate()
  4. import spark.implicits._
  5. val schema: StructType = StructType(
  6.   Seq(
  7.     StructField("user_id", LongType),
  8.     StructField("locale", StringType),
  9.     StructField("birthYear", IntegerType),
  10.     StructField("gender", StringType),
  11.     StructField("joinedAt", StringType),
  12.     StructField("location", StringType),
  13.     StructField("timezone", StringType)
  14.   )
  15. )
  16. val frmUsers: DataFrame = spark.read
  17. .schema(schema)
  18. .option("separator", ",") // 指定文件分割符
  19. .option("header", "true") // 指定CSV文件包含表头
  20. .option("quoteChar", """)
  21. .option("escapeChar", "\")
  22. .csv("C:\\Users\\lenovo\\Desktop\\users.csv")
  23. .repartition(4)
  24. .cache()
复制代码
此处已经创建好了DataFrame
1. 面向标准SQL语句(偷懒用)

  1. frmUsers.registerTempTable("user_info") // 此方法已过期
  2. spark.sql(
  3.       """
  4.         |select * from user_info
  5.         |where gender='female'
  6.         |""".stripMargin)
  7.                 .show(10)
复制代码
2. 使用Spark中的SQL算子(更规范)

  1. frmUsers
  2.       .where($"birthYear">1990)
  3.       .groupBy($"locale")
  4.       .agg(
  5.         count($"locale").as("locale_count"),
  6.         round(avg($"birthYear"),2).as("avg_birth_year")
  7.       )
  8.       .where($"locale_count">=10 and $"avg_birth_year">=1993)
  9.       .orderBy($"locale_count".desc)
  10.       .select(
  11.         $"locale", $"locale_count", $"avg_birth_year",
  12.         dense_rank()
  13.           .over(win)
  14.           .as("rnk_by_locale_count"),
  15.         lag($"locale_count",1)
  16.           .over(win)
  17.           .as("last_locale_count")
  18.       )
  19.       .show(10)
复制代码
七、常用算子

1.基本SQL模板

  1. select
  2.                 col,cols*,agg*
  3. where
  4.                 conditionCols
  5. group by
  6.                 col,cols*
  7. having
  8.                 condition
  9. order by
  10.                 col asc|desc
  11. limit
  12.                 n
复制代码
2.select

select语句在代码的开头可以不写,因为有后续的类似where和group by语句已经对列进行了操纵,指明确列名。假如后续有select语句,则优先按照后面的select语句进行。
  1. frmUsers.select(
  2.         $"locale",$"locale_count"
  3. )
复制代码
3.agg

  1. .agg(
  2.   count($"locale").as("locale_count"),
  3.   round(avg($"birthYear"),2).as("avg_birth_year")
  4. )
复制代码
4.窗口函数


注意:over子句中的分区信息是可以被重用的
  1. val win: WindowSpec = Window.partitionBy($"gender").orderBy($"locale_count".desc)
  2. frmUsers
  3.   ...
  4.   .select(
  5.                 dense_rank()
  6.                   .over(win)
  7.                   .as("rnk_by_locale_count")
  8.         )
复制代码
5.show

show(N)表示显示符合条件的至多N条数据。(不是取前N条再提取出其中符合条件的数据)
  1. frmUsers
  2.   ...
  3.   .show(10)
复制代码
6.条件筛选 where

  1. newCol:Column = $"cus_state".isNull
  2. newCol:Column = $"cus_state".isNaN
  3. newCol:Column = $"cus_state".isNotNull
  4. newCol:Column = $"cus_state".gt(10)                <=>        $"cus_state">10
  5. newCol:Column = $"cus_state".geq(10)        <=>        $"cus_state">=10
  6. newCol:Column = $"cus_state".lt(10)                <=>        $"cus_state"<10
  7. newCol:Column = $"cus_state".leq(10)        <=>        $"cus_state"<=10
  8. newCol:Column = $"cus_state".eq(10)                <=>        $"cus_state"===10
  9. newCol:Column = $"cus_state".ne(10)                <=>        $"cus_state"=!=10
  10. newCol:Column = $"cus_state".between(10,20)
  11. newCol:Column = $"cus_state".like("张%")
  12. newCol:Column = $"cus_state".rlike("\\d+")
  13. newCol:Column = $"cus_state".isin(list:Any*)
  14. newCol:Column = $"cus_state".isInCollection(values:Itrable[_])
  15. 多条件:
  16. newCol:Column = ColOne and ColTwo
  17. newCol:Column = ColOne or ColTwo
复制代码
在Spark SQL中,不存在Having子句,Where子句的现实作用根据相对于分组语句的前后决定。
7.分组

  1. // 多重分组
  2. /**
  3. rollup的效果:
  4. select birthYear,count(*) from user group by birthYear
  5. union all
  6. select gender,birthYear,count(*) from user group by gender,birthYear
  7. 存在"字段不对应"的情况:
  8. 空缺的字段会自动补全为null
  9. */
  10. frmUsers
  11.         .rollup("gender", "birthYear")
  12.         .count()
  13.         .show(100)
复制代码
  1. // 为了方便查找到每个数据行所对应的分组方式
  2. spark.sql(
  3.   """
  4.   |select grouping__id,gender,birthYear,count(8) as cnt from user_info
  5.   |group by gender,birthday,
  6.   |grouping sets(gender,birthday,(gender,birthYear))
  7.   |""".stripMargin)
  8. .show(100)
  9. // 这里的group by子句定义了分组的列,到grouping sets明确指定了分组的组合
  10. // 因而,在数仓设计的过程中,我们能够对不同分组依据下的不同数据依据grouping__id做分区。
复制代码

8.关联查询

  1. val frmClass: DataFrame = spark.createDataFrame(
  2.   Seq(
  3.     Class(1, "yb12211"),
  4.     Class(2, "yb12309"),
  5.     Class(3, "yb12401")
  6.   )
  7. )
  8. val frmStu: DataFrame = spark.createDataFrame(
  9.   Seq(
  10.     Student("henry", 1),
  11.     Student("ariel", 2),
  12.     Student("jack", 1),
  13.     Student("rose", 4),
  14.     Student("jerry", 2),
  15.     Student("mary", 1)
  16.   )
  17. )
  18. // 1.笛卡尔积(默认情况下)
  19. frmStu.as("S")
  20. .join(frmClass.as("C"))
  21. .show(100)
  22. /**
  23. +-----+-------+-------+---------+
  24. | name|classId|classId|className|
  25. +-----+-------+-------+---------+
  26. |henry|    1 |    1 |  yb12211|
  27. |henry|    1 |    2 |  yb12309|
  28. |henry|    1 |    3 |  yb12401|
  29. |ariel|    2 |    1 |  yb12211|
  30. |ariel|    2 |    2 |  yb12309|
  31. |ariel|    2 |    3 |  yb12401|
  32. | jack|    1 |    1 |  yb12211|
  33. | jack|    1 |    2 |  yb12309|
  34. | jack|    1 |    3 |  yb12401|
  35. | rose|    4 |    1 |  yb12211|
  36. | rose|    4 |    2 |  yb12309|
  37. | rose|    4 |    3 |  yb12401|
  38. |jerry|    2 |    1 |  yb12211|
  39. |jerry|    2 |    2 |  yb12309|
  40. |jerry|    2 |    3 |  yb12401|
  41. | mary|    1 |    1 |  yb12211|
  42. | mary|    1 |    2 |  yb12309|
  43. | mary|    1 |    3 |  yb12401|
  44. +-----+-------+-------+---------+
  45. */
  46. // 2.内连接
  47. frmStu.as("S")
  48. .join(frmClass.as("C"), $"S.classId" === $"C.classId","inner")
  49. .show(100)
  50. /**
  51. +-----+-------+-------+---------+
  52. | name|classId|classId|className|
  53. +-----+-------+-------+---------+
  54. |henry|    1 |    1 |  yb12211|
  55. |ariel|    2 |    2 |  yb12309|
  56. | jack|    1 |    1 |  yb12211|
  57. |jerry|    2 |    2 |  yb12309|
  58. | mary|    1 |    1 |  yb12211|
  59. +-----+-------+-------+---------+
  60. */
  61. // 启用using:使用Seq("Column")代表关联字段
  62. frmStu.as("S")
  63. .join(frmClass.as("C"), Seq("classId"),"right")
  64. .show(100)
  65. // 3.外连接
  66. frmStu.as("S")
  67. .join(frmClass.as("C"), $"S.classId" === $"C.classId","outer") // left | right | outer
  68. .show(100)
  69. /**
  70. +-----+-------+-------+---------+
  71. | name|classId|classId|className|
  72. +-----+-------+-------+---------+
  73. |henry|    1 |    1 |  yb12211|
  74. | jack|    1 |    1 |  yb12211|
  75. | mary|    1 |    1 |  yb12211|
  76. | null|  null |    3 |  yb12401|
  77. | rose|    4 |  null |    null|
  78. |ariel|    2 |    2 |  yb12309|
  79. |jerry|    2 |    2 |  yb12309|
  80. +-----+-------+-------+---------+
  81. */
  82. // 4.反连接:返回左数据集中所有没有关联字段匹配记录的左数据集的行
  83. frmStu.as("S")
  84. .join(frmClass.as("C"), $"S.classId" === $"C.classId","anti")
  85. .show(100)
  86. /**
  87. +----+-------+
  88. |name|classId|
  89. +----+-------+
  90. |rose|    4 |
  91. +----+-------+
  92. */
  93. // 5.半连接:返回左数据集中所有有关联字段匹配记录的左数据集的行
  94. frmStu.as("S")
  95. .join(frmClass.as("C"), $"S.classId" === $"C.classId","semi")
  96. .show(100)
  97. /**
  98. +-----+-------+
  99. | name|classId|
  100. +-----+-------+
  101. |henry|    1 |
  102. |ariel|    2 |
  103. | jack|    1 |
  104. |jerry|    2 |
  105. | mary|    1 |
  106. +-----+-------+
  107. */
复制代码
9.排序

  1. frmStu.orderBy(cols:Column*)
复制代码
10.数据截取

  1. frmStu.tail(n:Int)
  2. frmStu.take(n:Int)
复制代码


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 ToB企服应用市场:ToB评测及商务社交产业平台 (https://dis.qidao123.com/) Powered by Discuz! X3.4