EMR on EKS は EKS 上で Spark アプリケーションを動かすための API やパフォーマンス最適化されたランタイム、History Serverなどを提供するマネージドサービス。 通常の EMR が Hadoop クラスタの管理も行うのに対して、EMR on EKS はコンテナの起動のみを担う。
AWS CLIでEMRクラスタを立ち上げSparkのアプリケーションを実行する - sambaiz-net
Kubernetes で動かすことで、管理やモニタリングの際に Kubernetes 用のツールや機能を利用でき、既存のクラスタがあれば余っているリソースを有効活用できる。 他のアプリケーションでの Kubernetes の知見を集計や分析で活かしたり、その逆もあるかもしれない。 ただ、Kubernetes および EKS の知識はある程度必要。通常の EMR は YARN の知識がなくとも使い始められるが、こちらはないと動かすのも大変だと思う。
ちなみに、Docker については EMR 6.x (Hadoop 3) から サポートされている。
料金はリクエストした vCPU とメモリに対してかかる。
今回は CDK で EKS クラスタの立ち上げから EMR の登録までを行い、Spark のジョブが実行できることを確認する。 全体のコードは GitHub にある。
クラスタの立ち上げ
v1.22 から addManifest() などの際に最新バージョンの kubectl が使われるようにするには 別パッケージの kubectlLayer を渡す必要ができた。
import { KubectlV27Layer } from '@aws-cdk/lambda-layer-kubectl-v27';
const mastersRole = new iam.Role(this, 'MastersRole', {
assumedBy: new iam.CompositePrincipal(
new iam.ServicePrincipal('eks.amazonaws.com'),
new iam.AccountRootPrincipal(),
),
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName('AmazonEKSClusterPolicy'),
],
})
return new eks.Cluster(this, 'EKSCluster', {
vpc,
mastersRole,
clusterName: clusterName,
version: eks.KubernetesVersion.V1_27,
kubectlLayer: new KubectlV27Layer(this, 'KubectlLayer'),
defaultCapacity: 3,
defaultCapacityInstance: new ec2.InstanceType('m5.large')
})
この後 CDK ではなく eksctl create iamidentitymapping でクラスタへのアクセス権限を付与する場合は、 Kubernetes の権限に加えて eks:DescribeCluster が必要になるので master role にこれを追加するか、 ユーザーの role を aws-auth に追加して Kubernetes の権限を与える必要がある。
inlinePolicies: {
'eksctl-iamidentitymapping': new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['eks:DescribeCluster'],
resources: ['*'],
})
]
})
}
その場合は、併せて assume する profile を config に追加する。
$ brew tap weaveworks/tap
$ brew install weaveworks/tap/eksctl
$ eksctl version
0.124.0
$ cat ~/.aws/config
...
[profile eks-master]
region = us-east-1
role_arn = arn:aws:iam::xxxxx:role/EmrOnEksTestStack-clusterMastersRoleEABFBB9C-10Q5X4BE7QN51
source_profile = <user_profile_name>
クラスタへのアクセス権限の付与
EMR on EKS の service-linked role に紐づいた User に Kubernetes の Role を bind する。
RBACが有効なGKEでHelmを使う - sambaiz-net
a. eksctl create iamidentitymapping による方法
まずはドキュメント通り eksctl でどのようなリソースが作成されるか確認する。
$ CLUSTER_NAME=emr-on-eks-test
$ NAMESPACE=default
$ eksctl create iamidentitymapping \
--cluster $CLUSTER_NAME \
--namespace $NAMESPACE \
--service-name emr-containers \
--profile eks-master
2022-12-31 17:40:34 [ℹ] created "default:Role.rbac.authorization.k8s.io/emr-containers"
2022-12-31 17:40:34 [ℹ] created "default:RoleBinding.rbac.authorization.k8s.io/emr-containers"
2022-12-31 17:40:34 [ℹ] adding identity "arn:aws:iam::xxxxx:role/AWSServiceRoleForAmazonEMRContainers" to auth ConfigMap
$ kubectl describe -n kube-system configmap aws-auth
...
mapRoles:
----
...
- rolearn: arn:aws:iam::xxxxx:role/AWSServiceRoleForAmazonEMRContainers
username: emr-containers
...
b. CDK による方法
eksctl のリポジトリを見ると該当リソースのマニフェストがあったので、 これを CDK で表した。
const emrRole = cluster.addManifest('EMRContainersRole', {
apiVersion: 'rbac.authorization.k8s.io/v1',
kind: 'Role',
metadata: { name: 'emr-containers', namespace: emrcontainersNamespace },
rules: [
{ apiGroups: [''], resources: ['namespaces'], verbs: ['get'] },
{ apiGroups: [''], resources: ['serviceaccounts', 'services', 'configmaps', 'events', 'pods', 'pods/log'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'deletecollection', 'annotate', 'patch', 'label'] },
{ apiGroups: [''], resources: ['secrets'], verbs: ['create', 'patch', 'delete', 'watch'] },
{ apiGroups: ['apps'], resources: ['statefulsets', 'deployments'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'annotate', 'patch', 'label'] },
{ apiGroups: ['batch'], resources: ['jobs'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'annotate', 'patch', 'label'] },
{ apiGroups: ['extensions', 'networking.k8s.io'], resources: ['ingresses'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'annotate', 'patch', 'label'] },
{ apiGroups: ['rbac.authorization.k8s.io'], resources: ['roles', 'rolebindings'], verbs: ['get', 'list', 'watch', 'describe', 'create', 'edit', 'delete', 'deletecollection', 'annotate', 'patch', 'label'] },
{ apiGroups: [''], resources: ['persistentvolumeclaims'], verbs: ['create', 'list', 'delete']},
{ apiGroups: ['scheduling.volcano.sh'], resources: ['podgroups'], verbs: ['get', 'list', 'watch', 'create', 'delete', 'update']}
],
});
emrRole.node.addDependency(namespace)
const emrRoleBind = cluster.addManifest('EMRContainersRoleBind', {
apiVersion: 'rbac.authorization.k8s.io/v1',
kind: 'RoleBinding',
metadata: { name: 'emr-containers', namespace: emrcontainersNamespace },
subjects: [{ kind: 'User', name: 'emr-containers', apiGroup: 'rbac.authorization.k8s.io' }],
roleRef: { kind: 'Role', name: 'emr-containers', apiGroup: 'rbac.authorization.k8s.io' },
});
emrRoleBind.node.addDependency(emrRole);
const emrContainersRole = iam.Role.fromRoleName(this, 'EMRContainersRole',
'AWSServiceRoleForAmazonEMRContainers')
cluster.awsAuth.addRoleMapping(emrContainersRole, {
username: "emr-containers",
groups: []
})
クラスタの登録
EMR にクラスタを登録する。
new emrcontainers.CfnVirtualCluster(this, 'virtual-cluster', {
name: 'test_virtual_cluster',
containerProvider: {
id: cluster.clusterName,
type: 'EKS',
info: {
eksInfo: {
namespace: 'default'
}
}
}
})
Execution role の作成
ジョブの実行に用いられる Role を作成する。 ジョブの送信時に作成される EMR on EKS の ServiceAccount が OIDC で assume できるような trust policy を設定している。
createJobExecutionRole(cluster: eks.Cluster, emrcontainersNamespace: string) {
const roleName = "emr-on-eks-test-job-execution-role"
const bs36 = basex("0123456789abcdefghijklmnopqrstuvwxyz")
const base36RoleName = bs36.encode(new TextEncoder().encode(roleName))
return new iam.Role(this, 'EMRJobExecutionRole', {
roleName: roleName,
assumedBy: new iam.WebIdentityPrincipal(
cluster.openIdConnectProvider.openIdConnectProviderArn,
{
"StringLike": new cdk.CfnJson(this, 'JobExecutionRoleStringEquals', {
value: {
[`${cluster.clusterOpenIdConnectIssuer}:sub`]: `system:serviceaccount:${emrcontainersNamespace}:emr-containers-sa-*-*-${this.account}-${base36RoleName}`
}
})
}
),
inlinePolicies: {
'job-execution-policy': new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
's3:PutObject',
's3:GetObject',
's3:ListBucket'
],
resources: ['arn:aws:s3:::*'],
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'logs:PutLogEvents',
'logs:CreateLogStream',
'logs:DescribeLogGroups',
'logs:DescribeLogStreams'
],
resources: ['arn:aws:logs:*:*:*'],
})
]
})
}
})
}
trust policy は次のコマンドでも追加できる。 もしこの後エラーになった場合は、このコマンドで追加されたものと CDK で作成したものが一致するかを確認するとよい。
$ aws emr-containers update-role-trust-policy \
--cluster-name $CLUSTER_NAME \
--namespace $NAMESPACE \
--role-name $EXECUTION_ROLE_NAME
Successfully updated trust policy of role emr-on-eks-test-job-execution-role
Spark ジョブの実行
start-job-run すると まず EMR の job-submitter の pod が立ち上がり、その後 Spark の driver が起動する。 リソースが足りないと pending する。 driver が S3 の AccessDenied で失敗する場合は execution-role が assume できていない可能性があるので、trust policy や IAM OIDC プロバイダーが登録されているか確認する。
dynamic resource allocation (DAR)を有効にして executor が増減するようにしている。 現状 Spark on Kubernetes が external shuffle service をサポートしていないため、 代わりに shuffle file tracking を有効にして shuffle data を持つ executor が削除されないようにする必要がある。
$ VIRTUAL_CLUSTER_ID=xxxxx
$ EXECUTION_ROLE_ARN=arn:aws:iam::xxxxx:role/emr-on-eks-test-job-execution-role
$ aws emr-containers start-job-run \
--virtual-cluster-id $VIRTUAL_CLUSTER_ID \
--name pi-2 \
--execution-role-arn $EXECUTION_ROLE_ARN \
--release-label emr-6.13.0-latest \
--job-driver '{
"sparkSubmitJobDriver": {
"entryPoint": "s3://<mybucket>/pi.py",
"sparkSubmitParameters": "--conf spark.executor.instances=1 --conf spark.executor.memory=2G --conf spark.executor.cores=1 --conf spark.driver.cores=1"
}
}' \
--configuration-overrides '{
"monitoringConfiguration": {
"persistentAppUI": "ENABLED",
"s3MonitoringConfiguration": {
"logUri": "s3://<mybucket>"
}
},
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.dynamicAllocation.enabled":"true",
"spark.dynamicAllocation.shuffleTracking.enabled":"true",
"spark.dynamicAllocation.minExecutors":"1",
"spark.dynamicAllocation.maxExecutors":"100",
"spark.dynamicAllocation.initialExecutors":"1"
}
}
]
}'
$ kubectl get pod -n emrcontainers
NAME READY STATUS RESTARTS AGE
000000032u7ddboo153-p52fj 3/3 Running 0 53s
pythonpi-a6561a8bc76bc424-exec-1 1/2 NotReady 0 25s
pythonpi-a6561a8bc76bc424-exec-2 0/2 Pending 0 17s
spark-000000032u7ddboo153-driver 1/2 NotReady 0 40s
$ aws s3 cp s3://<mybucket>/<cluster_id>/jobs/<job_id>/containers/spark-000000032psj6spu6q7/spark-000000032psj6spu6q7-driver/stdout.gz .
$ gzcat stdout.gz
Pi is roughly 3.140920
classification: spark-log4j で log4j2 の設定を渡すと /etc/spark/conf/log4j2.properties を patch できる。
{
"classification": "spark-log4j",
"properties": {
"rootLogger.level": "warn",
"appender.myapp.type": "RollingFile",
"appender.myapp.name": "MyAppLogFile",
"appender.myapp.filePattern": "/var/log/spark/user/myapp/myapp-%d{MM-dd-yy-HH-mm-ss}-%i.log.gz",
"appender.myapp.policies.type": "Policies",
"appender.myapp.policies.time.type": "TimeBasedTriggeringPolicy",
"appender.myapp.policies.time.interval": "1",
"logger.myapp.name": "net.sambaiz",
"logger.myapp": "debug, MyAppLogFile",
"logger.myapp.additivity": "false"
}
}
rootLogger には stderr の appender が設定されており、stdout/stderr が log4j2.properties の spark.yarn.app.container.log.dir に 出力され、monitoringConfiguration で設定した先に fluentd sidecar で送られるようになっている。
$ ls /var/log/spark/user/spark-******-driver/
stderr stderr-s3-container-log-in-tail.pos stdout stdout-s3-container-log-in-tail.pos
$ cat /etc/fluent/fluent.conf
...
<source>
@type tail
path "#{ENV['K8S_SPARK_LOG_URL_STDERR']}"
...
</source>
...
<match chicago-spark-s3-container-stdout-logs>
@type s3
s3_bucket <mybucket>
s3_region <region>
path "emr-logs/****/containers/#{ENV['SPARK_APPLICATION_ID']}/#{ENV['SPARK_CONTAINER_ID']}"
s3_object_key_format %{path}/stdout.gz
...
</match>
...
したがって stdout に出力することで Spark のログと分離して送ることができる。 また、additivity: false で stdout/stderr の appender を持たない logger で出力すると送られない。
package net.sambaiz.myapp
import org.apache.log4j.Logger
object MyApp {
def main(input: Array[String]) {
val logger = Logger.getLogger(getClass().getName())
println("aaaa")
logger.warn("bbbb")
}
}
spark.kubernetes.(driver|executor).podTemplateFile=s3://bucket/driver.yml のようにして Pod template を渡すことができ、これにより独自のログを送る fluentbit sidecar を追加したり、priorityClassName などを設定したりすることができる。
K8s の Pod の Prirority とそれによる Preemption - sambaiz-net
apiVersion: v1
kind: Pod
spec:
priorityClassName: low-priority
containers:
- name: spark-kubernetes-driver
トラブルシューティング
Glue Data Catalog 参照時の TABLE_OR_VIEW_NOT_FOUND
Glue Data Catalog を参照する場合は次の設定を追加しないと TABLE_OR_VIEW_NOT_FOUND になる。
--configuration-overrides '{
"applicationConfiguration": [
{
"classification": "spark-defaults",
"properties": {
"spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
"spark.sql.catalogImplementation": "hive"
}
}
]
}'
Node の DiskPressure
シャッフル時にメモリに乗り切らないと spill によるストレージへの書き込みが発生するが、この書き込み先はデフォルトで emptyDir の volume となっているので、Node のディスク容量を圧迫してしまうことがある。
Kubernetes のノードのリソース不足による terminationGracePeriodSeconds を待たない evict - sambaiz-net
volumes:
- emptyDir: {}
name: spark-local-dir-1
次のような設定を行うことで
{
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName": "OnDemand",
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass": "gp2",
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit": "50Gi",
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path": "/data",
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly": "false"
}
動的に PVC を作成し EBS などをマウントすることができる。
volumes:
- name: spark-local-dir-1
persistentVolumeClaim:
claimName: testjob-xxxxxx-exec-1-pvc-0
対応する CSI driver をインストールしておく。
const role = new iam.Role(this, 'EBSCSIRole', {
assumedBy: new iam.WebIdentityPrincipal(
cluster.openIdConnectProvider.openIdConnectProviderArn,
{
StringEquals: new CfnJson(this, 'EBSCSIRoleStringEquals', {
value: {
[`${cluster.clusterOpenIdConnectIssuer}:aud`]:
'sts.amazonaws.com',
[`${cluster.clusterOpenIdConnectIssuer}:sub`]:
'system:serviceaccount:kube-system:ebs-csi-controller-sa',
},
}),
}
),
managedPolicies: [
iam.ManagedPolicy.fromAwsManagedPolicyName('service-role/AmazonEBSCSIDriverPolicy')
]
})
new eks.CfnAddon(this, 'EBSCSIAddon', {
addonName: 'aws-ebs-csi-driver',
clusterName: cluster.clusterName,
serviceAccountRoleArn:role.roleArn,
})
SparkContext was shut down によるジョブの失敗
ノードの増減などの際に job-submitter や driver の pod が止まってしまうと SparkContext was shut down などでジョブが失敗してしまうので spark.kubernetes.driver.annotation.<AnnotationName> で annotation を付けるなどして回避する。 ただし job-submitter に関してはこの方法が現状取れないので nodeSelector で増減しないノードグループのノードを指定するか、何らかの方法で annotation を付ける必要がある。
CDK で EKS クラスタに Karpenter をインストールし柔軟で高速なオートスケールを行う - sambaiz-net
"spark.kubernetes.driver.annotation.karpenter.sh/do-not-disrupt": "true"
"jobsubmitter.node.selector.<label>": "<value>",
AWS SDK 使用時の PermissionDenied
execution role を assume するのに必要な情報は環境変数 AWS_ROLE_ARN や AWS_WEB_IDENTITY_TOKEN_FILE にあるが、 AWS SDK for Java は “software.amazon.awssdk” % “sts” が依存にないと次の WARN ログを出力し 優先順位の低い instance profile の Role を使ってしまう。
WARN WebIdentityCredentialsUtils: To use web identity tokens, the 'sts' service module must be on the class path.
参考
EKS Cluster v1.22以降の作り方がCDKで変わった話 #AWS - Qiita
How to Make Robust EMR on EKS pipelines + Airflow | by Jun Seokjun Han | Medium