头歌实践教学平台 大数据编程 实训答案(二)

张春  金牌会员 | 2024-10-24 15:20:56 | 显示全部楼层 | 阅读模式
打印 上一主题 下一主题

主题 899|帖子 899|积分 2697

第三阶段 Spark算子综合案例

Spark算子综合案例 - JAVA篇


第1关:WordCount - 词频统计

任务形貌
本关任务:使用 Spark Core 知识编写一个词频统计程序。
相干知识

编程要求
请仔细阅读右侧代码,根据方法内的提示,在Begin - End区域内进行代码补充,详细任务如下:
对文本文件内的每个单词都统计出其出现的次数;
按照每个单词出现次数的数量,降序排序。
  1. package net.educoder;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import scala.Tuple2;
  7. import java.util.Arrays;
  8. /**
  9. * 1、对文本文件内的每个单词都统计出其出现的次数。
  10. * 2、按照每个单词出现次数的数量,降序排序。
  11. */
  12. public class Step1 {
  13.     private static SparkConf conf;
  14.     private static JavaSparkContext sc;
  15.     static {
  16.         conf = new SparkConf().setAppName("step1").setMaster("local");
  17.         sc = new JavaSparkContext(conf);
  18.     }
  19.     /**
  20.      *
  21.      * @return JavaRDD<Tuple2>
  22.      */
  23.     public static JavaRDD<Tuple2> fun1() {
  24.         JavaRDD<String> rdd = sc.textFile("/root/wordcount.txt");
  25.         /**-----------------------------------begin----------------------------------------------**/
  26.         JavaRDD<String> rdd1 = rdd.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
  27.         JavaPairRDD<String, Integer> rdd2 = rdd1.mapToPair(x -> new Tuple2<>(x, 1));
  28.         JavaPairRDD<String, Integer> rdd3 = rdd2.reduceByKey((x, y) -> x + y);
  29.         JavaRDD<Tuple2> rdd4 = rdd3.map(x -> new Tuple2(x._2(), x._1()));
  30.         JavaRDD<Tuple2> rdd5 = rdd4.sortBy(x -> x._1(), false, 1);
  31.         JavaRDD<Tuple2> rdd6 = rdd5.map(x -> new Tuple2(x._2(), x._1()));
  32.         return rdd6;
  33.         /**-----------------------------------end----------------------------------------------**/
  34.     }
  35. }
复制代码
第2关:Friend Recommendation - 好友推荐

任务形貌
本关任务:使用 Spark Core 知识完成 " 好友推荐 " 的程序。
相干知识
直接好友与间接好友
参照数据如下:
hello hadoop cat
world hadoop hello hive
cat tom hive
...
...
数听说明(第二行为例):
这个人叫 world ,他有三个好友,分别是:hadoop、hello 和 hive。hadoop、hello 和 hive 之间就是间接好友。word 与 hadoop 、 hello 、hive 属于直接好友。
  1. package net.educoder;
  2. import org.apache.spark.SparkConf;
  3. import org.apache.spark.api.java.JavaPairRDD;
  4. import org.apache.spark.api.java.JavaRDD;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import scala.Tuple2;
  7. import java.util.ArrayList;
  8. import java.util.List;
  9. public class Step2 {
  10.     private static SparkConf conf;
  11.     private static JavaSparkContext sc;
  12.     static {
  13.         conf = new SparkConf().setAppName("step2").setMaster("local");
  14.         sc = new JavaSparkContext(conf);
  15.     }
  16.     /**
  17.      *
  18.      * @return JavaPairRDD<String, Integer>
  19.      */
  20.     public static JavaPairRDD<String, Integer> fun2() {
  21.         JavaRDD<String> rdd = sc.textFile("/root/friend.txt");
  22.         /**-------------------------------beign-----------------------------------**/
  23.     JavaPairRDD<String,Integer> rdd1 = rdd.flatMapToPair(line->{
  24.         List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();
  25.         String[] split = line.split(" ");
  26.         String me = split[0];
  27.         for (int i = 1;i < split.length;i++){
  28.             String s = me.hashCode() > split[i].hashCode() ? me +"_"+split[i]:split[i] + "_" + me;
  29.             list.add(new Tuple2<>(s,0));
  30.             for(int j=i+1;j<split.length;j++){
  31.             String ss = split[j].hashCode() > split[i].hashCode() ? split[j] + "_" + split[i]:split[i] + "_" +split[j];
  32.             list.add(new Tuple2<>(ss,1));
  33.         }
  34.         }
  35.         return list.iterator();
  36.     });
  37.     JavaPairRDD<String,Iterable<Integer>> rdd2 = rdd1.groupByKey();
  38.     JavaPairRDD<String,Integer> javaPairRDD = rdd2.mapToPair(x ->{
  39.         boolean bool = false;
  40.         int count = 0;
  41.         Iterable<Integer> flags = x._2();
  42.         String name = x._1();
  43.         for(Integer flag:flags){
  44.             if(flag == 0){
  45.                 bool = true;
  46.             }
  47.             count++;
  48.         }
  49.         if(bool == false){
  50.             return new Tuple2<String,Integer>(name,count);
  51.         }else{
  52.             return new Tuple2<String,Integer>("直接好友",0);
  53.         }
  54.     });
  55.     JavaPairRDD<String,Integer> filter = javaPairRDD.filter(x -> x._2()!=0?true : false);
  56.     return filter;
  57.         /**-------------------------------end-----------------------------------**/
  58.     }
  59. }
