Spark之PySpark

打印 上一主题 下一主题

主题 1516|帖子 1516|积分 4548

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
PySpark 是 Apache Spark 的 Python API,它允许开发者使用 Python 编程语言进行大规模数据处理和分析。Apache Spark 是一个快速、通用、可扩展的大数据处理引擎,支持批处理、流处理、呆板学习、图计算等多种数据处理模式。PySpark 使得 Python 开发者可以或许利用 Spark 强大的分布式计算能力,处理大数据集,并执行高效的并行计算。

一、PySpark 焦点概念

1. RDD(弹性分布式数据集)



  • 定义:RDD 是 Spark 的焦点抽象,表示一个不可变的分布式数据集合。它是一个可并行操纵的对象,可以在集群的不同节点上进行计算。
  • 特性

    • 可以通过转换操纵(如 map、filter)生成新的 RDD。
    • 支持耽误计算(懒加载),即只有在执行行动操纵(如 collect、count)时,Spark 才会真正计算数据。
    • 可以长期化,存储到内存或磁盘,以提高性能。

2. DataFrame



  • 定义:DataFrame 是一个分布式数据集,它在 RDD 之上增加了结构化的数据表示,类似于关系型数据库中的表格。它具有行和列,并支持 SQL 查询。
  • 特性

    • 支持多种数据源(如 CSV、JSON、Parquet、Hive)。
    • 提供了丰富的 API,支持通过 SQL、DataFrame 和 Dataset 操纵进行数据处理。
    • 提供了自动优化的执行计划(Catalyst 查询优化器)。

3. SparkSession



  • 定义:SparkSession 是 Spark 2.x 引入的一个新特性,它是全部 Spark 功能的入口点。它取代了之前的 SQLContext 和 HiveContext。
  • 功能

    • 用于创建 DataFrame 和 SQL 查询。
    • 可以访问 Spark 的各种功能,如批处理、流处理、呆板学习等。

4. SparkContext



  • 定义:SparkContext 是连接 Spark 集群的入口点。它用于创建 RDD、广播变量、累加器等。
  • 功能

    • 管理集群资源和任务调度。
    • 连接 Spark 集群并处理各种作业。


二、PySpark 的基本用法

1. 初始化 SparkSession

在 PySpark 中,首先需要创建一个 SparkSession 对象,这是全部 PySpark 步调的起点。
  1. from pyspark.sql import SparkSession
  2. spark = SparkSession.builder \
  3.     .appName("PySpark Example") \
  4.     .getOrCreate()
复制代码
2. 创建 RDD

创建 RDD 可以通过以下方式:


  • 从现有的 Python 集合创建:
    1. rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])
    复制代码
  • 通过读取外部数据源(如文件):
    1. rdd = spark.sparkContext.textFile("data.txt")
    复制代码
3. 创建 DataFrame

可以从 Python 数据结构(如 Pandas DataFrame 或字典)创建 DataFrame。
  1. data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
  2. df = spark.createDataFrame(data, ["name", "value"])
  3. df.show()
复制代码
4. 使用 DataFrame API 进行查询



  • 选择列
    1. df.select("name").show()
    复制代码
  • 过滤数据
    1. df.filter(df.value > 1).show()
    复制代码
  • 聚合操纵
    1. df.groupBy("value").count().show()
    复制代码
5. SQL 查询

Spark 支持 SQL 查询,可以通过注册 DataFrame 为临时视图,并执行 SQL 查询。
  1. df.createOrReplaceTempView("people")
  2. result = spark.sql("SELECT name, value FROM people WHERE value > 1")
  3. result.show()
复制代码

三、PySpark 中的常用操纵

1. 转换操纵(Transformations)

转换操纵会返回一个新的 RDD 或 DataFrame,而不会修改原有数据,常见的转换操纵包括:


  • map:对每个元素执行操纵并返回一个新的 RDD。
    1. rdd.map(lambda x: x * 2).collect()
    复制代码
  • filter:根据条件过滤元素。
    1. rdd.filter(lambda x: x > 2).collect()
    复制代码
  • flatMap:类似于 map,但每个输入元素可以映射到多个输出元素。
    1. rdd.flatMap(lambda x: (x, x*2)).collect()
    复制代码
2. 行动操纵(Actions)

行动操纵会触发计算并返回终极结果,常见的行动操纵包括:


  • collect:将 RDD 中的数据收集到驱动步调中。
    1. rdd.collect()
    复制代码
  • count:返回 RDD 中元素的数量。
    1. rdd.count()
    复制代码
  • reduce:对 RDD 中的元素进行聚合。
    1. rdd.reduce(lambda x, y: x + y)
    复制代码
3. 数据长期化

假如需要多次操纵某个 RDD,可以将其长期化到内存或磁盘,以提高性能:
  1. rdd.cache()  # 将数据缓存到内存中
复制代码

四、PySpark 在大数据处理中的应用

1. 批处理

PySpark 可以处理大规模的批量数据。通过将数据划分成多个分区,Spark 可以并行处理每个分区上的数据。
2. 流处理

PySpark 支持流数据处理,可以处理实时数据流。使用 Structured Streaming API,开发者可以通过类似 SQL 的语法进行实时数据分析。
  1. from pyspark.sql.functions import *
  2. # 读取实时数据流
  3. streaming_df = spark.readStream.format("csv").option("header", "true").load("streaming_data/")
  4. # 执行查询并输出结果
  5. query = streaming_df.select("name", "value").writeStream.outputMode("append").format("console").start()
  6. query.awaitTermination()
复制代码
3. 呆板学习

PySpark 提供了 MLlib 和 ML 两个库,支持常见的呆板学习算法,如回归、分类、聚类等。
  1. from pyspark.ml.classification import LogisticRegression
  2. from pyspark.ml.feature import VectorAssembler
  3. # 创建特征列
  4. assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
  5. training_data = assembler.transform(df)
  6. # 创建模型并训练
  7. lr = LogisticRegression(featuresCol="features", labelCol="label")
  8. model = lr.fit(training_data)
复制代码

五、PySpark 性能优化

1. 数据分区

得本地调整 RDD 或 DataFrame 的分区数量可以提高性能。可以通过 repartition 或 coalesce 来调整分区数量。
  1. rdd = rdd.repartition(4)
复制代码
2. 长期化战略

对于需要多次操纵的数据,使用 cache() 或 persist() 将数据存储在内存中,以淘汰计算开销。
3. 广播变量

在全部工作节点上共享只读数据时,可以使用广播变量。这样可以淘汰重复的数据传输。
  1. broadcast_var = spark.sparkContext.broadcast([1, 2, 3])
复制代码

六、PySpark 与 Hadoop 和 Spark 的关系



  • 与 Hadoop 的关系:PySpark 可与 Hadoop 共同使用,尤其是通过 HDFS 存储和 YARN 进行资源管理。Spark 提供了比 Hadoop MapReduce 更高效的数据处理能力。
  • 与 Spark 的关系:PySpark 是 Apache Spark 的 Python API,可以或许使用 Spark 提供的全部计算能力,但用 Python 语言进行开发

总结

PySpark 是一个强大的工具,可以或许资助 Python 开发者轻松地处理大数据。通过其简洁的 API,PySpark 使得分布式计算更加易于上手。无论是批处理、流处理照旧呆板学习,PySpark 都可以高效地完成任务,特殊适合在大数据环境中进行数据分析和实时处理。

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

灌篮少年

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表