Amazon Managed Workflow for Apache Airflow (MWAA) は Apache Airflow のマネージドサービス。 サーバーレスの Step Functions とは異なりインスタンス時間のコストがかかるが、 Airflow の豊富な機能や、AWSを含むサードパーティの Providers packages が利用できる。
Docker Compose で Apache Airflow を起動しワークフローを実行する - sambaiz-net
Step Functions は現状ワークフローの途中からの実行をサポートしていないため長いワークフローの再実行が大変だったり、 EmrAddStep の args のような Object の値の配列に input の値を含めたい場合に Lambda を挟んで配列に加工する必要があったりするので、 それらに不便を感じている場合は移行するのも一つの手だと思う。
CDKでStep Functionsによるワークフローを構築する - sambaiz-net
CDK による環境の作成
Quick start にある CloudFormation のテンプレートを参考に CDK のコードを記述していく。
MWAA の環境を作成するにあたって
- VPC
- パブリック (インターネットに出る場合) とプライベートのサブネットが各2つ
- 互いの通信を許可する Security Group
- DAG のソースや requirements.txt を置くS3 Bucket
- Execution Role
- s3
- blockPublicAccess がないと Unable to check PublicAccessBlock configuration になる
- cloudwatch: メトリクスとログの送信
- sqs: タスクのキューイング
- kms: データの暗号化
- s3
が必要になる。
import {
Stack,
StackProps,
Tags,
aws_ec2 as ec2,
aws_s3 as s3,
aws_iam as iam,
aws_mwaa as mwaa
} from 'aws-cdk-lib';
import { Construct } from 'constructs';
export class MwaaStack extends Stack {
constructor(scope: Construct, id: string, props?: StackProps) {
super(scope, id, props);
const vpc = new ec2.Vpc(this, 'MwaaVPC', {
ipAddresses: ec2.IpAddresses.cidr('10.0.0.0/16'),
maxAzs: 2,
})
Tags.of(vpc).add('Name', `${this.stackName}VPC`)
const securityGroup = new ec2.SecurityGroup(this, 'MwaaSecurityGroup', {
vpc: vpc
})
securityGroup.addIngressRule(securityGroup, ec2.Port.allTcp())
const sourceBucket = new s3.Bucket(this, 'MwaaSourceBucket', {
blockPublicAccess: {
blockPublicAcls: true,
blockPublicPolicy: true,
ignorePublicAcls: true,
restrictPublicBuckets: true
}
})
const mwaaEnvironmentName = `${this.stackName}-mwaa-env`
const executionRole = new iam.Role(this, 'MwaaExecutionRole', {
roleName: `${this.stackName}-exeuction-role`,
assumedBy: new iam.CompositePrincipal(
new iam.ServicePrincipal('airflow-env.amazonaws.com'),
new iam.ServicePrincipal('airflow.amazonaws.com'),
),
path: '/service-role/',
inlinePolicies: {
'execution-policy': new iam.PolicyDocument({
statements: [
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['airflow:PublishMetrics'],
resources: [`arn:aws:airflow:${this.region}:${this.account}:environment/${mwaaEnvironmentName}`]
}),
new iam.PolicyStatement({
effect: iam.Effect.DENY,
actions: ['s3:ListAllMyBuckets'],
resources: [
sourceBucket.bucketArn,
`${sourceBucket.bucketArn}/*`
]
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
's3:GetObject*',
's3:GetBucket*',
's3:List*'
],
resources: [
sourceBucket.bucketArn,
`${sourceBucket.bucketArn}/*`
]
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'logs:DescribeLogGroups',
's3:GetAccountPublicAccessBlock',
'cloudwatch:PutMetricData'
],
resources: ['*']
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'logs:CreateLogStream',
'logs:CreateLogGroup',
'logs:PutLogEvents',
'logs:GetLogEvents',
'logs:GetLogRecord',
'logs:GetLogGroupFields',
'logs:GetQueryResults',
'logs:DescribeLogGroups',
],
resources: [`arn:aws:logs:${this.region}:${this.account}:log-group:airflow-${mwaaEnvironmentName}-*`],
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'sqs:ChangeMessageVisibility',
'sqs:DeleteMessage',
'sqs:GetQueueAttributes',
'sqs:GetQueueUrl',
'sqs:ReceiveMessage',
'sqs:SendMessage'
],
resources: [`arn:aws:sqs:${this.region}:*:airflow-celery-*`]
}),
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'kms:Decrypt',
'kms:DescribeKey',
'kms:GenerateDataKey*',
'kms:Encrypt'
],
notResources: [`arn:aws:kms:*:${this.account}:key/*"`],
conditions: {
StringLike: {
'kms:ViaService': [
`sqs.${this.region}.amazonaws.com`
],
},
},
}),
]
})
}
})
}
}
environmentClass によってスケジューラー/ワーカーのリソース量が異なり、Small のワーカーはデフォルトで 5 つのタスクを並列に実行する。
そのままでは 実行時に渡す config で params を上書きできなかったので airflowConfigurationOptions で core.dag_run_conf_overrides_params を true にしている。 config の値は環境変数 AIRFLOW__{SECTION}__{KEY} で取得できる。
Environment の作成には数十分かかる。
const mwaaEnvironment = new mwaa.CfnEnvironment(this, 'MwaaEnvironment', {
name: mwaaEnvironmentName,
airflowVersion: '2.2.2',
airflowConfigurationOptions: {
'core.dag_run_conf_overrides_params': 'true',
'app.env': 'prd', // os.getenv("AIRFLOW__APP__ENV")
},
sourceBucketArn: sourceBucket.bucketArn,
executionRoleArn: executionRole.roleArn,
dagS3Path: 'dags',
networkConfiguration: {
securityGroupIds: [ securityGroup.securityGroupId ],
subnetIds: vpc.privateSubnets.map(s => s.subnetId),
},
webserverAccessMode: "PUBLIC_ONLY",
environmentClass: 'mw1.small',
maxWorkers: 1,
loggingConfiguration: {
dagProcessingLogs: {
enabled: true,
logLevel: 'INFO',
},
schedulerLogs: {
enabled: true,
logLevel: 'INFO',
},
taskLogs: {
enabled: true,
logLevel: 'INFO',
},
workerLogs: {
enabled: true,
logLevel: 'INFO',
},
webserverLogs: {
enabled: true,
logLevel: 'INFO',
},
},
});
Airflow UI へのアクセス
コンソール上からもアクセスできるが、 CLI で login token を取得してアクセスすることもできる。
$ ENV_NAME=MwaaStack-mwaa-env
$ aws mwaa create-web-login-token --name $ENV_NAME
{
"WebServerHostname": "*****.c9.ap-northeast-1.airflow.amazonaws.com",
"WebToken": "*****"
}
$ open $(aws mwaa create-web-login-token --name $ENV_NAME | jq -r '"https://" + .WebServerHostname + "/aws_mwaa/aws-console-sso?login=true#" + .WebToken')
ちなみに Airflow CLI を呼ぶ場合は cli token を取得して認証する。
$ aws mwaa create-cli-token --name $ENV_NAME
{
"CliToken": "*****",
"WebServerHostname": "*****.c8.ap-northeast-1.airflow.amazonaws.com"
}
$ CLI_JSON=$(aws mwaa create-cli-token --name $ENV_NAME) \
&& CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
&& WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
&& curl -X POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \
-H "Authorization: Bearer $CLI_TOKEN" \
-H "Content-Type: text/plain" \
-d "version" | jq -r '.stdout' | base64 -d
2.2.2
ワークフローの実行
EmrAddStepsOperator で EMR クラスタに step を追加し EmrStepSensor で完了まで待つ、Step Functions の elasticmapreduce:addStep.sync 相当の処理を行う次のコードを dagS3Path にアップロードする。
AWS CLIでEMRクラスタを立ち上げSparkのアプリケーションを実行する - sambaiz-net
BaseSensorOperator の mode のデフォルト値は poke で待っている間ワーカーの slot を埋めてしまうため、reschedule にしている。
Operator と Sensor を TaskGroup にまとめると UI 上は見やすくなるが、Task は他と同様に個別にスケジューリングされるため、リトライは工夫する必要がある。
Airflow の Callback で複数の Task からなる処理のリトライを行う - sambaiz-net
Airflow v2.2.2 環境 にインストールされている apache-airflow-providers-amazon は v2.4.0 と少し古いためモジュール名が異なっている。 また、AwsLambdaInvokeFunctionOperator など存在しない Operator も多いので、 必要ならhookはあるので自分で実装するか最新バージョンからコードを持ってくる必要がある。
$ cat test.py
from datetime import datetime
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.utils.task_group import TaskGroup
with DAG(
'test',
default_args={},
start_date=datetime(2022, 11, 17),
params={
"cluster_id": "j-3C4OVZL6MAM6H"
}
) as dag:
clusterId = '{{ params.cluster_id }}'
with TaskGroup("app1") as app1:
add_app1_step = EmrAddStepsOperator(
task_id="add_app1_step",
job_flow_id=clusterId,
steps=[
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
},
}
]
)
wait_app1_step = EmrStepSensor(
task_id="wait_app1_step",
job_flow_id=clusterId,
step_id=f"{{{{ ti.xcom_pull(task_ids='{add_app1_step.task_id}', key='return_value')[0] }}}}",
mode='reschedule'
)
add_app1_step >> wait_app1_step
with TaskGroup("app2") as app2:
add_app2_step = EmrAddStepsOperator(
task_id="add_app2_step",
job_flow_id=clusterId,
steps=[
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
},
}
]
)
wait_app2_step = EmrStepSensor(
task_id="wait_app2_step",
job_flow_id=clusterId,
step_id=f"{{{{ ti.xcom_pull(task_ids='{add_app2_step.task_id}', key='return_value')[0] }}}}",
mode='reschedule'
)
add_app2_step >> wait_app2_step
app1 >> app2
$ aws s3 cp text.py s3://<source_bucket_name>/dags/
これを実行すると失敗し AccessDeniedException がログに出るので 必要な Policy を Execution Role に追加する。 ログに何も出ない場合は、Role を修正し Environment を作り直すことで出るようになった。
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'elasticmapreduce:AddJobFlowSteps',
'elasticmapreduce:DescribeStep',
],
resources: [ '*' ]
})