复制代码
第四阶段 SparkSQL

Spark SQL 自定义函数(Scala)

第1关:Spark SQL 自定义函数

任务形貌
本关任务:根据编程要求,创建自定义函数,实现功能。
相干知识
为了完本钱关任务,你需要掌握:
自定义函数分类;
自定义函数的实现方式;
弱类型的 UDAF 与 强类型的 UDAF 区分;
实现弱类型的 UDAF 与 强类型的 UDAF。
  1. import org.apache.spark.sql.api.java.UDF1
  2. import org.apache.spark.sql.types.StringType
  3. import org.apache.spark.sql.{DataFrame,SparkSession}
  4. object First_Question {
  5.   def main(args: Array[String]): Unit = {
  6.    
  7.     val spark: SparkSession = SparkSession
  8.       .builder()
  9.       .appName("First_Question")
  10.       .master("local[*]")
  11.       .getOrCreate()
  12.     /******************* Begin *******************/  
  13.     val dataFrame: DataFrame = spark.read.text("file:///data/bigfiles/test.txt")
  14.     dataFrame.createTempView("data")
  15.     spark.sql("""select value[0] as name,
  16.     value[1] as chinese, value[2] as math,value[3] as english from(select split(value,' ')value from data)""".stripMargin).createOrReplaceTempView("clean_data")
  17.     spark.udf.register("nameToUp",(x:String)=>x.toUpperCase)
  18.     spark.udf.register("add",(chinese:Integer,math:Integer,english:Integer)=>chinese + math+english)
  19.     spark.sql("""select nameToUp(name) as name,add(chinese,math,english) as total from clean_data order by total desc""".stripMargin).show()
  20.     /******************* End *******************/
  21.     spark.stop()
  22.   }
  23. }
复制代码
Spark SQL 多数据源操纵(Scala)

第1关:加载与生存操纵

任务形貌
本关任务:根据编程要求,编写 Spark 程序读取指定命据源,完成任务。
相干知识
为了完本钱关任务,你需要掌握:
数据加载;
SQL 语句加载数据;
文件生存;
生存模式;
恒久化存储到 Hive;
分区与排序。
  1. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
  2. object First_Question {
  3.   def main(args: Array[String]): Unit = {
  4.    
  5.     val spark: SparkSession = SparkSession
  6.       .builder()
  7.       .appName("First_Question")
  8.       .master("local[*]")
  9.       .getOrCreate()
  10.     /******************* Begin *******************/  
  11.     val dataFrame = spark.read.json("file:///data/bigfiles/demo.json");
  12.     dataFrame.orderBy(dataFrame.col("age").desc).show();
  13.       
  14.     /******************* End *******************/
  15.     spark.stop()
  16.   }
  17. }
