dbt-athena で Iceberg テーブルの作成とスキーマの更新のみを行う

icebergaws

現状 AWS で Iceberg を扱うにあたりテーブルをどう管理していくか悩ましい状態にある。というのも CDK (CloudFormation) で Iceberg テーブルを更新すると Iceberg テーブルとして扱われなくなる問題があるからだ。Athena Notebooks から Spark で作成するのが一番手軽だが、もう少しレビューやデプロイの観点で運用に乗りやすそうな方法を探していたところ、dbt 公式の Athena アダプタである dbt-athena に辿り着いた。

Iceberg テーブルを Glue Data Catalog に登録して Athena や Snowflake からクエリする - sambaiz-net

dbt で BigQuery のデータを SQL で加工して新しいテーブルを作る - sambaiz-net

$ python -m pip install dbt-core dbt-athena
$ dbt --version
Core:
  - installed: 1.10.4
  - latest:    1.10.4 - Up to date!

Plugins:
  - athena: 1.9.4 - Up to date!
  
$ cat ~/.dbt/profiles.yml 
athenatest:
  outputs:
    dev:
      database: awsdatacatalog
      region_name: ap-northeast-1
      s3_data_dir: s3://****/data
      s3_staging_dir: s3://****/staging
      schema: dbtathenatest
      threads: 1
      type: athena
  target: dev

dbt は通常レコードの挿入まで行うが、0 行 append することでテーブルの作成/更新のみを行うことができる。on-schema-change のデフォルトは ignore なので設定しないと追加が反映されない。Trino での構造体は STRUCT でなく ROW。DataCatalog には STRUCT として登録される。

{{ config(
    materialized = 'incremental',
    incremental_strategy = 'append',
    table_type = 'iceberg',
    format = 'parquet',
    partitioned_by = ['id'],
    on_schema_change = 'append_new_columns'
) }}

select
    CAST(NULL AS INTEGER) AS id,
    CAST(NULL AS ROW(a INTEGER, b VARCHAR)) AS value
where false

location はデフォルト{data_dir}/{schema}/{model}/{uuid} になる

# create a table
$ dbt run --debug
....
create table "awsdatacatalog"."dbtathenatest"."my_first_dbt_model"
    with (
      table_type='iceberg',
      is_external=false,location='****/data/dbtathenatest/my_first_dbt_model/985181f4-f9d6-4486-9063-697ca8f510c5',
      partitioning=ARRAY['id'],
      format='parquet'
    )
    as
    ...

# add a column
$ dbt run --debug
...
In "awsdatacatalog"."dbtathenatest"."my_first_dbt_model":
        Schema changed: True
        Source columns not in target: [AthenaColumn(column='b', dtype='string', char_size=None, numeric_precision=None, numeric_scale=None, table_type=<TableType.ICEBERG: 'iceberg_table'>)]
        Target columns not in source: []
        New column types: []

On model.dbtathenatest.my_first_dbt_model: alter table `dbtathenatest`.`my_first_dbt_model`
          add columns (b string)
...

しかし Athena 由来の制約があって、特定の tblproperties しか設定できず、パーティションの変更も反映されない

{{ config(
    ...
    tblproperties = {
        'write.update.mode': 'merge-on-read'
    }
) }}

// An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: Table properties [write.update.mode] are not supported.

ちなみに BigQuery のときと同じく incremental_strategy として insert_overwrite は設定できない

dbt で BigQuery tables for Apache Iceberg を作成し Snowflake から読む - sambaiz-net

Invalid incremental strategy provided: insert_overwrite
        Incremental models on Iceberg tables only work with 'append', 'microbatch' or 'merge' (v3 only) strategy.
Microbatch model 'my_first_dbt_model' must provide an 'event_time' (string) config that indicates the name of the event time column.