CDK で EKS クラスタを立ち上げ EMR on EKS に登録し Spark のジョブを動かす

kubernetesawssparketl

EMR on EKS は EKS 上で Spark を動かす機能。 通常の EMR が Hadoop クラスタの管理も行うのに対して、EMR on EKS はコンテナの起動のみを担う。

AWS CLIでEMRクラスタを立ち上げSparkのアプリケーションを実行する - sambaiz-net

Kubernetes で動かすことで、管理やモニタリングの際に Kubernetes 用のツールや機能を利用でき、既存のクラスタがあれば余っているリソースを有効活用できる。 他のアプリケーションでの Kubernetes の知見を集計や分析で活かしたり、その逆もあるかもしれない。 ただ、Kubernetes および EKS の知識はある程度必要。通常の EMR は YARN の知識がなくとも使い始められるが、こちらはないと動かすのも大変だと思う。

ちなみに、Docker については EMR 6.x (Hadoop 3) から サポートされている

料金はリクエストした vCPU とメモリに対してかかる。

今回は CDK で EKS クラスタの立ち上げから EMR の登録までを行い、Spark のジョブが実行できることを確認する。 全体のコードは GitHub にある。

クラスタの立ち上げ

v1.22 から addManifest() などの際に最新バージョンの kubectl が使われるようにするには 別パッケージの kubectlLayer を渡す必要ができた

import { KubectlV27Layer } from '@aws-cdk/lambda-layer-kubectl-v27';

const mastersRole = new iam.Role(this, 'MastersRole', {
  assumedBy: new iam.CompositePrincipal(
    new iam.ServicePrincipal('eks.amazonaws.com'),
    new iam.AccountRootPrincipal(),
  ),
  managedPolicies: [
    iam.ManagedPolicy.fromAwsManagedPolicyName('AmazonEKSClusterPolicy'),
  ],
})

return new eks.Cluster(this, 'EKSCluster', {
  vpc,
  mastersRole,
  clusterName: clusterName,
  version: eks.KubernetesVersion.V1_27,
  kubectlLayer: new KubectlV27Layer(this, 'KubectlLayer'),
  defaultCapacity: 3,
  defaultCapacityInstance: new ec2.InstanceType('m5.large')
})

この後 CDK ではなく eksctl create iamidentitymapping でクラスタへのアクセス権限を付与する場合は、 Kubernetes の権限に加えて eks:DescribeCluster が必要になるので master role にこれを追加するか、 ユーザーの role を aws-auth に追加して Kubernetes の権限を与える必要がある。

inlinePolicies: {
  'eksctl-iamidentitymapping': new iam.PolicyDocument({
    statements: [
      new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        actions: ['eks:DescribeCluster'],
        resources: ['*'],
      })
    ]
  })
}

その場合は、併せて assume する profile を config に追加する。

$ brew tap weaveworks/tap
$ brew install weaveworks/tap/eksctl
$ eksctl version
0.124.0

$ cat ~/.aws/config
...
[profile eks-master]
region = us-east-1 
role_arn = arn:aws:iam::xxxxx:role/EmrOnEksTestStack-clusterMastersRoleEABFBB9C-10Q5X4BE7QN51
source_profile = <user_profile_name> 

クラスタへのアクセス権限の付与

EMR on EKS の service-linked role に紐づいた User に Kubernetes の Role を bind する。

RBACが有効なGKEでHelmを使う - sambaiz-net

a. eksctl create iamidentitymapping による方法

まずはドキュメント通り eksctl でどのようなリソースが作成されるか確認する。

$ CLUSTER_NAME=emr-on-eks-test
$ NAMESPACE=default

$ eksctl create iamidentitymapping \
    --cluster $CLUSTER_NAME \
    --namespace $NAMESPACE \
    --service-name emr-containers \
    --profile eks-master
2022-12-31 17:40:34 []  created "default:Role.rbac.authorization.k8s.io/emr-containers"
2022-12-31 17:40:34 []  created "default:RoleBinding.rbac.authorization.k8s.io/emr-containers"
2022-12-31 17:40:34 []  adding identity "arn:aws:iam::xxxxx:role/AWSServiceRoleForAmazonEMRContainers" to auth ConfigMap

$ kubectl describe -n kube-system configmap aws-auth
...
mapRoles:
  ----
...
- rolearn: arn:aws:iam::xxxxx:role/AWSServiceRoleForAmazonEMRContainers
  username: emr-containers
...

b. CDK による方法

eksctl のリポジトリを見ると該当リソースのマニフェストがあったので、 これを CDK で表した。

