Create an environment of Amazon Managed Workflow for Apache Airflow (MWAA) with CDK and run a workflow
awsairflowetlAmazon Managed Workflow for Apache Airflow (MWAA) is a managed service of Apache Airflow. Unlike Step Functions that is serverless, it costs per an instance hour, but Airflow’s abundant features and third-party’s, including AWS, providers packages are available.
Run Apache Airflow with Docker Compose and execute a workflow - sambaiz-net
Step Functions doesn’t support execution from the middle of the workflow currently, so retrying a very long workflow can be hard. Also, if you include the input value in an array of object values such as args of EmrAddStep, you need to process it into an array with Lambda in advance. If you feel inconvenienced with such things now, migrating to MWAA may be a option.
CDKでStep Functionsによるワークフローを構築する - sambaiz-net
Create an environment with CDK
Referring to CloudFormation template in Quick start, write CDK codes.
To create a MWAA environment, following resources are required.
- VPC
- 2 public (if routing over the Internet) and private subnets.
- A security group that allows communicating with each other.
- S3 Bucket that DAG’s sources and requirements.txt place
- Execution Role
- s3
- if no blockPublicAccess, “Unable to check PublicAccessBlock configuration” error occurs
- cloudwatch: sending metrics and logs
- sqs: queueing tasks
- kms: encrypting data
- s3
import {
Stack,
StackProps,
Tags,
aws_ec2 as ec2,
aws_s3 as s3,
aws_iam as iam,
aws_mwaa as mwaa
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
export class MwaaStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
const vpc = new ec2.Vpc(this, 'MwaaVPC', {
ipAddresses: ec2.IpAddresses.cidr('10.0.0.0/16'),
maxAzs: 2,
})
Tags.of(vpc).add('Name', `${this.stackName}VPC`)
const securityGroup = new ec2.SecurityGroup(this, 'MwaaSecurityGroup', {
vpc: vpc
})
securityGroup.addIngressRule(securityGroup, ec2.Port.allTcp())
const sourceBucket = new s3.Bucket(this, 'MwaaSourceBucket', {
blockPublicAccess: {
blockPublicAcls: true,
blockPublicPolicy: true,
ignorePublicAcls: true,
restrictPublicBuckets: true
}
})
const mwaaEnvironmentName = `${this.stackName}-mwaa-env`
const executionRole = new iam.Role(this, 'MwaaExecutionRole', {
roleName: `${this.stackName}-exeuction-role`,
assumedBy: new iam.CompositePrincipal(
new iam.ServicePrincipal('airflow-env.amazonaws.com'),
new iam.ServicePrincipal('airflow.amazonaws.com'),
),
path: '/service-role/',
inlinePolicies: {
'execution-policy': new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['airflow:PublishMetrics'],
resources: [`arn:aws:airflow:${this.region}:${this.account}:environment/${mwaaEnvironmentName}`]
}),
new iam.PolicyStatement({
effect: iam.Effect.DENY,
actions: ['s3:ListAllMyBuckets'],
resources: [
sourceBucket.bucketArn,
`${sourceBucket.bucketArn}/*`
]
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
's3:GetObject*',
's3:GetBucket*',
's3:List*'
],
resources: [
sourceBucket.bucketArn,
`${sourceBucket.bucketArn}/*`
]
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'logs:DescribeLogGroups',
's3:GetAccountPublicAccessBlock',
'cloudwatch:PutMetricData'
],
resources: ['*']
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'logs:CreateLogStream',
'logs:CreateLogGroup',
'logs:PutLogEvents',
'logs:GetLogEvents',
'logs:GetLogRecord',
'logs:GetLogGroupFields',
'logs:GetQueryResults',
'logs:DescribeLogGroups',
],
resources: [`arn:aws:logs:${this.region}:${this.account}:log-group:airflow-${mwaaEnvironmentName}-*`],
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'sqs:ChangeMessageVisibility',
'sqs:DeleteMessage',
'sqs:GetQueueAttributes',
'sqs:GetQueueUrl',
'sqs:ReceiveMessage',
'sqs:SendMessage'
],
resources: [`arn:aws:sqs:${this.region}:*:airflow-celery-*`]
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'kms:Decrypt',
'kms:DescribeKey',
'kms:GenerateDataKey*',
'kms:Encrypt'
],
notResources: [`arn:aws:kms:*:${this.account}:key/*"`],
conditions: {
StringLike: {
'kms:ViaService': [
`sqs.${this.region}.amazonaws.com`
],
},
},
}),
]
})
}
})
}
}
The resource amount of a scheduler/worker depends on environmentClass, and a small worker runs 5 tasks concurrently by default.
As it is, params couldn’t be overwritten with configs passed on triggered, so set core.dag_run_conf_overrides_params to true with airflowConfigurationOptions. Configs can be obtained with AIRFLOW__{SECTION}__{KEY} environment variables.
Creating an environment takes tens of minutes.
const mwaaEnvironment = new mwaa.CfnEnvironment(this, 'MwaaEnvironment', {
name: mwaaEnvironmentName,
airflowVersion: '2.2.2',
airflowConfigurationOptions: {
'core.dag_run_conf_overrides_params': 'true'
'app.env': 'prd', // os.getenv("AIRFLOW__APP__ENV")
},
sourceBucketArn: sourceBucket.bucketArn,
executionRoleArn: executionRole.roleArn,
dagS3Path: 'dags',
networkConfiguration: {
securityGroupIds: [ securityGroup.securityGroupId ],
subnetIds: vpc.privateSubnets.map(s => s.subnetId),
},
webserverAccessMode: "PUBLIC_ONLY",
environmentClass: 'mw1.small',
maxWorkers: 1,
loggingConfiguration: {
dagProcessingLogs: {
enabled: true,
logLevel: 'INFO',
},
schedulerLogs: {
enabled: true,
logLevel: 'INFO',
},
taskLogs: {
enabled: true,
logLevel: 'INFO',
},
workerLogs: {
enabled: true,
logLevel: 'INFO',
},
webserverLogs: {
enabled: true,
logLevel: 'INFO',
},
},
});
Access to Airflow UI
It can be accessed from the console, and it can also be accessed by obtaining a login token with CLI.
$ ENV_NAME=MwaaStack-mwaa-env
$ aws mwaa create-web-login-token --name $ENV_NAME
{
"WebServerHostname": "*****.c9.ap-northeast-1.airflow.amazonaws.com",
"WebToken": "*****"
}
$ open $(aws mwaa create-web-login-token --name $ENV_NAME | jq -r '"https://" + .WebServerHostname + "/aws_mwaa/aws-console-sso?login=true#" + .WebToken')
Besides, when you call Airflow CLI, get the cli token and authorize with it.
$ aws mwaa create-cli-token --name $ENV_NAME
{
"CliToken": "*****",
"WebServerHostname": "*****.c8.ap-northeast-1.airflow.amazonaws.com"
}
$ CLI_JSON=$(aws mwaa create-cli-token --name $ENV_NAME) \
&& CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
&& WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
&& curl -X POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \
-H "Authorization: Bearer $CLI_TOKEN" \
-H "Content-Type: text/plain" \
-d "version" | jq -r '.stdout' | base64 -d
2.2.2
Run a workflow
Upload the following codes that add a step with EmrAddStepsOperator, and wait until it has been completed with EmrStepSensor, to dagS3Path. These are equivalent to elasticmapreduce:addStep.sync of Step Functions.
Launch an EMR cluster with AWS CLI and run Spark applications - sambaiz-net
The default mode of BaseSensorOperator is “poke”, and it takes up a worker slot while waiting, but “reschedule” does not.
Putting an Operator and a Sensor into a TaskGroup makes the DAG easier to see, but Tasks are scheduled individually like any other, so it needs to devise for retries.
Retry processing consisting of multiple Tasks with Callbacks in Airflow - sambaiz-net
apache-airflow-providers-amazon installed in Airflow v2.2.2 environment is v2.4.0, which is a little old, so the module name is different. Also, some operators such as AwsLambdaInvokeFunctionOperator haven’t existed yet, so if you want to use them, you need to implement them by yourself with hook or bring codes from the latest version.
$ cat test.py
from datetime import datetime
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.utils.task_group import TaskGroup
with DAG(
'test',
default_args={},
start_date=datetime(2022, 11, 17),
params={
"cluster_id": "j-3C4OVZL6MAM6H"
}
) as dag:
clusterId = '{{ params.cluster_id }}'
with TaskGroup("app1") as app1:
add_app1_step = EmrAddStepsOperator(
task_id="add_app1_step",
job_flow_id=clusterId,
steps=[
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
},
}
]
)
wait_app1_step = EmrStepSensor(
task_id="wait_app1_step",
job_flow_id=clusterId,
step_id=f"{{{{ ti.xcom_pull(task_ids='{add_app1_step.task_id}', key='return_value')[0] }}}}",
)
add_app1_step >> wait_app1_step
with TaskGroup("app2") as app2:
add_app2_step = EmrAddStepsOperator(
task_id="add_app2_step",
job_flow_id=clusterId,
steps=[
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
},
}
]
)
wait_app2_step = EmrStepSensor(
task_id="wait_app2_step",
job_flow_id=clusterId,
step_id=f"{{{{ ti.xcom_pull(task_ids='{add_app2_step.task_id}', key='return_value')[0] }}}}",
)
add_app2_step >> wait_app2_step
app1 >> app2
$ aws s3 cp text.py s3://<source_bucket_name>/dags/
If you run this task, it fails, and you see AccessDeniedException in the log, so attach the required policies to the Execution Role. When no logs appeared, fixed the role and re-created an environment, and then I could see them.
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'elasticmapreduce:AddJobFlowSteps',
'elasticmapreduce:DescribeStep',
],
resources: [ '*' ]
})