dbt は SQL や Python でデータを加工するツール。基本的な機能を提供する OSS の CLI dbt Core と、Browser-based IDE や CI、アラートなどを含むサービス dbt Cloud がある。今回は dbt Core を使う。
dbt Core とデータプラットフォームのアダプタをインストールする。BigQuery や Spark、Snowflake など公式によってメンテされているもののほか、コミュニティのアダプタも利用できる。
$ python -m venv dbt-env
$ source dbt-env/bin/activate
$ python -m pip install dbt-core dbt-bigquery
$ dbt --version
Core:
- installed: 1.8.8
- latest: 1.8.8 - Up to date!
Plugins:
- bigquery: 1.8.3 - Up to date!
dbt init でプロジェクトを作成する。
$ dbt init testdbt
...
[1] bigquery
Enter a number: 1
[1] oauth
[2] service_account
Desired authentication method option (enter a number): 1
project (GCP project id):
dataset (the name of your dbt dataset): dbttest
threads (1 or more):
job_execution_timeout_seconds [300]:
[1] US
[2] EU
Desired location option (enter a number):
$ tree testdbt
testdbt
├── README.md
├── analyses
├── dbt_project.yml
├── macros
├── models
│ └── example
│ ├── my_first_dbt_model.sql
│ ├── my_second_dbt_model.sql
│ └── schema.yml
├── seeds
├── snapshots
└── tests
入力した値は ~/.dbt/profiles.yml に書き込まれる。 参照先は –profiles-dir や DBT_PROFILES_DIR 環境変数で指定できる。
$ cat ~/.dbt/profiles.yml
testdbt:
outputs:
dev:
dataset: dbttest
job_execution_timeout_seconds:
job_retries:
location:
method: oauth
priority:
project:
threads:
type: bigquery
target: dev
$ cat testdbt/dbt_project.yml
...
profile: 'testdbt'
...
dbt debug で接続できることを確認する。
# gcloud auth application-default login
$ dbt debug
...
17:00:18 Configuration:
17:00:18 profiles.yml file [OK found and valid]
17:00:18 dbt_project.yml file [OK found and valid]
17:00:18 Required dependencies:
17:00:18 - git [OK found]
...
17:00:18 Registered adapter: bigquery=1.8.3
17:00:21 Connection test: [OK connection ok]
17:00:21 All checks passed!
materialized=table でテーブルを作成する SQL を書く。CREATE OR REPLACE TABLE AS SELECT が走るのでデータが上書きされることに注意。
$ cat models/example/my_first_dbt_model.sql
{{ config(materialized='table') }}
with source_data as (
select 1 as id
union all
select null as id
)
select *
from source_data
なお materialized=incremental だと追加で入る。incremental_strategy のデフォルトは merge で、unique_key に合致する場合は update し、そうでない場合は insert するため、テーブルのサイズが大きいとコストが高い。insert_overwrite では対象のパーティションのデータを一度 delete して insert する。copy_partitions: true にすると merge 句ではなく copy table API を呼ぶため挿入のコストがかからない。
{{ config(
materialized="incremental",
incremental_strategy="insert_overwrite",
partition_by={
"field": "created_date",
"data_type": "timestamp",
"granularity": "day",
"copy_partitions": true
},
) }}
SQL は Jinja テンプレートで記述できて、target などで条件分岐したり —vars や dbt_project.yml の vars で渡した変数を var() で参照できる。
/*
$ dbt run --vars '{"test": 3333}'
or
vars:
test: 1111
*/
select 3 as id
union all
{% if target.name == 'prd' %}
select {{ var('test') }} as id
{% else %}
select null as id
{% endif %}
Best practice guides では sqlfluff で lint をかけることが推奨されている。
dbt の Best practice guides におけるモデルのレイヤー分け - sambaiz-net
$ pip install sqlfluff
$ sqlfluff lint --dialect bigquery models/example
== [models/example/my_first_dbt_model.sql] FAIL
L: 10 | P: 9 | CP01 | Keywords must be consistently lower case.
| [capitalisation.keywords]
== [models/example/my_second_dbt_model.sql] FAIL
L: 1 | P: 4 | TMP | Undefined jinja template variable: 'codegen'
dbt show でデータを確認して、
$ dbt show --select "my_first_dbt_model.sql"
...
Previewing node 'my_first_dbt_model':
| id |
| -- |
| 1 |
| |
dbt run を実行すると、
$ dbt run
17:19:34 Running with dbt=1.8.8
17:19:35 Registered adapter: bigquery=1.8.3
17:19:35 Found 2 models, 4 data tests, 479 macros
17:19:35
17:19:38 Concurrency: 1 threads (target='dev')
17:19:38
17:19:38 1 of 2 START sql table model dbttest.my_first_dbt_model ........................ [RUN]
17:19:41 1 of 2 OK created sql table model dbttest.my_first_dbt_model ................... [CREATE TABLE (2.0 rows, 0 processed) in 3.47s]
17:19:41 2 of 2 START sql view model dbttest.my_second_dbt_model ........................ [RUN]
17:19:43 2 of 2 OK created sql view model dbttest.my_second_dbt_model ................... [CREATE VIEW (0 processed) in 1.24s]
17:19:43
17:19:43 Finished running 1 table model, 1 view model in 0 hours 0 minutes and 8.00 seconds (8.00s).
17:19:43
17:19:43 Completed successfully
17:19:43
17:19:43 Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2
テーブルが作られ、データが入っている。
実行されたクエリはログに出ている。
$ cat logs/dbt.log
...
19:48:05.057309 [debug] [Thread-1 (]: On model.testdbt.my_first_dbt_model: /* {"app": "dbt", "dbt_version": "1.8.8", "profile_name": "testdbt", "target_name": "dev", "node_id": "model.testdbt.my_first_dbt_model"} */
-- back compat for old kwarg name
merge into `sambaiztest`.`dbttest`.`my_first_dbt_model` as DBT_INTERNAL_DEST
using (
with source_data as (
select 1 as id
union all
select null as id
)
select *
from source_data
) as DBT_INTERNAL_SOURCE
on (FALSE)
when not matched then insert
(`id`)
values
(`id`)
...
bq query –format=json は INTEGER のカラムを STRING で出力してしまうため、既存のデータと dbt show の diff を取るのが難しくなっている。dbt compile でクエリのみを生成することもでき target/manifest.json に sql ファイルのパスがあるので、これを bq query で実行し jd などで差分を取ると見やすい。
MODEL_NAME=my_first_dbt_model
dbt compile
SQL_FILE=$(jq -r ".nodes | to_entries[] | select(.value.name==\"$MODEL_NAME\") | .value.compiled_path" target/manifest.json)
cat $SQL_FILE | bq query --use_legacy_sql=false --format=json | jq > dbt_materialized.json
TABLE_NAME=$(jq -r ".nodes | to_entries[] | select(.value.name==\"$MODEL_NAME\") | .value.relation_name" target/manifest.json)
echo "SELECT * FROM $TABLE_NAME" | bq query --use_legacy_sql=false --format=json | jq > bq_original.json
jd dbt_materialized.json bq_original.json
dbt test で schema の data_tests のテストが走る。my_first_dbt_model には null の id が含まれているので FAIL になっている。
$ dbt test
17:23:54 Running with dbt=1.8.8
17:23:55 Registered adapter: bigquery=1.8.3
17:23:55 Found 2 models, 4 data tests, 479 macros
17:23:55
17:23:56 Concurrency: 1 threads (target='dev')
17:23:56
17:23:56 1 of 4 START test not_null_my_first_dbt_model_id ............................... [RUN]
17:23:58 1 of 4 FAIL 1 not_null_my_first_dbt_model_id ................................... [FAIL 1 in 1.61s]
17:23:58 2 of 4 START test not_null_my_second_dbt_model_id .............................. [RUN]
17:24:00 2 of 4 PASS not_null_my_second_dbt_model_id .................................... [PASS in 1.91s]
17:24:00 3 of 4 START test unique_my_first_dbt_model_id ................................. [RUN]
17:24:01 3 of 4 PASS unique_my_first_dbt_model_id ....................................... [PASS in 1.67s]
17:24:01 4 of 4 START test unique_my_second_dbt_model_id ................................ [RUN]
17:24:03 4 of 4 PASS unique_my_second_dbt_model_id ...................................... [PASS in 1.81s]
17:24:03
17:24:03 Finished running 4 data tests in 0 hours 0 minutes and 7.95 seconds (7.95s).
17:24:03
17:24:03 Completed with 1 error and 0 warnings:
17:24:03
17:24:03 Failure in test not_null_my_first_dbt_model_id (models/example/schema.yml)
17:24:03 Got 1 result, configured to fail if != 0
17:24:03
17:24:03 compiled code at target/compiled/testdbt/models/example/schema.yml/not_null_my_first_dbt_model_id.sql
17:24:03
17:24:03 Done. PASS=3 WARN=0 ERROR=1 SKIP=0 TOTAL=4
dbt build は run と test が走るため、依存している my_first_dbt_model のテストが通らない my_second_dbt_model は作られない。なお依存を持つ model を dbt show すると本物のテーブルを参照する。
$ cat models/example/my_second_dbt_model.sql
select *
from {{ ref('my_first_dbt_model') }}
where id = 1
dbt docs で schema.yml をもとにしたドキュメントが生成される。 静的サイトなので S3 などでホスティングできる。
$ cat models/example/schema.yml
version: 2
models:
- name: my_first_dbt_model
description: "A starter dbt model"
columns:
- name: id
description: "The primary key for this table"
data_tests:
- unique
- not_null
- name: my_second_dbt_model
description: "A starter dbt model"
columns:
- name: id
description: "The primary key for this table"
data_tests:
- unique
- not_null
$ dbt docs generate
$ dbt docs serve
schema.yml には data_type を記述することもできるが、実際に作られるテーブルのスキーマは model のクエリベースとなり乖離していたとしてもデフォルトでエラーにならない。contractを有効にすると materialized される前にエラーになる。
そうする場合 materialized=incremental では on_schema_change をデフォルトの ignore から append_new_columns または fail に変える必要がある。これはクエリの変更によってカラムが変更された際の挙動を表す。ignore だと新たなカラムを追加してもテーブルのスキーマは更新されないが、append_new_columns にすると追加されるようになる。ただし STRUCT のフィールドのようなネストされたカラムは対象外。いずれも削除や型の変更は反映されない。
また、description についてもデフォルトでは BigQuery などには反映されず、persist_docs を有効にする必要がある。
models:
- name: my_first_dbt_model
description: "A starter dbt model"
config:
contract:
enforced: true
on_schema_change: "append_new_columns"
columns:
- name: id
data_type: STRING
description: "The primary key for this table"
# Compilation Error in model my_first_dbt_model (models/example/my_first_dbt_model.sql)
# This model has an enforced contract that failed.
# Please ensure the name, data_type, and number of columns in your contract match the columns in your model's definition.
# | column_name | definition_type | contract_type | mismatch_reason |
# | ----------- | --------------- | ------------- | ------------------ |
# | id | INT64 | STRING | data type mismatch |
seeds ディレクトリに csv をおいて dbt seed するとその静的データを持つテーブルが作られる。
$ cat seeds/test_data.csv
id
100
200
300
$ dbt seed
/*
select *
from {{ ref('test_data') }}
*/
macros ディレクトリに 独自の macro を 定義すると model から参照したり、data_tests を実行することができる。
$ cat macros/test.sql
{% macro offset_id(id) %}
{% if target.name == 'prd' %}
{{ id }} + 1000
{% else %}
{{ id }}
{% endif %}
{% endmacro %}
/*
select {{ offset_id(1) }} as id
*/
{% test positive_value(model, column_name) %}
select *
from {{ model }}
where {{ column_name }} < 0
{% endtest %}
/*
models:
- name: my_first_dbt_model
columns:
- name: id
data_tests:
- positive_value
*/
macro は副作用を起こすこともでき、pre/post_hook やテーブルを作らない materialized=ephemeral なモデルから呼び出すことで dbt のワークフロー内で任意の処理を行うことができる。execute はクエリが走らない DAG の生成時には false になる変数で、run_query はクエリを実行する macro。これを jinja の do で出力せずに呼び出している。また、dbt run-operation で実行することもでき、codegen も macro として実装されている。
dbt の codegen で sources.yml や staging の model、schema.yml を自動生成する - sambaiz-net
{% macro export_to_gcs(
source_model,
gcs_bucket,
export_format='CSV'
) %}
{% if execute %}
{% set gcs_path = 'exports/' ~ model.name ~ '.csv' %}
{% set gcs_uri = 'gs://' ~ gcs_bucket ~ '/' ~ gcs_path %}
{% set export_query %}
EXPORT DATA
OPTIONS(
uri='{{ gcs_uri }}',
format='{{ export_format }}'
{% if export_format == 'CSV' %}
, header=true,
field_delimiter=','
{% endif %}
)
AS (
{{ source_model }}
)
{% endset %}
{% do run_query(export_query) %}
{% endif %}
{% endmacro %}
また、schema名を生成する際に用いられている generate_schema_name を定義することで config の schema を指定したとき default_schema の prefix を付ける挙動を上書きしたりすることもできる。
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
同様に独自の materialization も定義することもできる。
{%- materialization table_function, adapter='bigquery' -%}
{%- set params = config.get('params', '') -%}
{%- set relation = api.Relation.create(identifier=this.identifier, schema=this.schema, database=this.database, type="view") -%}
{%- set funcname = "`" ~ ([relation.database, relation.schema, relation.identifier] | join(".")) ~ "`" -%}
{% call statement('main') -%}
CREATE OR REPLACE TABLE FUNCTION {{ funcname }}({{ params }})
AS (
{{ sql }}
)
{%- endcall %}
{{ return({'relations': [relation]}) }}
{%- endmaterialization -%}
/*
{{
config(
materialized='table_function',
params='aaa INT, bbb INT'
)
}}
SELECT aaa as a, bbb as b
*/
参考
dbtのincremental modelを用いたBigQueryの差分更新 〜 copy_partitionsも試してみた 〜