EMRでSparkを動かす際の設定

awshadoopsparketl

AWSでSparkのアプリケーションを動かすマネージドサービスにはEMRとGlueがあって、 Glueがサーバーレスで手軽にETLジョブを動かすことができる一方、EMRはリソースやパラメータを細かくチューニングすることができる。 裏を返せば設定が適切でないとリソースをフル活用できず、特にメモリについては余っているにもかかわらずOOMで失敗してしまうことがある。

CLIでは –configurations で設定を記述したファイルのパスやjson文字列を渡すことができる。

AWS CLIでEMRクラスタを立ち上げSparkのアプリケーションを実行する - sambaiz-net

--configurations '[
  {
    "Classification": "spark-defaults",
    "Properties": {
      ...
    }
  }
]'

Glue Data Catalog の参照

Hive metastore として Glue Data Catalog を参照するための設定。

[
  {
    "Classification": "spark-hive-site",
    "Properties": {
      "hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
    }
  }
]

maximizeResourceAllocation

EMRには maximizeResourceAllocation という固有の設定項目があって、これをtrueにすると spark-defaults の次のパラメータがインスタンスタイプに合わせて設定される。

  • spark.default.parallelism
  • spark.driver.memory
  • spark.executor.memory
  • spark.executor.cores
  • spark.executor.instances
[
  {
    "Classification": "spark",
    "Properties": {
      "maximizeResourceAllocation": "true"
    }
  }
]

Dynamic Resource Allocation

Sparkの動的にアプリケーションへリソースを割り当てる機能。 EMRではデフォルトで有効になっている。

[
  {
    "Classification": "spark-defaults",
    "Properties": {
      "spark.dynamicAllocation.enabled": "true"
    }
  }
]

マニュアルでの設定

しかしながらこれらによる設定は改善の余地があるらしく、ベストプラクティスでは無効にして次の値にすることが推奨されている。

spark

  • spark.executor.cores = 5: 多くすると並列性が下がり、少なくするとexecutorが増えることで大量のI/Oが発生する

  • spark.executor.memory = (RAM / ((vCPU - 1) / executor.cores = number of executors)) * 0.9: vCPUから1を引いているのはHadoop daemon分

    • spark.executor.memoryOverhead = spark.executor.memory * (0.1/0.9)
  • spark.executor.instances = number of executors - 1: 1つはdriver分。client modeの場合はmaster nodeで動くので引く必要がない。

  • spark.driver.memory = spark.executor.memory

  • spark.driver.cores = spark.executors.cores

  • spark.default.parallelism = executor.instances * executors.cores * 2

spark-defaults

  • spark.network.timeout = 800s

  • spark.executor.heartbeatInterval = 60s: spark.network.timeoutよりも大幅に小さくする必要がある

  • spark.yarn.scheduler.reporterThread.maxFailures = 5: executorの最大失敗回数

  • spark.memory.fraction = 0.8: 実行とストレージに使われるヒープ領域の割合。小さくするとbufferからdiskへの移動であるspillingやキャッシュの削除が頻繁に発生する

  • spark.memory.storageFraction = 0.3

  • spark.rdd.compress = true: 圧縮するとCPU時間と引き換えに領域を節約できる

  • spark.shuffle.compress = true

  • spark.shuffle.spill.compress = true

  • spark.serializer = org.apache.spark.serializer.KryoSerializer: デフォルトのJavaSerializerより高速

GC

性能の良いG1GCアルゴリズムを用いGC開始の閾値を下げることでなるべくFull GCが起きないようにしてGCログを出す。

Javaで発生したOOMの原因をGCログとヒープダンプから探る - sambaiz-net

{
  "Classification": "spark-defaults",
  "Properties": {
    "spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
    "spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'"
  }
}

yarn-site

稀に発生するらしい virtual out-of-memory errors を防ぐ。

  • yarn.nodemanager.vmem-check-enabled = false
  • yarn.nodemanager.pmem-check-enabled = false

参考

Explain process of spilling in Hadoop’s map reduce… - Cloudera Community - 237245