IT评测·应用市场-qidao123.com

标题: 【PySpark】Python 中进行大规模数据处置惩罚和分析 [打印本页]

作者: 不到断气不罢休    时间: 2024-7-12 07:25
标题: 【PySpark】Python 中进行大规模数据处置惩罚和分析
一、前言先容
二、基础预备
三、数据输入
四、数据盘算
五、数据输出
六、分布式集群运行
一、前言先容

Spark概述

Apache Spark 是一个开源的大数据处置惩罚框架,提供了高效、通用、分布式的大规模数据处置惩罚本领。Spark 的主要特点包括:
Spark 的核心概念包括:

总体而言,Spark 是一个灵活、强大且易于使用的大数据处置惩罚框架,实用于各种规模的数据处置惩罚和分析任务。
PySpark概述

PySpark 是 Apache Spark 的 Python API,用于在 Python 中进行大规模数据处置惩罚和分析。Spark 是一个用于快速、通用、分布式盘算的开源集群盘算系统,而 PySpark 则是 Spark 的 Python 版本。
以下是使用 PySpark 进行基本操作的扼要步调:
以上是一个简朴的 PySpark 示例。实际应用中,可以根据具体需求使用更多功能,比方连接差别数据源、使用呆板学习库(MLlib)进行呆板学习任务等。 PySpark 提供了强大的工具和库,实用于大规模数据处置惩罚和分析的场景。
   Spark作为全球顶级的分布式盘算框架,支持浩繁的编程语言进行开发。
而Python语言,则是Spark重点支持的方向。
  二、基础预备

1、PySpark库的安装

同其它的Python第三方库一样,PySpark同样可以使用pip程序进行安装。
在”CMD”命令提示符程序内,输入:
  1. pip install pyspark
复制代码
或使用国内署理镜像网站(清华大学源)
  1. pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
复制代码
2、PySpark实行环境入口对象的构建

想要使用PySpark库完成数据处置惩罚,首先需要构建一个实行环境入口对象。
PySpark的实行环境入口对象是:类 SparkContext 的类对象
  1. """
  2. 演示获取PySpark的执行环境入库对象:SparkContext
  3. 并通过SparkContext对象获取当前PySpark的版本
  4. """
  5. # 导包
  6. from pyspark import SparkConf, SparkContext
  7. # 创建SparkConf类对象
  8. conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
  9. # 基于SparkConf类对象创建SparkContext对象
  10. sc = SparkContext(conf=conf)
  11. # 打印PySpark的运行版本
  12. print(sc.version)
  13. # 停止SparkContext对象的运行(停止PySpark程序)
  14. sc.stop()
复制代码
3、PySpark的编程模子



总结

三、数据输入

RDD对象

如图可见,PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象
RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)
PySpark针对数据的处置惩罚,都是以RDD对象作为载体,即:

PySpark数据输入的2种方法

Python数据容器转RDD对象


读取文件转RDD对象


总结


四、数据盘算

1、map方法

PySpark的数据盘算,都是基于RDD对象来进行的,那么如何进行呢?
天然是依靠,RDD对象内置丰富的:成员方法(算子)

  1. """
  2. 演示RDD的map成员方法的使用
  3. """
  4. from pyspark import SparkConf, SparkContext
  5. import os
  6. os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
  7. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  8. sc = SparkContext(conf=conf)
  9. # 准备一个RDD
  10. rdd = sc.parallelize([1, 2, 3, 4, 5])
  11. # 通过map方法将全部数据都乘以10
  12. # def func(data):
  13. #     return data * 10
  14. rdd2 = rdd.map(lambda x: x * 10).map(lambda x: x + 5)
  15. print(rdd2.collect())
  16. # (T) -> U
  17. # (T) -> T
  18. # 链式调用
复制代码

总结

