Launch an EKS cluster and register it to EMR on EKS with CDK to run Spark jobs

kubernetesawssparketl

EMR on EKS is a managed service that provides APIs for running Spark applications on EKS, performance optimized runtime, and History Server etc. While normal EMR also manages Hadoop clusters, EMR on EKS is only responsible for starting containers.

Launch an EMR cluster with AWS CLI and run Spark applications - sambaiz-net

By running on Kubernetes, you can use tools and functions for Kubernetes to manage and monitor, and utilize resources left over if you have an existing cluster. It also enables to use knowledge about Kubernetes learned from other applications for aggregation and analysis, and also may be vice versa. However, some knowledge about Kubernetes and EKS is required. You can start using normal EMR without knowledge of YARN, but I think EMR on EKS will be difficult to even operate with no knowledge.

By the way, Docker is supported from EMR 6.x (Hadoop 3).

Pricing is based on requested the amount of vCPU and memory.

In this article, I lanchced an EKS cluster and register it to EMR on EKS with CDK, and confirmed that Spark jobs can run on it. The entire code is on GitHub.

Launch a cluster

Since v1.22, you need to pass kubectlLayer of an external package to use the latest kubectl on addManifest() etc.

import { KubectlV27Layer } from '@aws-cdk/lambda-layer-kubectl-v27';

const mastersRole = new iam.Role(this, 'MastersRole', {
  assumedBy: new iam.CompositePrincipal(
    new iam.ServicePrincipal('eks.amazonaws.com'),
    new iam.AccountRootPrincipal(),
  ),
  managedPolicies: [
    iam.ManagedPolicy.fromAwsManagedPolicyName('AmazonEKSClusterPolicy'),
  ],
})

return new eks.Cluster(this, 'EKSCluster', {
  vpc,
  mastersRole,
  clusterName: clusterName,
  version: eks.KubernetesVersion.V1_27,
  kubectlLayer: new KubectlV27Layer(this, 'KubectlLayer'),
  defaultCapacity: 3,
  defaultCapacityInstance: new ec2.InstanceType('m5.large')
})

If you later want to grant cluster access to EMR with “eksctl create iamidentitymapping” instead of using CDK, you need eks:DescribeCluster in addition to Kubernetes permissions, so add this to the master role or add the user’s role to aws-auth to grant Kubernetes permissions.

inlinePolicies: {
  'eksctl-iamidentitymapping': new iam.PolicyDocument({
    statements: [
      new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        actions: ['eks:DescribeCluster'],
        resources: ['*'],
      })
    ]
  })
}

In that case, also add the profile to assume to the config.

$ brew tap weaveworks/tap
$ brew install weaveworks/tap/eksctl
$ eksctl version
0.124.0

$ cat ~/.aws/config
...
[profile eks-master]
region = us-east-1
role_arn = arn:aws:iam::xxxxx:role/EmrOnEksTestStack-clusterMastersRoleEABFBB9C-10Q5X4BE7QN51
source_profile = <user_profile_name> 

Grant the cluster access

Bind a Kubernetes role to a User linked to service-linked role for EMR on EKS.

RBACが有効なGKEでHelmを使う - sambaiz-net

a. “eksctl create iamidentitymapping” way

First, checked what resources are created by eksctl as per the documentation.

$ CLUSTER_NAME=emr-on-eks-test
$ NAMESPACE=default

$ eksctl create iamidentitymapping \
    --cluster $CLUSTER_NAME \
    --namespace $NAMESPACE \
    --service-name emr-containers \
    --profile eks-master
2022-12-31 17:40:34 []  created "default:Role.rbac.authorization.k8s.io/emr-containers"
2022-12-31 17:40:34 []  created "default:RoleBinding.rbac.authorization.k8s.io/emr-containers"
2022-12-31 17:40:34 []  adding identity "arn:aws:iam::xxxxx:role/AWSServiceRoleForAmazonEMRContainers" to auth ConfigMap

$ kubectl describe -n kube-system configmap aws-auth
...
mapRoles:
  ----
...
- rolearn: arn:aws:iam::xxxxx:role/AWSServiceRoleForAmazonEMRContainers
  username: emr-containers
...

b. CDK way

I expressed manifests existing in the eksctl repository.

