from pyspark.sql import SparkSession
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
if __name__ == "__main__":
spark = SparkSession.builder \
.appName("MySQL JDBC Example") \
.config("spark.driver.extraClassPath", "/usr/local/spark/jars/mysql-connector-j-8.0.33.jar") \ #自己JDBC的路径,自行修改
.getOrCreate()
jdbcDF = spark.read.format("jdbc") \
.option("url", "jdbc:mysql://localhost:3306/sparktest") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("dbtable", "employee") \
.option("user", "root") \
.option("password", "自己MySQL密码") \
.load()
jdbcDF.filter(jdbcDF.age > 20).collect() # 检测是否连接成功
studentRDD = spark.sparkContext.parallelize(["3 Mary F 26", "4 Tom M 23"]).map(lambda line: line.split(" "))
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True)
])
rowRDD = studentRDD.map(lambda p: Row(int(p[0]), p[1].strip(), p[2].strip(), int(p[3])))
employeeDF = spark.createDataFrame(rowRDD, schema)
prop = {
'user': 'root',
'password': '自己MySQL密码',
'driver': "com.mysql.cj.jdbc.Driver"
}
employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest", 'employee', 'append', prop)
jdbcDF.collect()
jdbcDF.agg({"age": "max"}).show()
jdbcDF.agg({"age": "sum"}).show()
|