EMR and Glue are managed services that run Spark applications on AWS. Glue is easy to run ETL jobs with serverless, while EMR allows fine-tuning of resources and parameters. In other words, if the settings are not appropriate, the resources cannot be fully used, and tasks can fail due to OOM even if there is excess memory.
In the CLI, settings can be passed as json string or a file with –configurations.
Launch an EMR cluster with AWS CLI and run Spark applications - sambaiz-net
--configurations '[
{
"Classification": "spark-defaults",
"Properties": {
...
}
}
]'
Refer to Glue Data Catalog
Refer to Glue Data Catalog as Hive metastore.
[
{
"Classification": "spark-hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
}
}
]
maximizeResourceAllocation
EMR has a unique parameter called maximizeResourceAllocation, and if this is set to true, the following parameters of spark-defaults are set according to the instance type.
- spark.default.parallelism
- spark.driver.memory
- spark.executor.memory
- spark.executor.cores
- spark.executor.instances
[
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
}
}
]
Dynamic Resource Allocation
Dynamic Resource Allocation is a spark’s feature to allocate resources to applications dynamically. This is enabled in EMR by default.
[
{
"Classification": "spark-defaults",
"Properties": {
"spark.dynamicAllocation.enabled": "true"
}
}
]
Manual setting
However, there seems to be room for improvement with these settings, and best practices suggest disabling them and setting the following values instead.
spark
-
spark.executor.cores = 5: Increasing it, the parallelism decreases, and decreasing it, the number of executors increases, and a large amount of I/O occurs.
-
spark.executor.memory = (RAM / ((vCPU - 1) / executor.cores = number of executors)) * 0.9: Hadoop daemon uses 1 vCPU.
- spark.executor.memoryOverhead = spark.executor.memory * (0.1/0.9)
-
spark.executor.instances = number of executors - 1: 1 is for the driver. In the case of client mode, it works on the master node, so there is no need to subtract it.
-
spark.driver.memory = spark.executor.memory
-
spark.driver.cores = spark.executors.cores
-
spark.default.parallelism = executor.instances * executors.cores * 2
spark-defaults
-
spark.network.timeout = 800s
-
spark.executor.heartbeatInterval = 60s: Should be significantly smaller than spark.network.timeout
-
spark.yarn.scheduler.reporterThread.maxFailures = 5: Maximum executor failures
-
spark.memory.fraction = 0.8: Ratio of heap space used for execution and storage. Making it smaller, spilling, which is moving data from buffer to disk, and cache deletion occur frequently.
-
spark.memory.storageFraction = 0.3
-
spark.rdd.compress = true: Compression saves space at the cost of CPU time
-
spark.shuffle.compress = true
-
spark.shuffle.spill.compress = true
-
spark.serializer = org.apache.spark.serializer.KryoSerializer: Faster than default JavaSerializer
-
spark.jars.packages (e.g. com.google.cloud.spark:spark-3.2-bigquery:0.31.1): Settings to put libraries in the classpath of the driver and executor, but they are downloaded every time a step is executed, so be careful that it costs NAT gateway if instances run in private subnets.
GC
Use the G1GC algorithm that is high-performance, and lower the threshold that GC starts to prevent Full GC. Also output GC logs.
Exploring the cause of OOM that occurred in Java from GC logs and heap dumps - sambaiz-net
{
"Classification": "spark-defaults",
"Properties": {
"spark.executor.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'",
"spark.driver.extraJavaOptions": "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p'"
}
}
yarn
Prevent rare virtual out-of-memory errors.
- yarn.nodemanager.vmem-check-enabled = false
- yarn.nodemanager.pmem-check-enabled = false
References
Explain process of spilling in Hadoop’s map reduce… - Cloudera Community - 237245