Creating new tables in BigQuery by processing data with SQL using dbt

dbtgcp

dbt is a tool for transforming data using SQL or Python. There is dbt Core, an OSS CLI that provides basic functionality, and dbt Cloud, a service that includes a browser-based IDE, CI, alerts, and more. This article is for dbt Core.

Install dbt Core and the adapters for data platforms. In addition to officially maintained ones like BigQuery, Spark, and Snowflake, community adapters are also available.

$ python -m venv dbt-env
$ source dbt-env/bin/activate
$ python -m pip install dbt-core dbt-bigquery
$ dbt --version
Core:
  - installed: 1.8.8
  - latest:    1.8.8 - Up to date!

Plugins:
  - bigquery: 1.8.3 - Up to date!

dbt init is used to create a project.

$ dbt init testdbt
...
[1] bigquery
Enter a number: 1
[1] oauth
[2] service_account
Desired authentication method option (enter a number): 1
project (GCP project id):
dataset (the name of your dbt dataset): dbttest
threads (1 or more):
job_execution_timeout_seconds [300]:
[1] US
[2] EU
Desired location option (enter a number):

$ tree testdbt
testdbt
├── README.md
├── analyses
├── dbt_project.yml
├── macros
├── models
│   └── example
│       ├── my_first_dbt_model.sql
│       ├── my_second_dbt_model.sql
│       └── schema.yml
├── seeds
├── snapshots
└── tests

The entered values are written to ~/.dbt/profiles.yml. The reference destination can be specified using the –profiles-dir or DBT_PROFILES_DIR environment variable.

$ cat ~/.dbt/profiles.yml
testdbt:
  outputs:
    dev:
      dataset: dbttest
      job_execution_timeout_seconds:
      job_retries:
      location:
      method: oauth
      priority:
      project:
      threads:
      type: bigquery
  target: dev

$ cat testdbt/dbt_project.yml
...
profile: 'testdbt'
...

Use dbt debug to verify the connection.

# gcloud auth application-default login
$ dbt debug
...
17:00:18  Configuration:
17:00:18    profiles.yml file [OK found and valid]
17:00:18    dbt_project.yml file [OK found and valid]
17:00:18  Required dependencies:
17:00:18   - git [OK found]
...
17:00:18  Registered adapter: bigquery=1.8.3
17:00:21    Connection test: [OK connection ok]

17:00:21  All checks passed!

Write SQL to create tables with materialized=table. Note that CREATE OR REPLACE TABLE AS SELECT will be executed and data will be overwritten.

$ cat models/example/my_first_dbt_model.sql

{{ config(materialized='table') }}

with source_data as (

    select 1 as id
    union all
    select null as id

)

select *
from source_data

In case materialized=incremental, data is added incrementally. The default incremental_strategy is merge, which updates if there’s a match on the unique_key and inserts if not, making it costly for large tables. insert_overwrite deletes the data in the target partition before inserting. Setting copy_partitions: true calls the copy table API instead of using a merge statement, eliminating the cost of insertion.

{{ config(
    materialized="incremental",
    incremental_strategy="insert_overwrite",
    partition_by={
      "field": "created_date",
      "data_type": "timestamp",
      "granularity": "day",
      "copy_partitions": true
    },
    
) }}

SQL can be written in Jinja templates, expressing branches by target etc. and referencing variables passed through –vars or vars in dbt_project.yml using var().

/*
$ dbt run --vars '{"test": 3333}'

or

vars:
  test: 1111
*/

select 3 as id
union all
{% if target.name == 'prd' %}
    select {{ var('test') }} as id
{% else %}
    select null as id
{% endif %}

To maintain style, lint by sqlfluff is recommended in Best practice guides.

Model Layering in dbt’s Best Practice Guides - sambaiz-net

$ pip install sqlfluff
$ sqlfluff lint --dialect bigquery models/example
== [models/example/my_first_dbt_model.sql] FAIL          
L:  10 | P:   9 | CP01 | Keywords must be consistently lower case.
                       | [capitalisation.keywords]
== [models/example/my_second_dbt_model.sql] FAIL         
L:   1 | P:   4 |  TMP | Undefined jinja template variable: 'codegen'

You can check data with dbt show.

$ dbt show --select "my_first_dbt_model.sql"
...
Previewing node 'my_first_dbt_model':
| id |
| -- |
|  1 |
|    |

Once you execute dbt run,

$ dbt run
17:19:34  Running with dbt=1.8.8
17:19:35  Registered adapter: bigquery=1.8.3
17:19:35  Found 2 models, 4 data tests, 479 macros
17:19:35
17:19:38  Concurrency: 1 threads (target='dev')
17:19:38
17:19:38  1 of 2 START sql table model dbttest.my_first_dbt_model ........................ [RUN]
17:19:41  1 of 2 OK created sql table model dbttest.my_first_dbt_model ................... [CREATE TABLE (2.0 rows, 0 processed) in 3.47s]
17:19:41  2 of 2 START sql view model dbttest.my_second_dbt_model ........................ [RUN]
17:19:43  2 of 2 OK created sql view model dbttest.my_second_dbt_model ................... [CREATE VIEW (0 processed) in 1.24s]
17:19:43
17:19:43  Finished running 1 table model, 1 view model in 0 hours 0 minutes and 8.00 seconds (8.00s).
17:19:43
17:19:43  Completed successfully
17:19:43
17:19:43  Done. PASS=2 WARN=0 ERROR=0 SKIP=0 TOTAL=2

