CDK で立ち上げた EKS クラスタを EMR on EKS に登録し Spark のジョブを動かす

kubernetesawssparketl

EMR on EKS は EKS 上で Spark を動かす機能。 通常の EMR が Hadoop クラスタの管理も行うのに対して、EMR on EKS はコンテナの起動のみを担う。

AWS CLIでEMRクラスタを立ち上げSparkのアプリケーションを実行する - sambaiz-net

Kubernetes で動かすことで、管理やモニタリングの際に Kubernetes 用のツールや機能を利用でき、既存のクラスタがあれば余っているリソースを有効活用できる。 他のアプリケーションでの Kubernetes の知見を集計や分析で活かしたり、その逆もあるかもしれない。 ただ、Kubernetes および EKS の知識はある程度必要。通常の EMR は YARN の知識がなくとも使い始められるが、こちらはないと動かすのも大変だと思う。

ちなみに、Docker については EMR 6.x (Hadoop 3) から サポートされている

料金はリクエストした vCPU とメモリに対してかかる。

クラスタの立ち上げ

CDK で EKS クラスタを立ち上げる。

const vpc = new ec2.Vpc(this, 'vpc', {
  cidr: '10.0.0.0/16',
  maxAzs: 2,
  subnetConfiguration: [
    {
      cidrMask: 18,
      name: 'public',
      subnetType: ec2.SubnetType.PUBLIC,
    },
    {
      cidrMask: 18,
      name: 'private',
      subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS,
    },
  ]
})
cdk.Tags.of(vpc).add("Name", this.stackName)

const mastersRole = new iam.Role(this, 'masters-role', {
  assumedBy: new iam.CompositePrincipal(
    new iam.ServicePrincipal('eks.amazonaws.com'),
    new iam.AccountRootPrincipal(),
  ),
  managedPolicies: [
    iam.ManagedPolicy.fromAwsManagedPolicyName('AmazonEKSClusterPolicy'),
  ],
  inlinePolicies: {
    'eksctl-iamidentitymapping': new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          effect: iam.Effect.ALLOW,
          actions: ['eks:DescribeCluster'],
          resources: ['*'],
        })
      ]
    })
  }
})

return new eks.Cluster(this, 'cluster', {
  vpc,
  mastersRole,
  clusterName: 'emr-on-eks-test',
  version: eks.KubernetesVersion.V1_23,
  defaultCapacity: 3,
  defaultCapacityInstance: new ec2.InstanceType('m5.large')
})

この後実行する eksctl create iamidentitymapping で Kubernetes の権限に加えて eks:DescribeCluster が必要になるので master role にこれを追加し、assume する profile を config に追加した。 代わりにユーザーの role を aws-auth に追加して Kubernetes の権限を与えてもよい。

$ brew tap weaveworks/tap
$ brew install weaveworks/tap/eksctl
$ eksctl version
0.124.0

$ cat ~/.aws/config
...
[profile eks-master]
region = ap-northeast-1
role_arn = arn:aws:iam::xxxxx:role/EmrOnEksTestStack-clusterMastersRoleEABFBB9C-10Q5X4BE7QN51
source_profile = <user_profile_name> 

クラスタへのアクセス権限の付与

EMR on EKS の service-linked role に紐づいた User に Kubernetes role を bind する。

RBACが有効なGKEでHelmを使う - sambaiz-net

マニュアルで行うこともできるが、次のコマンドを実行するとリソースや設定が自動で追加される。

$ CLUSTER_NAME=emr-on-eks-test
$ NAMESPACE=default

$ eksctl create iamidentitymapping \
    --cluster $CLUSTER_NAME \
    --namespace $NAMESPACE \
    --service-name emr-containers \
    --profile eks-master
2022-12-31 17:40:34 []  created "default:Role.rbac.authorization.k8s.io/emr-containers"
2022-12-31 17:40:34 []  created "default:RoleBinding.rbac.authorization.k8s.io/emr-containers"
2022-12-31 17:40:34 []  adding identity "arn:aws:iam::xxxxx:role/AWSServiceRoleForAmazonEMRContainers" to auth ConfigMap

bind されていることが確認できる。

$ kubectl describe -n kube-system configmap aws-auth
...
mapRoles:
  ----
...
- rolearn: arn:aws:iam::xxxxx:role/AWSServiceRoleForAmazonEMRContainers
  username: emr-containers
...

$ kubectl get rolebinding -n $NAMESPACE emr-containers

  NAME             ROLE                  AGE
  emr-containers   Role/emr-containers   14m

クラスタの登録

EMR にクラスタを登録する。

new emrcontainers.CfnVirtualCluster(this, 'virtual-cluster', {
  name: 'test_virtual_cluster',
  containerProvider: {
    id: cluster.clusterName,
    type: 'EKS',
    info: {
      eksInfo: {
        namespace: 'default'
      }
    }
  }
})

Execution role の作成

Job の実行に必要な Policy を付与した Role を作成する。

