Spark大数据分析与实战笔记(第四章 Spark SQL结构化数据文件处理-05)
https://i-blog.csdnimg.cn/direct/4426ad7a71d04d97b7808dda243be143.png#pic_center逐日一句正能量
努力学习,勤奋工作,让芳华更加光彩。
第4章 Spark SQL结构化数据文件处理
章节概要
在许多情况下,开发工程师并不了解Scala语言,也不了解Spark常用API,但又非常想要利用Spark框架提供的强盛的数据分析能力。Spark的开发工程师们考虑到了这个标题,利用SQL语言的语法简便、学习门槛低以及在编程语言遍及程度和流行程度高等诸多优势,从而开发了Spark SQL模块,通过Spark SQL,开发职员可以或许通过利用SQL语句,实现对结构化数据的处理。本章将针对Spark SQL的根本原理、利用方式举行详细解说。
4.5 Spark SQL操作数据源
Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame举行一系列的操作后,还可以将数据重新写入到关系型数据中。关于Spark SQL对MySQL数据库的干系操作具体如下。
4.5.1 Spark SQL操作MySQL
[*]读取MysQL数据库
通过SQLyog工具远程毗连hadoop01节点的MySQL服务(这里选择的是SQLyog,用其它的工具也是一样的),并利用可视化操作界面创建名称为“spark"的数据库,并创建名称为“person”的数据表,以及向表中添加数据。
同样也可以在hadoop01节点上利用MySQL客户端创建数据库、数据表以及插入数据,具体下令如下。
[*]启动mysql客户端
mysql -u root -p #屏幕提示输入暗码
结果如下图所示
https://i-blog.csdnimg.cn/blog_migrate/5b14e4ca861a9051ebfbf02143a438dd.png#pic_center
[*]创建spark数据库
mysql > CREATE database spark ;
结果如下图所示
https://i-blog.csdnimg.cn/blog_migrate/181f20d29c2c8feb57d84e46ac0fcfd6.png#pic_center
[*]创建person数据表
mysql > CREATE TABLE person (id INT(4) , NAME CHAR(20) , age INT(4));
结果如下图所示
https://i-blog.csdnimg.cn/blog_migrate/4a3e8542a97a60c36299591cd73f5717.png#pic_center
[*]插入数据
mysql > INSERT INTO person VALUE( 1 , ' zhangsan' , 18);
mysql > INSERT INTO person VALUE(2, ' lisi ' ,20);
mysql > SELECT * FROM person;
结果如下图所示
https://i-blog.csdnimg.cn/blog_migrate/44d6ffe9b2fe118820ba4a30d071d0c3.png#pic_center
数据库和数据表创建乐成后,如果想通过Spark SQL API方式访问MySQL数据库,必要在pom.xml设置文件中添加MySQL驱动毗连包,依赖参数如下。
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version> 8.0.30 </version>
</ dependency>
当所需依赖添加完毕后,就可以编写代码读取MySQL数据库中的数据,具体代码如文件所示:
SparkSqlToMysql.scala
package cn.itcast.sql
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
//需要MySQL连接驱动包
object DataFromMysql {
def main(args: Array): Unit = {
//1、创建sparkSession对象
val spark: SparkSession = SparkSession.builder()
.appName("DataFromMysql")
.master("local")
.getOrCreate()
//2、创建Properties对象,设置连接mysql的用户名和密码
val properties: Properties =new Properties()
properties.setProperty("user","root")
properties.setProperty("password","12345678")
//读取mysql中的数据
val mysqlDF : DataFrame = spark.read.jdbc("jdbc:mysql://192.168.121.128:3306/spark?useSSL=false","person",properties)
var str = ""
mysqlDF.collect().foreach(rdd => {
str = str+rdd.get(0).toString+":"+rdd.get(1).toString
val id = rdd
})
print(str)
mysqlDF.show()
spark.stop()
}
}
运行结果如下图所示
https://i-blog.csdnimg.cn/blog_migrate/a2502bf793e971ca5283e6f7e6027305.png#pic_center
2. 向MySQL数据库写入数据
Spark SQL不仅可以或许查询MySQL数据库中的数据,还可以向表中插入新的数据,实现方式的具体代码如文件所示。
SparkSgIToMysql.scala
package cn.itcast.sql
import java.util.Properties
import org.apache.calcite.avatica.ColumnMetaData.StructType
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StructField}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object SparkSqlToMysql01 {
def main(args: Array): Unit = {
//1.创建sparkSession对象
val spark: SparkSession = SparkSession.builder()
.appName("SparkSqlToMysql")
.master("local")
.getOrCreate()
//2.读取数据
//val data: RDD = spark.sparkContext.textFile("D://spark//student.txt")
val data: RDD = spark.sparkContext.parallelize(Array("3,wangwu,22","4,zhaoliu,26"))
//3.切分每一行,
val arrRDD: RDD] = data.map(_.split(","))
//4.RDD关联Student
val personRDD: RDD = arrRDD.map(x=>Student(x(0).toInt,x(1),x(2).toInt))
//导入隐式转换
import spark.implicits._
//5.将RDD转换成DataFrame
val personDF: DataFrame = personRDD.toDF()
//6.创建Properties对象,配置连接mysql的用户名和密码
val prop =new Properties()
prop.setProperty("user","root")
prop.setProperty("password","12345678")
prop.setProperty("driver","com.mysql.jdbc.Driver")
personDF.write.mode("append").jdbc("jdbc:mysql://192.168.121.128:3306/spark?useUnicode=true&characterEncoding=utf8","spark.person",prop)
personDF.show()
spark.stop()
}
}
运行结果如下图所示
https://i-blog.csdnimg.cn/blog_migrate/db004da4cda74f1c139396d98414be51.png#pic_center
查察mysql中的数据表
https://i-blog.csdnimg.cn/blog_migrate/f43caa1403bf5518fc884c37deba4a9f.png#pic_center
4.5.2 操作Hive数据集
Apache Hive是Hadoop上的SQL引擎,也是大数据系统中紧张的数据仓库工具,Spark SQL支持访问Hive数据仓库,然后在Spark引擎中举行统计分析。接下来先容通过Spark SQL操作Hive数据仓库的具体实现步骤。
[*]预备情况
Hive采用MySQL数据库存放Hive元数据,因此为了可以或许让Spark访问Hive,就必要将MySQL驱动包拷贝到Spark安装路径下的jars目录下,具体下令如下。
cp mysql-connector-java-5.1.32.jar /export/servers/spark/jars/
结果如下图所示
https://i-blog.csdnimg.cn/blog_migrate/cd2b64959fbfe6ddca5d81f29ce20864.png#pic_center
https://i-blog.csdnimg.cn/blog_migrate/2182f1f7fceaaf826b143cc81293a67f.png#pic_center
要将Spark SQL毗连到一个摆设好的Hive时,就必须要把hive
-site.xml设置文件复制到Spark的设置文件目录中,这里采用软毗连方式,具体下令如下。
ln -s /export/servers/apache-hive
-1.2.1-bin/conf/hive
-site.xml \ /export/servers/spark/conf/hive
-site.xml
结果如下图所示
https://i-blog.csdnimg.cn/blog_migrate/0884125239805ce265af09e4e8bae8d6.png#pic_center
[*]在Hive中创建数据库和表
接下来,我们首先在hadoop01节点上启动Hive服务,创建数据库和表,具体下令如下所示。
启动hive
步伐
hive
结果如下图所示
https://i-blog.csdnimg.cn/blog_migrate/d82e207d4d6615d424296a3d986cb7e9.png#pic_center
创建数据仓库
create database sparksqltest;
结果如下图所示
https://i-blog.csdnimg.cn/blog_migrate/b577a1fef416284115be575622766ff5.png#pic_center
创建数据表
create table if not exists sparksqltest.person(id int,name string,age int);
结果如下图所示
https://i-blog.csdnimg.cn/blog_migrate/dfb15719fe948b6d9c6fd488ef43b0be.png#pic_center
切换数据库
use sparksqltest;
结果如下图所示
https://i-blog.csdnimg.cn/blog_migrate/b02ee5662d0ba9b6121bb67cb8eb029b.png#pic_center
向数据表中添加数据
insert into person values(1, "tom",29);
insert into person values(2, "jerry",20);
结果如下图所示
https://i-blog.csdnimg.cn/blog_migrate/752527d29c12305479e3be27754f6283.png#pic_center
目前,我们创建乐成了person数据表,并在该表中插入了两条数据,下面克隆hadoop01会话窗口,执行Spark-Shell。
[*]Spark SQL操作Hive数据库
执行Spark-Shell,首先进入sparksqltest数据仓库,查察当前数据仓库中是否存在person表,具体代码如下所示。
spark-shell --master spark://hadoop01:7077
spark.sql("use sparksqltest")
spark.sql("show tables").show;
结果如下图所示
https://i-blog.csdnimg.cn/blog_migrate/b8101707c9db17626fe77a8f5a089ca4.png#pic_center
如果spark没有启动的话,必要启动一下
https://i-blog.csdnimg.cn/blog_migrate/89abb3fceb382ba1c9c8747dbc180664.png#pic_center
https://i-blog.csdnimg.cn/blog_migrate/6651ac8dc0a68246132e07d5f5623daa.png#pic_center
show()方法背面的()因为没有参数,是可以去掉的
https://i-blog.csdnimg.cn/blog_migrate/2aa6a5eb7aff451b097e05c49fbd8d5e.png#pic_center
从上述返回结果看出,当前Spark-Shell乐成显示出Hive数据仓库中的person表。
[*]向Hive表写入数据
在插入数据之前,首先查察当前表中数据,具体代码如下所示。
spark.sql("select * from person").show
https://i-blog.csdnimg.cn/blog_migrate/9700ff2daf0b75a866af1e9c88a16d6b.png#pic_center
从上述返回结果看出,当前person表中仅有两条数据信息。
下面在Spark-Shell中编写代码,添加两条数据到person表中,代码具体如下所示。
import java.util.Proerties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
#创建数据
val personRDD = spark.sparkContest.parallelize(Array("3 zhangsan 22", "4 lisi 29")).map(_.split(" "))
#设置personRDD的Schema
val schema = StructType(List(StructField("id", IntegerType, true), StructField("name", StringType,true),StructField("age", IntegerType,true)))
#创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = personRdd.map(p=>Row(p(0).toInt, p(1).trim, p(2).toInt))
#建立rowRDD与Schema对应关系,创建DataFrame
val personDF = spark.createDataFrame(rowRDD,schema)
#注册临时表
personDF.registerTempTable("t_person")
#将数据插入Hive表
spark.sql("insert into person select * from t_person")
#查询表数据
spark.sql("select * from person").show
https://i-blog.csdnimg.cn/blog_migrate/f815241fb57f0f2f16292b725fada873.png#pic_center
https://i-blog.csdnimg.cn/blog_migrate/a5e6dbf6acefd255416d006206f34aa5.png#pic_center
https://i-blog.csdnimg.cn/blog_migrate/90423c05d3a8a3069d53790193725c5d.png#pic_center
转载自:https://blog.csdn.net/u014727709/article/details/132515279
欢迎
页:
[1]