PySpark 当地开发情况搭建与实践

打印 上一主题 下一主题

主题 910|帖子 910|积分 2730

Spark 的介绍与搭建:从理论到实践-CSDN博客
Spark 的Standalone集群情况安装与测试-CSDN博客
目录
一、PySpark 当地开发情况搭建
(一)Windows 当地 JDK 和 Hadoop 的安装
(二)Windows 安装 Anaconda
(三)Anaconda 中安装 PySpark
(四)Pycharm 中创建工程
二、编写代码
(一)编写情况变量的代码
(二)获取 SparkContext 对象
(三)将代码模板化
完整的模板:记得给模板起个名字pyspark_local_script
三、当地开发案例
(一)WordCount 案例
(二)使用正则办理特殊分隔符问题
(三)当地开发 - 读取 hdfs 上的数据
(四)当地开发 - 获取外部的变量
通报数据
参数设置界面
四、Spark 程序的监控
4040 界面的使用
五、local 和效果文件的数量问题
(一)local 模式并行度
(二)效果文件数量与 local 模式的关系
六、总结





        在大数据处理范畴,PySpark 作为一个强大的工具,为数据科学家和开发人员提供了便捷的方式来处理大规模数据。本文将详细介绍如安在 Windows 情况下搭建 PySpark 当地开发情况,并深入探究在这个情况下的代码编写、案例实践、程序监控以及一些常见问题的处理,帮助读者快速上手 PySpark 当地开发。

一、PySpark 当地开发情况搭建


(一)Windows 当地 JDK 和 Hadoop 的安装


        JDK(Java Development Kit)是 Java 开发的根本,而 Hadoop 是处理大数据的重要框架。在 Windows 上安装它们是后续搭建 PySpark 情况的第一步。安装过程必要注意选择符合的版本,并按照安装向导举行操作,确保安装路径等设置精确。
JDK安装设置教程_jdk64位安装-CSDN博客
Windows 体系安装 Hadoop 详细教程-CSDN博客

(二)Windows 安装 Anaconda


         Anaconda 是一个开源的 Python 发行版本,它包含了 conda、Python 等 180 多个科学包及其依赖项。右键以管理员身份运行安装程序,默认安装到了 ProgramData 文件夹(这是一个非空文件夹)。在安装过程中,要注意各种安装选项,确保安装顺遂举行。Anaconda 的安装为后续在其情况中安装 PySpark 等相关包提供了根本。
通过网盘分享Miniconda3的:Miniconda3-py38_4.11.0-Windows-x86_64.exe


(三)Anaconda 中安装 PySpark


         在下令提示符(cmd)中举行操作。在安装过程中,如果碰到必要输入 y 或者 n 的情况,输入 y。安装完成后,可以通过 conda list 或者 pip list 查抄是否包含 py4j 和 pyspark 两个包。PySpark 的安装路径在 $ANACONDA_HOME/Lib/site - packages。这里必要夸大的是,这现实上是在当地安装一个 Spark 软件,如果没有 Spark 情况,仅仅安装了 PySpark 是无法运行 Spark 代码的。


(四)Pycharm 中创建工程



  • 选择 Conda:在 Pycharm 中创建工程时,选择 Conda。直接点确定即可。因为 Anaconda 包含了 Python 而且可以安装各种情况,比如 pyspark,通过这种关联,Pycharm 可以使用 Anaconda 中的工具。

  • 办理辨认问题:如果 Anaconda 没有安装在 C 盘,可能会出现辨认不了的情况。此时必要手动选择。

  • 查抄安装包中是否有相关软件,并验证选择的解释器是否精确。

  • 创建文件夹,为后续代码编写做好准备。
   main :用于存放天天开发的一些代码文件
resources :用于存放程序中必要用到的设置文件
datas :用于存放天天用到的一些数据文件
test :用于存放测试时的一些代码文件
  


二、编写代码


(一)编写情况变量的代码


        情况变量的设置对于 PySpark 程序的运行至关重要。它确保程序可以或许找到所需的资源和设置。在代码中,要精确地设置与 Spark 相关的情况变量,包括 Spark 的安装路径、设置文件路径等。
  1. import os
  2. if __name__ == '__main__':
  3.     # 你自己的JDK路径
  4.     os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
  5.     # 配置Hadoop的路径,就是前面解压的那个路径
  6.     os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
  7.     # 配置base环境Python解析器的路径
  8.     os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  
  9.     # 配置base环境Python解析器的路径
  10.     os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
