AWS CLIでEMRクラスタを立ち上げSparkのアプリケーションを実行する

awssparkhadoopetl

Amazon EMRはEC2やEKS上にSparkやHive, Prestoのクラスタを構築するサービス。 SparkのマネージドサービスであるGlueと比べると、Glueはサーバーレスで手軽にSparkによるETL処理をを行えるのに対して、EMRはスポットインスタンスなどを利用したコストパフォーマンスの良さと詳細なチューニングを行うことができるという特長があるが、EMR Serverlessがリリースされたのでその差は少し縮まっているように感じる。Glueにはスキーマを指定する必要がないDynamicFrameや前の続きから実行できるBookmarkといった便利な機能もあるが、重い処理を立て続けに実行するとコストが嵩んだりDPUなどのクォータに引っかかることもあるので適宜使い分けていきたい。

AWS GlueのJobのBookmarkを有効にして前回の続きから処理を行う - sambaiz-net

今回は次のアプリケーションを実行する。

$ cat test.py
import argparse
import logging

from pyspark.sql import SparkSession

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

output_path = "s3://<bucket>/emrtest/"

if __name__ == "__main__":
  parser = argparse.ArgumentParser()
  parser.add_argument('--value', default=10, type=int, help="some value")
  args = parser.parse_args()
  with SparkSession.builder.appName("testapp").getOrCreate() as spark:
    df = spark.createDataFrame([(args.value,)], ['value'])
    logger.info("write %d rows", df.count())
    df.write.mode('overwrite').parquet(output_path, compression="gzip")

$ aws s3 cp test.py s3://<bucket>/

Glue Data Catalogの有効化

Glue Data Catalogのテーブルを参照する場合は次の設定を行う。

with SparkSession.builder \
    .appName("testapp") \
    .config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
    .enableHiveSupport() \
    .getOrCreate() as spark:
    df = spark.sql("SELECT * FROM xxxx.yyyyy")

クラスタの起動

aws emr create-cluster でクラスタを立ち上げることができ、その際 –step を渡して –auto-terminate を付けると処理が完了次第クラスタが終了するので使い捨てることができる。

–configurations で executor が使えるコアやメモリなどを設定できる。maximizeResourceAllocation=true にするとインスタンスタイプに合わせた設定になる。

EMRでSparkを動かす際の設定 - sambaiz-net

その他のフラグは次の通り。扱うデータに対してメモリが不足しているとtaskが失敗したり大量のGCによってdriverからのheartbeatがタイムアウトしてしまうことがあるのである程度余裕を持っておく。

  • –release-label emr-6.3.1: EMRのバージョン。このバージョンではGlue3.0と同じSpark 3.1.1が動くが、なるべく新しいバージョンにした方が不具合を回避できて良いかもしれない。
  • –applications Name=Spark: クラスタにインストールするアプリケーション
  • –instance-type, –instance-count: インスタンスタイプと数。この内の1つがMaster nodeで残りはCore nodeとなる。
  • –managed-scaling-policy: オートスケールの設定
  • –use-default-roles: インスタンスプロファイルにEMR_EC2_DefaultRole、EMR RoleにEMR_DefaultRoleを使う
  • –name: クラスタ名。デフォルトは"Development Cluster"

spark-submitには

  • –master yarn: HadoopのリソースマネージャーであるYARNをCluster Managerとして用いる

Hadoop YARN によってアプリケーションにリソースが割り当てられる流れと割り当てられているリソース量の確認 - sambaiz-net

  • –deploy-mode: SparkContextを生成するDriver Programをクライアントで実行するかクラスタ内で実行するか。EMRではclientにするとMaster nodeで実行され、clusterにするとCore nodeで実行される。clientだとコンソール上からstepログを見られて便利だが、複数のアプリケーションを同時に動かす場合はclusterが適している

を渡している。

$ aws emr create-cluster \
  --release-label emr-6.3.1 \
  --applications Name=Spark \
  --instance-type m6g.xlarge \
  --instance-count 3 \
  --managed-scaling-policy ComputeLimits='{MinimumCapacityUnits=3,MaximumCapacityUnits=5,UnitType=Instances}'\
  --use-default-roles \
  --auto-terminate \
  --enable-debugging \
  --log-uri s3n://<bucket>/emrlogs/ \
  --name test \
  --configurations '[
    {
      "Classification": "spark",
      "Properties": {
        "maximizeResourceAllocation": "true"
      }
    }
  ]' \
  --steps '[{
    "Type": "CUSTOM_JAR",
    "Name": "spark-submit",
    "ActionOnFailure": "TERMINATE_CLUSTER",
    "Jar": "command-runner.jar", 
    "Args": [
      "spark-submit",
      "--master",
      "yarn",
      "--deploy-mode",
      "client",
      "s3://<bucket>/test.py",
      "--value",
      "123"
    ]
  }]'

