現状 AWS で Iceberg を扱うにあたりテーブルをどう管理していくか悩ましい状態にある。というのも CDK (CloudFormation) で Iceberg テーブルを更新すると Iceberg テーブルとして扱われなくなる問題があるからだ。Athena Notebooks から Spark で作成するのが一番手軽だが、もう少しレビューやデプロイの観点で運用に乗りやすそうな方法を探していたところ、dbt 公式の Athena アダプタである dbt-athena に辿り着いた。
Iceberg テーブルを Glue Data Catalog に登録して Athena や Snowflake からクエリする - sambaiz-net
dbt で BigQuery のデータを SQL で加工して新しいテーブルを作る - sambaiz-net
$ python -m pip install dbt-core dbt-athena
$ dbt --version
Core:
- installed: 1.10.4
- latest: 1.10.4 - Up to date!
Plugins:
- athena: 1.9.4 - Up to date!
$ cat ~/.dbt/profiles.yml
athenatest:
outputs:
dev:
database: awsdatacatalog
region_name: ap-northeast-1
s3_data_dir: s3://****/data
s3_staging_dir: s3://****/staging
schema: dbtathenatest
threads: 1
type: athena
target: dev
dbt は通常レコードの挿入まで行うが、0 行 append することでテーブルの作成/更新のみを行うことができる。on-schema-change のデフォルトは ignore なので設定しないと追加が反映されない。Trino での構造体は STRUCT でなく ROW。DataCatalog には STRUCT として登録される。
{{ config(
materialized = 'incremental',
incremental_strategy = 'append',
table_type = 'iceberg',
format = 'parquet',
partitioned_by = ['id'],
on_schema_change = 'append_new_columns'
) }}
select
CAST(NULL AS INTEGER) AS id,
CAST(NULL AS ROW(a INTEGER, b VARCHAR)) AS value
where false
location はデフォルトで {data_dir}/{schema}/{model}/{uuid} になる。
# create a table
$ dbt run --debug
....
create table "awsdatacatalog"."dbtathenatest"."my_first_dbt_model"
with (
table_type='iceberg',
is_external=false,location='****/data/dbtathenatest/my_first_dbt_model/985181f4-f9d6-4486-9063-697ca8f510c5',
partitioning=ARRAY['id'],
format='parquet'
)
as
...
# add a column
$ dbt run --debug
...
In "awsdatacatalog"."dbtathenatest"."my_first_dbt_model":
Schema changed: True
Source columns not in target: [AthenaColumn(column='b', dtype='string', char_size=None, numeric_precision=None, numeric_scale=None, table_type=<TableType.ICEBERG: 'iceberg_table'>)]
Target columns not in source: []
New column types: []
On model.dbtathenatest.my_first_dbt_model: alter table `dbtathenatest`.`my_first_dbt_model`
add columns (b string)
...
しかし Athena 由来の制約があって、特定の tblproperties しか設定できず、パーティションの変更も反映されない。
{{ config(
...
tblproperties = {
'write.update.mode': 'merge-on-read'
}
) }}
// An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: Table properties [write.update.mode] are not supported.
ちなみに BigQuery のときと同じく incremental_strategy として insert_overwrite は設定できない。
dbt で BigQuery tables for Apache Iceberg を作成し Snowflake から読む - sambaiz-net
Invalid incremental strategy provided: insert_overwrite
Incremental models on Iceberg tables only work with 'append', 'microbatch' or 'merge' (v3 only) strategy.
Microbatch model 'my_first_dbt_model' must provide an 'event_time' (string) config that indicates the name of the event time column.