const emrRole = cluster.addManifest('EMRContainersRole', {
  apiVersion: 'rbac.authorization.k8s.io/v1',
  kind: 'Role',
  metadata: { name: 'emr-containers', namespace: emrcontainersNamespace },
  rules: [
    { apiGroups: [''], resources: ['namespaces'], verbs: ['get'] },
    { apiGroups: [''], resources: ['serviceaccounts', 'services', 'configmaps', 'events', 'pods', 'pods/log'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'deletecollection', 'annotate', 'patch', 'label'] },
    { apiGroups: [''], resources: ['secrets'], verbs: ['create', 'patch', 'delete', 'watch'] },
    { apiGroups: ['apps'], resources: ['statefulsets', 'deployments'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'annotate', 'patch', 'label'] },
    { apiGroups: ['batch'], resources: ['jobs'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'annotate', 'patch', 'label'] },
    { apiGroups: ['extensions', 'networking.k8s.io'], resources: ['ingresses'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'annotate', 'patch', 'label'] },
    { apiGroups: ['rbac.authorization.k8s.io'], resources: ['roles', 'rolebindings'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'deletecollection', 'annotate', 'patch', 'label'] },
    { apiGroups: [''], resources: ['persistentvolumeclaims'], verbs: ['create', 'list', 'delete']},
    { apiGroups: ['scheduling.volcano.sh'], resources: ['podgroups'], verbs: ['get', 'list', 'watch', 'create', 'delete', 'update']}
  ],
});
emrRole.node.addDependency(namespace)

const emrRoleBind = cluster.addManifest('EMRContainersRoleBind', {
  apiVersion: 'rbac.authorization.k8s.io/v1',
  kind: 'RoleBinding',
  metadata: { name: 'emr-containers', namespace: emrcontainersNamespace },
  subjects: [{ kind: 'User', name: 'emr-containers', apiGroup: 'rbac.authorization.k8s.io' }],
  roleRef: { kind: 'Role', name: 'emr-containers', apiGroup: 'rbac.authorization.k8s.io' },
});
emrRoleBind.node.addDependency(emrRole);

const emrContainersRole = iam.Role.fromRoleName(this, 'EMRContainersRole',
  'AWSServiceRoleForAmazonEMRContainers')

cluster.awsAuth.addRoleMapping(emrContainersRole, {
  username: "emr-containers",
  groups: []
})

Register the cluster

Register the cluster to EMR.

new emrcontainers.CfnVirtualCluster(this, 'virtual-cluster', {
  name: 'test_virtual_cluster',
  containerProvider: {
    id: cluster.clusterName,
    type: 'EKS',
    info: {
      eksInfo: {
        namespace: 'default'
      }
    }
  }
})

Create an execution role

Create a Role used for Job execution. Set a trust policy so that ServiceAccount for EMR on EKS, which is created when submitting a job, can assume it with OIDC.

createJobExecutionRole(cluster: eks.Cluster, emrcontainersNamespace: string) {
  const roleName = "emr-on-eks-test-job-execution-role"
  const bs36 = basex("0123456789abcdefghijklmnopqrstuvwxyz")
  const base36RoleName = bs36.encode(new TextEncoder().encode(roleName))

  return new iam.Role(this, 'EMRJobExecutionRole', {
    roleName: roleName,
    assumedBy: new iam.WebIdentityPrincipal(
      cluster.openIdConnectProvider.openIdConnectProviderArn,
      {
        "StringLike": new cdk.CfnJson(this, 'JobExecutionRoleStringEquals', {
          value: {
            [`${cluster.clusterOpenIdConnectIssuer}:sub`]: `system:serviceaccount:${emrcontainersNamespace}:emr-containers-sa-*-*-${this.account}-${base36RoleName}`
          }
        })
      }
    ),
    inlinePolicies: {
      'job-execution-policy': new iam.PolicyDocument({
        statements: [
          new iam.PolicyStatement({
            effect: iam.Effect.ALLOW,
            actions: [
              's3:PutObject',
              's3:GetObject',
              's3:ListBucket'
            ],
            resources: ['arn:aws:s3:::*'],
          }),
          new iam.PolicyStatement({
            effect: iam.Effect.ALLOW,
            actions: [
              'logs:PutLogEvents',
              'logs:CreateLogStream',
              'logs:DescribeLogGroups',
              'logs:DescribeLogStreams'
            ],
            resources: ['arn:aws:logs:*:*:*'],
          })
        ]
      })
    }
  })
}