{
    "ClusterId": "j-DND71AFKD661",
    "ClusterArn": "arn:aws:elasticmapreduce:ap-northeast-1:******:cluster/j-DND71AFKD661"
}

スポットインスタンスの利用

また、–instance-type や –instance-count の代わりにinstance fleetsの設定を行うことでコストが低いスポットインスタンスや複数のインスタンスタイプを混ぜたクラスタを構築することができる。 スポットインスタンスはオンデマンドの需要によって突然シャットダウンすることがあるが、もしそれによってシャッフルブロックが失われても再計算して回復する仕組みがあるので特に問題は起きない。

  • InstanceTypeConfigs
    • BidPrice,BidPriceAsPercentageOfOnDemandPrice: スポットインスタンスへの入札額。いずれも渡さないとオンデマンドの価格が上限となる。
    • WeightedCapacity: 1インスタンスがキャパシティを埋める量。デフォルトは1。
  • LaunchSpecifications
    • TimeoutDurationMinutes, TimeoutAction: スポットインスタンスが使えなかった場合の挙動

–managed-scaling-policy の UnitType は InstanceFleetUnits となる。

--instance-fleets \
    InstanceFleetType=MASTER,TargetOnDemandCapacity=1,InstanceTypeConfigs=['{InstanceType=m6g.xlarge}'] \
    InstanceFleetType=CORE,TargetOnDemandCapacity=1,TargetSpotCapacity=3,InstanceTypeConfigs=['{InstanceType=m6g.xlarge}','{InstanceType=m6g.2xlarge,WeightedCapacity=2}'],LaunchSpecifications={SpotSpecification='{TimeoutDurationMinutes=5,TimeoutAction=SWITCH_TO_ON_DEMAND}'} \
--managed-scaling-policy ComputeLimits='{MinimumCapacityUnits=3,MaximumCapacityUnits=5,UnitType=InstanceFleetUnits}'\

ssm-agentのインストール

–bootstrap-actions でインスタンス起動時に ssm-agent をインストールすれば Key Pair を登録することなく中に入ってコマンドを実行したり、 ポートフォワーディングしてGangliaにアクセスしたりリモートデバッグすることができる。

AWS CopilotでECS on Fargate上にコンテナをデプロイしECS Execによるコマンドの実行やSession Managerによるポートフォワーディングを行う - sambaiz-net

ScalaでSparkのアプリケーションを開発してEMRでリモートデバッグする - sambaiz-net

--bootstrap-actions Path=s3://<bucket>/install-ssm-agent.sh,Name=insatall-ssm-agent \

$ cat install-ssm-agent.sh
#!/bin/sh -e
sudo yum install -y https://s3.amazonaws.com/ec2-downloads-windows/SSMAgent/latest/linux_arm64/amazon-ssm-agent.rpm \
    && sudo yum clean all

# --applications Name=Spark Name=Ganglia \
$ aws ssm start-session --target <instance_id> --document-name AWS-StartPortForwardingSession --parameters '{"portNumber":["80"], "localPortNumber":["8080"]}'
$ open http://localhost:8080/ganglia/

併せてEC2のRoleに AmazonSSMManagedInstanceCore と次のpolicyをアタッチする必要がある。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "ssm:DescribeInstanceProperties",
        "ssm:DescribeSessions",
        "ec2:describeInstances",
        "ssm:GetConnectionStatus"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "ssm:StartSession"
      ],
      "Resource": "*"
    }
  ]
}

stepの実行

bootstrapが完了するとstepの実行が始まる。

実行状況はコンソールか aws emr list-steps から確認できる。

$ aws emr list-steps --cluster-id j-DND71AFKD661 | jq ".Steps[0]"
{
  "Id": "s-2WT4PRXV2S3B0",
  "Name": "spark-submit",
  "Config": {
    "Jar": "command-runner.jar",
    "Properties": {},
    "Args": [
      "spark-submit",
      "--master",
      "yarn",
      "--deploy-mode",
      "client",
      "s3://<bucket>/test.py",
      "--value",
      "123"
    ]
  },
  "ActionOnFailure": "TERMINATE_CLUSTER",
  "Status": {
    "State": "PENDING",
    "StateChangeReason": {},
    "Timeline": {
      "CreationDateTime": "2022-06-21T23:42:52.594000+09:00"
    }
  }
}

ログは 5分ごとにS3に保存されるが、 Cluster mode の場合 step に対応する application_id を探す必要があるので New Relic などに集約すると良さそうだ。

EMRクラスタで動かしたSparkのログをFluent BitでNew Relicに集約する - sambaiz-net

参考

AWS で実践!Analytics Modernization ~ETL 編~ AWS ETL サービス適材適所選択のコツ

Instance Fleetsを使って集計クラスタのランニングコストを1/4にした | CyberAgent Developers Blog