from pyspark import SparkConf
from pyspark.sql import SparkSession,Row
from pyspark.sql.types import *
import os
os.environ["JAVA_HOME"]="/home/spark021/servers/jdk"
os.environ["YSPARK_PYTHON"]="/home/spark021/servers/anaconda3/envs/pyspark/bin/python3.6"
os.environ["YSPARK_DRIVER_PYTHON"]="/home/spark021/servers/anaconda3/envs/pyspark/bin/python3.6"
spark = SparkSession.builder.appName("RDD转换成DataFrame").config(conf=SparkConf()).getOrCreate()
*#**配置Spark应用程序
* employee021 = spark.sparkContext.\
textFile("file:///home/spark021/data/employee021.txt").\
map(lambda line: line.split(",")).\
map(lambda x: Row(id=x[0],name=x[1],age=x[1]))
employee021.foreach(print)
*#创建DataFrame(具有模式) * schemaPeople = spark.createDataFrame(employee021) *#注册为暂时表,才可以通过sql查询
* schemaPeople.createOrReplaceTempView("people")
peopleDF = spark.sql("select * from people ")
peopleDF.show()
peopleRDD = peopleDF.rdd.map(lambda p: "id"+":"+p.id+","+"name"+":"+p.name+","+"age"+":"+p.age)
peopleRDD.foreach(print)
3.编码实现利用DataFrame读写Mysql的数据
(1)创建数据库
创建表
create table employee021 (id int(4), name char(20), gender char(4), age int(4));
插入数据
insert into employee021 values(1,'Alice','F',22);
insert into employee021 values(2,'John','M',25);
查看表
select * from employee021;
(2)进入saprk,配置 Spark通过 JDBC 连按数据库 MySQL,编程实现利用 DataFrame 插入数据到 MySQL 中,末了打印出 age 的最大值和 age 的总和。
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import os
os.environ["JAVA_HOME"]="/home/spark021/servers/jdk"
os.environ["YSPARK_PYTHON"]="/home/spark021/servers/anaconda3/envs/pyspark/bin/python3.6"
os.environ["YSPARK_DRIVER_PYTHON"]="/home/spark021/servers/anaconda3/envs/pyspark/bin/python3.6"