Share variables to executors using Spark's Broadcast variables and Accumulator

spark

There are two mechanisms for sharing variables among executors: Broadcast variables and Accumulator.

Broadcast variables are read-only variables that each node caches. Common data is broadcast automatically per stage, and it is deserialized before tasks. If there is data that multiple stages use or nodes should have in deserialized form, explicit broadcasting using SparkContext.broadcast() could be helpful.

Spark Web UI: Monitor Job Stages, Tasks distribution and SQL plan - sambaiz-net

Accumulator are variables that can be only added.

import org.apache.spark.sql.SparkSession
import org.apache.spark.util.AccumulatorV2
import scala.collection.mutable.HashSet

object Sandbox {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("sandbox").getOrCreate()

    val broadcastVar = spark.sparkContext.broadcast(Array(1, 2, 3))
    val sumAccumulator = spark.sparkContext.longAccumulator("sumAccumulator")
    val data = spark.sparkContext.parallelize(1 to 100, 4)

    data.foreach { number =>
      sumAccumulator.add(broadcastVar.value(1) * number)
    }

    println(s"Result: ${sumAccumulator.value}") // -> 5050 * 2 = 10100

    spark.stop()
  }
}

Its value for each task can be seen in the Web UI.