Livy を EMR on EKS にインストールしSparkmagic でローカルの Jupyter Notebook から Spark のジョブを実行する

awskubernetessparketl

Apache Livy は Spark クラスタとやり取りするための REST API を提供し、Sparkmagic はこの API を呼ぶことで Jupyter Notebooks からリモートの Spark クラスタでジョブを実行する。Athena for Apache Spark でも使われていて、インタラクティブにジョブを実行し結果を確認できるのはデバッグやアドホックなクエリを実行したりするのに便利だ。

Athena for Apache Spark の Notebook で DataFrame.toPandas().plot() した際の日本語が文字化けしないようにする - sambaiz-net

しかし Livy の Kubernetes 対応は2019年に PR は出ているものの、Kubernetes に明るいメンテナがおらず merge されない状況が続いている。そのため自分でビルドして使おうかと思っていたが、なんと EMR 7.1 から Livy をサポートするようになった。これを用いてローカルの Jupyter Notebooks から EMR on EKS で Spark のジョブを実行してみる。

必要なパラメータを渡して Helm chart で Livy をインストールする。

const livyExecutionRole = new iam.Role(this, 'LivyExecutionRole', {
  roleName: 'livy-execution-role',
  assumedBy: new iam.WebIdentityPrincipal(
    cluster.openIdConnectProvider.openIdConnectProviderArn,
    {
      StringLike: new cdk.CfnJson(this, 'LivyExecutionRoleStringEquals', {
        value: {
          [`${cluster.clusterOpenIdConnectIssuer}:sub`]: 'system:serviceaccount:livy:emr-containers-sa-livy',
        },
      }),
    }
  ),
  inlinePolicies: {
    UploadS3: new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          effect: iam.Effect.ALLOW,
          actions: [
            's3:PutObject',
            's3:ListBucket',
          ],
          resources: [
            'arn:aws:s3:::<bucket>',
            'arn:aws:s3:::<bucket>/*'
          ],
        }),
      ],
    }),
  },
})

cluster.addHelmChart('LivyChart', {
  chart: 'livy',
  repository: 'oci://059004520145.dkr.ecr.ap-northeast-1.amazonaws.com/livy',
  version: '7.1.0',
  namespace: 'livy',
  createNamespace: true,
  values: {
    image: '059004520145.dkr.ecr.ap-northeast-1.amazonaws.com/livy/emr-7.1.0:latest',
    sparkNamespace: 'emr',
    serviceAccount: {
      executionRoleArn: livyExecutionRole.roleArn,
    },
    service: {
      annotations: {
        'service.beta.kubernetes.io/aws-load-balancer-subnets': 'testpublicsubnet1,testpublicsubnet2'
      },
    },
  },
})

Load Balancer Controller によって service に対応する Internal の NLB が作られるが、今回はローカルからアクセスするためポートフォワーディングする。

EKSクラスタにAWS Load Balancer ControllerをインストールしてALBのIngressを立てる - sambaiz-net

$ kubectl port-forward -n livy svc/emr-containers-livy 8998:8998
containers-livy 8998:8998
Forwarding from 127.0.0.1:8998 -> 8998
Forwarding from [::1]:8998 -> 8998

$ curl http://localhost:8998/sessions
{"from":0,"total":0,"sessions":[]}

LivyのREST APIを呼んでSparkジョブを実行する - sambaiz-net

Sparkmagic の公式のイメージはなさそうだったのでリポジトリにある Dockerfile を参考にビルドする。Spark クラスタで処理を行わせるには %%spark magic を使う方法もあるが、自動で Spark クラスタで処理を行うカーネルも提供されているので必要に応じてインストールする。

FROM jupyter/base-notebook

USER root

RUN apt update && \
    apt install -y gcc && \
    apt clean && \
    rm -rf /var/lib/apt/lists/*

# This is needed because requests-kerberos fails to install on debian due to missing linux headers
RUN conda install requests-kerberos -y

USER $NB_USER

RUN pip install --upgrade pip && \
    pip install --upgrade --ignore-installed setuptools && \
    pip install hdijupyterutils autovizwidget sparkmagic

# Jupyter extensions changed in >7.x.x
# For now (workaround), let's pin to 6 to avoid breaking things
# xref: https://github.com/jupyter-incubator/sparkmagic/issues/825
RUN pip install notebook==6.5.5

RUN mkdir /home/$NB_USER/.sparkmagic
COPY config.json /home/$NB_USER/.sparkmagic/config.json

RUN jupyter nbextension enable --py --sys-prefix widgetsnbextension
RUN jupyter-kernelspec install --user $(pip show sparkmagic | grep Location | cut -d" " -f2)/sparkmagic/kernels/sparkkernel
RUN jupyter-kernelspec install --user $(pip show sparkmagic | grep Location | cut -d" " -f2)/sparkmagic/kernels/pysparkkernel
RUN jupyter-kernelspec install --user $(pip show sparkmagic | grep Location | cut -d" " -f2)/sparkmagic/kernels/sparkrkernel
RUN jupyter serverextension enable --py sparkmagic

USER root
RUN chown $NB_USER /home/$NB_USER/.sparkmagic/config.json

CMD [ "start-notebook.sh", "--NotebookApp.token=''" ]

config.json は次のようにした。

{
  "kernel_scala_credentials" : {
    "url": "http://host.docker.internal:8998"
  },
  
  "session_configs": {
    "driverMemory": "1000M",
    "executorCores": 2
  },

  "session_configs_defaults": {
    "conf": {
      "spark.kubernetes.namespace": "emr",
      "spark.kubernetes.container.image": "public.ecr.aws/emr-on-eks/spark/emr-7.1.0:latest",
      "spark.kubernetes.authenticate.driver.serviceAccountName": "emr-containers-sa-spark-driver-*****", 
      "spark.kubernetes.file.upload.path": "s3://<bucket>",

      "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
      "spark.sql.catalog.spark_catalog.type": "hive"
    }
  },
  
  "livy_session_startup_timeout_seconds": 180,
  "max_results_sql": 2500
}

Notebook を開いて spark.sql() などを実行すると EKS クラスタで session が起動し結果が返る。