ScalaでSparkのアプリケーションを開発してGitHub ActionsでデプロイしEMRでリモートデバッグする

sparkscalaawsgithubetl

Spark は開発で用いられている Scala に加えて Java や Python のAPIを提供しており、技術スタックや他コンポーネントとの兼ね合いなどによって選択することができる。

Python は データ分析や機械学習のスキルセットとの親和性の高さや Glue Studio 上で編集して実行できる手軽さがある一方、エラーが分かりづらく JVM と Python Worker 間でデータをやり取りする必要があるのでパフォーマンスの点でも不利。 また、JVM の制御の外である Python インタプリタが YARN などのリソースマネージャによって割り当てられた以上のメモリを確保してしまうと executor が kill されてしまう。

Scala は 敷居が高いとされがちだが、Spark の API を呼ぶという点では Python とさほど変わらない。 もし Scala の経験があれば API が 標準ライブラリと近しいので書きやすく感じると思う。 また、JDWP によってコードに手を入れることなくリモートデバッグができるのも大きい。

Spark 3.2.0 から Scala 2.12/2.13 対応になったので、 CrossVersion.for3Use2_13 によって Scala 3 で追加された enumextension が使えるかと思いきや、依然デフォルトのバージョンは 2.12 のようで、実際 EMR の Spark も 2.12 でビルドされており現状動かなかった。

Scala 2/3 の列挙型と既存の型へのフィールドの追加 - sambaiz-net

$ spark-shell --version
...
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 1.8.0_342
...

$ spark-submit --class "TestApp" --master "local[4]" target/scala-3.2.1/simple-project_3-1.0.jar
...
Exception in thread "main" java.lang.NoSuchMethodError: 'java.lang.Object scala.Predef$.refArrayOps(java.lang.Object[])'
        at TestApp$.main(Main.scala:6)
        at TestApp.main(Main.scala)
...

Scalaの環境構築

cs setup で sbt などをインストールする。

$ brew install coursier/formulas/coursier && cs setup
...
Checking if the standard Scala applications are installed
  Installed ammonite
  Installed cs
  Installed coursier
  Installed scala
  Installed scalac
  Installed scala-cli
  Installed sbt
  Installed sbtn
  Installed scalafmt

hello world project を作成して実行してみる。

$ sbt new scala/hello-world.g8
$ cd hello-world-template
$ tree .
.
├── build.sbt
├── project
│ └── build.properties
└── src
└── main
└── scala
└── Main.scala

$ cat build.sbt | grep "scalaVersion"
scalaVersion := "2.13.8"

$ sbt run
[info]   Compilation completed in 6.913s.
[info] running Main 
Hello, World!

実装

spark-sql を依存に追加する。

$ cat build.sbt
name := "Simple Project"

version := "1.0"
scalaVersion := "2.12.17"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.3.0" % "provided"

簡単なSQLを実行してみる。

import org.apache.spark.sql.SparkSession

object TestApp {
  def main(args: Array[String]) {
    val spark = SparkSession.builder.appName("Test Application").getOrCreate()
    val out = spark.sql("SELECT 1 as a, 2 as b, 3 as c").collect().map(row => row.mkString(","))
    println(out.head)
    spark.stop()
  }
}

package して jar を生成する。

$ sbt package
$ ls target/scala-2.12 | grep ".jar"
simple-project_2.12-1.0.jar

実行

ローカルで spark-submit して実行する。

$ brew install apache-spark
$ spark-submit --class "TestApp" --master "local[4]" target/scala-2.12/simple-project_2.12-1.0.jar
...
1,2,3
...

デプロイ

GitHub Actions で S3 に jar をアップロードする。

GitHub ActionsからOIDCでassumeできるRoleをCDKで作成する - sambaiz-net

$ cat .github/workflows/ci.yml 
name: CI
on:
  push:
env:
  DEPLOY_ROLE_NAME: "arn:aws:iam::<aws_account_id>:role/ScalaSparkCiRemoteDebugStack_DeployRole"
  DEPLOY_BUCKET_NAME: "scalasparkciremotedebugstack-<aws_account_id>-deploy-bucket"
  AWS_REGION: "ap-northeast-1"
permissions:
  id-token: write # This is required for requesting the JWT
  contents: read  # This is required for actions/checkout
jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout
        uses: actions/checkout@v2
      - name: Setup JDK
        uses: actions/setup-java@v3
        with:
          distribution: temurin
          java-version: 8
      - name: Creates a jar file
        run: sbt package
      - name: Configure AWS Credentials
        uses: aws-actions/configure-aws-credentials@v1
        with:
          role-to-assume: ${{ env.DEPLOY_ROLE_NAME }}
          aws-region: ${{ env.AWS_REGION }}
      - name: Deploy the jar file
        run: aws s3 cp target/scala-2.12/simple-project_2.12-1.0.jar s3://${{ env.DEPLOY_BUCKET_NAME }}/

リモートデバッグ

server=y,suspend=y でデバッガーが接続するまで実行されないようにして EMR に spark-submit する。

JDWPを有効にしてリモートマシンで動いているJavaアプリケーションをデバッグする - sambaiz-net

driver を master instance で動かすために deploy-mode は client にしている。

AWS CLIでEMRクラスタを立ち上げSparkのアプリケーションを実行する - sambaiz-net

$ aws emr add-steps \
  --cluster-id <cluster_id> \
  --steps '[{
    "Type": "CUSTOM_JAR",
    "Name": "spark-submit",
    "Jar": "command-runner.jar",
    "Args": [
      "spark-submit",
      "--master",
      "yarn",
      "--deploy-mode",
      "client",
      "--conf",
      "spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005",
      "s3://<bucket>/simple-project_2.12-1.0.jar"
    ]
}]'

master instance に Session Manager などでポートフォワーディングして接続すればリモートデバッグできる。 handshake failed する場合はポートが正しいかなどを確認する。

$ aws ssm start-session --target <master_instance_id> --document-name AWS-StartPortForwardingSession --parameters '{"portNumber":["5005"], "localPortNumber":["5005"]}'

参考

Using Scala 3 with Spark | 47 Degrees

Getting The Best Performance With PySpark