【SparkJavaAPI】Transformation(13)—zipWithIndex、zipWithUniqueId



Spark Java Api Transformation Zipwithindex Zipwithuniqueid




公式文書の説明:

Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].

関数プロトタイプ:

def zipWithUniqueId(): JavaPairRDD[T, JLong]

この関数は、RDDの要素と対応する一意のIDをキーと値のペアに結合します。ここで、ID生成アルゴリズムは、各パーティションの最初の要素のIDがパーティションのインデックス番号であり、各パーティションのN番目の要素がIDは(N * RDDのパーティションの総数)+(パーティションインデックス番号)です。

ソースコード分析:

def zipWithUniqueId(): RDD[(T, Long)] = withScope { val n = this.partitions.length.toLong this.mapPartitionsWithIndex { case (k, iter) => iter.zipWithIndex.map { case (item, i) => (item, i * n + k) } } }

ソースコードからわかるように、zipWithUniqueId()関数はmapPartitionsWithIndex()関数を使用して各要素のパーティションインデックス番号を取得し、対応する計算に(i * n + k)を使用します。



例:

List data = Arrays.asList(5, 1, 1, 4, 4, 2, 2) JavaRDD javaRDD = javaSparkContext.parallelize(data,3) List data1 = Arrays.asList(3,2,12,5,6,1,7) JavaRDD javaRDD1 = javaSparkContext.parallelize(data1) JavaPairRDD zipWithIndexRDD = javaRDD.zipWithUniqueId() System.out.println('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~' + zipWithIndexRDD.collect())