Blog

RDD Cache

Content #

当同一个 RDD 被引用多次时,就可以考虑对其进行 Cache,从而提升作业的执行效率。

Word Count完整的代码如下所示:

import org.apache.spark.rdd.RDD

val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"

// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)

// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
val cleanWordRDD: RDD[String] = wordRDD.filter(word => !word.equals(""))

// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))

// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)

// 打印词频最高的5个词汇
wordCounts.map{case (k, v) => (v, k)}.sortByKey(false).take(5)

// 将分组计数结果落盘到文件
val targetPath: String = _
wordCounts.saveAsTextFile(targetPath)

我们在最后追加了 saveAsTextFile 落盘操作,这样一来,wordCounts 这个 RDD 在程序中被引用了两次。

...

Execution Memory与Storage Memory的抢占规则

Content #

两者可以互相抢占,抢占逻辑有如下 3 条:

  1. 如果对方的内存空间有空闲,双方可以互相抢占;

  2. 对于 Storage Memory 抢占的 Execution Memory 部分,当分布式任务有计算需要时,Storage Memory 必须立即归还抢占的内存,涉及的缓存数据要么落盘、要么清除;

  3. 对于 Execution Memory 抢占的 Storage Memory 部分,即便 Storage Memory 有收回内存的需要,也必须要等到分布式任务执行完毕才能释放。

Viewpoints #

From #

08 | 内存管理:Spark如何使用内存?

内存配置项(Spark)

Content #

Executor JVM Heap 的划分,由图中的 3 个配置项来决定:

其中 spark.executor.memory 是绝对值,它指定了 Executor 进程的 JVM Heap 总大小。另外两个配置项,spark.memory.fraction 和 spark.memory.storageFraction 都是比例值,它们指定了划定不同区域的空间占比。

spark.memory.fraction 用于标记 Spark 处理分布式数据集的内存总大小,这部分内存包括 Execution Memory 和 Storage Memory 两部分,也就是图中绿色的矩形区域。(M – 300)* (1 – mf)刚好就是 User Memory 的区域大小,也就是图中蓝色区域的部分。

Storage Memory 的初始大小。

Viewpoints #

From #

08 | 内存管理:Spark如何使用内存?

Spark内存区域划分

Content #

对于任意一个 Executor 来说,Spark 会把内存分为 4 个区域,分别是 Reserved Memory、User Memory、Execution Memory 和 Storage Memory。

其中,Reserved Memory 固定为 300MB,不受开发者控制,它是 Spark 预留的、用来存储各种 Spark 内部对象的内存区域;User Memory 用于存储开发者自定义的数据结构,例如 RDD 算子中引用的数组、列表、映射等等。

Execution Memory 用来执行分布式任务。分布式任务的计算,主要包括数据的转换、过滤、映射、排序、聚合、归并等环节,而这些计算环节的内存消耗,统统来自于 Execution Memory。

Storage Memory 用于缓存分布式数据集,比如 RDD Cache、广播变量等等。RDD Cache 指的是 RDD 物化到内存中的副本。在一个较长的 DAG 中,如果同一个 RDD 被引用多次,那么把这个 RDD 缓存到内存中,往往会大幅提升作业的执行性能。我们在这节课的最后会介绍 RDD Cache 的具体用法。

不难发现,Execution Memory 和 Storage Memory 这两块内存区域,对于 Spark 作业的执行性能起着举足轻重的作用。因此,在所有的内存区域中, Execution Memory 和 Storage Memory 是最重要的,也是开发者最需要关注的。

在 Spark 1.6 版本之前,Execution Memory 和 Storage Memory 的空间划分是静态的,一旦空间划分完毕,不同内存区域的用途与尺寸就固定了。在 1.6 版本之后,Spark 推出了统一内存管理模式,在这种模式下,Execution Memory 和 Storage Memory 之间可以相互转化。

...

aggregateByKey聚合算子

Content #

aggregateByKey 的调用形式如下所示:

