Task nodes で EMR クラスタのスケールインを高速化する

awssparketlhadoop

EMR クラスタは YARN の ResourceManager などが動く Master (primary) node と、 Core nodes および Task nodes から構成される

Hadoop YARN によってアプリケーションにリソースが割り当てられる流れと割り当てられているリソース量の確認 - sambaiz-net

Core nodes と Task nodes のリソースはいずれもタスクの実行に用いられるが、 Core nodes が HDFS の DataNode となるのに対して Task nodes はならない。 そのため、 Core nodes はデータが失われないように Decommission されてから Terminate される必要があるが、 スパイク防止のためレプリケーションの帯域が制限されることから、スケールインするのに時間がかかってしまう

HDFS(Hadoop Distributed File System)とは - sambaiz-net

$ sudo -u hdfs hdfs dfsadmin -report
...
Live datanodes (11):

Name: 172.31.32.80:9866 (ip-172-31-32-80.ap-northeast-1.compute.internal)
Hostname: ip-172-31-32-80.ap-northeast-1.compute.internal
Decommission Status : Decommission in progress
Configured Capacity: 62245027840 (57.97 GB)
DFS Used: 11104256 (10.59 MB)
Non DFS Used: 208084992 (198.45 MB)
DFS Remaining: 62025838592 (57.77 GB)
DFS Used%: 0.02%
DFS Remaining%: 99.65%
Configured Cache Capacity: 0 (0 B)
Cache Used: 0 (0 B)
Cache Remaining: 0 (0 B)
Cache Used%: 100.00%
Cache Remaining%: 0.00%
Xceivers: 0
Last contact: Sun Mar 19 05:49:14 UTC 2023
Last Block Report: Sun Mar 19 05:27:53 UTC 2023
Num of Blocks: 106
...

ただ、永続化をS3で行っている場合、HDFSの容量はそれほど必要ないことが多い。

$ sudo -u hdfs hdfs dfs -du -h /
0        0        /user
12.6 M   12.6 M   /var

$ sudo -u hdfs hdfs dfs -du -h /var/log
7.6 M  7.6 M  /var/log/hadoop-yarn
5.0 M  5.0 M  /var/log/spark

Core nodes をスケールアウトさせた場合と、Task nodes をスケールさせた場合でスケールインするのにかかる時間を比較してみる。 全体のコードは GitHub にある。

for i in `seq 1 100`; do
  aws emr add-steps --cluster-id $CLUSTER_ID --steps Type=Spark,Name="Spark app ${i}",ActionOnFailure=CONTINUE,Args=[--class,org.apache.spark.examples.SparkPi,/usr/lib/spark/examples/jars/spark-examples.jar,100]
done

Core nodes をスケールアウトさせた場合

Core nodes を 2 台から 10 台までスケールアウトさせる。

{
  instances: {
    masterInstanceFleet: {
      targetOnDemandCapacity: 1,
      instanceTypeConfigs: [
        {
          instanceType: 'm6g.xlarge',
        },
      ],
    },
    coreInstanceFleet: {
      targetOnDemandCapacity: 2,
      instanceTypeConfigs: [
        {
          instanceType: 'm6g.xlarge',
        },
      ],
    },
  },
  managedScalingPolicy: {
    computeLimits: {
      maximumCapacityUnits: 10,
      minimumCapacityUnits: 2,
      unitType: 'InstanceFleetUnits',
    },
  }, 
  ...
}

全ての Step の実行が完了してから最低台数までスケールインされるまで 14 分程度かかった。

Task nodes をスケールアウトさせた場合

次に Task nodes を追加して Core nodes はスケールアウトされないようにする。 Core nodes と異なり、突然シャットダウンしても問題が起きづらいためスポットインスタンスにすることでコストを抑えることができるが、 今回は再現しやすいようオンデマンドインスタンスで動かす。

{
  instances: {
    masterInstanceFleet: {
      targetOnDemandCapacity: 1,
      instanceTypeConfigs: [
        {
          instanceType: 'm6g.xlarge',
        },
      ],
    },
    coreInstanceFleet: {
      targetOnDemandCapacity: 1,
      instanceTypeConfigs: [
        {
          instanceType: 'm6g.xlarge',
        },
      ],
    },
    taskInstanceFleets: [{
      name: 'Task nodes',
      targetSpotCapacity: 1,
      instanceTypeConfigs: [
        {
          instanceType: 'm6g.xlarge',
        },
      ],
    }],
  },
  managedScalingPolicy: {
    computeLimits: {
      maximumCapacityUnits: 10,
      maximumCoreCapacityUnits: 1,
      minimumCapacityUnits: 2,
      unitType: 'InstanceFleetUnits',
    },
  }, 
  ...
}

今度は6分ほどでスケールインされた。

EMR 6.x から YARN のノードラベル機能がデフォルトでオフになり、Task nodes でも ApplicationMaster が実行されるようになったため Core nodes のリソースと数を大幅に減らしたところ、CPU使用率が貼りついて task の実行が大幅に遅くなることがあったので、モニタリングしてある程度は余裕を持った設定にすると良いと思う。 また、インスタンスタイプによってデフォルトのストレージサイズが異なるので、小さいインスタンスにしたときは明示的に EBS の設定を行わないと no space left on device で task が即失敗するようになることがある。