2、flatMap方法


  1. """
  2. 演示RDD的flatMap成员方法的使用
  3. """
  4. from pyspark import SparkConf, SparkContext
  5. import os
  6. os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
  7. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  8. sc = SparkContext(conf=conf)
  9. # 准备一个RDD
  10. rdd = sc.parallelize(["itheima itcast 666", "itheima itheima itcast", "python itheima"])
  11. # 需求,将RDD数据里面的一个个单词提取出来
  12. rdd2 = rdd.flatMap(lambda x: x.split(" "))
  13. print(rdd2.collect())
复制代码
总结

flatMap算子

3、reduceByKey方法


  1. """
  2. 演示RDD的reduceByKey成员方法的使用
  3. """
  4. from pyspark import SparkConf, SparkContext
  5. import os
  6. os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
  7. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  8. sc = SparkContext(conf=conf)
  9. # 准备一个RDD
  10. rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 99), ('女', 66)])
  11. # 求男生和女生两个组的成绩之和
  12. rdd2 = rdd.reduceByKey(lambda a, b: a + b)
  13. print(rdd2.collect())
复制代码
总结

reduceByKey算子
担当一个处置惩罚函数,对数据进行两两盘算

WordCount案例

  1. """
  2. 完成练习案例:单词计数统计
  3. """
  4. # 1. 构建执行环境入口对象
  5. from pyspark import SparkContext, SparkConf
  6. import os
  7. os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
  8. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  9. sc = SparkContext(conf=conf)
  10. # 2. 读取数据文件
  11. rdd = sc.textFile("D:/hello.txt")
  12. # 3. 取出全部单词
  13. word_rdd = rdd.flatMap(lambda x: x.split(" "))
  14. # 4. 将所有单词都转换成二元元组,单词为Key,value设置为1
  15. word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
  16. # 5. 分组并求和
  17. result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
  18. # 6. 打印输出结果
  19. print(result_rdd.collect())
复制代码
4、filter方法


  1. """
  2. 演示RDD的filter成员方法的使用
  3. """
  4. from pyspark import SparkConf, SparkContext
  5. import os
  6. os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
  7. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  8. sc = SparkContext(conf=conf)
  9. # 准备一个RDD
  10. rdd = sc.parallelize([1, 2, 3, 4, 5])
  11. # 对RDD的数据进行过滤
  12. rdd2 = rdd.filter(lambda num: num % 2 == 0)
  13. print(rdd2.collect())
复制代码
总结

filter算子

5、distinct方法


  1. """
  2. 演示RDD的distinct成员方法的使用
  3. """
  4. from pyspark import SparkConf, SparkContext
  5. import os
  6. os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
  7. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  8. sc = SparkContext(conf=conf)
  9. # 准备一个RDD
  10. rdd = sc.parallelize([1, 1, 3, 3, 5, 5, 7, 8, 8, 9, 10])
  11. # 对RDD的数据进行去重
  12. rdd2 = rdd.distinct()
  13. print(rdd2.collect())
复制代码
总结

distinct算子
完成对RDD内数据的去重操作
6、sortBy方法


  1. """
  2. 演示RDD的sortBy成员方法的使用
  3. """
  4. from pyspark import SparkConf, SparkContext
  5. import os
  6. os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
  7. conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
  8. sc = SparkContext(conf=conf)
  9. # 1. 读取数据文件
  10. rdd = sc.textFile("D:/hello.txt")
  11. # 2. 取出全部单词
  12. word_rdd = rdd.flatMap(lambda x: x.split(" "))
  13. # 3. 将所有单词都转换成二元元组,单词为Key,value设置为1
  14. word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
  15. # 4. 分组并求和
  16. result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
  17. # 5. 对结果进行排序
  18. final_rdd = result_rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=1)
  19. print(final_rdd.collect())
复制代码
总结

sortBy算子

五、数据输出

1、输出为Python对象

将RDD的结果输出为Python对象的各类方法

collect方法


reduce方法


take方法


count方法


总结

2、输出到文件中

将RDD的内容输出到文件中

saveAsTextFile方法


注意事项

调用生存文件的算子,需要配置Hadoop依靠

更改RDD的分区数为1


总结

六、分布式集群运行

