RDD Cache

RDD Cache

Content #

当同一个 RDD 被引用多次时,就可以考虑对其进行 Cache,从而提升作业的执行效率。

Word Count完整的代码如下所示:

import org.apache.spark.rdd.RDD

val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"

// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)

// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))

// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))

// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)

// 打印词频最高的5个词汇
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

// 将分组计数结果落盘到文件
val targetPath: String = _
wordCounts.saveAsTextFile(targetPath)

我们在最后追加了 saveAsTextFile 落盘操作,这样一来,wordCounts 这个 RDD 在程序中被引用了两次。

如果你把这份代码丢进 spark-shell 去执行,会发现 take 和 saveAsTextFile 这两个操作执行得都很慢。这个时候,我们就可以考虑通过给 wordCounts 加 Cache 来提升效率。

你只需要在 wordCounts 完成定义之后,在这个 RDD 之上依次调用 cache 和 count 即可,如下所示:

// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)

wordCounts.cache// 使用cache算子告知Spark对wordCounts加缓存
wordCounts.count// 触发wordCounts的计算,并将wordCounts缓存到内存

// 打印词频最高的5个词汇
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

// 将分组计数结果落盘到文件
val targetPath: String = _
wordCounts.saveAsTextFile(targetPath)

由于 cache 函数并不会立即触发 RDD 在内存中的物化,因此我们还需要调用 count 算子来触发这一执行过程。添加上面的两条语句之后,你会发现 take 和 saveAsTextFile 的运行速度明显变快了很多。

在上面的例子中,我们通过在 RDD 之上调用 cache 来为其添加缓存,而在背后, cache 函数实际上会进一步调用 persist(MEMORY_ONLY)来完成计算。换句话说,下面的两条语句是完全等价的,二者的含义都是把 RDD 物化到内存。

wordCounts.cache
wordCounts.persist(MEMORY_ONLY)

就添加 Cache 来说,相比 cache 算子,persist 算子更具备普适性,结合多样的存储级别(如这里的 MEMORY_ONLY),persist 算子允许开发者灵活地选择 Cache 的存储介质、存储形式以及副本数量。

Viewpoints #

From #

08 | 内存管理:Spark如何使用内存?