Make EMR clusters' scale-in faster with Task nodes

awssparketlhadoop

EMR cluster consists of a Master (primary) node and Core nodes and Task nodes.

How Hadoop YARN allocates resources to applications and check how much resources are allocated - sambaiz-net

Resources of both core nodes and task nodes are used to run tasks, but core nodes are HDFS’s DataNode while task nodes aren’t. Therefore, core nodes need to be decommissioned and then terminated to prevent data loss, but replication bandwidth is limited to prevent spikes, so it takes time to scale in.

What is HDFS(Hadoop Distributed File System) - sambaiz-net

$ sudo -u hdfs hdfs dfsadmin -report
...
Live datanodes (11):

Name: 172.31.32.80:9866 (ip-172-31-32-80.ap-northeast-1.compute.internal)
Hostname: ip-172-31-32-80.ap-northeast-1.compute.internal
Decommission Status : Decommission in progress
Configured Capacity: 62245027840 (57.97 GB)
DFS Used: 11104256 (10.59 MB)
Non DFS Used: 208084992 (198.45 MB)
DFS Remaining: 62025838592 (57.77 GB)
DFS Used%: 0.02%
DFS Remaining%: 99.65%
Configured Cache Capacity: 0 (0 B)
Cache Used: 0 (0 B)
Cache Remaining: 0 (0 B)
Cache Used%: 100.00%
Cache Remaining%: 0.00%
Xceivers: 0
Last contact: Sun Mar 19 05:49:14 UTC 2023
Last Block Report: Sun Mar 19 05:27:53 UTC 2023
Num of Blocks: 106
...

If your cluster consists of only Core nodes, most of the HDFS capacity may not be used.

$ sudo -u hdfs hdfs dfs -du -h /
0        0        /user
12.6 M   12.6 M   /var

$ sudo -u hdfs hdfs dfs -du -h /var/log
7.6 M  7.6 M  /var/log/hadoop-yarn
5.0 M  5.0 M  /var/log/spark

I try to compare the time to scale in when core nodes are scaled out and when task nodes are scaled out. There is a whole code on GitHub.

for i in `seq 1 100`; do
  aws emr add-steps --cluster-id $CLUSTER_ID --steps Type=Spark,Name="Spark app ${i}",ActionOnFailure=CONTINUE,Args=[--class,org.apache.spark.examples.SparkPi,/usr/lib/spark/examples/jars/spark-examples.jar,100]
done

When core nodes are scaled

First, Scale out Core nodes from 2 to 10.

{
  instances: {
    masterInstanceFleet: {
      targetOnDemandCapacity: 1,
      instanceTypeConfigs: [
        {
          instanceType: 'm6g.xlarge',
        },
      ],
    },
    coreInstanceFleet: {
      targetOnDemandCapacity: 2,
      instanceTypeConfigs: [
        {
          instanceType: 'm6g.xlarge',
        },
      ],
    },
  },
  managedScalingPolicy: {
    computeLimits: {
      maximumCapacityUnits: 10,
      minimumCapacityUnits: 2,
      unitType: 'InstanceFleetUnits',
    },
  }, 
  ...
}

It takes about 14 minutes to scale in after all steps are completed.

When task nodes are scaled

Next, add Task nodes and prevent Core nodes from scaling out. Unlike Core nodes, there is almost no problem even if they suddenly shut down, so you can reduce the cost with Spot Instances, but this time, I run with on-demand instances to make reproduction easy.

{
  instances: {
    masterInstanceFleet: {
      targetOnDemandCapacity: 1,
        instanceTypeConfigs: [
        {
          instanceType: 'm6g.xlarge',
        },
      ],
    },
    coreInstanceFleet: {
      targetOnDemandCapacity: 1,
        instanceTypeConfigs: [
        {
          instanceType: 'm6g.xlarge',
        },
      ],
    },
    taskInstanceFleets: [{
      name: 'Task nodes',
      targetSpotCapacity: 1,
      instanceTypeConfigs: [
        {
          instanceType: 'm6g.xlarge',
        },
      ],
    }],
  },
  managedScalingPolicy: {
    computeLimits: {
      maximumCapacityUnits: 10,
        maximumCoreCapacityUnits: 1,
        minimumCapacityUnits: 2,
        unitType: 'InstanceFleetUnits',
    },
  },
...
}

It takes about 6 minutes to scale in.

Since EMR 6.x, YARN’s node label feature is disabled by default, and ApplicationMaster is executed on even Task nodes, so I drastically reduced the resources and number of Core nodes, but the CPU resources became insufficient, and as a result the task execution was significantly slowed down. You would be better to monitor it and decide values with a margin. Plus, the default storage size depends on the instance type, so if you set a small instance type, you should also set the EBS size explicitly or tasks will fail instantly with no space left on device.