复制代码
第2关:Parquet 格式文件

任务形貌
本关任务:根据编程要求,编写 Spark 程序读取指定命据源,完成 Parquet 分区任务。
相干知识
为了完本钱关任务,你需要掌握:
什么是 Parquet 文件;
Parquet 分区自动辨认;
Parquet 模式合并;
Hive Metastore Parquet 表转换;
元数据刷新。
什么是 Parquet 文件?
Apache Parquet 是面向分析型业务的列式存储格式,由 Twitter 和 Cloudera 相助开发,Parquet 是一种与语言无关的列式存储文件类型,可以适配多种计算框架。在  Spark SQL 中提供了对 Parquet 文件的读写支持,读取 Parquet 文件时会自动保留原始数据的布局。需要注意的一点,我们在编写 Parquet 文件时,出于兼容性缘故原由,全部列都将自动转换为 NULL 类型。
  1. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
  2. object Second_Question {
  3.   def main(args: Array[String]): Unit = {
  4.    
  5.     val spark: SparkSession = SparkSession
  6.       .builder()
  7.       .appName("Second_Question")
  8.       .master("local[*]")
  9.       .getOrCreate()
  10.     /******************* Begin *******************/  
  11.     spark.read.json("file:///data/bigfiles/demo.json").write.parquet("file:///result/student=1")
  12.     spark.read.json("file:///data/bigfiles/demo2.json").write.parquet("file:///result/student=2")
  13.     /******************* End *******************/
  14.     spark.stop()
  15.   }
  16. }
复制代码
第3关:ORC 格式文件

任务形貌
本关任务:根据编程要求,美满程序,实现 Spark SQL 读取 ORC 格式的 Hive 数据表。
相干知识
为了完本钱关任务,你需要掌握:
什么是 ORC 文件?
ORC 组成
读取 ORC 文件数据流程
Spark SQL 读取 ORC 格式的 Hive 数据表
什么是 ORC 文件?
ORC 全称 Optimized Row Columnar,是一种为 Hadoop 工作负载设计的自形貌类型感知列文件格式。它针对大型流式读取进行了优化,但集成了对快速查找所需行的支持。以列格式存储数据使读者可以仅读取、解压缩和处理当前查询所需的值。因为 ORC 文件是类型感知的,所以编写者为该类型选择最合适的编码,并在写入文件时创建一个内部索引。谓词下推使用这些索引来确定需要为特定查询读取文件中的哪些条带,并且行索引可以将搜刮范围缩小到 10,000 行的特定集合。ORC 支持 Hive 中的完备类型集,包罗复杂类型:布局、列表、映射和联合。
下令行窗口
  1. # 启动 Hadoop
  2. start-all.sh
  3. # 启动 Hive 元数据服务
  4. nohup hive --service metastore &
  5. # 进入 Hive
  6. hive
  7. # 创建 ORC 格式的 Hive 数据表
  8. create table student(
  9.     id int,
  10.    name string,
  11.    age int,
  12.    class string
  13. )stored as orc;
  14. # 插入数据
  15. insert into table student values(1001,"王刚",19,"大数据一班");
  16. insert into table student values(1002,"李虹",18,"大数据一班");
  17. insert into table student values(1003,"张子萱",20,"大数据一班");
  18. insert into table student values(1004,"赵云",18,"大数据一班");
  19. insert into table student values(1005,"李晓玲",19,"大数据一班");
  20. insert into table student values(1006,"张惠",18,"大数据二班");
  21. insert into table student values(1007,"秦散",19,"大数据二班");
  22. insert into table student values(1008,"王丽",18,"大数据二班");
  23. insert into table student values(1009,"田忌",20,"大数据二班");
  24. insert into table student values(1010,"张花",18,"大数据二班");
