Spark は開発で用いられている Scala に加えて Java や Python のAPIを提供しており、技術スタックや他コンポーネントとの兼ね合いなどによって選択することができる。
Python は データ分析や機械学習のスキルセットとの親和性の高さや Glue Studio 上で編集して実行できる手軽さがある一方、エラーが分かりづらく JVM と Python Worker 間でデータをやり取りする必要があるのでパフォーマンスの点でも不利。 また、JVM の制御の外である Python インタプリタが YARN などのリソースマネージャによって割り当てられた以上のメモリを確保してしまうと executor が kill されてしまう。
Scala は 敷居が高いとされがちだが、Spark の API を呼ぶという点では Python とさほど変わらない。 もし Scala の経験があれば API が 標準ライブラリと近しいので書きやすく感じると思う。 また、JDWP によってコードに手を入れることなくリモートデバッグができるのも大きい。
Spark 3.2.0 から Scala 2.12/2.13 対応になったので、 CrossVersion.for3Use2_13 によって Scala 3 で追加された enum や extension が使えるかと思いきや、依然デフォルトのバージョンは 2.12 のようで、実際 EMR の Spark も 2.12 でビルドされており現状動かなかった。
Scala 2/3 の列挙型と既存の型へのフィールドの追加 - sambaiz-net
$ spark-shell --version
...
Using Scala version 2.12.15, OpenJDK 64-Bit Server VM, 1.8.0_342
...
$ spark-submit --class "TestApp" --master "local[4]" target/scala-3.2.1/simple-project_3-1.0.jar
...
Exception in thread "main" java.lang.NoSuchMethodError: 'java.lang.Object scala.Predef$.refArrayOps(java.lang.Object[])'
at TestApp$.main(Main.scala:6)
at TestApp.main(Main.scala)
...
Scalaの環境構築
cs setup で sbt などをインストールする。
$ brew install coursier/formulas/coursier && cs setup
...
Checking if the standard Scala applications are installed
Installed ammonite
Installed cs
Installed coursier
Installed scala
Installed scalac
Installed scala-cli
Installed sbt
Installed sbtn
Installed scalafmt
hello world project を作成して実行してみる。
$ sbt new scala/hello-world.g8
$ cd hello-world-template
$ tree .
.
├── build.sbt
├── project
│ └── build.properties
└── src
└── main
└── scala
└── Main.scala
$ cat build.sbt | grep "scalaVersion"
scalaVersion := "2.13.8"
$ sbt run
[info] Compilation completed in 6.913s.
[info] running Main
Hello, World!
実装
spark-sql を依存に追加する。
$ cat build.sbt
name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.17"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.3.0" % "provided"
簡単なSQLを実行してみる。
import org.apache.spark.sql.SparkSession
object TestApp {
def main(args: Array[String]) {
val spark = SparkSession.builder.appName("Test Application").getOrCreate()
val out = spark.sql("SELECT 1 as a, 2 as b, 3 as c").collect().map(row => row.mkString(","))
println(out.head)
spark.stop()
}
}
package して jar を生成する。
$ sbt package
$ ls target/scala-2.12 | grep ".jar"
simple-project_2.12-1.0.jar
実行
ローカルで spark-submit して実行する。
$ brew install apache-spark
$ spark-submit --class "TestApp" --master "local[4]" target/scala-2.12/simple-project_2.12-1.0.jar
...
1,2,3
...
デプロイ
GitHub Actions で S3 に jar をアップロードする。
GitHub ActionsからOIDCでassumeできるRoleをCDKで作成する - sambaiz-net
$ cat .github/workflows/ci.yml
name: CI
on:
push:
env:
DEPLOY_ROLE_NAME: "arn:aws:iam::<aws_account_id>:role/ScalaSparkCiRemoteDebugStack_DeployRole"
DEPLOY_BUCKET_NAME: "scalasparkciremotedebugstack-<aws_account_id>-deploy-bucket"
AWS_REGION: "ap-northeast-1"
permissions:
id-token: write # This is required for requesting the JWT
contents: read # This is required for actions/checkout
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Setup JDK
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
- name: Creates a jar file
run: sbt package
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1
with:
role-to-assume: ${{ env.DEPLOY_ROLE_NAME }}
aws-region: ${{ env.AWS_REGION }}
- name: Deploy the jar file
run: aws s3 cp target/scala-2.12/simple-project_2.12-1.0.jar s3://${{ env.DEPLOY_BUCKET_NAME }}/
リモートデバッグ
server=y,suspend=y でデバッガーが接続するまで実行されないようにして EMR に spark-submit する。
JDWPを有効にしてリモートマシンで動いているJavaアプリケーションをデバッグする - sambaiz-net
driver を master instance で動かすために deploy-mode は client にしている。
AWS CLIでEMRクラスタを立ち上げSparkのアプリケーションを実行する - sambaiz-net
$ aws emr add-steps \
--cluster-id <cluster_id> \
--steps '[{
"Type": "CUSTOM_JAR",
"Name": "spark-submit",
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--master",
"yarn",
"--deploy-mode",
"client",
"--conf",
"spark.driver.extraJavaOptions=-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005",
"s3://<bucket>/simple-project_2.12-1.0.jar"
]
}]'
master instance に Session Manager などでポートフォワーディングして接続すればリモートデバッグできる。 handshake failed する場合はポートが正しいかなどを確認する。
$ aws ssm start-session --target <master_instance_id> --document-name AWS-StartPortForwardingSession --parameters '{"portNumber":["5005"], "localPortNumber":["5005"]}'