CDK で Amazon Managed Workflow for Apache Airflow (MWAA) の環境を作成しワークフローを実行する

awsairflowetl

Amazon Managed Workflow for Apache Airflow (MWAA)Apache Airflow のマネージドサービス。 サーバーレスの Step Functions とは異なりインスタンス時間のコストがかかるが、 Airflow の豊富な機能や、AWSを含むサードパーティの Providers packages が利用できる。

Docker Compose で Apache Airflow を起動しワークフローを実行する - sambaiz-net

Step Functions は現状ワークフローの途中からの実行をサポートしていないため長いワークフローの再実行が大変だったり、 EmrAddStep の args のような Object の値の配列に input の値を含めたい場合に Lambda を挟んで配列に加工する必要があったりするので、 それらに不便を感じている場合は移行するのも一つの手だと思う。

CDKでStep Functionsによるワークフローを構築する - sambaiz-net

CDK による環境の作成

Quick start にある CloudFormation のテンプレートを参考に CDK のコードを記述していく。

MWAA の環境を作成するにあたって

  • VPC
    • パブリック (インターネットに出る場合) とプライベートのサブネットが各2つ
    • 互いの通信を許可する Security Group
  • DAG のソースや requirements.txt を置くS3 Bucket
  • Execution Role
    • s3
      • blockPublicAccess がないと Unable to check PublicAccessBlock configuration になる
    • cloudwatch: メトリクスとログの送信
    • sqs: タスクのキューイング
    • kms: データの暗号化

が必要になる。

import { 
  Stack, 
  StackProps,
  Tags,
  aws_ec2 as ec2, 
  aws_s3 as s3, 
  aws_iam as iam,
  aws_mwaa as mwaa
} from 'aws-cdk-lib';
import { Construct } from 'constructs';

export class MwaaStack extends Stack {
  constructor(scope: Construct, id: string, props?: StackProps) {
    super(scope, id, props);

    const vpc = new ec2.Vpc(this, 'MwaaVPC', {
      ipAddresses: ec2.IpAddresses.cidr('10.0.0.0/16'),
      maxAzs: 2,
    })
    Tags.of(vpc).add('Name', `${this.stackName}VPC`)

    const securityGroup = new ec2.SecurityGroup(this, 'MwaaSecurityGroup', {
      vpc: vpc
    })
    securityGroup.addIngressRule(securityGroup, ec2.Port.allTcp())

    const sourceBucket = new s3.Bucket(this, 'MwaaSourceBucket', {
      blockPublicAccess: {
        blockPublicAcls: true,
        blockPublicPolicy: true,
        ignorePublicAcls: true,
        restrictPublicBuckets: true
      }
    })

    const mwaaEnvironmentName = `${this.stackName}-mwaa-env`

    const executionRole = new iam.Role(this, 'MwaaExecutionRole', {
      roleName: `${this.stackName}-exeuction-role`,
      assumedBy: new iam.CompositePrincipal(
        new iam.ServicePrincipal('airflow-env.amazonaws.com'),
        new iam.ServicePrincipal('airflow.amazonaws.com'),
      ),
      path: '/service-role/',
      inlinePolicies: {
        'execution-policy': new iam.PolicyDocument({
          statements: [
            new iam.PolicyStatement({
              effect: iam.Effect.ALLOW,
              actions: ['airflow:PublishMetrics'],
              resources: [`arn:aws:airflow:${this.region}:${this.account}:environment/${mwaaEnvironmentName}`]
            }),
            new iam.PolicyStatement({
              effect: iam.Effect.DENY,
              actions: ['s3:ListAllMyBuckets'],
              resources: [
                sourceBucket.bucketArn,
                `${sourceBucket.bucketArn}/*`
              ]
            }),
            new iam.PolicyStatement({
              effect: iam.Effect.ALLOW,
              actions: [
                's3:GetObject*',
                's3:GetBucket*',
                's3:List*'
              ],
              resources: [
                sourceBucket.bucketArn,
                `${sourceBucket.bucketArn}/*`
              ]
            }),
            new iam.PolicyStatement({
              effect: iam.Effect.ALLOW,
              actions: [
                'logs:DescribeLogGroups',
                's3:GetAccountPublicAccessBlock',
                'cloudwatch:PutMetricData'
              ],
              resources: ['*']
            }),
            new iam.PolicyStatement({
              effect: iam.Effect.ALLOW,
              actions: [
                'logs:CreateLogStream',
                'logs:CreateLogGroup',
                'logs:PutLogEvents',
                'logs:GetLogEvents',
                'logs:GetLogRecord',
                'logs:GetLogGroupFields',
                'logs:GetQueryResults',
                'logs:DescribeLogGroups',
              ], 
              resources: [`arn:aws:logs:${this.region}:${this.account}:log-group:airflow-${mwaaEnvironmentName}-*`],
            }),
            new iam.PolicyStatement({
              effect: iam.Effect.ALLOW,
              actions: [
                'sqs:ChangeMessageVisibility',
                'sqs:DeleteMessage',
                'sqs:GetQueueAttributes',
                'sqs:GetQueueUrl',
                'sqs:ReceiveMessage',
                'sqs:SendMessage'
              ],
              resources: [`arn:aws:sqs:${this.region}:*:airflow-celery-*`]
            }),
            new iam.PolicyStatement({
              effect: iam.Effect.ALLOW,
              actions: [
                'kms:Decrypt',
                'kms:DescribeKey',
                'kms:GenerateDataKey*',
                'kms:Encrypt'
              ],
              notResources: [`arn:aws:kms:*:${this.account}:key/*"`],
              conditions: {
                StringLike: {
                  'kms:ViaService': [
                    `sqs.${this.region}.amazonaws.com`
                  ],
                },
              },
            }),
          ]
        })
      }
    })
  }
}