复制代码
代码文件窗口
  1. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
  2. object Third_Question {
  3.   def main(args: Array[String]): Unit = {
  4.    
  5.     val spark: SparkSession = SparkSession
  6.       .builder()
  7.       .appName("Third_Question")
  8.       .master("local[*]")
  9.       .enableHiveSupport()
  10.       .getOrCreate()
  11.     /******************* Begin *******************/  
  12.     spark.sql("select * from student").orderBy("id").show()
  13.     /******************* End *******************/
  14.     spark.stop()
  15.   }
  16. }
复制代码
第4关:JSON 格式文件

任务形貌
本关任务:根据编程要求,读取 JSON 文件,完成任务。
相干知识
为了完本钱关任务,你需要掌握:
什么是 JSON 格式;
JSON 的标准格式;
JSON 与 Spark SQL。
什么是 JSON 格式?
JSON(JavaScript Object Notation, JS 对象简谱)是一种轻量级的数据互换格式。它基于 ECMAScript(European Computer Manufacturers Association, 欧洲计算机协会制定的 js 规范)的一个子集,接纳完全独立于编程语言的文本格式来存储和表现数据。简洁和清楚的层次布局使得 JSON 成为抱负的数据互换语言。 易于人阅读和编写,同时也易于呆板解析和天生,并有效地提升网络传输服从。
  1. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
  2. object Forth_Question {
  3.   def main(args: Array[String]): Unit = {
  4.    
  5.     val spark: SparkSession = SparkSession
  6.       .builder()
  7.       .appName("Forth_Question")
  8.       .master("local[*]")
  9.       .getOrCreate()
  10.     /******************* Begin *******************/  
  11.     val dataFrame:DataFrame = spark.read.json("file:///data/bigfiles/test.json")
  12.     dataFrame.createOrReplaceTempView("data")
  13.     spark.sql("select id,name,age,class from data").orderBy("id").show()
  14.     /******************* End *******************/
  15.     spark.stop()
  16.   }
  17. }
复制代码
第5关:JDBC 操纵数据库

任务形貌
本关任务:根据编程要求,读取当地文件,将数据使用 JDBC 方式进行生存。
相干知识
为了完本钱关任务,你需要掌握:
JDBC 的定义;
Spark SQL 与 JDBC;
Spark SQL 加载 JDBC 数据;
Spark SQL 写入 JDBC 数据。
JDBC 的定义
Java 数据库连接(Java Database Connectivity,简称 JDBC)是 Java 语言中用来规范客户端程序怎样来访问数据库的应用程序接口,提供了诸如查询和更新数据库中数据的方法。JDBC 也是 Sun Microsystems 的商标。我们通常说的 JDBC 是面向关系型数据库的。
  1. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
  2. object Fifth_Question {
  3.   def main(args: Array[String]): Unit = {
  4.    
  5.     val spark: SparkSession = SparkSession
  6.       .builder()
  7.       .appName("Fifth_Question")
  8.       .master("local[*]")
  9.       .getOrCreate()
  10.     /******************* Begin *******************/  
  11.     val dataFrame:DataFrame = spark.read.option("header","true").csv("file:///data/bigfiles/job58_data.csv")
  12.     dataFrame.write.format("jdbc")
  13.       .option("url", "jdbc:mysql://localhost:3306/work?useSSL=false")
  14.       .option("driver", "com.mysql.jdbc.Driver")
  15.       .option("user", "root")
  16.       .option("password", "123123")
  17.       .option("dbtable", "job_data")
  18.       .mode(SaveMode.Overwrite)  
  19.       .save()
  20.     /******************* End *******************/
  21.     spark.stop()
  22.   }
  23. }
复制代码
第6关:Hive 表操纵