new iam.Role(this, 'job-exectuion-role', {
  roleName: "emr-on-eks-test-job-execution-role",
  assumedBy: new iam.AccountRootPrincipal(), // update later
  inlinePolicies: {
    'job-execution-policy': new iam.PolicyDocument({
      statements: [
        new iam.PolicyStatement({
          effect: iam.Effect.ALLOW,
          actions: [
            's3:PutObject',
            's3:GetObject',
            's3:ListBucket',
          ],
          resources: ['arn:aws:s3:::*'],
        }),
        new iam.PolicyStatement({
          effect: iam.Effect.ALLOW,
          actions: [
            'logs:PutLogEvents',
            'logs:CreateLogStream',
            'logs:DescribeLogGroups',
            'logs:DescribeLogStreams'
          ],
          resources: ['arn:aws:logs:*:*:*'],
        })
      ]
    })
  }
})

次のコマンドを実行すると ジョブの送信時に作成される EMR on EKS の service account に対する trust policy が追加される。

$ EXECUTION_ROLE_NAME=emr-on-eks-test-job-execution-role

$ aws emr-containers update-role-trust-policy \
    --cluster-name $CLUSTER_NAME \
    --namespace $NAMESPACE \
    --role-name $EXECUTION_ROLE_NAME
Successfully updated trust policy of role emr-on-eks-test-job-execution-role

{
    "Effect": "Allow",
    "Principal": {
        "Federated": "arn:aws:iam::xxxxx:oidc-provider/oidc.eks.ap-northeast-1.amazonaws.com/id/aaaaa"
    },
    "Action": "sts:AssumeRoleWithWebIdentity",
    "Condition": {
        "StringLike": {
            "oidc.eks.ap-northeast-1.amazonaws.com/id/aaaaa:sub": "system:serviceaccount:default:emr-containers-sa-*-*-xxxxx-<BASE36_ENCODED_ROLE_NAME>"
        }
    }
}

クラスタ の issuer url は次のコマンドで取得できる。

$ aws eks describe-cluster --name $CLUSTER_NAME --query "cluster.identity.oidc.issuer" --output text 
https://oidc.eks.ap-northeast-1.amazonaws.com/id/aaaaa

IAM OIDC プロバイダーに登録する。

GitHub ActionsからOIDCでassumeできるRoleをCDKで作成する - sambaiz-net

$ eksctl utils associate-iam-oidc-provider --cluster $CLUSTER_NAME --approve
2023-01-02 13:39:28 []  will create IAM Open ID Connect provider for cluster "emr-on-eks-test" in "ap-northeast-1"
2023-01-02 13:39:29 []  created IAM Open ID Connect provider for cluster "emr-on-eks-test" in "ap-northeast-1"

$ aws iam list-open-id-connect-providers 
{
    "OpenIDConnectProviderList": [
        {
            "Arn": "arn:aws:iam::xxxxx:oidc-provider/oidc.eks.ap-northeast-1.amazonaws.com/id/aaaaa"
        },
        ....
    ]
}

Spark ジョブの実行

start-job-run すると pod が立ち上がる。リソースが足りないと pending する。 driver が S3 の AccessDenied で失敗する場合は execution-role が assume できていない可能性があるので、trust policy や IAM OIDC プロバイダーが登録されているか確認する。

$ VIRTUAL_CLUSTER_ID=xxxxx
$ EXECUTION_ROLE_ARN=arn:aws:iam::xxxxx:role/emr-on-eks-test-job-execution-role

$ aws emr-containers start-job-run \
  --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
  --name pi-2 \
  --execution-role-arn $EXECUTION_ROLE_ARN \
  --release-label emr-6.7.0-latest \
  --job-driver '{
      "sparkSubmitJobDriver": {
          "entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
          "sparkSubmitParameters": "--conf spark.executor.instances=1 --conf spark.executor.memory=2G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
      }
  }'

{
    "id": "000000031bdlkdo0q1k",
    "name": "pi-2",
    "arn": "arn:aws:emr-containers:ap-northeast-1:xxxxx:/virtualclusters/xxxxx/jobruns/000000031bdlkdo0q1k",
    "virtualClusterId": "xxxxx"
}

$ kubectl get pod
NAME                               READY   STATUS              RESTARTS   AGE
000000031bdlkdo0q1k-sn72c          2/2     Running             0          21s
pythonpi-c4ae29856dad6d33-exec-1   0/1     ContainerCreating   0          1s
spark-000000031bdlkdo0q1k-driver   2/2     Running             0          12s

$ kubectl describe pod spark-000000031bdlkdo0q1k-driver
...
spark-kubernetes-driver:
  Image:         059004520145.dkr.ecr.ap-northeast-1.amazonaws.com/spark/emr-6.7.0:latest
  Args:
      driver
      --properties-file
      /usr/lib/spark/conf/spark.properties
      --class
      org.apache.spark.deploy.PythonRunner
      local:///usr/lib/spark/examples/src/main/python/pi.py
  ...