environmentClass によってスケジューラー/ワーカーのリソース量が異なり、Small のワーカーはデフォルトで 5 つのタスクを並列に実行する。

そのままでは 実行時に渡す config で params を上書きできなかったので airflowConfigurationOptions で core.dag_run_conf_overrides_params を true にしている。 config の値は環境変数 AIRFLOW__{SECTION}__{KEY} で取得できる

Environment の作成には数十分かかる。

const mwaaEnvironment = new mwaa.CfnEnvironment(this, 'MwaaEnvironment', {
  name: mwaaEnvironmentName,
  airflowVersion: '2.2.2',
  airflowConfigurationOptions: {
    'core.dag_run_conf_overrides_params': 'true',
    'app.env': 'prd', // os.getenv("AIRFLOW__APP__ENV")
  },
   
  sourceBucketArn: sourceBucket.bucketArn,
  executionRoleArn: executionRole.roleArn,
  dagS3Path: 'dags',

  networkConfiguration: {
    securityGroupIds: [ securityGroup.securityGroupId ],
    subnetIds: vpc.privateSubnets.map(s => s.subnetId),
  },
  webserverAccessMode: "PUBLIC_ONLY",
  
  environmentClass: 'mw1.small',
  maxWorkers: 1,

  loggingConfiguration: {
    dagProcessingLogs: {
      enabled: true,
      logLevel: 'INFO',
    },
    schedulerLogs: {
      enabled: true,
      logLevel: 'INFO',
    },
    taskLogs: {
      enabled: true,
      logLevel: 'INFO',
    },
    workerLogs: {
      enabled: true,
      logLevel: 'INFO',
    },
    webserverLogs: {
      enabled: true,
      logLevel: 'INFO',
    },
  },
});

Airflow UI へのアクセス

コンソール上からもアクセスできるが、 CLI で login token を取得してアクセスすることもできる。

$ ENV_NAME=MwaaStack-mwaa-env
$ aws mwaa create-web-login-token --name $ENV_NAME
{
    "WebServerHostname": "*****.c9.ap-northeast-1.airflow.amazonaws.com",
    "WebToken": "*****"
}

$ open $(aws mwaa create-web-login-token --name $ENV_NAME | jq -r '"https://" + .WebServerHostname + "/aws_mwaa/aws-console-sso?login=true#" + .WebToken')