const emrRole = cluster.addManifest('EMRContainersRole', {
  apiVersion: 'rbac.authorization.k8s.io/v1',
  kind: 'Role',
  metadata: { name: 'emr-containers', namespace: emrcontainersNamespace },
  rules: [
    { apiGroups: [''], resources: ['namespaces'], verbs: ['get'] },
    { apiGroups: [''], resources: ['serviceaccounts', 'services', 'configmaps', 'events', 'pods', 'pods/log'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'deletecollection', 'annotate', 'patch', 'label'] },
    { apiGroups: [''], resources: ['secrets'], verbs: ['create', 'patch', 'delete', 'watch'] },
    { apiGroups: ['apps'], resources: ['statefulsets', 'deployments'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'annotate', 'patch', 'label'] },
    { apiGroups: ['batch'], resources: ['jobs'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'annotate', 'patch', 'label'] },
    { apiGroups: ['extensions', 'networking.k8s.io'], resources: ['ingresses'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'annotate', 'patch', 'label'] },
    { apiGroups: ['rbac.authorization.k8s.io'], resources: ['roles', 'rolebindings'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'deletecollection', 'annotate', 'patch', 'label'] },
    { apiGroups: [''], resources: ['persistentvolumeclaims'], verbs: ['create', 'list', 'delete']},
    { apiGroups: ['scheduling.volcano.sh'], resources: ['podgroups'], verbs: ['get', 'list', 'watch', 'create', 'delete', 'update']}
  ],
});
emrRole.node.addDependency(namespace)

const emrRoleBind = cluster.addManifest('EMRContainersRoleBind', {
  apiVersion: 'rbac.authorization.k8s.io/v1',
  kind: 'RoleBinding',
  metadata: { name: 'emr-containers', namespace: emrcontainersNamespace },
  subjects: [{ kind: 'User', name: 'emr-containers', apiGroup: 'rbac.authorization.k8s.io' }],
  roleRef: { kind: 'Role', name: 'emr-containers', apiGroup: 'rbac.authorization.k8s.io' },
});
emrRoleBind.node.addDependency(emrRole);

const emrContainersRole = iam.Role.fromRoleName(this, 'EMRContainersRole',
  'AWSServiceRoleForAmazonEMRContainers')

cluster.awsAuth.addRoleMapping(emrContainersRole, {
  username: "emr-containers",
  groups: []
})

クラスタの登録

EMR にクラスタを登録する。

new emrcontainers.CfnVirtualCluster(this, 'virtual-cluster', {
  name: 'test_virtual_cluster',
  containerProvider: {
    id: cluster.clusterName,
    type: 'EKS',
    info: {
      eksInfo: {
        namespace: 'default'
      }
    }
  }
})

Execution role の作成

ジョブの実行に用いられる Role を作成する。 ジョブの送信時に作成される EMR on EKS の ServiceAccount が OIDC で assume できるような trust policy を設定している。

createJobExecutionRole(cluster: eks.Cluster, emrcontainersNamespace: string) {
  const roleName = "emr-on-eks-test-job-execution-role"
  const bs36 = basex("0123456789abcdefghijklmnopqrstuvwxyz")
  const base36RoleName = bs36.encode(new TextEncoder().encode(roleName))

  return new iam.Role(this, 'EMRJobExecutionRole', {
    roleName: roleName,
    assumedBy: new iam.WebIdentityPrincipal(
      cluster.openIdConnectProvider.openIdConnectProviderArn,
      {
        "StringLike": new cdk.CfnJson(this, 'JobExecutionRoleStringEquals', {
          value: {
            [`${cluster.clusterOpenIdConnectIssuer}:sub`]: `system:serviceaccount:${emrcontainersNamespace}:emr-containers-sa-*-*-${this.account}-${base36RoleName}`
          }
        })
      }
    ),
    inlinePolicies: {
      'job-execution-policy': new iam.PolicyDocument({
        statements: [
          new iam.PolicyStatement({
            effect: iam.Effect.ALLOW,
            actions: [
              's3:PutObject',
              's3:GetObject',
              's3:ListBucket'
            ],
            resources: ['arn:aws:s3:::*'],
          }),
          new iam.PolicyStatement({
            effect: iam.Effect.ALLOW,
            actions: [
              'logs:PutLogEvents',
              'logs:CreateLogStream',
              'logs:DescribeLogGroups',
              'logs:DescribeLogStreams'
            ],
            resources: ['arn:aws:logs:*:*:*'],
          })
        ]
      })
    }
  })
}

trust policy は次のコマンドでも追加できる。 もしこの後エラーになった場合は、このコマンドで追加されたものと CDK で作成したものが一致するかを確認するとよい。

