对于函数中次数的累加,我们寻常再认识不外,但在spark的rdd中,累加却有其独特的形式,让我们一起来探究一下。
现在我们可以先利用寻常较为常用的累加方案来对pyspark中的rdd进行累加,看看与寻常我们编写代码时的累加有何不同(这里利用pyspark来对数据进行处理)
- #coding:utf8
- import time
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setAppName("test").setMaster("local[*]")
- sc = SparkContext(conf=conf)
- rdd = sc.parallelize(([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
- count = 0
- def map_func(data):
- global count
- count += 1
- print(count)
- rdd.map(map_func).collect()
- print(count)
复制代码 此处可以看到,我们创建了一个函数,用来对count进行累加,在对rdd进行map操作时,操作一次累加一次,最后打印效果
这里可以看到我们是在线创建了一个数据集而且将这个数据进行了分区,将其作为了spark的rdd,由于分区为2,spark的driver给了两个线程来并行式处理数据,所以最后我们会看到效果是两个并列的累加12345。
但是汇总到最后打印输出为什么是0呢,理论上当地的count变量应该被赋予10的效果才符合我们的需求。
这里我们可以画一个图来表示:
可以看到我们确实是有将count=0传入每个分区线程中,但到最后并没有汇总到driver,所以driver当地的count依旧是0,没有被累加。
对于这个问题,spark提供了一个累加器供我们利用,以满足我们累加的需求,其原理类似于driver发送给分区线程的是count的内存指针而非只是一个形式参数,此时只要分区线程对count进行处理,其效果就会同步到driver中,如许就可以实现分布式的累加。
那么如许的话我们可以不用设置一个变量就进行累加,直接利用spark提供的累加器变量即可
- #coding:utf8
- import time
- from pyspark import SparkConf,SparkContext
- if __name__ == '__main__':
- conf = SparkConf().setAppName("test").setMaster("local[*]")
- sc = SparkContext(conf=conf)
- rdd = sc.parallelize(([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), 2)
- #spark提供的累加器变量,参数是初始值
- acmlt = sc.accumulator(0)
- def map_func(data):
- global acmlt
- acmlt += 1
- print(acmlt)
- rdd.map(map_func).collect()
- print(acmlt)
复制代码 这里我们将变量进行了修改,利用了spark内置提供的累加器变量,我们来看看效果
可以看到,累加成功。
注意事项:由于rdd的特性,在调用map之后会得到一个新的rdd2,然后再对该rdd2进行行动算子盘算后(如collect()),该rdd2便会被烧毁。如许问题就来了,rdd2这个中间变量被烧毁了,过后想要再利用这个rdd2,driver会沿着rdd2的血缘关系去溯源rdd2,那么就会先获取rdd1,再得来rdd2,此时累加过程会重复。故而在rdd失效的时候,假如重新构建rdd,必要注意其累加效果。
当然可以利用缓存对rdd进行保留,如许也可以解决上述问题(不外要注意在行动算子之前进行缓存)。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |