当前位置:网站首页>10 Spark on RDD Cache
10 Spark on RDD Cache
2022-08-08 23:31:00 【YaPengLi.】
RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 算子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用,写两个列子参考下:
CASE01:直接,复用RDD不使用Cache函数
val list = List("Hello Scala", "Hello Spark", "Hello Python")
val rdd = sparkContext.makeRDD(list)
val mapRDD: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map(m => {
println("..........")
(m, 1)
})
val reduceByKeyRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
reduceByKeyRDD.collect().foreach(print);
val groupByKeyRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()
groupByKeyRDD.collect().foreach(print)
CASE01:输出结果
[Stage 0:> (0 + 0) / 8]..........
..........
..........
..........
..........
..........
(Hello,3)(Python,1)(Spark,1)(Scala,1)..........
..........
..........
..........
..........
..........
(Hello,CompactBuffer(1, 1, 1))(Python,CompactBuffer(1))(Spark,CompactBuffer(1))(Scala,CompactBuffer(1))
通过,以上代码print观察,map算子被重复调用。按照JAVA语言理解,我们只是使用了mapRDD算子,map算子应当不会重新被调用。但是,在Spark中map算子被多次调用。原因是什么呢?是在Spark中RDD只保存了依赖关系、计算逻辑,得到的RDD是不存储数据的,从而导致,每一次调用都需要重新执行全部流程。那么我们加上Cache算子来看下:
val list = List("Hello Scala", "Hello Spark", "Hello Python")
val rdd = sparkContext.makeRDD(list)
val mapRDD: RDD[(String, Int)] = rdd.flatMap(_.split(" ")).map(m => {
println("..........")
(m, 1)
})
mapRDD.cache()
val reduceByKeyRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)
reduceByKeyRDD.collect().foreach(print);
val groupByKeyRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()
groupByKeyRDD.collect().foreach(print)
[Stage 0:> (0 + 0) / 8]..........
..........
..........
..........
..........
..........
(Hello,3)(Python,1)(Spark,1)(Scala,1)(Hello,CompactBuffer(1, 1, 1))
(Python,CompactBuffer(1))(Spark,CompactBuffer(1))(Scala,CompactBuffer(1))
然后在血缘关系层面, Cache算子会增加血缘关系,不改变原有的血缘关系。
println(mapRDD.toDebugString)
最后,看下存储级别:
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部 Partition。Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时候,如果想重用数据,仍然建议调用 persist 或 cache。
边栏推荐
- Introduction to Qt (5) - file operation, hotkey and mouse reading (implementation of txt window)
- LightningChart .NET 10.3.2 Crack 支持旧项目直接升级
- LeetCode:正则表达式匹配
- 【Tensorflow2】tensorflow1.x-tensorflow2.x一些接口的转变
- 使用Mongoose populate实现多表关联存储与查询,内附完整代码
- 待完善:tf.name_scope() 和 tf.variable_scope()的区别
- PHP regular to img SRC to add the domain name
- 【Verilog基础】PPA优化问题总结(含面积优化、速度优化)
- 【CUDA】版本自由切换
- 微信小程序开发一些函数使用方法
猜你喜欢
Low-Light Image Enhancement via a Deep Hybrid Network阅读札记
跨域请求浏览器无法显示set-cookie,坑了我一晚上
【CUDA】version switch freely
2022牛客多校六 B-Eezie and Pie (dfs)
Use Mongoose populate to implement multi-table associative storage and query, with complete code included
【瑞吉外卖】day04:员工分页查询、启用/禁用员工账号、编辑员工信息
域前置通信过程和溯源思路
牛客多校2 G League of Legends
循环神经网络实现股票预测
【PP-YOLOv2】测试自定义的数据集
随机推荐
【CUDA】version switch freely
2021 RoboCom 世界机器人开发者大赛-本科组(决赛)7-4猛犸不上 Ban(最短路)
Small program figure display banner
stm32使用spi1在slave 模式下 dma 读取数据
stm32 利用 串口接收空闲中断 + dma 实现不定长度dma 接收
用工具实现 Mock API 的整个流程
LightningChart .NET 10.3.2 Crack 支持旧项目直接升级
2022牛客多校六 M-Z-Game on grid(动态规划)
记录一些 PostgreSQL问题分析思路
(2022牛客多校五)B-Watches(二分)
stm32 uses serial port to receive idle interrupt + dma to achieve variable length dma reception
牛客练习赛88 D 克鲁斯卡尔重构树
Casbin 进行权限控制验证
-Wl,--start-group ... -Wl,--end-group for resolving circular dependencies of several libraries
-Wl,--start-group ... -Wl,--end-group 用于解决几个库的循环依赖关系
Tp5 in cache cache, storage cell phone text message authentication code
AsyncTask的替代方案
最小生成树prim 求得 树上两点之间的最大最小值
Introduction to Qt (5) - file operation, hotkey and mouse reading (implementation of txt window)
Qt入门(五)——文件操作、热键和鼠标的读取(txt窗口的实现)