Spark の Broadcast variables や Accumulator で Executor に変数を共有する
sparkExecutor に変数を共有する仕組みとして Broadcast variables と Accumulator がある。
Broadcast variables はノードにキャッシュされる read-only な変数。Stageごとに共通で必要なデータは自動で broadcast され、Task の実行前にデシリアライズされるが、複数の stage で必要なデータがある場合やデシリアライズされた状態で持っておきたい場合、 SparkContext.broadcast() で明示的に変数を broadcast することが有効にはたらく。
SparkのWeb UIでJobのStageとExecutorによるTask分散、SQLのplanを確認する - sambaiz-net
Accumulator は追加しかできない変数。
import org.apache.spark.sql.SparkSession
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable.HashSet
object Sandbox {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("sandbox").getOrCreate()
val broadcastVar = spark.sparkContext.broadcast(Array(1, 2, 3))
val sumAccumulator = spark.sparkContext.longAccumulator("sumAccumulator")
val data = spark.sparkContext.parallelize(1 to 100, 4)
data.foreach { number =>
sumAccumulator.add(broadcastVar.value(1) * number)
}
println(s"Result: ${sumAccumulator.value}") // -> 5050 * 2 = 10100
spark.stop()
}
}
その値は Web UI から Task ごとに確認できる。