任务形貌
本关任务:根据编程要求,创建 Hive 数据表,完成读取操纵。
相干知识
为了完本钱关任务,你需要掌握:
什么是 Hive;
Spark on Hive;
Spark SQL 与 Hive;
Spark SQL 加载 Hive 数据;
Spark SQL 生存数据到 Hive。
什么是 Hive?
Hive 是基于 Hadoop 的一个数据堆栈工具,用来进行数据提取、转化、加载,这是一种可以存储、查询和分析存储在 Hadoop 中的大规模数据的机制。Hive 数据堆栈工具能将布局化的数据文件映射为一张数据库表,并提供 SQL 查询功能,能将 SQL 语句转酿成 MapReduce 任务来实行。Hive 的优点是学习本钱低,可以通过雷同 SQL 语句实现快速 MapReduce 统计,使 MapReduce 变得更加简单,而不必开发专门的 MapReduce 应用程序。
下令行窗口
  1. # 启动 Hadoop
  2. start-all.sh
  3. # 启动 Hive 元数据服务
  4. nohup hive --service metastore &
  5. # 进入 Hive
  6. hive
  7. # 创建 Hive 数据表
  8. create table employee(
  9.    eid string,
  10.    ename string,
  11.    age int,
  12.    part string
  13. );
  14. # 插入数据
  15. insert into table employee values("A568952","王晓",25,"财务部");
  16. insert into table employee values("B256412","张天",28,"人事部");
  17. insert into table employee values("C125754","田笑笑",23,"销售部");
  18. insert into table employee values("D265412","赵云",24,"研发部");
  19. insert into table employee values("F256875","李姿姿",26,"后勤部");
复制代码
代码文件窗口
  1. import org.apache.spark.sql.{DataFrame, SparkSession}
  2. object Sixth_Question {
  3.   def main(args: Array[String]): Unit = {
  4.    
  5.     /******************* Begin *******************/  
  6.     val spark: SparkSession = SparkSession
  7.       .builder()
  8.       .appName("Sixth_Question")
  9.       .master("local[*]")
  10.       .enableHiveSupport()
  11.       .getOrCreate()
  12.    
  13.     val dataFrame : DataFrame = spark.sql("select * from employee order by eid")
  14.     dataFrame.show()
  15.    
  16.     spark.stop()
  17.     /******************* End *******************/
  18.   }
  19. }
复制代码
RDD、DataSet 与 DataFrame 的转换(Scala)

第1关:RDD、DataSet 与 DataFrame 的相互转换

任务形貌
本关任务:完成 RDD、DataSet 与 DataFrame 之间的相互转换。
相干知识
为了完本钱关任务,你需要掌握:
熟悉基础的 Scala 代码编写;
创建 RDD、DataSet、DataFrame 数据集。
  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.sql.{DataFrame,SparkSession}
  3. object First_Question {
  4.   case class Employee(id:Int,e_name:String,e_part:String,salary:Int)
  5.   def main(args: Array[String]): Unit = {
  6.    
  7.     val spark: SparkSession = SparkSession
  8.       .builder()
  9.       .appName("First_Question")
  10.       .master("local[*]")
  11.       .getOrCreate()
  12.     val rdd: RDD[(Int, String, String, Int)] = spark.sparkContext.parallelize(List((1001, "李晓", "运营部", 6000), (1002, "张花", "美术部", 6000), (1003, "李强", "研发部", 8000), (1004,"田美", "营销部", 5000), (1005, "王菲", "后勤部", 4000)))
  13.     /******************* Begin *******************/  
  14.     import spark.implicits._
  15.     val df:DataFrame=rdd.map(line=>{
  16.     Employee(line._1,line._2,line._3,line._4)
  17.     }).toDF()
  18.     df.as[Employee].show()
  19.     /******************* End *******************/
  20.     spark.stop()
  21.   }
  22. }
复制代码

DataFrame 基础操纵(Scala)

第1关:DataFrame 基础操纵

