Spark の Broadcast variables や Accumulator で Executor に変数を共有する

spark

Executor に変数を共有する仕組みとして Broadcast variablesAccumulator がある。

Broadcast variables はノードにキャッシュされる read-only な変数。Stageごとに共通で必要なデータは自動で broadcast され、Task の実行前にデシリアライズされるが、複数の stage で必要なデータがある場合やデシリアライズされた状態で持っておきたい場合、 SparkContext.broadcast() で明示的に変数を broadcast することが有効にはたらく。

SparkのWeb UIでJobのStageとExecutorによるTask分散、SQLのplanを確認する - sambaiz-net

Accumulator は追加しかできない変数。

import org.apache.spark.sql.SparkSession
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable.HashSet

object Sandbox {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("sandbox").getOrCreate()

    val broadcastVar = spark.sparkContext.broadcast(Array(1, 2, 3))
    val sumAccumulator = spark.sparkContext.longAccumulator("sumAccumulator")
    val data = spark.sparkContext.parallelize(1 to 100, 4)

    data.foreach { number =>
      sumAccumulator.add(broadcastVar.value(1) * number)
    }

    println(s"Result: ${sumAccumulator.value}") // -> 5050 * 2 = 10100

    spark.stop()
  }
}

その値は Web UI から Task ごとに確認できる。