复制代码

(二)获取 SparkContext 对象


         SparkContext 是 Spark 中的焦点类,任何一个 Spark 的程序都必须包含一个 SparkContext 类的对象。通过获取这个对象,我们可以进一步构建和执行 Spark 任务。例如:
  1. import os
  2. # 导入pyspark模块
  3. from pyspark import SparkContext,SparkConf
  4. if __name__ == '__main__':
  5.         # 配置环境
  6.         os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
  7.         # 配置Hadoop的路径,就是前面解压的那个路径
  8.         os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
  9.         # 配置base环境Python解析器的路径
  10.         os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
  11.         os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
  12.         # 获取 conf 对象
  13.         conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")
  14.         # 假如我想设置压缩
  15.         # conf.set("spark.eventLog.compression.codec","snappy")
  16.         # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
  17.         sc = SparkContext(conf=conf)
  18.         print(sc)
  19.         # 使用完后,记得关闭
  20.         sc.stop()
复制代码

(三)将代码模板化


         创建一个名为 pyspark_local_script 的模板,并在此中添加必要的内容。模板化代码有助于提高代码的复用性和规范性。在模板中,可以将一些常用的代码结构和函数界说好,方便在不同的项目中使用。

完整的模板:记得给模板起个名字pyspark_local_script

  1. import os
  2. # 导入pyspark模块
  3. from pyspark import SparkContext,SparkConf
  4. """
  5. ------------------------------------------
  6.   Description : TODO:
  7.   SourceFile : ${NAME}
  8.   Author  : ${USER}
  9.   Date  : ${DATE}
  10. -------------------------------------------
  11. """
  12. if __name__ == '__main__':
  13.         # 配置环境
  14.         os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
  15.         # 配置Hadoop的路径,就是前面解压的那个路径
  16.         os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
  17.         # 配置base环境Python解析器的路径
  18.         os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
  19.         os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
  20.         # 获取 conf 对象
  21.         conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")
  22.         # 假如我想设置压缩
  23.         # conf.set("spark.eventLog.compression.codec","snappy")
  24.         # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
  25.         sc = SparkContext(conf=conf)
  26.         print(sc)
  27.         # 使用完后,记得关闭
  28.         sc.stop()
复制代码
模板的使用:



三、当地开发案例


(一)WordCount 案例


代码编写:这是一个经典的大数据处理案例。通过读取文本文件,将此中的单词举行拆分、计数。代码实现如下:

  1. import os
  2. # 导入pyspark模块
  3. from pyspark import SparkContext, SparkConf
  4. if __name__ == '__main__':
  5.     # 配置环境
  6.     os.environ['JAVA_HOME'] = 'C:/Program Files/java/jdk1.8.0_181'
  7.     # 配置Hadoop的路径,就是前面解压的那个路径
  8.     os.environ['HADOOP_HOME'] = 'D:/Linux/hadoop/hadoop-3.3.1'
  9.     # 配置base环境Python解析器的路径
  10.     os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
  11.     os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
  12.     # 获取 conf 对象
  13.     # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
  14.     #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
  15.     #  appName 任务的名字
  16.     conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")
  17.     # 假如我想设置压缩
  18.     # conf.set("spark.eventLog.compression.codec","snappy")
  19.     # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
  20.     sc = SparkContext(conf=conf)
  21.     print(sc)
  22.     fileRdd = sc.textFile("../../datas/WordCount/data.txt")
  23.     rsRdd = fileRdd.filter(lambda x: len(x) > 0) \
  24.         .flatMap(lambda line: line.strip().split()) \
  25.         .map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
  26.     rsRdd.saveAsTextFile("../datas/WordCount/result2")
  27.     # 使用完后,记得关闭
  28.     sc.stop()
  29.     # 注意,在cmd窗口中执行spark-submit命令时,需要将以下路径添加到环境变量中
复制代码
建议安装一个工具psutil,如果不想看到就在 Python解释器中安装:pip install psutil
查看运行效果:运行代码后,可以在指定的输出路径中查看效果文件。效果文件中包含了每个单词及其出现的次数。

常见的其他错误:






