Setting up GCP's managed Airflow service Cloud Composer using Terraform and running workflows
gcpairflowterraformCloud Composer is a managed service for Apache Airflow. It’s equivalent of MWAA by AWS.
Pricing differs between Composer 2 and Composer 3, which is currently review, but it’s mainly based on CPU, memory, and storage usage. While it runs on GKE, existing clusters cannot be used. If the resouce is insufficient, Worker Pod will occur ClashLoopBackoff or OOM. This cluster can be seen when Composer 2 is used, but will no longer be displayed from 3.
Follow the documentation to set up using Terraform.
Enable the API.
resource "google_project_service" "composer_api" {
provider = google-beta
project = "sambaiztest"
service = "composer.googleapis.com"
disable_on_destroy = false
check_if_service_has_usage_on_destroy = true
}
Create a Service Account and assign it the composer.worker role, then grant permissions to Google’s Service Account. The PROJECT_NUMBER can be found in the dashboard.
resource "google_service_account" "composer_service_account" {
account_id = "composer-service-account"
display_name = "Service Account for Cloud Composer"
}
resource "google_project_iam_member" "composer_service_account" {
provider = google-beta
project = "sambaiztest"
member = format("serviceAccount:%s", google_service_account.composer_service_account.email)
role = "roles/composer.worker"
}
resource "google_service_account_iam_member" "custom_service_account" {
provider = google-beta
service_account_id = google_service_account.composer_service_account.name
role = "roles/composer.ServiceAgentV2Ext"
member = "serviceAccount:service-PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com"
}
Create an environment using google_composer_environment. Setting the image_version to composer-3 will create a Composer 3 environment, but it cannot be upgraded.
resource "google_composer_environment" "composer_env" {
name = "my-composer-environment"
region = "asia-northeast1"
config {
software_config {
image_version = "composer-2.9.11-airflow-2.9.3"
# image_version = "composer-3-airflow-2.9.3"
env_variables = {
ENV = "dev"
}
}
node_config {
service_account = google_service_account.composer_service_account.email
}
environment_size = "ENVIRONMENT_SIZE_SMALL"
}
}
It took about 17 minutes to complete.
DAGs are reflected when uploaded to the specified bucket.
$ COMPOSER_ENVIRONMENT_NAME=my-composer-environment
$ COMPOSER_LOCATION=asia-northeast1
$ DAG_BUCKET_PATH=$(gcloud composer environments \
describe $COMPOSER_ENVIRONMENT_NAME \
--location=$COMPOSER_LOCATION \
--format="get(config.dagGcsPrefix)")
$ echo $DAG_BUCKET_PATH
gs://*****/dags
$ gsutil -m rsync -r -d -x "(\.DS_Store|xxxx)" ./dags $DAG_BUCKET_PATH
# or
#
# resource "google_storage_bucket_object" "dag_file" {
# name = "dags/example_dag.py"
# bucket = split("/", google_composer_environment.composer_env.config[0].dag_gcs_prefix)[2]
# source = "${path.module}/dags/example_dag.py"
# }
Dependency library updates can be performed by either modifying the pypi_packages in software_config or by passing requirements.txt to –update-pypi-packages-from-file. If you want to do this with Terraform, you can use the local-exec provisioner, but in any case, if you run terraform apply again, there will be a discrepancy with pypi_packages.
$ poetry show --only main --top-level | awk '{print $1"=="$2}' > requirements.txt
$ gcloud composer environments update $COMPOSER_ENVIRONMENT_NAME \
--location=$COMPOSER_LOCATION \
--update-pypi-packages-from-file="./requirements.txt"
# or
#
# resource "null_resource" "update_composer_environment" {
# triggers = {
# requirements_hash = filemd5("${path.module}/dags/requirements.txt")
# }
#
# provisioner "local-exec" {
# command = <<EOT
# gcloud composer environments update ${google_composer_environment.composer_env.name} \
# --location ${google_composer_environment.composer_env.region} \
# --update-pypi-packages-from-file dags/requirements.txt
# EOT
# }
# }
DAG or Task failures can be detected by the composer.googleapis.com/workflow/(task/)run_count metric state=“failed”.