Following command can add the trust policy as well. If you get an error after this, you would be good to check that what was added with this command matches what was created with CDK.

$ EXECUTION_ROLE_NAME=emr-on-eks-test-job-execution-role

$ aws emr-containers update-role-trust-policy \
    --cluster-name $CLUSTER_NAME \
    --namespace $NAMESPACE \
    --role-name $EXECUTION_ROLE_NAME
Successfully updated trust policy of role emr-on-eks-test-job-execution-role

Run Spark jobs

start-job-run launches pods. If the resource is insufficient, it will be pending. If the driver fails with S3 AccessDenied, execution-role may be failed to be assumed, so check if trust policy and IAM OIDC provider are registered.

Enable dynamic resource allocation (DAR) to increase or decrease the number of executors based on demand. Spark on Kubernetes doesn’t support external shuffle service for now, so you need to enable shuffle file tracking instead to prevent executors storing shuffle data from being removed.

$ VIRTUAL_CLUSTER_ID=xxxxx
$ EXECUTION_ROLE_ARN=arn:aws:iam::xxxxx:role/emr-on-eks-test-job-execution-role

$ aws emr-containers start-job-run \
  --virtual-cluster-id $VIRTUAL_CLUSTER_ID \
  --name pi-2 \
  --execution-role-arn $EXECUTION_ROLE_ARN \
  --release-label emr-6.13.0-latest \
  --job-driver '{
      "sparkSubmitJobDriver": {
          "entryPoint": "s3://<mybucket>/pi.py",
          "sparkSubmitParameters": "--conf spark.executor.instances=1 --conf spark.executor.memory=2G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
      }
  }' \
  --configuration-overrides '{
    "monitoringConfiguration": {
      "persistentAppUI": "ENABLED",
      "s3MonitoringConfiguration": {
        "logUri": "s3://<mybucket>"
      }
    },
    "applicationConfiguration": [
      {
        "classification": "spark-defaults", 
        "properties": {
          "spark.dynamicAllocation.enabled":"true",
          "spark.dynamicAllocation.shuffleTracking.enabled":"true",
          "spark.dynamicAllocation.minExecutors":"1",
          "spark.dynamicAllocation.maxExecutors":"100",
          "spark.dynamicAllocation.initialExecutors":"1"
        }
      }
    ]
  }'
  
$ kubectl get pod -n emrcontainers
NAME                               READY   STATUS    RESTARTS   AGE
000000032u7ddboo153-p52fj          3/3     Running    0          53s
pythonpi-a6561a8bc76bc424-exec-1   1/2     NotReady   0          25s
pythonpi-a6561a8bc76bc424-exec-2   0/2     Pending    0          17s
spark-000000032u7ddboo153-driver   1/2     NotReady   0          40s

$ aws s3 cp s3://<mybucket>/<cluster_id>/jobs/<job_id>/containers/spark-000000032psj6spu6q7/spark-000000032psj6spu6q7-driver/stdout.gz .
$ gzcat stdout.gz
Pi is roughly 3.140920

If you pass log4j2 settings to classification: spark-log4j, you can patch /etc/spark/conf/log4j2.properties.

{
    "classification": "spark-log4j",
    "properties": {
        "rootLogger.level": "warn",
        
        "appender.myapp.type": "RollingFile",
        "appender.myapp.name": "MyAppLogFile",
        "appender.myapp.filePattern": "/var/log/spark/user/myapp/myapp-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz",
        "appender.myapp.policies.type": "Policies",
        "appender.myapp.policies.time.type": "TimeBasedTriggeringPolicy",
        "appender.myapp.policies.time.interval": "1",

        "logger.myapp.name": "net.sambaiz",
        "logger.myapp": "debug, MyAppLogFile",
        "logger.myapp.additivity": "false"
    }
}

The rootLogger has an appender of stderr, and stdout/stderr are output to spark.yarn.app.container.log.dir in log4j2.properties, and sent to the destination set in monitoringConfiguration by fluentd sidecar.

$ ls /var/log/spark/user/spark-******-driver/
stderr	stderr-s3-container-log-in-tail.pos  stdout  stdout-s3-container-log-in-tail.pos

$ cat /etc/fluent/fluent.conf
...
<source>
  @type tail
  path "#{ENV['K8S_SPARK_LOG_URL_STDERR']}"
  ...