(二)使用正则办理特殊分隔符问题


        在现实数据处理中,可能会碰到特殊的分隔符。这时可以使用正则表达式来改造 WordCount 代码。例如,如果数据是用特定的非空格字符分隔的,可以通过修改 flatMap 函数中的分隔逻辑,使用正则表达式来精确拆分单词。

  1. import os
  2. import re
  3. # 导入pyspark模块
  4. from pyspark import SparkContext, SparkConf
  5. if __name__ == '__main__':
  6. # 配置环境
  7. os.environ['JAVA_HOME'] = 'C:/Program Files/java/jdk1.8.0_181'
  8. # 配置Hadoop的路径,就是前面解压的那个路径
  9. os.environ['HADOOP_HOME'] = 'D:/Linux/hadoop/hadoop-3.3.1'
  10. # 配置base环境Python解析器的路径
  11. os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
  12. os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
  13. # 获取 conf 对象
  14. # setMaster 按照什么模式运行,local bigdata01:7077 yarn
  15. # local[2] 使用2核CPU * 你本地资源有多少核就用多少核
  16. # appName 任务的名字
  17. conf = SparkConf().setMaster("local[*]").setAppName("")
  18. # 假如我想设置压缩
  19. # conf.set("spark.eventLog.compression.codec","snappy")
  20. # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
  21. sc = SparkContext(conf=conf)
  22. print(sc)
  23. fileRdd = sc.textFile("../../datas/WordCount/data.txt")
  24. rsRdd = fileRdd.filter(lambda x: len(x) > 0) \
  25. .flatMap(lambda line: re.split("\\s+", line.strip())) \
  26. .map(lambda word: (word, 1)) \
  27. .reduceByKey(lambda sum, num: sum+num)
  28. rsRdd.saveAsTextFile("../datas/WordCount/result3")
  29. # 使用完后,记得关闭
  30. sc.stop()
复制代码


(三)当地开发 - 读取 hdfs 上的数据


        在 Windows 情况下,用户通常没有权限访问 hdfs 文件体系。这必要举行一些额外的设置,比如设置 Hadoop 的相关权限,或者通过一些署理工具来实现访问。在代码中,要精确设置 Hadoop 的设置参数,以确保可以或许读取 hdfs 上的数据。
  1.         fileRdd = sc.textFile("hdfs://bigdata01:9820/spark/wordcount/input/*")
  2.         rsRdd = fileRdd.filter(lambda line: len(line.strip()) > 0 ).flatMap(lambda line: re.split("\\s+",line.strip())).map(lambda word: (word,1)).reduceByKey(lambda sum,num : sum+num)
  3.         rsRdd.saveAsTextFile("hdfs://bigdata01:9820/spark/wordcount/output4")
复制代码

以上这个阐明,windows用户没有权限访问hdfs文件体系
   # 申明当前以root用户的身份来执行操作
os.environ['HADOOP_USER_NAME'] = 'root'
  完整代码
  1. import os
  2. import re
  3. # 导入pyspark模块
  4. from pyspark import SparkContext, SparkConf
  5. if __name__ == '__main__':
  6.     # 配置环境
  7.     os.environ['JAVA_HOME'] = 'C:/Program Files/java/jdk1.8.0_181'
  8.     # 配置Hadoop的路径,就是前面解压的那个路径
  9.     os.environ['HADOOP_HOME'] = 'D:/Linux/hadoop/hadoop-3.3.1'
  10.     # 配置base环境Python解析器的路径
  11.     os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
  12.     os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
  13.     # 申明当前以root用户的身份来执行操作
  14.     os.environ['HADOOP_USER_NAME'] = 'root'
  15.     conf = SparkConf().setMaster("local[*]").setAppName("")
  16.     sc = SparkContext(conf=conf)
  17.     fileRdd = sc.textFile("hdfs://bigdata01:9820/spark/wordcount/input/*")
  18.     rsRdd = fileRdd.filter(lambda line: len(line.strip()) > 0) \
  19.         .flatMap(lambda line: re.split("\\s+", line.strip())) \
  20.         .map(lambda word: (word, 1))\
  21.         .reduceByKey(lambda sum, num: sum + num)
  22.     rsRdd.saveAsTextFile("hdfs://bigdata01:9820/spark/wordcount/output4")
  23.     # 使用完后,记得关闭
  24.     sc.stop()
复制代码
运行