$ aws emr-containers update-role-trust-policy \
    --cluster-name $CLUSTER_NAME \
    --namespace $NAMESPACE \
    --role-name $EXECUTION_ROLE_NAME
Successfully updated trust policy of role emr-on-eks-test-job-execution-role

Spark ジョブの実行

start-job-run すると まず EMR の job-submitter の pod が立ち上がり、その後 Spark の driver が起動する。 リソースが足りないと pending する。 driver が S3 の AccessDenied で失敗する場合は execution-role が assume できていない可能性があるので、trust policy や IAM OIDC プロバイダーが登録されているか確認する。

dynamic resource allocation (DAR)を有効にして executor が増減するようにしている。 現状 Spark on Kubernetes が external shuffle service をサポートしていないため、 代わりに shuffle file tracking を有効にして shuffle data を持つ executor が削除されないようにする必要がある。

$ VIRTUAL_CLUSTER_ID=xxxxx
$ EXECUTION_ROLE_ARN=arn:aws:iam::xxxxx:role/emr-on-eks-test-job-execution-role

$ aws emr-containers start-job-run \
  --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
  --name pi-2 \
  --execution-role-arn $EXECUTION_ROLE_ARN \
  --release-label emr-6.13.0-latest \
  --job-driver '{
      "sparkSubmitJobDriver": {
          "entryPoint": "s3://<mybucket>/pi.py",
          "sparkSubmitParameters": "--conf spark.executor.instances=1 --conf spark.executor.memory=2G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
      }
  }' \
  --configuration-overrides '{
    "monitoringConfiguration": {
      "persistentAppUI": "ENABLED",
      "s3MonitoringConfiguration": {
        "logUri": "s3://<mybucket>"
      }
    },
    "applicationConfiguration": [
      {
        "classification": "spark-defaults", 
        "properties": {
          "spark.dynamicAllocation.enabled":"true",
          "spark.dynamicAllocation.shuffleTracking.enabled":"true",
          "spark.dynamicAllocation.minExecutors":"1",
          "spark.dynamicAllocation.maxExecutors":"100",
          "spark.dynamicAllocation.initialExecutors":"1"
         }
      }
    ]
  }'
  
$ kubectl get pod -n emrcontainers
NAME                               READY   STATUS    RESTARTS   AGE
000000032u7ddboo153-p52fj          3/3     Running    0          53s
pythonpi-a6561a8bc76bc424-exec-1   1/2     NotReady   0          25s
pythonpi-a6561a8bc76bc424-exec-2   0/2     Pending    0          17s
spark-000000032u7ddboo153-driver   1/2     NotReady   0          40s

$ aws s3 cp s3://<mybucket>/<cluster_id>/jobs/<job_id>/containers/spark-000000032psj6spu6q7/spark-000000032psj6spu6q7-driver/stdout.gz .
$ gzcat stdout.gz
Pi is roughly 3.140920

トラブルシューティング

Glue Data Catalog を参照する場合は次の設定を追加しないと TABLE_OR_VIEW_NOT_FOUND になる。

--configuration-overrides '{
    "applicationConfiguration": [
        {
            "classification": "spark-defaults",
            "properties": {
                "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
                "spark.sql.catalogImplementation": "hive"
            }
        }
    ]
}'

ノードの増減などの際に job-submitter や driver の pod が止まってしまうと SparkContext was shut down などでジョブが失敗してしまうので spark.kubernetes.driver.annotation.<AnnotationName> で annotation を付けるなどして回避する。 ただし job-submitter に関してはこの方法が現状取れないので nodeSelector で増減しないノードグループのノードを指定するか、何らかの方法で annotation を付ける必要がある。

CDK で EKS クラスタに Karpenter をインストールし柔軟で高速なオートスケールを行う - sambaiz-net

"spark.kubernetes.driver.annotation.karpenter.sh/do-not-disrupt": "true"
"jobsubmitter.node.selector.<label>": "<value>",

execution role を assume するのに必要な情報は環境変数 AWS_ROLE_ARN や AWS_WEB_IDENTITY_TOKEN_FILE にあるが、 AWS SDK for Java は “software.amazon.awssdk” % “sts” が依存にないと次の WARN ログを出力し 優先順位の低い instance profile の Role を使ってしまう。

WARN WebIdentityCredentialsUtils: To use web identity tokens, the 'sts' service module must be on the class path.

参考

EKS Cluster v1.22以降の作り方がCDKで変わった話 #AWS - Qiita

How to Make Robust EMR on EKS pipelines + Airflow | by Jun Seokjun Han | Medium