不用map而用mapPartitions的例子

不用map而用mapPartitions的例子

Content #

把 Word Count 的计数需求,从原来的对单词计数,改为对单词的哈希值计数:

// 把普通RDD转换为Paired RDD
import java.security.MessageDigest
val cleanWordRDD: RDD[String] = _
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map{ word =>
  // 获取MD5对象实例
  val md5 = MessageDigest.getInstance("MD5")
  // 使用MD5计算哈希值
  val hash = md5.digest(word.getBytes).mkString
  // 返回哈希值与数字1的Pair
  (hash, 1)
}

由于 map(f) 是以元素为单元做转换的,那么对于 RDD 中的每一条数据记录,我们都需要实例化一个 MessageDigest 对象来计算这个元素的哈希值。

在工业级生产系统中,一个 RDD 动辄包含上百万甚至是上亿级别的数据记录,如果处理每条记录都需要事先创建 MessageDigest,那么实例化对象的开销就会聚沙成塔,不知不觉地成为影响执行效率的罪魁祸首。

mapPartitions就是以数据分区为粒度,使用映射函数 f 对 RDD 进行数据转换。

import java.security.MessageDigest
val cleanWordRDD: RDD[String] = _
val kvRDD: RDD[(String, Int)] = cleanWordRDD.mapPartitions( partition => {
  // 注意!这里是以数据分区为粒度,获取MD5对象实例
  val md5 = MessageDigest.getInstance("MD5")
  val newPartition = partition.map( word => {
  // 在处理每一条数据记录的时候,可以复用同一个Partition内的MD5对象
    (md5.digest(word.getBytes()).mkString,1)
  })
  newPartition
})

相比前一个版本,我们把实例化 MD5 对象的语句挪到了 map 算子之外。如此一来,以数据分区为单位,实例化对象的操作只需要执行一次,而同一个数据分区中所有的数据记录,都可以共享该 MD5 对象,从而完成单词到哈希值的转换。

Viewpoints #

From #

03 | RDD常用算子(一):RDD内部的数据转换