(四)当地开发 - 获取外部的变量



        雷同于 Java 中的 String[] args,在 PySpark 中也可以获取外部变量。可以通过下令行参数通报的方式来实现。例如,在运行 pyspark 脚本时,可以使用 spark - sumit xxxxx.py 参数 1, 参数 2 的形式通报参数。在代码中,必要对这些参数举行剖析和使用。
  1. import os
  2. import re
  3. import sys
  4. # 导入pyspark模块
  5. from pyspark import SparkContext, SparkConf
  6. if __name__ == '__main__':
  7.     # 配置环境
  8.     os.environ['JAVA_HOME'] = 'C:/Program Files/java/jdk1.8.0_181'
  9.     # 配置Hadoop的路径,就是前面解压的那个路径
  10.     os.environ['HADOOP_HOME'] = 'D:/Linux/hadoop/hadoop-3.3.1'
  11.     # 配置base环境Python解析器的路径
  12.     os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
  13.     os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
  14.     # 申明当前以root用户的身份来执行操作
  15.     os.environ['HADOOP_USER_NAME'] = 'root'
  16.     # 获取 conf 对象
  17.     conf = SparkConf().setMaster("local[*]").setAppName("")
  18.     # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
  19.     sc = SparkContext(conf=conf)
  20.     print(sc)
  21.     # 获取第一个参数
  22.     fileRdd = sc.textFile(sys.argv[1])
  23.     rsRdd = fileRdd.filter(lambda line: len(line.strip()) > 0) \
  24.         .flatMap(lambda line: re.split("\\s+", line.strip())) \
  25.         .map(lambda word: (word, 1)) \
  26.         .reduceByKey(lambda sum, num: sum + num)
  27.     # 获取第二个参数
  28.     rsRdd.saveAsTextFile(sys.argv[2])
  29.     # 使用完后,记得关闭
  30.     sc.stop()
复制代码
通报数据

   参数一: hdfs://bigdata01:9820/spark/wordcount/input/*
参数二: hdfs://bigdata01:9820/spark/wordcount/output4
  参数设置界面



四、Spark 程序的监控


4040 界面的使用


        因为是当地的程序,以是可以通过访问地址 http://localhost:4040 来监控程序。每个 Spark 程序都有一个对应的 4040 界面。这个界面提供了丰富的信息:


  • RDD 相关信息:每个斑点表现一个 RDD,每个矩形框中的 RDD 的转换都是在内存中完成的,曲线代表经过了 Shuffle,灰色代表没有执行(因为之前执行过)。

  • 进程信息:显示当前这个程序的运行进程的信息。每个 Spark 程序都由两种进程构成:一个 Driver 和多个 Executors。Driver 进程负责剖析程序,构建 DAG 图,构建 Stage,构建、调度、监控 Task 任务的运行;Executor 进程负责运行程序中的所有 Task 任务。

  • 存储信息:Storage 部分显示当前这个程序在内存缓存的数据信息。
  • 设置信息:Environment 显示当前这个程序所有的设置信息。

五、local 和效果文件的数量问题


(一)local 模式并行度



  • local:使用当地模式,并行度是 1。
  • local[3]:使用当地模式,并行度是 3,这个并行度最好和 CPU 的核数一致,一般并行度 <= CPU 的核数。
  • local
  • :并行度 = CPU 的核数。

(二)效果文件数量与 local 模式的关系


        文件的效果经常是 2 个文件,这跟分区数有关系,跟 local = N 也有肯定的关系。其规律是 min(N,2),例如如果是 local [1],最后的文件数量就是 1。
        如果在 local 模式下,想要效果文件是 10,可以使用 sc.textFile("../datas/wordcount/data.txt",10) 的方式来设置分区数。

六、总结


        本文详细介绍了 PySpark 当地开发情况的搭建过程,包括 JDK、Hadoop、Anaconda、PySpark 的安装以及 Pycharm 工程的创建。同时,深入讲解了代码编写、当地开发案例(如 WordCount、处理特殊分隔符、读取 hdfs 数据、获取外部变量)、Spark 程序的监控和 local 模式下效果文件数量问题等内容。通过掌握这些知识和技能,读者可以在 Windows 当地情况中高效地举行 PySpark 开发,处理大规模数据,办理现实业务中的数据分析和处理问题。渴望本文能为读者在 PySpark 学习和实践的道路上提供有力的帮助,让读者可以或许更好地使用这个强大的工具来发掘数据的代价。


免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

tsx81429

金牌会员
这个人很懒什么都没写!
快速回复 返回顶部 返回列表