在 Spark 中,分布式集群运行是其强大性能的表现。下面是使用 Spark 进行分布式集群运行的基本步调:
这些步调涵盖了在分布式集群上运行 Spark 应用程序的基本流程。确保配置精确、节点正常连接,以及应用程序能够充实使用集群中的盘算资源。 Spark 提供了灵活的配置选项,可以根据具体的集群规模和需求进行调整。
将案例提交到YARN集群中运行

提交命令:
  1. bin/spark-submit --master yarn --num-executors 3 --queue root.teach --executor-cores 4 --executor-memory 4g /home/hadoop/demo.py
复制代码
上面的 Spark 提交命令已经包括了提交到 YARN 集群的必要参数。
以下是命令的表明:
  1. bin/spark-submit
  2. --master yarn                # 指定 Spark 的主节点为 YARN
  3. --num-executors 3             # 指定执行器的数量
  4. --queue root.teach            # 指定 YARN 队列
  5. --executor-cores 4            # 指定每个执行器的核心数
  6. --executor-memory 4g          # 指定每个执行器的内存大小
  7. /home/hadoop/demo.py          # 提交的 Spark 应用程序的路径
复制代码
表明一下每个参数的作用:

请确保在提交之前,Spark 相关的配置精确,并且 YARN 集群正常运行。如果有额外的依靠项,确保它们在集群中的每个节点上都可用。
代码

  1. """
  2. 演示PySpark综合案例
  3. """
  4. from pyspark import SparkConf, SparkContext
  5. import os
  6. os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python'
  7. os.environ['HADOOP_HOME'] = "/export/server/hadoop-3.3.1"
  8. conf = SparkConf().setAppName("spark_cluster")
  9. conf.set("spark.default.parallelism", "24")
  10. sc = SparkContext(conf=conf)
  11. # 读取文件转换成RDD
  12. file_rdd = sc.textFile("hdfs://m1:8020/data/search_log.txt")
  13. # TODO 需求1: 热门搜索时间段Top3(小时精度)
  14. # 1.1 取出全部的时间并转换为小时
  15. # 1.2 转换为(小时, 1) 的二元元组
  16. # 1.3 Key分组聚合Value
  17. # 1.4 排序(降序)
  18. # 1.5 取前3
  19. result1 = file_rdd.map(lambda x: (x.split("\t")[0][:2], 1)).\
  20.     reduceByKey(lambda a, b: a + b).\
  21.     sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
  22.     take(3)
  23. print("需求1的结果:", result1)
  24. # TODO 需求2: 热门搜索词Top3
  25. # 2.1 取出全部的搜索词
  26. # 2.2 (词, 1) 二元元组
  27. # 2.3 分组聚合
  28. # 2.4 排序
  29. # 2.5 Top3
  30. result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\
  31.     reduceByKey(lambda a, b: a + b).\
  32.     sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
  33.     take(3)
  34. print("需求2的结果:", result2)
  35. # TODO 需求3: 统计黑马程序员关键字在什么时段被搜索的最多
  36. # 3.1 过滤内容,只保留黑马程序员关键词
  37. # 3.2 转换为(小时, 1) 的二元元组
  38. # 3.3 Key分组聚合Value
  39. # 3.4 排序(降序)
  40. # 3.5 取前1
  41. result3 = file_rdd.map(lambda x: x.split("\t")).\
  42.     filter(lambda x: x[2] == '黑马程序员').\
  43.     map(lambda x: (x[0][:2], 1)).\
  44.     reduceByKey(lambda a, b: a + b).\
  45.     sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
  46.     take(1)
  47. print("需求3的结果:", result3)
  48. # TODO 需求4: 将数据转换为JSON格式,写出到文件中
  49. # 4.1 转换为JSON格式的RDD
  50. # 4.2 写出为文件
  51. file_rdd.map(lambda x: x.split("\t")).\
  52.     map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\
  53.     saveAsTextFile("hdfs://m1:8020/output/output_json")
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。




欢迎光临 IT评测·应用市场-qidao123.com (https://dis.qidao123.com/) Powered by Discuz! X3.4