Spark使用Python开发和RDD

打印 上一主题 下一主题

主题 689|帖子 689|积分 2067

使用PySpark

配置python环境

在所有节点上按照python3,版本必须是python3.6及以上版本
  1. yum install -y python3
复制代码
修改所有节点的环境变量
  1. export JAVA_HOME=/usr/local/jdk1.8.0_251
  2. export PYSPARK_PYTHON=python3
  3. export HADOOP_HOME=/bigdata/hadoop-3.2.1
  4. export HADOOP_CONF_DIR=/bigdata/hadoop-3.2.1/etc/hadoop
  5. export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin
复制代码
使用pyspark shell
  1. /bigdata/spark-3.2.3-bin-hadoop3.2/bin/pyspark \
  2. --master spark://node-1.51doit.cn:7077 \
  3. --executor-memory 1g --total-executor-cores 10
复制代码
在pyspark shell使用python编写wordcount
  1. sc.textFile("hdfs://node-1.51doit.cn:8020/data/wc").flatMap(lambda line: line.split(' ')).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda t: t[1], False).saveAsTextFile('hdfs://node-1.51doit.cn:8020/out01')
复制代码
在pycharm中使用python编写wordcount
  1. from pyspark import SparkConf, SparkContext
  2. if __name__ == '__main__':
  3.     conf = SparkConf().setAppName('WordCount').setMaster('local[*]')
  4.     sc = SparkContext(conf=conf)
  5.     lines = sc.textFile('file:///Users/star/Desktop/data.txt')
  6.     words = lines.flatMap(lambda line: line.split(' '))
  7.     wordAndOne = words.map(lambda word: (word, 1))
  8.     reduced = wordAndOne.reduceByKey(lambda x, y: x + y)
  9.     result = reduced.sortBy(lambda t: t[1], False)
  10.     print(result.collect())
复制代码
RDD

RDD的全称为Resilient Distributed Dataset,是一个弹性、可复原的分布式数据集,是Spark中最基本的抽象,是一个不可变的、有多个分区的、可以并行计算的集合。RDD中并不装真正要计算的数据,而装的是描述信息,描述以后从哪里读取数据,调用了用什么方法,传入了什么函数,以及依赖关系等。
RDD的特点

•        有一系列连续的分区:分区编号从0开始,分区的数量决定了对应阶段Task的并行度
•        有一个函数作用在每个输入切片上或对应的分区上: 每一个分区都会生成一个Task,对该分区的数据进行计算,这个函数就是具体的计算逻辑
•        RDD和RDD之间存在一系列依赖关系:RDD调用Transformation后会生成一个新的RDD,子RDD会记录父RDD的依赖关系,包括宽依赖(有shuffle)和窄依赖(没有shuffle)
•        (可选的)K-V的RDD在Shuffle会有分区器,默认使用HashPartitioner
•        (可选的)如果从HDFS中读取数据,会有一个最优位置:spark在调度任务之前会读取NameNode的元数据信息,获取数据的位置,移动计算而不是移动数据,这样可以提高计算效率。

RDD的算子(方法)分类

•        Transformation:即转换算子,调用转换算子会生成一个新的RDD,Transformation是Lazy的,不会触发job执行。
•        Action:行动算子,调用行动算子会触发job执行,本质上是调用了sc.runJob方法,该方法从最后一个RDD,根据其依赖关系,从后往前,划分Stage,生成TaskSet。
创建RDD的方法

•        从HDFS指定的目录据创建RDD
  1. val lines: RDD[String] = sc.textFile("hdfs://node-1.51doit.cn:9000/log")
复制代码
•        通过并行化方式,将Driver端的集合转成RDD
  1. val rdd1: RDD[Int] = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))
复制代码
查看RDD的分区数量
  1. val rdd1: RDD[Int] = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))
  2. rdd1.partitions.length
复制代码
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x
回复

使用道具 举报

0 个回复

倒序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

圆咕噜咕噜

金牌会员
这个人很懒什么都没写!

标签云

快速回复 返回顶部 返回列表