Cloud Composer は Apache Airflow のマネージドサービス。AWS でいう MWAA。
CDK で Amazon Managed Workflow for Apache Airflow (MWAA) の環境を作成しワークフローを実行する - sambaiz-net
料金は Composer 2 と現在プレビューの Composer 3 で異なるが、主に CPU、メモリ、ストレージ量に対してかかる。GKE 上で動くが、既存のクラスタは利用できない。リソースが足りないと Worker Pod が ClashLoopBackoff や OOMになったりする。このクラスタは Composer 2 ではリストに表示されているが、3 では表示されなくなる。
ドキュメントに従って Terraform で立ち上げる。
API を有効にする。
resource "google_project_service" "composer_api" {
provider = google-beta
project = "sambaiztest"
service = "composer.googleapis.com"
disable_on_destroy = false
check_if_service_has_usage_on_destroy = true
}
Service Account を作成して composer.worker role を付与し、Google の Service Account へ権限を与える。PROJECT_NUMBER はダッシュボードから確認できる。
resource "google_service_account" "composer_service_account" {
account_id = "composer-service-account"
display_name = "Service Account for Cloud Composer"
}
resource "google_project_iam_member" "composer_service_account" {
provider = google-beta
project = "sambaiztest"
member = format("serviceAccount:%s", google_service_account.composer_service_account.email)
role = "roles/composer.worker"
}
resource "google_service_account_iam_member" "custom_service_account" {
provider = google-beta
service_account_id = google_service_account.composer_service_account.name
role = "roles/composer.ServiceAgentV2Ext"
member = "serviceAccount:service-PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com"
}
google_composer_environment で環境を作成する。image_version を composer-3 にすると Composer 3 の環境ができるが、アップグレードすることはできない。
resource "google_composer_environment" "composer_env" {
name = "my-composer-environment"
region = "asia-northeast1"
config {
software_config {
image_version = "composer-2.9.11-airflow-2.9.3"
# image_version = "composer-3-airflow-2.9.3"
env_variables = {
ENV = "dev"
}
}
node_config {
service_account = google_service_account.composer_service_account.email
}
environment_size = "ENVIRONMENT_SIZE_SMALL"
}
}
完了まで 17 分ほどかかった。
DAG は指定の bucket に上げると反映される。
$ COMPOSER_ENVIRONMENT_NAME=my-composer-environment
$ COMPOSER_LOCATION=asia-northeast1
$ DAG_BUCKET_PATH=$(gcloud composer environments \
describe $COMPOSER_ENVIRONMENT_NAME \
--location=$COMPOSER_LOCATION \
--format="get(config.dagGcsPrefix)")
$ echo $DAG_BUCKET_PATH
gs://*****/dags
$ gsutil -m rsync -r -d -x "(\.DS_Store|xxxx)" ./dags $DAG_BUCKET_PATH
# or
#
# resource "google_storage_bucket_object" "dag_file" {
# name = "dags/example_dag.py"
# bucket = split("/", google_composer_environment.composer_env.config[0].dag_gcs_prefix)[2]
# source = "${path.module}/dags/example_dag.py"
# }
依存ライブラリの更新は、software_config の pypi_packages を変更したり –update-pypi-packages-from-file に requirements.txt を渡したりすることで行える。 これを Terraform で行いたい場合は local-exec provisioner が使えるが、いずれにせよ再度 terraform apply すると pypi_packages との乖離が発生することになる。
$ poetry show --only main --top-level | awk '{print $1"=="$2}' > requirements.txt
$ gcloud composer environments update $COMPOSER_ENVIRONMENT_NAME \
--location=$COMPOSER_LOCATION \
--update-pypi-packages-from-file="./requirements.txt"
# or
#
# resource "null_resource" "update_composer_environment" {
# triggers = {
# requirements_hash = filemd5("${path.module}/dags/requirements.txt")
# }
#
# provisioner "local-exec" {
# command = <<EOT
# gcloud composer environments update ${google_composer_environment.composer_env.name} \
# --location ${google_composer_environment.composer_env.region} \
# --update-pypi-packages-from-file dags/requirements.txt
# EOT
# }
# }
DAG や Task の失敗は composer.googleapis.com/workflow/(task/)run_count メトリクスの state=“failed” で検知することができる。