astronomer-cosmos で dbt を Airflow の TaskGroup として実行する

dbtairflowgcp

astronomer-cosmos は dbt を Airflow の TaskGroup として動すためのライブラリ。Airflow のスケジューリングや再実行機能を用いた、データ加工の前後で行うタスクを含む包括的なワークフローを構築することができる。

dbt で BigQuery のデータを SQL で加工して新しいテーブルを作る - sambaiz-net

Docker Compose で Apache Airflow を起動しワークフローを実行する - sambaiz-net

今回は Cloud Composer で動かす

GCP の マネージド Airflow サービスCloud Composer を Terraform で立ち上げてワークフローを動かす - sambaiz-net

dbt の project ディレクトリと profile ファイルを bucket に上げ、astronomer-cosmos をインストールする。

$ gsutil cp ~/.dbt/profiles.yml $DAG_BUCKET_PATH/
$ gsutil cp -r ./dags/* $DAG_BUCKET_PATH/

$ cat requirements.txt
astronomer-cosmos~=1.7.1

$ gcloud composer environments update $COMPOSER_ENVIRONMENT_NAME \
		--location=$COMPOSER_LOCATION \
    --update-pypi-packages-from-file="./requirements.txt"

ProfileConfig と ProjectConfig から DbtTaskGroup ができる。 operator_args の install_deps: True を渡すと各 Task の実行前に dbt deps が走るが、無駄が多いしタイムアウトも頻発したのでおすすめしない。

from airflow import DAG
from cosmos import DbtTaskGroup, ProfileConfig, ProjectConfig
from datetime import datetime
from airflow.operators.bash import BashOperator

dbt_project_path = "/home/airflow/gcs/dags/testdbt"
dbt_profiles_path = "/home/airflow/gcs/dags/profiles.yml"

with DAG(
    dag_id="cosmos_test",
    schedule_interval="@daily",
    start_date=datetime(2024, 12, 3),
    catchup=False,
) as dag:   
  t1 = BashOperator(
    task_id='task1',
    bash_command='date',
  )

  dbt_task_group = DbtTaskGroup(
      group_id="dbt",
      profile_config=ProfileConfig(
      profile_name="testdbt",
      target_name="dev",
      profiles_yml_filepath=dbt_profiles_path,
    ),
    project_config=ProjectConfig(
      dbt_project_path=dbt_project_path,
    ),
  )
  
  t2 = BashOperator(
    task_id='task2',
    bash_command='date',
  )
  
  t1 >> dbt_task_group
  dbt_task_group >> t2

BigQuery の権限を追加して DAG を実行する。

resource "google_project_iam_member" "composer_service_account_roles" {
  for_each = toset([
    "roles/composer.worker",
    "roles/bigquery.jobUser",
    "roles/bigquery.dataEditor"
  ])
  
  project = "sambaiztest"
  role    = each.key
  member  = format("serviceAccount:%s", google_service_account.composer_service_account.email)
}

各 model の run と test が Airflow の Task として表現されるので個別に再実行することもできる。

render_config で –select することができるので、次のようにして特定のタグが付いている model のみを対象にできる。

DbtTaskGroup(
  ...
  render_config=RenderConfig(
      select=["tag:hourly"],
  ),
)

# {{ config(
#    materialized='table',
#    tags=['hourly']
# ) }}

デフォルトの LoadMode である automatic は、事前に作っておいた manifest.json がない場合 dbt ls を実行するか Airflow の Variables にキャッシュされた結果を用いて dbt nodes のグラフを構築する。このとき project_config の dbt_vars で dbt ls に {{data_interval_start}} などを渡そうとしてもまだレンダリングされていない文字列が渡ってしまい dbt の方で jinja テンプレートの変数として解決を試みて失敗してしまう。そのため次のようにして Run ごとに動的に model を選択する試みはうまくいかなかった。

{{ config(
    enabled=(var('start') >= '2025-01-01'),
    ...
) }}

Task は作られてしまうが次のように条件分岐することで実質実行されない状態にすることはできる。

{{ config(
    materialized='incremental',
) }}

{% if var('start') >= '2025-01-01' %}
SELECT 1 as id
{% else %}
SELECT * FROM UNNEST([])
{% endif %}