tables are created and the data are inserted.

The executed queries are shown in the logs.

$ cat logs/dbt.log
...
19:48:05.057309 [debug] [Thread-1 (]: On model.testdbt.my_first_dbt_model: /* {"app": "dbt", "dbt_version": "1.8.8", "profile_name": "testdbt", "target_name": "dev", "node_id": "model.testdbt.my_first_dbt_model"} */
-- back compat for old kwarg name

    merge into `sambaiztest`.`dbttest`.`my_first_dbt_model` as DBT_INTERNAL_DEST
        using (

with source_data as (

    select 1 as id
    union all
    select null as id

)

select *
from source_data
        ) as DBT_INTERNAL_SOURCE
        on (FALSE)

    when not matched then insert
        (`id`)
    values
        (`id`)
...

bq query –format=json outputs INTEGER columns as STRING, making it difficult to get the diff between existing data and dbt show. You can generate just the query with dbt compile, and the path to the sql file is in target/manifest.json, so you can run this with bq query and get the diff with jd etc. to make it easier to see.

MODEL_NAME=my_first_dbt_model

dbt compile

SQL_FILE=$(jq -r ".nodes | to_entries[] | select(.value.name==\"$MODEL_NAME\") | .value.compiled_path" target/manifest.json)
cat $SQL_FILE | bq query --use_legacy_sql=false --format=json | jq > dbt_materialized.json

TABLE_NAME=$(jq -r ".nodes | to_entries[] | select(.value.name==\"$MODEL_NAME\") | .value.relation_name" target/manifest.json)
echo "SELECT * FROM $TABLE_NAME" | bq query --use_legacy_sql=false --format=json | jq > bq_original.json

jd dbt_materialized.json bq_original.json

dbt test runs the data_tests defined in the schema. my_first_dbt_model FAILS because it has a null id.

$ dbt test
17:23:54  Running with dbt=1.8.8
17:23:55  Registered adapter: bigquery=1.8.3
17:23:55  Found 2 models, 4 data tests, 479 macros
17:23:55
17:23:56  Concurrency: 1 threads (target='dev')
17:23:56
17:23:56  1 of 4 START test not_null_my_first_dbt_model_id ............................... [RUN]
17:23:58  1 of 4 FAIL 1 not_null_my_first_dbt_model_id ................................... [FAIL 1 in 1.61s]
17:23:58  2 of 4 START test not_null_my_second_dbt_model_id .............................. [RUN]
17:24:00  2 of 4 PASS not_null_my_second_dbt_model_id .................................... [PASS in 1.91s]
17:24:00  3 of 4 START test unique_my_first_dbt_model_id ................................. [RUN]
17:24:01  3 of 4 PASS unique_my_first_dbt_model_id ....................................... [PASS in 1.67s]
17:24:01  4 of 4 START test unique_my_second_dbt_model_id ................................ [RUN]
17:24:03  4 of 4 PASS unique_my_second_dbt_model_id ...................................... [PASS in 1.81s]
17:24:03
17:24:03  Finished running 4 data tests in 0 hours 0 minutes and 7.95 seconds (7.95s).
17:24:03
17:24:03  Completed with 1 error and 0 warnings:
17:24:03
17:24:03  Failure in test not_null_my_first_dbt_model_id (models/example/schema.yml)
17:24:03    Got 1 result, configured to fail if != 0
17:24:03
17:24:03    compiled code at target/compiled/testdbt/models/example/schema.yml/not_null_my_first_dbt_model_id.sql
17:24:03
17:24:03  Done. PASS=3 WARN=0 ERROR=1 SKIP=0 TOTAL=4

dbt build runs both run and test, so my_second_dbt_model is not created due to dependent my_first_dbt_model tests failure. Besides, if you run dbt show on a model that has dependencies, it will refer to the actual table.

$ cat models/example/my_second_dbt_model.sql

select *
from {{ ref('my_first_dbt_model') }}
where id = 1

dbt docs generates documentation based on schema.yml. Since it is a static site, so you can host it on S3 etc.

$ cat models/example/schema.yml
version: 2

models:
  - name: my_first_dbt_model
    description: "A starter dbt model"
    columns:
      - name: id
        description: "The primary key for this table"
        data_tests:
          - unique
          - not_null

  - name: my_second_dbt_model
    description: "A starter dbt model"
    columns:
      - name: id
        description: "The primary key for this table"
        data_tests:
          - unique
          - not_null

$ dbt docs generate
$ dbt docs serve

You can also specify data_type in schema.yml, but the actual created table schema is based on the model query, and by default no error will occur even if there is a discrepancy. If you enable contract, an error will occur before materializing.

If you do this, when using materialized=incremental, you need to change on_schema_change from the default ignore to append_new_columns or fail. This indicates the behavior when columns are changed due to changes in the query. If you set it to ignore, the table schema will not be updated even if a new column is added, but if you set it to append_new_columns, it will be added. However, nested columns such as STRUCT fields are not supported. In either case, deletions and type changes will not be reflected.

Also, the description is not reflected in BigQuery by default, so you need to enable persist_docs.

models:
  - name: my_first_dbt_model
    description: "A starter dbt model"

    config:
      contract:
        enforced: true
      on_schema_change: "append_new_columns"

    columns:
      - name: id
        data_type: STRING
        description: "The primary key for this table"

# Compilation Error in model my_first_dbt_model (models/example/my_first_dbt_model.sql)
# This model has an enforced contract that failed.
# Please ensure the name, data_type, and number of columns in your contract match the columns in your model's definition.
# | column_name | definition_type | contract_type | mismatch_reason    |
# | ----------- | --------------- | ------------- | ------------------ |
# | id          | INT64           | STRING        | data type mismatch |

If you put csv files in the seeds directory and run dbt seed, tables with the static data will be created.

$ cat seeds/test_data.csv 
id
100
200
300

$ dbt seed

/*
select *
from {{ ref('test_data') }}
*/

if you define your own macro in the macros directory, you can refer to it from models and run data_tests.

$ cat macros/test.sql 
{% macro offset_id(id) %}
    {% if target.name == 'prd' %}
        {{ id }} + 1000
    {% else %}
        {{ id }}
    {% endif %}
{% endmacro %}

/*
select {{ offset_id(1) }} as id
*/

{% test positive_value(model, column_name) %}
    select *
    from {{ model }}
    where {{ column_name }} < 0
{% endtest %}

/*
models:
  - name: my_first_dbt_model
    columns:
      - name: id
        data_tests:
          - positive_value
*/

Macros can also cause side effects, and by calling from pre/post_hook or materialized=ephemeral that don’t create tables, you can perform any processing within the dbt workflow. The execute variable is false during DAG generation when queries don’t run, and run_query is a macro that executes queries. You can call this using jinja’s do without output. Besides, you can run it with dbt run-operation, and codegen is also implemented as macro.

Automatically generate sources.yml, staging models, and schema.yml using dbt codegen - sambaiz-net

{% macro export_to_gcs(
    source_model,
    gcs_bucket,
    export_format='CSV'
) %}

{% if execute %}
  {% set gcs_path = 'exports/' ~ model.name ~ '.csv' %}
  {% set gcs_uri = 'gs://' ~ gcs_bucket ~ '/' ~ gcs_path %}
  
  {% set export_query %}
  EXPORT DATA
  OPTIONS(
    uri='{{ gcs_uri }}',
    format='{{ export_format }}'
    {% if export_format == 'CSV' %}
    , header=true,
    field_delimiter=','
    {% endif %}
  )
  AS (
    {{ source_model }}
  )
  {% endset %}
  
  {% do run_query(export_query) %}
{% endif %}

{% endmacro %}

Such macros can be called from ephemeral models like this:

{{ config(materialized='ephemeral') }}

{%- set query %}
select * from {{ ref('my_model') }}
{%- endset %}

{{ export_to_gcs(
    source_model=query,
    gcs_bucket='my-bucket'
) }}

You can also override the behavior of adding the default_schema prefix when you specify a schema in the config by defining generate_schema_name, which is used to generate the schema name.

{% macro generate_schema_name(custom_schema_name, node) -%}

    {%- set default_schema = target.schema -%}
    {%- if custom_schema_name is none -%}

        {{ default_schema }}

    {%- else -%}

        {{ default_schema }}_{{ custom_schema_name | trim }}

    {%- endif -%}

{%- endmacro %}

Similarly, you can also define your own materialization.

{%- materialization table_function, adapter='bigquery' -%}
  {%- set params = config.get('params', '') -%}
  {%- set relation = api.Relation.create(identifier=this.identifier, schema=this.schema, database=this.database, type="view") -%}
  {%- set funcname = "`" ~ ([relation.database, relation.schema, relation.identifier] | join(".")) ~ "`" -%}

  {% call statement('main') -%}
  CREATE OR REPLACE TABLE FUNCTION {{ funcname }}({{ params }})
  AS (
    {{ sql }}
  )
  {%- endcall %}

  {{ return({'relations': [relation]}) }}
{%- endmaterialization -%}

/*
{{
  config(
    materialized='table_function',
    params='aaa INT, bbb INT'
  )
}}

SELECT aaa as a, bbb as b
*/

Reference

dbtのincremental modelを用いたBigQueryの差分更新 〜 copy_partitionsも試してみた 〜

[BigQuery] dbtでtable functionを管理する #SQL - Qiita