Spark演算子:mapPartitions、mapPartitionsWithIndexの変換



Spark Operator Mappartitions



1、 mapPartitions

def mapPartitions [U](f:(Iterator [T])=> Iterator [U]、preservesPartitioning:Boolean = false)(デフォルトarg0:ClassTag [U]):RDD [U]



この関数はmapに似ていますが、mapping関数のパラメーターがRDDの各パーティションのイテレーターである点が異なります。マッピングプロセス中に追加のオブジェクトを頻繁に作成する必要がある場合は、mapPartitionsを使用する方がmapよりも効率的です。 preparesPartitioningパラメーターは、親RDDのパーティション情報を保持するかどうかを示します。

var rdd1 = sc.makeRDD(1 to 5,2) / / rdd1 has two partitions, respectively calculate the elements of the two partitions and save to result scala> var rdd3 = rdd1.mapPartitions{ x => { var result = List[Int]() var i = 0 while(x.hasNext){ i += x.next() } result.::(i).iterator }} rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[84] at mapPartitions at :23 //rdd3 accumulates the values ​​in each partition in rdd1 scala> rdd3.collect res65: Array[Int] = Array(3, 12) scala> rdd3.partitions.size res66: Int = 2

二、 mapPartitionsWithIndex



def mapPartitionsWithIndex [U](f :(Int、Iterator [T])=> Iterator [U]、preservesPartitioning:Boolean = false)(implicit arg0:ClassTag [U]):RDD [U]

この関数はmapPartitionsと同じ効果がありますが、パーティションインデックスパラメーターがもう1つあります。

var rdd1 = sc.makeRDD(1 to 5,2) //rdd1 has two partitions var rdd2 = rdd1.mapPartitionsWithIndex{ (x,iter) => { var result = List[String]() var i = 0 while(iter.hasNext){ i += iter.next() } result.::(x + '|' + i).iterator } } //rdd2 accumulates the number of each partition in rdd1 and adds a partitioned index in front of the accumulated result of each partition. scala> rdd2.collect res13: Array[String] = Array(0|3, 1|12)