aggregateByKey聚合算子

aggregateByKey聚合算子

Content #

aggregateByKey 的调用形式如下所示:

val rdd: RDD[(Key类型,Value类型)] = _
rdd.aggregateByKey(初始值)(f1, f2)

初始值可以是任意数值或是字符串,而聚合函数都是带有两个形参和一个输出结果的普通函数。这 3 个参数来说之间的类型需要保持一致,具体来说:

  1. 初始值类型,必须与 f2 的结果类型保持一致;
  2. f1 的形参类型,必须与 Paired RDD 的 Value 类型保持一致;
  3. f2 的形参类型,必须与 f1 的结果类型保持一致。

我们用 aggregateByKey 这个算子来实现“先加和,再取最大值”的计算逻辑,代码实现如下所示:

// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))

// 显示定义Map阶段聚合函数f1
def f1(x: Int, y: Int): Int = {
return x + y
}

// 显示定义Reduce阶段聚合函数f2
def f2(x: Int, y: Int): Int = {
return math.max(x, y)
}

// 调用aggregateByKey,实现先加和、再求最大值
val wordCounts: RDD[(String, Int)] = kvRDD.aggregateByKey(0) (f1, f2)

Viewpoints #

From #

07 | RDD常用算子(二):Spark如何实现数据聚合?