val rdd: RDD[(Key类型,Value类型)] = _
rdd.aggregateByKey(初始值)(f1, f2)

初始值可以是任意数值或是字符串,而聚合函数都是带有两个形参和一个输出结果的普通函数。这 3 个参数来说之间的类型需要保持一致,具体来说:

  1. 初始值类型,必须与 f2 的结果类型保持一致;
  2. f1 的形参类型,必须与 Paired RDD 的 Value 类型保持一致;
  3. f2 的形参类型,必须与 f1 的结果类型保持一致。

我们用 aggregateByKey 这个算子来实现“先加和,再取最大值”的计算逻辑,代码实现如下所示:

// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))

// 显示定义Map阶段聚合函数f1
def f1(x: Int, y: Int): Int = {
return x + y
}

// 显示定义Reduce阶段聚合函数f2
def f2(x: Int, y: Int): Int = {
return math.max(x, y)
}

// 调用aggregateByKey,实现先加和、再求最大值
val wordCounts: RDD[(String, Int)] = kvRDD.aggregateByKey(0) (f1, f2)

Viewpoints #

From #

07 | RDD常用算子(二):Spark如何实现数据聚合?

...

reducebyKey的Map端聚合与Reduce端聚合

Content #

我们把 Word Count 的计算逻辑,改为随机赋值、提取同一个 Key 的最大值。

import scala.util.Random._
// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, nextInt(100)))
// 显示定义提取最大值的聚合函数f
def f(x: Int, y: Int): Int = {
    return math.max(x, y)
}
// 按照单词提取最大值
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey(f)

reduceByKey 的计算过程如下图:

尽管 reduceByKey 也会引入 Shuffle,但相比 groupByKey 以全量原始数据记录的方式消耗磁盘与网络,reduceByKey 在落盘与分发之前,会先在 Shuffle 的 Map 阶段做初步的聚合计算。

比如,在数据分区 0 的处理中,在 Map 阶段,reduceByKey 把 Key 同为 Streaming 的两条数据记录聚合为一条,聚合逻辑就是由函数 f 定义的、取两者之间 Value 较大的数据记录,这个过程我们称之为“Map 端聚合”。相应地,数据经由网络分发之后,在 Reduce 阶段完成的计算,我们称之为“Reduce 端聚合”。

量变引起质变,在工业级的海量数据下,相比 groupByKey,reduceByKey 通过在 Map 端大幅削减需要落盘与分发的数据量,往往能将执行效率提升至少一倍。

reduceByKey 算子的局限性,在于其 Map 阶段与 Reduce 阶段的计算逻辑必须保持一致,这个计算逻辑统一由聚合函数 f 定义。当一种计算场景需要在两个阶段执行不同计算逻辑的时候,reduceByKey 就爱莫能助了。

...

Shuffle(Spark)

Content #

在分布式计算场景中,Shuffle 指的是集群范围内跨节点、跨进程的数据分发。

Shuffle 的计算会消耗所有类型的硬件资源。具体来说,Shuffle 中的哈希与排序操作会大量消耗 CPU,而 Shuffle Write 生成中间文件的过程,会消耗宝贵的内存资源与磁盘 I/O,最后,Shuffle Read 阶段的数据拉取会引入大量的网络 I/O。Shuffle 是资源密集型计算。

Shuffle 中间文件包含两类文件,一个是记录(Key,Value)键值对的 data 文件,另一个是记录键值对所属 Reduce Task 的 index 文件。计算图 DAG 中的 Map 阶段与 Reduce 阶段,正是通过中间文件来完成数据的交换。

Shuffle Write 过程中生成中间文件,分为 4 个步骤:

  1. 对于数据分区中的数据记录,逐一计算其目标分区,然后填充内存数据结构;

  2. 当数据结构填满后,如果分区中还有未处理的数据记录,就对结构中的数据记录按(目标分区 ID,Key)排序,将所有数据溢出到临时文件,同时清空数据结构;

  3. 重复前 2 个步骤,直到分区中所有的数据记录都被处理为止;

  4. 对所有临时文件和内存数据结构中剩余的数据记录做归并排序,生成数据文件和索引文件。

