Amazon EMR is the service that launches a cluster installed Spark, Hive, and Presto on EC2 or EKS. While Glue, the managed Spark service, can easily run Spark ETL jobs with serverless, EMR has the advantage of excellent cost performance by spot instances etc., and also fine-tuning is available, but now that EMR Serverless has been released, the difference has narrowed a little. Glue also has handy features such as DynamicFrame, which doesn’t need to be specified the schema, and Bookmark, which makes processing being executed from the previous continuation. However, if heavy processing is executed repeatedly, it can be expensive, besides, it causes to reach quotas such as DPU, so it would be better to use it properly.
Generate data with TPC-DS Connector for Glue - sambaiz-net
I use the following code this time.
$ cat test.py
import argparse
import logging
from pyspark.sql import SparkSession
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
output_path = "s3://<bucket>/emrtest/"
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--value', default=10, type=int, help="some value")
args = parser.parse_args()
with SparkSession.builder.appName("testapp").getOrCreate() as spark:
df = spark.createDataFrame([(args.value,)], ['value'])
logger.info("write %d rows", df.count())
df.write.mode('overwrite').parquet(output_path, compression="gzip")
$ aws s3 cp test.py s3://<bucket>/
Enable Glue Data Catalog
If tables in Glue Data Catalog is referred, To refer to the Glue Data Catalog table, apply the following settings.
with SparkSession.builder \
.appName("testapp") \
.config("hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.enableHiveSupport() \
.getOrCreate() as spark:
df = spark.sql("SELECT * FROM xxxx.yyyyy")
Launch an EMR cluster
aws emr create-cluster launches a cluster. If –step and –auto-terminate are passed, the cluster will be terminated as soon as the process is completed, so it can be disposable.
With –configurations, parameters such as cores and memory available for an executor can be passed. If maximizeResourceAllocation=true is passed, they are determined depending on the instance type.
Settings for running Spark on EMR - sambaiz-net
Other flags are as follows. If the memory is insufficient for the data to be handled, tasks can be failed, or the heartbeat from the driver can time out due to a huge number GC, so it would be better to have some margin.
- –release-label emr-6.3.1: EMR version. This version runs the same Spark 3.1.1 as Glue 3.0. It might be better to use the latest version to avoid a bug that is already fixed.
- –applications Name=Spark: Installed applications
- –instance-type, –instance-count: Instance type and the number. One of these is the Master node and the rest are the Core node.
- –managed-scaling-policy: auto scale settings
- –use-default-roles: Use EMR_EC2_DefaultRole as the instance profile, and EMR_DefaultRole as the EMR role.
- –name: Cluster name. The default is “Development Cluster”
spark-submit is passed:
- –master yarn: Use YARN, the resource manager of Hadoop, as a Cluster Manager.
- –deploy-mode: Whether to execute the Driver Program that generates SparkContext on the client or in the cluster. In EMR, if it is set to “client”, the Driver Program is executed on the Master node, and if it is set to “cluster”, the Driver Program executed on the Core node. While client mode is handy as step logs can be seen in the console, if multiple applications run in the cluster simultaneously, cluster mode is suitable.
$ aws emr create-cluster \
--release-label emr-6.3.1 \
--applications Name=Spark \
--instance-type m6g.xlarge \
--instance-count 3 \
--managed-scaling-policy ComputeLimits='{MinimumCapacityUnits=3,MaximumCapacityUnits=5,UnitType=Instances}'\
--use-default-roles \
--auto-terminate \
--enable-debugging \
--log-uri s3n://<bucket>/emrlogs/ \
--name test \
--configurations '[
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
}
}
]' \
--steps '[{
"Type": "CUSTOM_JAR",
"Name": "spark-submit",
"ActionOnFailure": "TERMINATE_CLUSTER",
"Jar": "command-runner.jar",
"Args": [
"spark-submit",
"--master",
"yarn",
"--deploy-mode",
"client",
"s3://<bucket>/test.py",
"--value",
"123"
]
}]'
{
"ClusterId": "j-DND71AFKD661",
"ClusterArn": "arn:aws:elasticmapreduce:ap-northeast-1:******:cluster/j-DND71AFKD661"
}
Use spot instances
With instance fleets settings instead of –instance-type and –instance-count, a cluster, which consists of low-cost spot instances and multiple types of instances, can be launched. Spot instances can suddenly shut down depending on the demand, but even if shuffle blocks are lost due to it, there is a feature to recover with recalculation.
- InstanceTypeConfigs
- BidPrice,BidPriceAsPercentageOfOnDemandPrice: The bid price for each spot instance. If neither is given, the on-demand price is the upper limit.
- WeightedCapacity: The amount that one instance fills the capacity. The default is 1.
- LaunchSpecifications
- TimeoutDurationMinutes, TimeoutAction: Behavior when spot instances cannot be used
–managed-scaling-policy’s UnitType is InstanceFleetUnits. MaximumOnDemandCapacityUnits is needed to launch spot instances on scale-out.
--instance-fleets \
InstanceFleetType=MASTER,TargetOnDemandCapacity=1,InstanceTypeConfigs=['{InstanceType=m6g.xlarge}'] \
InstanceFleetType=CORE,TargetOnDemandCapacity=2,InstanceTypeConfigs=['{InstanceType=m6g.xlarge}'] \
InstanceFleetType=TASK,TargetSpotCapacity=1,InstanceTypeConfigs=['{InstanceType=m6g.xlarge}','{InstanceType=m6g.2xlarge,WeightedCapacity=2}'],LaunchSpecifications={SpotSpecification='{TimeoutDurationMinutes=5,TimeoutAction=SWITCH_TO_ON_DEMAND}'} \
--managed-scaling-policy ComputeLimits='{MinimumCapacityUnits=3,MaximumCapacityUnits=5,MaximumCoreCapacityUnits=2,MaximumOnDemandCapacityUnits=2,UnitType=InstanceFleetUnits}'\
Make EMR clusters’ scale-in faster with Task nodes - sambaiz-net
Install the ssm-agent
Install ssm-agent at instance startup with –bootstrap-actions allows to execute commands and perform port forwarding to access Ganglia and debug remotely without registering a Key Pair.
Develop Spark Applications in Scala and perform remote debugging on EMR - sambaiz-net
--bootstrap-actions Path=s3://<bucket>/install-ssm-agent.sh,Name=insatall-ssm-agent \
$ cat install-ssm-agent.sh
#!/bin/sh -e
sudo yum install -y https://s3.amazonaws.com/ec2-downloads-windows/SSMAgent/latest/linux_arm64/amazon-ssm-agent.rpm \
&& sudo yum clean all
# --applications Name=Spark Name=Ganglia \
$ aws ssm start-session --target <instance_id> --document-name AWS-StartPortForwardingSession --parameters '{"portNumber":["80"], "localPortNumber":["8080"]}'
$ open http://localhost:8080/ganglia/
It is necessary to attach AmazonSSMManagedInstanceCore and the following policy to the EC2 Role.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"ssm:DescribeInstanceProperties",
"ssm:DescribeSessions",
"ec2:describeInstances",
"ssm:GetConnectionStatus"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"ssm:StartSession"
],
"Resource": "*"
}
]
}
Run the step
The step starts to run after bootstrapping.
Running status can be checked on the management console or “aws emr list-steps”
$ aws emr list-steps --cluster-id j-DND71AFKD661 | jq ".Steps[0]"
{
"Id": "s-2WT4PRXV2S3B0",
"Name": "spark-submit",
"Config": {
"Jar": "command-runner.jar",
"Properties": {},
"Args": [
"spark-submit",
"--master",
"yarn",
"--deploy-mode",
"client",
"s3://<bucket>/test.py",
"--value",
"123"
]
},
"ActionOnFailure": "TERMINATE_CLUSTER",
"Status": {
"State": "PENDING",
"StateChangeReason": {},
"Timeline": {
"CreationDateTime": "2022-06-21T23:42:52.594000+09:00"
}
}
}
Logs are stored in S3 every 5 minutes. When the job runs with cluster mode, finding the application_id corresponding to the step is troublesome, so it may be good to aggregate logs to New Relic to check them easily.
Aggregate logs of spark running on an EMR cluster with Fluent Bit - sambaiz-net
References
AWS で実践!Analytics Modernization ~ETL 編~ AWS ETL サービス適材適所選択のコツ
Instance Fleetsを使って集計クラスタのランニングコストを1/4にした | CyberAgent Developers Blog