Spark缓存

打印 上一主题 下一主题

主题 1799|帖子 1799|积分 5397

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?立即注册

x
在 Spark 中,缓存(Caching)是一种优化技术,用于将中间计算效果存储在内存或磁盘中,制止重复计算,从而明显提升迭代计算或交互式查询的性能。以下是关于 Spark 缓存的详细介绍:
一、为什么必要缓存?

Spark 的 RDD 操作是惰性的,每次触发举措算子(如collect()、count())时都会重新计算整个 DAG(有向无环图)。对于必要多次使用的 RDD(如迭代算法或交互式查询),这种重复计算会造成极大的资源浪费。缓存可以将中间效果持久化,制止重复计算。
二、缓存的根本用法

Spark 提供了两种缓存方法:


  • cache():默认将 RDD 存储在内存中(等同于persist(StorageLevel.MEMORY_ONLY))。
  • persist(StorageLevel):可指定存储级别(如内存、磁盘、序列化等)。
示例:缓存 RDD

python
运行
  1. from pyspark import SparkContext
  2. sc = SparkContext("local", "CacheExample")
  3. # 创建RDD
  4. rdd = sc.textFile("hdfs://path/to/large/file.txt")
  5. # 缓存RDD
  6. rdd.cache()  # 等同于 rdd.persist(StorageLevel.MEMORY_ONLY)
  7. # 第一次行动操作:触发计算并缓存结果
  8. count1 = rdd.count()  # 第一次计算,耗时较长
  9. # 第二次行动操作:直接使用缓存结果
  10. count2 = rdd.count()  # 直接从缓存读取,耗时极短
复制代码
三、存储级别(StorageLevel)

Spark 支持多种存储级别,可根据数据规模和内存情况选择:

存储级别说明MEMORY_ONLY默认级别,将 RDD 作为反序列化的 Java 对象存储在内存中。内存不足时部分数据会被抛弃。MEMORY_ONLY_SER将 RDD 作为序列化的 Java 对象存储(占用空间更小)。MEMORY_AND_DISK优先存储在内存中,内存不足时溢写到磁盘。MEMORY_AND_DISK_SER类似MEMORY_AND_DISK,但数据序列化存储。DISK_ONLY只存储在磁盘上。MEMORY_ONLY_2类似MEMORY_ONLY,但数据复制到两个节点。OFF_HEAP存储在 Tungsten 堆外内存中(必要启用堆外内存)。 示例:指定存储级别

python
运行
  1. from pyspark import StorageLevel
  2. # 存储在内存和磁盘,序列化
  3. rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
  4. # 只存储在磁盘
  5. rdd.persist(StorageLevel.DISK_ONLY)
复制代码
四、缓存的生命周期



  • 缓存何时触发?
    初次触发举措算子时,Spark 管帐算 RDD 并将效果缓存。
  • 缓存何时失效?

    • 手动调用unpersist()释放缓存。
    • 内存不足时,Spark 会根据 LRU(最近最少使用)战略自动删除部分缓存数据。
    • Spark 应用结束时,缓存自动释放。

示例:释放缓存

python
运行
  1. rdd.unpersist()  # 释放缓存
复制代码
五、缓存的最佳实践


  • 优先缓存频仍使用的 RDD
    如迭代算法中的中间效果(如 MLlib 模子训练)或交互式查询中的数据集。
  • 选择合适的存储级别

    • 内存充足时,使用MEMORY_ONLY(性能最优)。
    • 数据量大时,使用MEMORY_ONLY_SER或MEMORY_AND_DISK_SER(节省内存)。
    • 对容错要求高的场景,使用带副本的存储级别(如MEMORY_ONLY_2)。

  • 制止过度缓存
    缓存过多数据会导致内存压力,触发频仍的 GC 或数据溢写磁盘,反而降低性能。
  • 缓存后举行重分区
    缓存后可使用coalesce()减少分区数,降低后续任务的调治开销:
    python
    运行
    1. rdd_cached = rdd.cache()
    2. rdd_optimized = rdd_cached.coalesce(10)  # 合并为10个分区
    复制代码
  • 监控缓存使用情况
    通过 Spark UI 查看缓存状态:

    • Storage 标签页显示各 RDD 的缓存大小、分区数和存储位置。
    • 监控内存使用,制止 OOM(内存溢出)。

六、缓存 vs 查抄点(Checkpointing)

特性缓存(Cache)查抄点(Checkpoint)存储位置内存或磁盘(Executor 节点)HDFS 等可靠存储(外部体系)容错性节点故障时可能丢失数据,需重新计算数据永久存储,节点故障不影响性能速率快(内存读取)速率较慢(需写入外部存储)用途短期重用中间效果恒久生存关键效果(如长时间迭代的中间点)触发方式初次举措算子自动触发手动调用checkpoint()并触发举措算子 示例:使用查抄点

python
运行
  1. # 设置检查点目录
  2. sc.setCheckpointDir("hdfs://path/to/checkpoint_dir")
  3. # 标记RDD为检查点
  4. rdd.checkpoint()
  5. # 触发检查点(必须有行动算子)
  6. rdd.count()  # 此时RDD会被写入检查点目录
复制代码
七、常见题目与办理方法

  • 内存不足导致缓存失效

    • 办理方案:改用MEMORY_AND_DISK_SER存储级别,或增加内存资源。

  • 缓存数据丢失

    • 办理方案:使用MEMORY_ONLY_2存储级别(数据复制到两个节点),或联合查抄点。

  • 缓存未生效

    • 查抄是否在举措算子前调用了cache()或persist()。
    • 确认 RDD 是否被重复创建(每次map()、filter()等转换操作都会生成新 RDD)。

八、总结

缓存是 Spark 中提升性能的告急手段,尤其得当迭代计算和交互式查询。合理使用缓存可以明显减少计算开销,但需根据数据规模和内存情况选择合适的存储级别,并留意监控和管理缓存数据。
分享
除了textFile,Spark另有哪些方法可以创建RDD?
除了内存,还可以将RDD持久化到哪些存储介质上?
缓存的RDD是否会不绝占用内存或磁盘空间?

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!更多信息从访问主页:qidao123.com:ToB企服之家,中国第一个企服评测及商务社交产业平台。
回复

使用道具 举报

0 个回复

正序浏览

快速回复

您需要登录后才可以回帖 登录 or 立即注册

本版积分规则

玛卡巴卡的卡巴卡玛

论坛元老
这个人很懒什么都没写!
快速回复 返回顶部 返回列表