Spark APIのmap、mapPartitions、mapValues、flatMap、flatMapValuesの詳細な説明



Detailed Explanation Map



Spark APIのmap、mapPartitions、mapValues、flatMap、flatMapValuesの詳細な説明

http://spark.apache.org/docs/latest/api/python/pyspark.html



1. RDD変数を作成し、ヘルプ関数を使用して、関連する関数の定義と例を表示します。

>>> a = sc.parallelize([(1,2),(3,4),(5,6)]) >>> a ParallelCollectionRDD[21] at parallelize at PythonRDD.scala:475 >>> help(a.map) Help on RDD in module pyspark.rdd object: class RDD(__builtin__.object) | A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. | Represents an immutable, partitioned collection of elements that can be | operated on in parallel. | | Methods defined here: | | __add__(self, other) | Return the union of this RDD and another one. | | >>> rdd = sc.parallelize([1, 1, 2, 3]) | >>> (rdd + rdd).collect() | [1, 1, 2, 3, 1, 1, 2, 3] RDDとは何ですか?

RDDは、Sparkの抽象データ構造タイプです。 SparkではすべてのデータがRDDとして表されます。プログラミングの観点からは、RDDは単純に配列と見なすことができます。通常の配列との違いは、RDDのデータがパーティションに格納されるため、異なるパーティションのデータを異なるマシンに分散し、同時に並行して処理できることです。したがって、Sparkアプリケーションが行うことは、処理するデータをRDDに変換してから、RDDに対して一連の変換と操作を実行して結果を取得することです。



2、map(関数)

マップは、RDDの各要素に対して指定された機能を実行して、新しいRDDを生成することです。元のRDDの要素はすべて新しいRDDにあり、それに対応する要素は1つだけです。

map(self, f, preservesPartitioning=False) method of pyspark.rdd.RDD instance Return a new RDD by applying a function to each element of this RDD. >>> rdd = sc.parallelize(['b', 'a', 'c']) >>> sorted(rdd.map(lambda x: (x, 1)).collect()) [('a', 1), ('b', 1), ('c', 1)]

3、mapPartitions(関数)



map()の入力関数はRDDの各要素に適用され、mapPartitions()の入力関数は各パーティションに適用されます

mapPartitionsはmapの変形です。 mapの入力関数はRDDの各要素に適用され、mapPartitionsの入力関数は各パーティションに適用されます。つまり、各パーティションのコンテンツは全体として処理されます。

mapPartitions(self, f, preservesPartitioning=False) method of pyspark.rdd.RDD instance Return a new RDD by applying a function to each partition of this RDD. >>> rdd = sc.parallelize([1, 2, 3, 4], 2) >>> def f(iterator): yield sum(iterator) >>> rdd.mapPartitions(f).collect() [3, 7]

4、mapValues(関数)

元のRDDのキーは変更されず、新しい値とともに新しいRDDの要素を形成します。したがって、この機能は、要素がKVペアであるRDDにのみ適用できます。
mapValues(self, f) method of pyspark.rdd.RDD instance Pass each value in the key-value pair RDD through a map function without changing the keys this also retains the original RDD's partitioning. >>> x = sc.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])]) >>> def f(x): return len(x) >>> x.mapValues(f).collect() [('a', 3), ('b', 1)] 5、flatMap(関数)

マップと同様に、元のRDDの要素はマップ処理後に1つの要素しか生成できず、元のRDDの要素はフラットマップ処理後に複数の要素を生成できるという違いがあります。

Help on method flatMap in module pyspark.rdd: flatMap(self, f, preservesPartitioning=False) method of pyspark.rdd.RDD instance Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. >>> rdd = sc.parallelize([2, 3, 4]) >>> sorted(rdd.flatMap(lambda x: range(1, x)).collect()) [1, 1, 1, 2, 2, 3] >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect()) [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]

6、flatMapValues(関数)

flatMapValuesはmapValuesに似ていますが、flatMapValuesが要素がKVペアであるRDDの値に適用される点が異なります。各1要素の値は、入力関数によって一連の値にマッピングされ、これらの値は、元のRDDのキーと一連の新しいKVペアを形成します。

flatMapValues(self, f) method of pyspark.rdd.RDD instance Pass each value in the key-value pair RDD through a flatMap function without changing the keys this also retains the original RDD's partitioning. >>> x = sc.parallelize([('a', ['x', 'y', 'z']), ('b', ['p', 'r'])]) >>> def f(x): return x >>> x.flatMapValues(f).collect() [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]

7、機能を減らす

Reduceは、RDDの要素をペアで入力関数に渡し、同時に新しい値を生成します。新しく生成された値とRDDの次の要素は、最後に値が1つだけになるまで入力関数に渡されます。

reduce(self, f) method of pyspark.rdd.RDD instance Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally. >>> from operator import add >>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add) 15 >>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) 10 >>> sc.parallelize([]).reduce(add) Traceback (most recent call last): ... ValueError: Can not reduce() empty RDD

例は次のとおりです。

>>> from operator import add >>> b.collect() [1, 2, 3, 4, 5, 6] >>> b.reduce(add) # Introduce built-in functions 21 >>> b.reduce(lambda a,b:a+b) # lambda custom anonymous function 21

8.reduceByKey関数

名前が示すように、reduceByKeyは、要素がKVペアであるRDD内の同じキーの値を減らすことです。したがって、同じキーを持つ複数の要素の値は1つの値に削減され、元のRDDのキーと新しいKVペアを形成します。

Help on method reduceByKey in module pyspark.rdd: reduceByKey(self, func, numPartitions=None, partitionFunc=) method of pyspark.rdd.RDD instance Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a 'combiner' in MapReduce. Output will be partitioned with C{numPartitions} partitions, or the default parallelism level if C{numPartitions} is not specified. Default partitioner is hash-partition. >>> from operator import add >>> rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)]) >>> sorted(rdd.reduceByKey(add).collect()) [('a', 2), ('b', 1)]参照:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html