GCP の マネージド Airflow サービスCloud Composer を Terraform で立ち上げてワークフローを動かす

gcpairflowterraform

Cloud Composer は Apache Airflow のマネージドサービス。AWS でいう MWAA。

CDK で Amazon Managed Workflow for Apache Airflow (MWAA) の環境を作成しワークフローを実行する - sambaiz-net

料金は Composer 2 と現在プレビューの Composer 3 で異なるが、主に CPU、メモリ、ストレージ量に対してかかる。GKE 上で動くが、既存のクラスタは利用できない。リソースが足りないと Worker Pod が ClashLoopBackoff や OOMになったりする。このクラスタは Composer 2 ではリストに表示されているが、3 では表示されなくなる。

ドキュメントに従って Terraform で立ち上げる。

API を有効にする。

resource "google_project_service" "composer_api" {
  provider = google-beta
  project = "sambaiztest"
  service = "composer.googleapis.com"
  disable_on_destroy = false
  check_if_service_has_usage_on_destroy = true
}	

Service Account を作成して composer.worker role を付与し、Google の Service Account へ権限を与える。PROJECT_NUMBER はダッシュボードから確認できる。

resource "google_service_account" "composer_service_account" {
  account_id   = "composer-service-account"
  display_name = "Service Account for Cloud Composer"
}

resource "google_project_iam_member" "composer_service_account" {
  provider = google-beta
  project  = "sambaiztest"
  member   = format("serviceAccount:%s", google_service_account.composer_service_account.email)
  role     = "roles/composer.worker"
}

resource "google_service_account_iam_member" "custom_service_account" {
  provider = google-beta
  service_account_id = google_service_account.composer_service_account.name
  role = "roles/composer.ServiceAgentV2Ext"
  member = "serviceAccount:service-PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com"
}

google_composer_environment で環境を作成する。image_version を composer-3 にすると Composer 3 の環境ができるが、アップグレードすることはできない。

resource "google_composer_environment" "composer_env" {
  name   = "my-composer-environment"
  region = "asia-northeast1"
  
  config {
    software_config {
      image_version = "composer-2.9.11-airflow-2.9.3"
      # image_version = "composer-3-airflow-2.9.3"
      
      env_variables = {
        ENV = "dev"
      }
    }

    node_config {
      service_account = google_service_account.composer_service_account.email
    }
    environment_size = "ENVIRONMENT_SIZE_SMALL"
  }
}

完了まで 17 分ほどかかった。

DAG は指定の bucket に上げると反映される。

$ COMPOSER_ENVIRONMENT_NAME=my-composer-environment
$ COMPOSER_LOCATION=asia-northeast1

$ DAG_BUCKET_PATH=$(gcloud composer environments \
	    describe $COMPOSER_ENVIRONMENT_NAME \
	    --location=$COMPOSER_LOCATION \
    	--format="get(config.dagGcsPrefix)")
$ echo $DAG_BUCKET_PATH
gs://*****/dags

$ gsutil -m rsync -r -d -x "(\.DS_Store|xxxx)" ./dags $DAG_BUCKET_PATH

# or
#
# resource "google_storage_bucket_object" "dag_file" {
#  name   = "dags/example_dag.py"
#  bucket = split("/", google_composer_environment.composer_env.config[0].dag_gcs_prefix)[2]
#  source = "${path.module}/dags/example_dag.py"
# }

依存ライブラリの更新は、software_config の pypi_packages を変更したり –update-pypi-packages-from-file に requirements.txt を渡したりすることで行える。 これを Terraform で行いたい場合は local-exec provisioner が使えるが、いずれにせよ再度 terraform apply すると pypi_packages との乖離が発生することになる。

$ poetry show --only main --top-level | awk '{print $1"=="$2}' > requirements.txt

$ gcloud composer environments update $COMPOSER_ENVIRONMENT_NAME \
	  --location=$COMPOSER_LOCATION \
    --update-pypi-packages-from-file="./requirements.txt"

# or
#
# resource "null_resource" "update_composer_environment" {
#   triggers = {
#     requirements_hash = filemd5("${path.module}/dags/requirements.txt")
#   }
#
#   provisioner "local-exec" {
#     command = <<EOT
#       gcloud composer environments update ${google_composer_environment.composer_env.name} \
#         --location ${google_composer_environment.composer_env.region} \
#         --update-pypi-packages-from-file dags/requirements.txt
#     EOT
#   }
# }

DAG や Task の失敗は composer.googleapis.com/workflow/(task/)run_count メトリクスの state=“failed” で検知することができる。

参考

Cloud Composer3を調べてみた

Cloud Composer 3: Truly “serverless”?