任务形貌
本关任务:根据编程要求,完成对指定 DataFrame 数据集的基础操纵。
相干知识
为了完本钱关任务,你需要掌握:
熟悉基础的 Scala 代码编写;
DataFrame 的基础操纵。
下面让我来领导各人一起来学习 DataFrame 中丰富的方法,首先创建 DataFrame,做为下面各个方法中的示例数据。
  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.sql.{DataFrame,SparkSession}
  3. object First_Question {
  4.   case class Student(name:String,age:String,sex:String)
  5.   def main(args: Array[String]): Unit = {
  6.    
  7.     val spark: SparkSession = SparkSession
  8.       .builder()
  9.       .appName("First_Question")
  10.       .master("local[*]")
  11.       .getOrCreate()
  12.      val rdd: RDD[String] = spark.sparkContext.parallelize(List("张三,20,男", "李四,22,男", "李婷,23,女","赵六,21,男"))
  13.     val temp: RDD[Student] = rdd.map(s => {
  14.       val split_rdd: Array[String] = s.split(",")
  15.       Student(split_rdd(0), split_rdd(1), split_rdd(2))
  16.     })
  17.     import spark.implicits._
  18.     // DataFrame 源数据
  19.     val dataFrame: DataFrame = temp.toDF()
  20.     /******************* Begin *******************/  
  21.     dataFrame.where("age>=18 and age<25").groupBy("sex").count().show()
  22.   
  23.         
  24.     /******************* End *******************/
  25.     spark.stop()
  26.   }
  27. }
复制代码

DataFrame 创建(Scala)

第1关:DataFrame 创建

任务形貌
本关任务:了解什么是 DataFrame 以及创建 DataFrame 的方式。
相干知识
为了完本钱关任务,你需要掌握:
熟悉基础的 Scala 代码编写;
创建 DataFrame 数据集。
什么是 DataFrame?
DataFrame 的前身是 SchemaRDD, 从 Spark 1.3.0 开始 SchemaRDD 更名为 DataFrame。与 SchemaRDD 的主要区别是: DataFrame 不再直接继续自 RDD, 而是自己实现了 RDD 的绝大多数功能。但仍然可以在 DataFrame 上调用 RDD 方法将其转换为一个 RDD。DataFrame 是一种以 RDD 为基础的分布式数据集, 雷同于传统数据库的二维表格, DataFrame 带有 Schema 元信息, 即 DataFrame 所表现的二维表数据集的每一列都带有名称和类型, 但底层做了更多的优化。DataFrame 可以从许多数据源构建, 比如: 已存在的 RDD, 布局化文件, 外部数据库, Hive 表等。
  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.sql.{DataFrame,SparkSession}
  3. object First_Question {
  4.   /******************* Begin *******************/
  5.   case class Student(name:String,age:String,sex:String)
  6.   /******************* End *******************/
  7.   def main(args: Array[String]): Unit = {
  8.    
  9.     val spark: SparkSession = SparkSession
  10.       .builder()
  11.       .appName("First_Question")
  12.       .master("local[*]")
  13.       .getOrCreate()
  14.     val rdd: RDD[String] = spark.sparkContext.parallelize(List("张三,20,男", "李四,22,男", "李婷,23,女","赵六,21,男"))
  15.     /******************* Begin *******************/  
  16.     val result: RDD[Student] = rdd.map(s => {
  17.        val split_rdd: Array[String] = s.split(",")
  18.        Student(split_rdd(0), split_rdd(1), split_rdd(2))
  19.     })
  20.     import spark.implicits._
  21.     result.toDF().show()
  22.   
  23.         
  24.     /******************* End *******************/
  25.     spark.stop()
  26.   }
  27. }
复制代码

SparkSQL数据源

第1关:SparkSQL加载和生存

任务形貌
本关任务:编写一个SparkSQL程序,完成加载和生存数据。
相干知识
为了完本钱关任务,你需要掌握:
加载数据
直接在文件上运行SQL
生存到路径
生存模式介绍
生存到恒久表
存储和排序或分区
  1. package com.educoder.bigData.sparksql2;
  2. import org.apache.spark.sql.AnalysisException;
  3. import org.apache.spark.sql.SaveMode;
  4. import org.apache.spark.sql.SparkSession;
  5. public class Test1 {
  6.        
  7.         public static void main(String[] args) throws AnalysisException {
  8.                 SparkSession  spark  =  SparkSession
  9.                                   .builder()
  10.                                   .appName("test1")
  11.                                   .master("local")
  12.                                   .getOrCreate();
  13.                 /********* Begin *********/
  14.                 spark.read().format("json").load("people.json").write().mode(SaveMode.Append).save("people");
  15.         spark.read().format("json").load("people1.json").write().mode(SaveMode.Append).save("people");
  16.         spark.read().load("people").show();
  17.                
  18.                 /********* End *********/
  19.         }
  20.        
  21. }