</source>
...
<match chicago-spark-s3-container-stdout-logs>
  @type s3
  s3_bucket <mybucket>
  s3_region <region>
  path "emr-logs/****/containers/#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_CONTAINER_ID']}"
  s3_object_key_format %{path}/stdout.gz
  ...
</match>
...

Therefore, if you output logs to stdout, you can send them separately from the Spark logs. Besides, if you use logger that doesn’t have appenders of stdout/stderr and additivity is false, it won’t be sent.

package net.sambaiz.myapp
import org.apache.log4j.Logger

object MyApp {
  def main(input: Array[String]) {
    val logger = Logger.getLogger(getClass().getName())
    println("aaaa")
    logger.warn("bbbb")
  }
}

You can pass a pod template with “spark.kubernetes.(driver|executor).podTemplateFile=s3://bucket/template.yml” to add a fluentbit sidecar for your own logs, or spcify priorityClassName etc.

Priority of K8s pods and preemption - sambaiz-net

apiVersion: v1
kind: Pod
spec:
  priorityClassName: low-priority
  containers:
  - name: spark-kubernetes-driver

Troubleshooting

TABLE_OR_VIEW_NOT_FOUND when referring to the Glue Data Catalog

When referring Glue Data Catalog, TABLE_OR_VIEW_NOT_FOUND error occurs unless the following settings are added.

--configuration-overrides '{
    "applicationConfiguration": [
        {
            "classification": "spark-defaults",
            "properties": {
                "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
                "spark.sql.catalogImplementation": "hive"
            }
        }
    ]
}'

DiskPressure on the node

If shuffled data does not fit into memory, spill will occur and the data will be written to storage. By default, the written volume is emptyDir, which can cause disk pressure on the node.

Pods evicted without waiting for terminationGracePeriodSeconds due to lack resources of Kubernetes nodes - sambaiz-net

volumes:
- emptyDir: {}
  name: spark-local-dir-1

By configuring the following:

{
  "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName": "OnDemand",
  "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass": "gp2",
  "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit": "50Gi",
  "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path": "/data",
  "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly": "false"
}

you can dynamically create PVCs and mount EBS, etc.

volumes:
- name: spark-local-dir-1
  persistentVolumeClaim:
    claimName: testjob-xxxxxx-exec-1-pvc-0

Install the corresponding CSI driver.

const role = new iam.Role(this, 'EBSCSIRole', {
  assumedBy: new iam.WebIdentityPrincipal(
    cluster.openIdConnectProvider.openIdConnectProviderArn,
    {
      StringEquals: new CfnJson(this, 'EBSCSIRoleStringEquals', {
        value: {
            [`${cluster.clusterOpenIdConnectIssuer}:aud`]: 
              'sts.amazonaws.com',
            [`${cluster.clusterOpenIdConnectIssuer}:sub`]:
              'system:serviceaccount:kube-system:ebs-csi-controller-sa',
        },
      }),
    }
  ),
  managedPolicies: [
    iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AmazonEBSCSIDriverPolicy')
  ]
})

new eks.CfnAddon(this, 'EBSCSIAddon', {
  addonName: 'aws-ebs-csi-driver',
  clusterName: cluster.clusterName,
  serviceAccountRoleArn:role.roleArn,
})

Job failure due to SparkContext was shut down

If job-submitter or driver pods are stopped due to node scaling, SparkContext shuts down and the job fails, so you should avoid this by adding spark.kubernetes.driver.annotation.<AnnotationName> etc. However, this method is currently not available for job-submitter, so you need to specify nodes in a node group that doesn’t scale with nodeSelector, or add the annotation in some way.

Install Karpenter on an EKS cluster with CDK to auto-scale flexibility and quickly - sambaiz-net

"spark.kubernetes.driver.annotation.karpenter.sh/do-not-disrupt": "true"
"jobsubmitter.node.selector.<label>": "<value>",

PermissionDenied when using AWS SDK

Information you need to assume the execution role is in environment variables AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE, but if “software.amazon.awssdk” % “sts” isn’t included in dependency,
AWS SDK for Java outputs the following WARN logs and uses the instance profile’s role with lower order instead.

WARN WebIdentityCredentialsUtils: To use web identity tokens, the 'sts' service module must be on the class path.

References

EKS Cluster v1.22以降の作り方がCDKで変わった話 #AWS - Qiita

How to Make Robust EMR on EKS pipelines + Airflow | by Jun Seokjun Han | Medium