Sparkストリーミング演算子:reduceByKeyAndWindow
Spark Streaming Operator
注意:
- ウィンドウの長さ-ウィンドウの長さ(図3)
- スライド間隔-ウィンドウ操作が実行される時間間隔(図2)
これらの2つのパラメーターは、DStreamバッチの時間間隔の倍数である必要があります。
例
- コード
object WindowApp { def main(args: Array[String]) { System.setProperty('hadoop.home.dir', 'D:\hadoop') // Ready to work val conf = new SparkConf().setMaster('local[2]').setAppName('WindowApp') val ssc = new StreamingContext(conf, Seconds(10)) // business logic processing val lines = ssc.socketTextStream('hadoop000', 9999) // 1 thread lines.flatMap(_.split(',')) .map((_,1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(10), Seconds(5)) .print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate }
-
上記のように:val ssc = new StreamingContext(conf、Seconds(5))
このバッチ処理時間は10秒に設定されており、エラーが報告されます。
-
理由:以下に設定されているスライド時間は5秒です(10の倍数である必要があります)
.map((_、1))。reduceByKeyAndWindow((a:Int、b:Int)=>(a + b)、
秒(10)、秒(5))