复制代码
第2关:Parquet文件介绍

任务形貌
本关任务:编写Parquet分区文件,并输出表格内容
相干知识
为了完本钱关任务,你需要掌握:
编程方式加载Parquet文件
Parquet分区
布局合并
元数据刷新
Parquet参数配置
  1. package com.educoder.bigData.sparksql2;
  2. import org.apache.spark.sql.AnalysisException;
  3. import org.apache.spark.sql.SparkSession;
  4. public class Test2 {
  5.         public static void main(String[] args) throws AnalysisException {
  6.                 SparkSession  spark  =  SparkSession
  7.                                   .builder()
  8.                                   .appName("test1")
  9.                                   .master("local")
  10.                                   .getOrCreate();
  11.                 /********* Begin *********/
  12.                 spark.read().json("people.json").write().parquet("people/id=1");
  13.                 spark.read().json("people1.json").write().parquet("people/id=2");
  14.         spark.read().load("people").show();
  15.                 /********* End *********/
  16.         }
  17.        
  18. }
复制代码
第3关:json文件介绍

任务形貌
本关任务:编写一个sparksql程序,统计均匀薪水。
相干知识
为了完本钱关任务,你需要掌握json文件介绍及使用。
json文件介绍
Spark SQL可以自动推断JSON数据集的模式并将其加载为Dataset<Row>。可以使用SparkSession.read().json()。
请注意,作为json文件提供的文件不是典型的JSON文件。每行必须包含一个单独的,自包含的有效JSON对象。有关更多信息,请参阅JSON Lines文本格式,也称为换行符分隔的JSON。
  1. package com.educoder.bigData.sparksql2;
  2. import org.apache.spark.sql.AnalysisException;
  3. import org.apache.spark.sql.SparkSession;
  4. public class Test3 {
  5.        
  6.         public static void main(String[] args) throws AnalysisException {
  7.                 SparkSession  spark  =  SparkSession
  8.                                   .builder()
  9.                                   .appName("test1")
  10.                                   .master("local")
  11.                                   .getOrCreate();
  12.                 /********* Begin *********/
  13.                 spark.read().format("json").load("people.json").createOrReplaceTempView("people");
  14.         spark.read().format("json").load("people1.json").createOrReplaceTempView("people1");
  15.                 spark.sql("select avg(salary) from ( select salary from people union all select salary from people1) a").show();
  16.                 /********* End *********/
  17.                
  18.         }
  19.        
  20. }
复制代码
第4关:JDBC读取数据源

任务形貌
本关任务:编写sparksql程序,生存文件信息到mysql,并从mysql进行读取。
相干知识
为了完本钱关任务,你需要掌握怎样使用JDBC读取数据源。
使用JDBC怎样读取数据源
Spark SQL 还包罗一个可以使用JDBC从其他数据库读取数据的数据源,与使用JdbcRDD相比,此功能应该更受欢迎。这是因为结果作为DataSet返回,可以在Spark SQL中轻松处理,也可以与其他数据源连接。
JDBC数据源也更易于使用Java或Python,因为它不需要用户提供 ClassTag。(请注意,这与Spark SQL JDBC服务器不同,后者允许其他应用程序使用Spark SQL运行查询)。
  1. package com.educoder.bigData.sparksql2;
  2. import org.apache.spark.sql.Dataset;
  3. import org.apache.spark.sql.Row;
  4. import org.apache.spark.sql.SaveMode;
  5. import org.apache.spark.sql.SparkSession;
  6. public class Test4 {
  7.         public static void case4(SparkSession spark) {
  8.                
  9.                 /********* Begin *********/
  10.         //people.json保存至mysql的people
  11.                 Dataset<Row> load = spark
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

张春

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表