dbt で BigQuery のデータを SQL で加工して新しいテーブルを作る

dbtgcp

dbt は SQL や Python でデータを加工するツール。基本的な機能を提供する OSS の CLI dbt Core と、Browser-based IDE や CI、アラートなどを含むサービス dbt Cloud がある。今回は dbt Core を使う。

dbt Core とデータプラットフォームのアダプタインストールする。BigQuery や Spark、Snowflake など公式によってメンテされているもののほか、コミュニティのアダプタも利用できる。

$ python -m venv dbt-env
$ source dbt-env/bin/activate
$ python -m pip install dbt-core dbt-bigquery
$ dbt --version
Core:
  - installed: 1.8.8
  - latest:    1.8.8 - Up to date!

Plugins:
  - bigquery: 1.8.3 - Up to date!

dbt init でプロジェクトを作成する。

$ dbt init testdbt
...
[1] bigquery
Enter a number: 1
[1] oauth
[2] service_account
Desired authentication method option (enter a number): 1
project (GCP project id):
dataset (the name of your dbt dataset): dbttest
threads (1 or more):
job_execution_timeout_seconds [300]:
[1] US
[2] EU
Desired location option (enter a number):

$ tree testdbt
testdbt
├── README.md
├── analyses
├── dbt_project.yml
├── macros
├── models
│   └── example
│       ├── my_first_dbt_model.sql
│       ├── my_second_dbt_model.sql
│       └── schema.yml
├── seeds
├── snapshots
└── tests

入力した値は ~/.dbt/profiles.yml に書き込まれる。 参照先は –profiles-dir や DBT_PROFILES_DIR 環境変数で指定できる。

$ cat ~/.dbt/profiles.yml
testdbt:
  outputs:
    dev:
      dataset: dbttest
      job_execution_timeout_seconds:
      job_retries:
      location:
      method: oauth
      priority:
      project:
      threads:
      type: bigquery
  target: dev

$ cat testdbt/dbt_project.yml
...
profile: 'testdbt'
...

dbt debug で接続できることを確認する。

# gcloud auth application-default login
$ dbt debug
...
17:00:18  Configuration:
17:00:18    profiles.yml file [OK found and valid]
17:00:18    dbt_project.yml file [OK found and valid]
17:00:18  Required dependencies:
17:00:18   - git [OK found]
...
17:00:18  Registered adapter: bigquery=1.8.3
17:00:21    Connection test: [OK connection ok]

17:00:21  All checks passed!

materialized=table でテーブルを作成する SQL を書く。CREATE OR REPLACE TABLE AS SELECT が走るのでデータが上書きされることに注意。

$ cat models/example/my_first_dbt_model.sql

{{ config(materialized='table') }}

with source_data as (

    select 1 as id
    union all
    select null as id

)

select *
from source_data

なお materialized=incremental だと追加で入る。incremental_strategy のデフォルトは merge で、unique_key に合致する場合は update し、そうでない場合は insert するため、テーブルのサイズが大きいとコストが高い。insert_overwrite では対象のパーティションのデータを一度 delete して insert する。copy_partitions: true にすると merge 句ではなく copy table API を呼ぶため挿入のコストがかからない。

{{ config(
    materialized="incremental",
    incremental_strategy="insert_overwrite",
    partition_by={
      "field": "created_date",
      "data_type": "timestamp",
      "granularity": "day",
      "copy_partitions": true
    },
    
) }}

SQL は Jinja テンプレートで記述できて、target などで条件分岐したり —vars や dbt_project.yml の vars で渡した変数var() で参照できる。

/*
$ dbt run --vars '{"test": 3333}'

or

vars:
  test: 1111
*/

select 3 as id
union all
{% if target.name == 'prd' %}
    select {{ var('test') }} as id
{% else %}
    select null as id
{% endif %}

Best practice guides では sqlfluff で lint をかけることが推奨されている

dbt の Best practice guides におけるモデルのレイヤー分け - sambaiz-net

$ pip install sqlfluff
$ sqlfluff lint --dialect bigquery models/example
== [models/example/my_first_dbt_model.sql] FAIL          
L:  10 | P:   9 | CP01 | Keywords must be consistently lower case.
                       | [capitalisation.keywords]
== [models/example/my_second_dbt_model.sql] FAIL         
L:   1 | P:   4 |  TMP | Undefined jinja template variable: 'codegen'

dbt show でデータを確認して、

$ dbt show --select "my_first_dbt_model.sql"
...
Previewing node 'my_first_dbt_model':
| id |
| -- |
|  1 |
|    |

dbt run を実行すると、

