Develop Spark Applications in Scala, deploy with GitHub Actions, and perform remote debugging on EMR

sparkscalaawsetl

Spark provides Java and Python APIs in addition to Scala, which is used for developing Spark itself. You can choose among them depending on the technical stack and technologies used in other components, etc.

While Python has highly compatible with data analysis and machine learning skill sets and easy to edit and run on Glue Studio, the error is hard to understand, and the performance also has disadvantages because it needs to exchange the data between JVM and Python Workers. Also, if the Python interpreter, which is not controlled by the JVM, try to use more memory than a resource manager such as YARN has allocated, the executor can be killed.

Scala tends to be said it is a relatively hard language, but it’s not much different from PySpark in terms of calling the Spark API. The Spark API is close to the standard library, so you may feel easy to write codes if you have already experienced Scala. Besides, it also has an advantage that can perform remote debugging without inserting codes with JDWP.

From Spark 3.2.0, Scala 2.12/2.13 were supported, so you may expect enum and extension added in Scala 3 can be used by CrossVersion.for3Use2_13, but the default version still seems to be 2.12, and Spark on EMR is also built with 2.12, so it doesn’t work currently.

Enumerated types and extending existing types in Scala 2/3 - sambaiz-net

$ spark-shell --version
...
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 1.8.0_342
...

$ spark-submit --class "TestApp" --master "local[4]" target/scala-3.2.1/simple-project_3-1.0.jar
...
Exception in thread "main" java.lang.NoSuchMethodError: 'java.lang.Object scala.Predef$.refArrayOps(java.lang.Object[])'
        at TestApp$.main(Main.scala:6)
        at TestApp.main(Main.scala)
...

Set up Scala

Install sbt etc. with cs setup.

$ brew install coursier/formulas/coursier && cs setup
...
Checking if the standard Scala applications are installed
  Installed ammonite
  Installed cs
  Installed coursier
  Installed scala
  Installed scalac
  Installed scala-cli
  Installed sbt
  Installed sbtn
  Installed scalafmt

Create a hello world project, and try to execute.

$ sbt new scala/hello-world.g8
$ cd hello-world-template
$ tree .
.
├── build.sbt
├── project
│ └── build.properties
└── src
└── main
└── scala
└── Main.scala

$ cat build.sbt | grep "scalaVersion"
scalaVersion := "2.13.8"

$ sbt run
[info]   Compilation completed in 6.913s.
[info] running Main 
Hello, World!

Implementation

Add spark-sql to dependencies.

$ cat build.sbt
name := "Simple Project"

version := "1.0"
scalaVersion := "2.12.17"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.3.0" % "provided"

Run a simple SQL.

import org.apache.spark.sql.SparkSession

object TestApp {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.appName("Test Application").getOrCreate()
    var out = spark.sql("SELECT 1 as a, 2 as b, 3 as c").collect().map(row => row.mkString(","))
    println(out.head)
    spark.stop()
  }
}

Package it and generate a jar.

$ sbt package
$ ls target/scala-2.12 | grep ".jar"
simple-project_2.12-1.0.jar

Execution

Execute it with spark-submit.

$ brew install apache-spark
$ spark-submit --class "TestApp" --master "local[4]" target/scala-2.12/simple-project_2.12-1.0.jar
...
1,2,3
...

Deploy

Upload the jar file to S3 with GitHub Actions.

Create a role that can assume with OIDC from GitHub Actions with CDK - sambaiz-net

$ cat .github/workflows/ci.yml 
name: CI
on:
  push:
env:
  DEPLOY_ROLE_NAME: "arn:aws:iam::<aws_account_id>:role/ScalaSparkCiRemoteDebugStack_DeployRole"
  DEPLOY_BUCKET_NAME: "scalasparkciremotedebugstack-<aws_account_id>-deploy-bucket"
  AWS_REGION: "ap-northeast-1"
permissions:
  id-token: write # This is required for requesting the JWT
  contents: read  # This is required for actions/checkout
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout
        uses: actions/[email protected]
      - name: Setup JDK
        uses: actions/[email protected]
        with:
          distribution: temurin
          java-version: 8
      - name: Creates a jar file
        run: sbt package
      - name: Configure AWS Credentials
        uses: aws-actions/[email protected]
        with:
          role-to-assume: ${{ env.DEPLOY_ROLE_NAME }}
          aws-region: ${{ env.AWS_REGION }}
      - name: Deploy the jar file
        run: aws s3 cp target/scala-2.12/simple-project_2.12-1.0.jar s3://${{ env.DEPLOY_BUCKET_NAME }}/

Remote debugging

With preventing the application from running until a debugger connects with server=y,suspend=y, spark-submit to EMR.

Debug a Java application running on a remote machine by enabling JDWP - sambaiz-net

Pass client as deploy-mode to run the driver on the master instance.

Launch an EMR cluster with AWS CLI and run Spark applications - sambaiz-net

$ aws emr add-steps \
  --cluster-id <cluster_id> \
  --steps '[{
    "Type": "CUSTOM_JAR",
    "Name": "spark-submit",
    "Jar": "command-runner.jar",
    "Args": [
      "spark-submit",
      "--master",
      "yarn",
      "--deploy-mode",
      "client",
      "--conf",
      "spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005",
      "s3://<bucket>/simple-project_2.12-1.0.jar"
    ]
}]'

Start port forwarding to the master instance with Session Manager, etc., and then remote debugging begins. If handshake failed occurs, check the port number is correct and so on.

$ aws ssm start-session --target <master_instance_id> --document-name AWS-StartPortForwardingSession --parameters '{"portNumber":["5005"], "localPortNumber":["5005"]}'

References

Using Scala 3 with Spark | 47 Degrees

Getting The Best Performance With PySpark