Spark RDD通过persist方法或cache方法可以将盘算效果的缓存,但是并不是这两个方法被调用时立刻缓存,而是触发后面的action时,该RDD才会被缓存在盘算节点的内存中并供后面重用。下面是persist方法或cache方法的函数定义:
- def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
- def cache(): this.type = persist()
复制代码 视频解说如下 【赵渝强老师】Spark RDD的缓存机制
通过函数的定义发现,cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark在object StorageLevel中定义了缓存的存储级别。下面是在StorageLevel中的定义的缓存级别。
- val NONE = new StorageLevel(false, false, false, false)
- val DISK_ONLY = new StorageLevel(true, false, false, false)
- val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
- val MEMORY_ONLY = new StorageLevel(false, true, false, true)
- val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
- val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
- val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
- val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
- val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
- val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
- val MEMORY_AND_DISK_SER_2=new StorageLevel(true, true, false, false, 2)
- valOFF_HEAP = new StorageLevel(true, true, true, false, 1)
复制代码 需要阐明的是,使用RDD的缓存机制,数据可能丢失;或者会由于内存的不敷而造成数据被删除。可以通过使用RDD的查抄点机制了包管缓存的容错,纵然缓存丢失了也能包管盘算的正确执行。
下面是使用RDD缓存机制的一个示例。这里使用RDD读取一个大的文件,该文件中包罗918843条记录。通过Spark Web Console可以对比出在不使用缓存和使用缓存时,执行效率的差异。
(1)读取一个大文件。
- scala> val rdd1 = sc.textFile("/root/temp/sales")
复制代码 (2)触发一个盘算,这里没有使用缓存。
(3)调用cache方法标识该RDD可以被缓存。
(4)第二次触发盘算,盘算完成后会将效果缓存。
(5)第三次触发盘算,这里会直接从之前的缓存中获取效果。
(6)访问Spark的Web Console观察这三次count盘算的执行时间,可以当作末了一次count盘算只耗费了98ms,如下图所示。
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。 |