$ dbt run
17:19:34  Running with dbt=1.8.8
17:19:35  Registered adapter: bigquery=1.8.3
17:19:35  Found 2 models, 4 data tests, 479 macros
17:19:35
17:19:38  Concurrency: 1 threads (target='dev')
17:19:38
17:19:38  1 of 2 START sql table model dbttest.my_first_dbt_model ........................ [RUN]
17:19:41  1 of 2 OK created sql table model dbttest.my_first_dbt_model ................... [CREATE TABLE (2.0 rows, 0 processed) in 3.47s]
17:19:41  2 of 2 START sql view model dbttest.my_second_dbt_model ........................ [RUN]
17:19:43  2 of 2 OK created sql view model dbttest.my_second_dbt_model ................... [CREATE VIEW (0 processed) in 1.24s]
17:19:43
17:19:43  Finished running 1 table model, 1 view model in 0 hours 0 minutes and 8.00 seconds (8.00s).
17:19:43
17:19:43  Completed successfully
17:19:43
17:19:43  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2

テーブルが作られ、データが入っている。

実行されたクエリはログに出ている。

$ cat logs/dbt.log
...
19:48:05.057309 [debug] [Thread-1 (]: On model.testdbt.my_first_dbt_model: /* {"app": "dbt", "dbt_version": "1.8.8", "profile_name": "testdbt", "target_name": "dev", "node_id": "model.testdbt.my_first_dbt_model"} */
-- back compat for old kwarg name

    merge into `sambaiztest`.`dbttest`.`my_first_dbt_model` as DBT_INTERNAL_DEST
        using (

with source_data as (

    select 1 as id
    union all
    select null as id

)

select *
from source_data
        ) as DBT_INTERNAL_SOURCE
        on (FALSE)

    when not matched then insert
        (`id`)
    values
        (`id`)
...

bq query –format=json は INTEGER のカラムを STRING で出力してしまうため、既存のデータと dbt show の diff を取るのが難しくなっている。dbt compile でクエリのみを生成することもでき target/manifest.json に sql ファイルのパスがあるので、これを bq query で実行し jd などで差分を取ると見やすい。

MODEL_NAME=my_first_dbt_model

dbt compile

SQL_FILE=$(jq -r ".nodes | to_entries[] | select(.value.name==\"$MODEL_NAME\") | .value.compiled_path" target/manifest.json)
cat $SQL_FILE | bq query --use_legacy_sql=false --format=json | jq > dbt_materialized.json

TABLE_NAME=$(jq -r ".nodes | to_entries[] | select(.value.name==\"$MODEL_NAME\") | .value.relation_name" target/manifest.json)
echo "SELECT * FROM $TABLE_NAME" | bq query --use_legacy_sql=false --format=json | jq > bq_original.json

jd dbt_materialized.json bq_original.json

dbt test で schema の data_tests のテストが走る。my_first_dbt_model には null の id が含まれているので FAIL になっている。

$ dbt test
17:23:54  Running with dbt=1.8.8
17:23:55  Registered adapter: bigquery=1.8.3
17:23:55  Found 2 models, 4 data tests, 479 macros
17:23:55
17:23:56  Concurrency: 1 threads (target='dev')
17:23:56
17:23:56  1 of 4 START test not_null_my_first_dbt_model_id ............................... [RUN]
17:23:58  1 of 4 FAIL 1 not_null_my_first_dbt_model_id ................................... [FAIL 1 in 1.61s]
17:23:58  2 of 4 START test not_null_my_second_dbt_model_id .............................. [RUN]
17:24:00  2 of 4 PASS not_null_my_second_dbt_model_id .................................... [PASS in 1.91s]
17:24:00  3 of 4 START test unique_my_first_dbt_model_id ................................. [RUN]
17:24:01  3 of 4 PASS unique_my_first_dbt_model_id ....................................... [PASS in 1.67s]
17:24:01  4 of 4 START test unique_my_second_dbt_model_id ................................ [RUN]
17:24:03  4 of 4 PASS unique_my_second_dbt_model_id ...................................... [PASS in 1.81s]
17:24:03
17:24:03  Finished running 4 data tests in 0 hours 0 minutes and 7.95 seconds (7.95s).
17:24:03
17:24:03  Completed with 1 error and 0 warnings:
17:24:03
17:24:03  Failure in test not_null_my_first_dbt_model_id (models/example/schema.yml)
17:24:03    Got 1 result, configured to fail if != 0
17:24:03
17:24:03    compiled code at target/compiled/testdbt/models/example/schema.yml/not_null_my_first_dbt_model_id.sql
17:24:03
17:24:03  Done. PASS=3 WARN=0 ERROR=1 SKIP=0 TOTAL=4

dbt build は run と test が走るため、依存している my_first_dbt_model のテストが通らない my_second_dbt_model は作られない。なお依存を持つ model を dbt show すると本物のテーブルを参照する。

$ cat models/example/my_second_dbt_model.sql

select *
from {{ ref('my_first_dbt_model') }}
where id = 1

dbt docs で schema.yml をもとにしたドキュメントが生成される。 静的サイトなので S3 などでホスティングできる

$ cat models/example/schema.yml
version: 2

models:
  - name: my_first_dbt_model
    description: "A starter dbt model"
    columns:
      - name: id
        description: "The primary key for this table"
        data_tests:
          - unique
          - not_null

  - name: my_second_dbt_model
    description: "A starter dbt model"
    columns:
      - name: id
        description: "The primary key for this table"
        data_tests:
          - unique
          - not_null

$ dbt docs generate
$ dbt docs serve

schema.yml には data_type を記述することもできるが、実際に作られるテーブルのスキーマは model のクエリベースとなり乖離していたとしてもデフォルトでエラーにならない。contractを有効にすると materialized される前にエラーになる。

そうする場合 materialized=incremental では on_schema_change をデフォルトの ignore から append_new_columns または fail に変える必要がある。これはクエリの変更によってカラムが変更された際の挙動を表す。ignore だと新たなカラムを追加してもテーブルのスキーマは更新されないが、append_new_columns にすると追加されるようになる。ただし STRUCT のフィールドのようなネストされたカラムは対象外。いずれも削除や型の変更は反映されない。

また、description についてもデフォルトでは BigQuery などには反映されず、persist_docs を有効にする必要がある。

models:
  - name: my_first_dbt_model
    description: "A starter dbt model"

    config:
      contract:
        enforced: true
      on_schema_change: "append_new_columns"

    columns:
      - name: id
        data_type: STRING
        description: "The primary key for this table"

# Compilation Error in model my_first_dbt_model (models/example/my_first_dbt_model.sql)
# This model has an enforced contract that failed.
# Please ensure the name, data_type, and number of columns in your contract match the columns in your model's definition.
# | column_name | definition_type | contract_type | mismatch_reason    |
# | ----------- | --------------- | ------------- | ------------------ |
# | id          | INT64           | STRING        | data type mismatch |

seeds ディレクトリに csv をおいて dbt seed するとその静的データを持つテーブルが作られる。

$ cat seeds/test_data.csv 
id
100
200
300

$ dbt seed

/*
select *
from {{ ref('test_data') }}
*/

macros ディレクトリに 独自の macro を 定義すると model から参照したり、data_tests を実行することができる。

$ cat macros/test.sql 
{% macro offset_id(id) %}
    {% if target.name == 'prd' %}
        {{ id }} + 1000
    {% else %}
        {{ id }}
    {% endif %}
{% endmacro %}

/*
select {{ offset_id(1) }} as id
*/

{% test positive_value(model, column_name) %}
    select *
    from {{ model }}
    where {{ column_name }} < 0
{% endtest %}

/*
models:
  - name: my_first_dbt_model
    columns:
      - name: id
        data_tests:
          - positive_value
*/

macro は副作用を起こすこともでき、pre/post_hook やテーブルを作らない materialized=ephemeral なモデルから呼び出すことで dbt のワークフロー内で任意の処理を行うことができる。execute はクエリが走らない DAG の生成時には false になる変数で、run_query はクエリを実行する macro。これを jinja の do で出力せずに呼び出している。また、dbt run-operation で実行することもでき、codegen も macro として実装されている。

dbt の codegen で sources.yml や staging の model、schema.yml を自動生成する - sambaiz-net

{% macro export_to_gcs(
    source_model,
    gcs_bucket,
    export_format='CSV'
) %}

{% if execute %}
  {% set gcs_path = 'exports/' ~ model.name ~ '.csv' %}
  {% set gcs_uri = 'gs://' ~ gcs_bucket ~ '/' ~ gcs_path %}
  
  {% set export_query %}
  EXPORT DATA
  OPTIONS(
    uri='{{ gcs_uri }}',
    format='{{ export_format }}'
    {% if export_format == 'CSV' %}
    , header=true,
    field_delimiter=','
    {% endif %}
  )
  AS (
    {{ source_model }}
  )
  {% endset %}
  
  {% do run_query(export_query) %}
{% endif %}

{% endmacro %}

また、schema名を生成する際に用いられている generate_schema_name を定義することで config の schema を指定したとき default_schema の prefix を付ける挙動を上書きしたりすることもできる。

{% macro generate_schema_name(custom_schema_name, node) -%}

    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}

        {{ default_schema }}

    {%- else -%}

        {{ default_schema }}_{{ custom_schema_name | trim }}

    {%- endif -%}

{%- endmacro %}

同様に独自の materialization も定義することもできる。

{%- materialization table_function, adapter='bigquery' -%}
  {%- set params = config.get('params', '') -%}
  {%- set relation = api.Relation.create(identifier=this.identifier, schema=this.schema, database=this.database, type="view") -%}
  {%- set funcname = "`" ~ ([relation.database, relation.schema, relation.identifier] | join(".")) ~ "`" -%}

  {% call statement('main') -%}
  CREATE OR REPLACE TABLE FUNCTION {{ funcname }}({{ params }})
  AS (
    {{ sql }}
  )
  {%- endcall %}

  {{ return({'relations': [relation]}) }}
{%- endmaterialization -%}

/*
{{
  config(
    materialized='table_function',
    params='aaa INT, bbb INT'
  )
}}

SELECT aaa as a, bbb as b
*/

参考

dbtのincremental modelを用いたBigQueryの差分更新 〜 copy_partitionsも試してみた 〜

[BigQuery] dbtでtable functionを管理する #SQL - Qiita