Running dbt as an Airflow TaskGroup with astronomer-cosmos

dbtairflowgcp

astronomer-cosmos is a library for running dbt as an Airflow TaskGroup. It enables you to build comprehensive workflows that include tasks before and after data processing, utilizing Airflow’s scheduling and retry capabilities.

Creating new tables in BigQuery by processing data with SQL using dbt - sambaiz-net

Run Apache Airflow with Docker Compose and execute a workflow - sambaiz-net

In this article, I run it on Cloud Composer.

Setting up GCP’s managed Airflow service Cloud Composer using Terraform and running workflows - sambaiz-net

Upload the dbt project directory and profile file to the bucket, and install astronomer-cosmos.

$ gsutil cp ~/.dbt/profiles.yml $DAG_BUCKET_PATH/
$ gsutil cp -r ./dags/* $DAG_BUCKET_PATH/

$ cat requirements.txt
astronomer-cosmos~=1.7.1

$ gcloud composer environments update $COMPOSER_ENVIRONMENT_NAME \
		--location=$COMPOSER_LOCATION \
    --update-pypi-packages-from-file="./requirements.txt"

DbtTaskGroup can be created from ProfileConfig and ProjectConfig. If you pass install_deps: True in operator_args, dbt deps will run before each Task, but this is wasteful and timeouts occur frequently, so I don’t recommend it.

from airflow import DAG
from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig
from datetime import datetime
from airflow.operators.bash import BashOperator

dbt_project_path = "/home/airflow/gcs/dags/testdbt"
dbt_profiles_path = "/home/airflow/gcs/dags/profiles.yml"

with DAG(
    dag_id="cosmos_test",
    schedule_interval="@daily",
    start_date=datetime(2024, 12, 3),
    catchup=False,
) as dag:   
  t1 = BashOperator(
    task_id='task1',
    bash_command='date',
  )

  dbt_task_group = DbtTaskGroup(
      group_id="dbt",
      profile_config=ProfileConfig(
      profile_name="testdbt",
      target_name="dev",
      profiles_yml_filepath=dbt_profiles_path,
    ),
    project_config=ProjectConfig(
      dbt_project_path=dbt_project_path,
    ),
  )
  
  t2 = BashOperator(
    task_id='task2',
    bash_command='date',
  )
  
  t1 >> dbt_task_group
  dbt_task_group >> t2

Add BigQuery permissions and invoke the dag.

resource "google_project_iam_member" "composer_service_account_roles" {
  for_each = toset([
    "roles/composer.worker",
    "roles/bigquery.jobUser",
    "roles/bigquery.dataEditor"
  ])
  
  project = "sambaiztest"
  role    = each.key
  member  = format("serviceAccount:%s", google_service_account.composer_service_account.email)
}

Each model’s run and test is represented as an Airflow Task, so they can be re-executed individually.

You can pass –select option in render_config to target only models with a specific tag, as follows.

DbtTaskGroup(
  ...
  render_config=RenderConfig(
    select=["tag:hourly"],
  ),
)

# {{ config(
#    materialized='table',
#    tags=['hourly']
# ) }}

The default LoadMode automatic either executes dbt ls or uses the result cached in Airflow Variables to build the dbt nodes graph if there is no pre-created manifest.json. When trying to pass variables like {{data_interval_start}} to dbt ls through project_config’s dbt_vars, the unrendered string gets passed, causing dbt to attempt to resolve it as a jinja template variable and fail. Therefore, attempts to dynamically select models per Run in the following way were unsuccessful.

config(
    enabled=(var('start') >= '2025-01-01'),
    ...
)

Tasks will still be created, but they can effectively be made not to execute by using conditional branching as follows.

{{ config(
    materialized='incremental',
) }}

{% if var('start') >= '2025-01-01' %}
SELECT 1 as id
{% else %}
SELECT * FROM UNNEST([])
{% endif %}