ちなみに Airflow CLI を呼ぶ場合は cli token を取得して認証する。

$ aws mwaa create-cli-token --name $ENV_NAME
{
    "CliToken": "*****",
    "WebServerHostname": "*****.c8.ap-northeast-1.airflow.amazonaws.com"
}

$ CLI_JSON=$(aws mwaa create-cli-token --name $ENV_NAME) \
    && CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \
    && WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \
    && curl -X POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \
    -H "Authorization: Bearer $CLI_TOKEN" \
    -H "Content-Type: text/plain" \
    -d "version" | jq -r '.stdout' | base64 -d
2.2.2

ワークフローの実行

EmrAddStepsOperator で EMR クラスタに step を追加し EmrStepSensor で完了まで待つ、Step Functions の elasticmapreduce:addStep.sync 相当の処理を行う次のコードを dagS3Path にアップロードする。

AWS CLIでEMRクラスタを立ち上げSparkのアプリケーションを実行する - sambaiz-net

BaseSensorOperator の mode のデフォルト値は poke で待っている間ワーカーの slot を埋めてしまうため、reschedule にしている。

Operator と Sensor を TaskGroup にまとめると UI 上は見やすくなるが、Task は他と同様に個別にスケジューリングされるため、リトライは工夫する必要がある。

Airflow の Callback で複数の Task からなる処理のリトライを行う - sambaiz-net

Airflow v2.2.2 環境 にインストールされている apache-airflow-providers-amazon は v2.4.0 と少し古いためモジュール名が異なっている。 また、AwsLambdaInvokeFunctionOperator など存在しない Operator も多いので、 必要ならhookはあるので自分で実装するか最新バージョンからコードを持ってくる必要がある。

$ cat test.py
from datetime import datetime
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.utils.task_group import TaskGroup

with DAG(
  'test',
  default_args={},
  start_date=datetime(2022, 11, 17),
  params={
     "cluster_id": "j-3C4OVZL6MAM6H"
  }
) as dag:
  clusterId = '{{ params.cluster_id }}'

  with TaskGroup("app1") as app1:
    add_app1_step = EmrAddStepsOperator(
      task_id="add_app1_step",
      job_flow_id=clusterId,
      steps=[
        {
          "Name": "calculate_pi",
          "ActionOnFailure": "CONTINUE",
          "HadoopJarStep": {
              "Jar": "command-runner.jar",
              "Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
          },
        }
      ]
    )

    wait_app1_step = EmrStepSensor(
      task_id="wait_app1_step",
      job_flow_id=clusterId,
      step_id=f"{{{{ ti.xcom_pull(task_ids='{add_app1_step.task_id}', key='return_value')[0] }}}}",
      mode='reschedule'
    )

    add_app1_step >> wait_app1_step
  
  with TaskGroup("app2") as app2:
    add_app2_step = EmrAddStepsOperator(
      task_id="add_app2_step",
      job_flow_id=clusterId,
      steps=[
        {
          "Name": "calculate_pi",
          "ActionOnFailure": "CONTINUE",
          "HadoopJarStep": {
              "Jar": "command-runner.jar",
              "Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
          },
        }
      ]
    )

    wait_app2_step = EmrStepSensor(
      task_id="wait_app2_step",
      job_flow_id=clusterId,
      step_id=f"{{{{ ti.xcom_pull(task_ids='{add_app2_step.task_id}', key='return_value')[0] }}}}",
      mode='reschedule'
    )

    add_app2_step >> wait_app2_step

  app1 >> app2

$ aws s3 cp text.py s3://<source_bucket_name>/dags/

これを実行すると失敗し AccessDeniedException がログに出るので 必要な Policy を Execution Role に追加する。 ログに何も出ない場合は、Role を修正し Environment を作り直すことで出るようになった。

new iam.PolicyStatement({
  effect: iam.Effect.ALLOW,
  actions: [
    'elasticmapreduce:AddJobFlowSteps',
    'elasticmapreduce:DescribeStep',
  ],
  resources: [ '*' ]
})