Spark大数据分析与实战笔记(第四章 Spark SQL结构化数据文件处理-04) ...

打印 上一主题 下一主题

主题 1687|帖子 1687|积分 5061


每日一句正能量

   一个人若想拥有智慧才智,便需要不断地学习积累。
  第4章 Spark SQL结构化数据文件处理

章节概要

   在很多情况下,开发工程师并不了解Scala语言,也不了解Spark常用API,但又非常想要利用Spark框架提供的强大的数据分析本领。Spark的开发工程师们考虑到了这个问题,利用SQL语言的语法简洁、学习门槛低以及在编程语言普及程度和流行程度高等诸多优势,从而开发了Spark SQL模块,通过Spark SQL,开发人员能够通过利用SQL语句,实现对结构化数据的处理。本章将针对Spark SQL的根本原理、利用方式举行具体讲解。
  4.4 RDD转换DataFrame



  • Spark官方提供了两种方法实现从RDD转换得到DataFrame。
  • 第一种方法是利用反射机制来推断包含特定范例对象的Schema,这种方式适用于对已知数据结构的RDD转换
  • 第二种方法通过编程接口构造一个Schema,并将其应用在已知的RDD数据中。
4.4.1 反射机制推断Schema

Windows体系开发Scala代码,可利用当地环境测试(需要先预备当地数据文件)。我们可以很容易的分析出当前数据文件中字段的信息,但盘算机无法直观感受字段的实际含义,因此需要通过反射机制来推断包含特定范例对象的Schema信息,实现将RDD转换成DataFrame。
在Windows体系下开发Scala代码,可以利用当地环境测试,因此我们首先需要在当地磁盘预备文本数据文件,这里将HDFS中的/spark/person.txt文件下载到当地D:/spark/person.txt路径下。我们需要通过反射机制来推断包含特定范例对象的Schema信息。
接下来我们打开IDEA开发工具,创建名为"“spark_chapter04""的Maven工程,讲解实现反射机制推断Schema的开发流程。
具体步调
1.创建Maven工程。
打开IDEA开发工具,创建名为“spark_chapter04”的Maven工程。
2.添加依赖。在pom.xml文件中添加Spark SQL依赖。,代码片段如下所示。
  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-sql_2.11</artifactId>
  4. <version>2.3.2</version>
  5. </dependency>
复制代码
3.界说case class样例类、字段和属性,样例类的参数名会被利用反射机制作为列名。通过sc对象读取文件天生一个RDD,将RDD 与样例类匹配,调用toDF()方法将RDD转换为DataFrame。代码如下所示
  1. package cn.itcast.sql
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.sql.{DataFrame, Row, SparkSession}
  5. case class Person(id:Int,name:String,age:Int)
  6. object CaseClassSchema {
  7.     def main(args: Array[String]): Unit = {
  8.         //1.构建SparkSession
  9.         val spark : SparkSession = SparkSession.builder()
  10.                 .appName("CaseClassSchema")
  11.                 .master("local[2]")
  12.                 .getOrCreate();
  13.         //2.获取SparkContext
  14.         val sc : SparkContext =spark.sparkContext;
  15.         //设置日志打印级别
  16.         sc.setLogLevel("WARN")
  17.         //3.读取文件
  18.         val data: RDD[Array[String]] =
  19.   sc.textFile("D://spark//person.txt").map(x=>x.split(" "));
  20.         //4.将RDD与样例类关联
  21.         val personRdd: RDD[Person] = data.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
  22.         //5.获取DF
  23.         //手动导入隐式转换
  24.         import spark.implicits._
  25.         val personDF: DataFrame = personRdd.toDF
  26.         //------------DSL语法操作开始-------------
  27.         //1、显示DataFrame的数据,默认显示20行
  28.         personDF.show()
  29.         //2、显示DataFrame的schema信息
  30.         personDF.printSchema()
  31.         //3、显示DataFrame记录数
  32.         println(personDF.count())
  33.         //4、显示DataFrame的所有字段
  34.         personDF.columns.foreach(println)
  35.         //5、取出DataFrame的第一行记录
  36.         println(personDF.head())
  37.         //6、显示DataFrame中name字段的所有值
  38.         personDF.select("name").show()
  39.         //7、过滤出DataFrame中年龄大于30的记录
  40.         personDF.filter($"age" > 30).show()
  41.         //8、统计DataFrame中年龄大于30的人数
  42.         println(personDF.filter($"age">30).count())
  43.         //9、统计DataFrame中按照年龄进行分组,求每个组的人数
  44.         personDF.groupBy("age").count().show()
  45.         //-----------DSL语法操作结束-------------
  46.         //-----------SQL操作风格开始-------------
  47.         //将DataFrame注册成表
  48.         personDF.createOrReplaceTempView("t_person")
  49.         //传入sql语句,进行操作
  50.         spark.sql("select * from t_person").show()
  51.         spark.sql("select * from t_person where name='zhangsan'").show()
  52.         spark.sql("select * from t_person order by age desc").show()
  53.         //-----------SQL操作风格结束-------------
  54.         //关闭操作
  55.         sc.stop()
  56.         spark.stop()
  57.     }
  58. }
复制代码
运行效果如下图所示

4.4.2 编程方式界说Schema

当Case类不能提前界说的时候,就需要接纳编程方式界说Schema信息,实现RDD转换DataFrame的功能主要包含3个步调,具体如下:
1.创建一个Row对象结构的RDD;
2.基于StructType范例创建Schema;
3.通过SparkSession提供的createDataFrame()方法来拼接Schema。
根据上述步调,创建SparkSqISchema.scala文件,利用编程方式界说Schema信息的具体代码如下所示。
  1. package cn.itcast.sql
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
  5. import org.apache.spark.sql.{DataFrame, Row, SparkSession}
  6. object SparkSqlSchema {
  7.     def main(args: Array[String]): Unit = {
  8.         //1.创建SparkSession
  9.         val spark: SparkSession = SparkSession.builder()
  10.                 .appName("SparkSqlSchema")
  11.                 .master("local[2]")
  12.                 .getOrCreate()
  13.         //2.获取sparkContext对象
  14.         val sc: SparkContext = spark.sparkContext
  15.         //设置日志打印级别
  16.         sc.setLogLevel("WARN")
  17.         //3.加载数据
  18.         val dataRDD: RDD[String] = sc.textFile("D://spark//person.txt")
  19.         //4.切分每一行
  20.         val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))
  21.         //5.加载数据到Row对象中
  22.         val personRDD: RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
  23.         //6.创建Schema
  24.         val schema:StructType= StructType(Seq(
  25.             StructField("id", IntegerType, false),
  26.             StructField("name", StringType, false),
  27.             StructField("age", IntegerType, false)
  28.         ))
  29.         //7.利用personRDD与Schema创建DataFrame
  30.         val personDF: DataFrame = spark.createDataFrame(personRDD,schema)
  31.         //8.DSL操作显示DataFrame的数据结果
  32.         personDF.show()
  33.         //9.将DataFrame注册成表
  34.         personDF.createOrReplaceTempView("t_person")
  35.         //10.sql语句操作
  36.         spark.sql("select * from t_person").show()
  37.         //11.关闭资源
  38.         sc.stop()
  39.         spark.stop()
  40.     }
  41. }
复制代码
转载自:https://blog.csdn.net/u014727709/article/details/136033354
欢迎

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

大连密封材料

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表