最后,在 Reduce 阶段,Reduce Task 通过 index 文件来“定位”属于自己的数据内容,并通过网络从不同节点的 data 文件中下载属于自己的数据记录。

Viewpoints #

From #

06 | Shuffle管理:为什么Shuffle是性能瓶颈?

任务调度的步骤(Spark)

Content #

1.DAGScheduler 以 Shuffle 为边界,将开发者设计的计算图 DAG 拆分为多个执行阶段 Stages,然后为每个 Stage 创建任务集 TaskSet。

2.SchedulerBackend 通过与 Executors 中的 ExecutorBackend 的交互来实时地获取集群中可用的计算资源,并将这些信息记录到 ExecutorDataMap 数据结构。

3.与此同时,SchedulerBackend 根据 ExecutorDataMap 中可用资源创建 WorkerOffer,以 WorkerOffer 为粒度提供计算资源。

4.对于给定 WorkerOffer,TaskScheduler 结合 TaskSet 中任务的本地性倾向,按照 PROCESS_LOCAL、NODE_LOCAL、RACK_LOCAL 和 ANY 的顺序,依次对 TaskSet 中的任务进行遍历,优先调度本地性倾向要求苛刻的 Task。

5.被选中的 Task 由 TaskScheduler 传递给 SchedulerBackend,再由 SchedulerBackend 分发到 Executors 中的 ExecutorBackend。Executors 接收到 Task 之后,即调用本地线程池来执行分布式任务。

Viewpoints #

From #

05 | 调度系统:如何把握分布式计算的精髓?

sub:Spark

Content #

数组与RDD的对比 RDD(Resilient Distributed Dataset) 创建RDD RDD的结构 依赖关系(RDD) Spark的延迟计算(Lazy Evaluation) Transformations类算子和Actions类算子 不用map而用mapPartitions的例子 相邻单词出现次数统计(flatMap) Driver和Executors 任务调度的步骤(Spark) Shuffle(Spark) reducebyKey的Map端聚合与Reduce端聚合 aggregateByKey聚合算子 Spark内存区域划分 内存配置项(Spark) Execution Memory与Storage Memory的抢占规则 RDD Cache repartition与coalesce的区别 广播变量 存储系统的三个服务对象(BlockManager) RDD、DataFrame、DataSet的对比 事件时间和处理时间 Structured Streaming 与 Spark Streaming 对比 每隔10秒输出过去60秒产生的前十热点词 withColumn与withColumnRenamed

Driver和Executors

Content #

分布式计算的精髓在于,如何把抽象的计算流图,转化为实实在在的分布式计算任务,然后以并行计算的方式交付执行。

进程模型的核心是 Driver 和 Executors。任何一个 Spark 应用程序的入口,都是带有 SparkSession 的 main 函数,而在 Spark 的分布式计算环境中,运行这样 main 函数的 JVM 进程有且仅有一个,它被称为 “Driver”。

Driver 最核心的作用,在于解析用户代码、构建计算流图,然后将计算流图转化为分布式任务,并把任务分发给集群中的 Executors 交付执行。接收到任务之后,Executors 调用内部线程池,结合事先分配好的数据分片,并发地执行任务代码。 Driver 进程的 3 个核心组件:

  1. 根据用户代码构建计算流图(DAGScheduler)
  2. 根据计算流图拆解出分布式任务(TaskScheduler)
  3. 将分布式任务分发到 Executors 中去(SchedulerBackend)

对于一个完整的 RDD,每个 Executors 负责处理这个 RDD 的一个数据分片子集。每当任务执行完毕,Executors 都会及时地与 Driver 进行通信、汇报任务状态。 Driver 在获取到 Executors 的执行进度之后,结合计算流图的任务拆解,依次有序地将下一阶段的任务再次分发给 Executors 付诸执行,直至整个计算流图执行完毕。

Viewpoints #

From #

04 | 进程模型与分布式部署:分布式计算是怎么回事?