Snowflake から BigQuery に入っているデータを読む際、直接アクセスすることはできないためSnowflake のテーブルに入れ直すか GCS にエクスポートなどする必要がある。ただ、前者に関しては単一ソースでなくなることによるデータやスキーマの乖離の心配があり、いずれの場合も BigQuery と併用する場合データの保存コストが二重でかかってしまう。後者の場合は BigQuery ストレージに保存するのではなく外部テーブルとして運用する方法もあるが書き込むのに Spark のような外部エンジンが必要になる。
Spark で Iceberg テーブルを作成しスキーマや write mode を変更してデータを書き込みメタデータの内容を確認する - sambaiz-net
BigQuery tables for Apache Iceberg はBigQuery から読み書きできる Iceberg テーブル。つまり通常のテーブルと同様に dbt の bigquery adapter で CREATE TABLE AS などして書き込むことができる。
CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME(
...
)
WITH CONNECTION `(project_id).us.****`
OPTIONS (
file_format = 'PARQUET',
table_format = 'ICEBERG',
storage_uri = 'gs://****');
connection の ServiceAccount に objectCreator の権限を与える必要がある。
resource "google_bigquery_connection" "iceberg_table" {
project = "(project_id)"
connection_id = "iceberg_table"
location = "US"
cloud_resource {}
}
resource "google_storage_bucket_iam_member" "iceberg_connection_gcs_creator" {
bucket = "(bucket_name)"
role = "roles/storage.objectCreator"
member = "serviceAccount:${google_bigquery_connection.iceberg_table.cloud_resource[0].service_account_id}"
}
ただ、プレビューということもありまだ dbt の PR が通っておらず CREATE TABLE 時に Iceberg 用のパラメータを渡すことができない。そこで bigquery_table_options macro を上書きしてパラメータを渡すようにする。ちなみに bigquery_options でないのは incremental で merge する際に作られる _tmp テーブルの作成に失敗しないよう temporary = True のとき別のディレクトリを指定するため。なおパーティションはサポートしておらずクラスタリングしかできないので insert_overwrite はできない。
dbt で BigQuery のデータを SQL で加工して新しいテーブルを作る - sambaiz-net
{% macro bigquery_table_options(config, node, temporary) %}
{% set opts = adapter.get_table_options(config, node, temporary) %}
{%- if config.get('table_format') == 'iceberg' -%}
{%- set direcotry = 'iceberg/' if not temporary else 'iceberg_tmp/' -%}
{%- do opts.update({
'file_format': "'parquet'",
'table_format': "'iceberg'",
'storage_uri': "'gs://(bucket_name)/" ~ direcotry ~ model.schema ~ "/" ~ model.name ~ "'"
}) -%}
{%- endif -%}
{% set connection -%}
{%- if config.get('table_format') == 'iceberg' -%}
WITH CONNECTION `{{ target.project }}.us.iceberg_table`
{%- endif -%}
{%- endset %}
{%- do return(connection ~ "\n" ~ bigquery_options(opts)) -%}
{%- endmacro -%}
これで Iceberg テーブルを作成できるようになった。初めからある v0.metadata.json には format-version などが含まれておらず、Snowflake で ICEBERG TABLE を作成する際にエラーになるので、EXPORT TABLE METADATA する。
```sql
{{ config(
materialized='incremental',
table_format='iceberg',
storage_uri='gs://*****',
post_hook="EXPORT TABLE METADATA FROM {{ this }}",
) }}
select ...
そして Snowflake に GCS の EXTERNAL VOLUME と OBJECT_STORE の CATALOG INTEGRATION を作成し、ServiceAccount に権限を与えて ICEBERG TABLE を作れば読めるようになる。ちなみに storage.buckets.get が必要なので objectCreator では権限が足りない。
CREATE EXTERNAL VOLUME GCS_EXTERNAL_VOLUME
STORAGE_LOCATIONS =
(
(
NAME = 'asia-northeast1'
STORAGE_PROVIDER = 'GCS'
STORAGE_BASE_URL = 'gcs://******/'
)
);
DESC EXTERNAL VOLUME GCS_EXTERNAL_VOLUME;
-- {"STORAGE_GCP_SERVICE_ACCOUNT":"****.iam.gserviceaccount.com",...}
SELECT SYSTEM$VERIFY_EXTERNAL_VOLUME('GCS_EXTERNAL_VOLUME');
-- {"success":true,...}
CREATE CATALOG INTEGRATION OBJECT_STORE_CATALOG_INTEGRATION
CATALOG_SOURCE = OBJECT_STORE
TABLE_FORMAT = ICEBERG
ENABLED = TRUE;
CREATE ICEBERG TABLE test
EXTERNAL_VOLUME = GCS_EXTERNAL_VOLUME
CATALOG = OBJECT_STORE_CATALOG_INTEGRATION
METADATA_FILE_PATH = '****/metadata/v****.metadata.json';
ただ、Glue のときと異なり metadata ファイルのパスを指定する必要があるため AUTO REFRESH は行われない。
Iceberg テーブルを Glue Data Catalog に登録して Athena や Snowflake からクエリする - sambaiz-net
次のようにして最新の metadata ファイルが取得できるので、これで ALTER TABLE REFRESH すれば追従できる。これを PROCEDURE から行うこともできる。
CREATE STORAGE INTEGRATION GCS_DATA_INTEGRATION
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'GCS'
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://******/');
CREATE STAGE GCS_DATA_STAGE
STORAGE_INTEGRATION = GCS_DATA_INTEGRATION
URL = 'gcs://******/';
SELECT METADATA$FILENAME AS FILE_PATH
FROM @GCS_DATA_STAGE
WHERE METADATA$FILENAME LIKE '%metadata.json'
ORDER BY METADATA$FILE_LAST_MODIFIED DESC
LIMIT 1;
ALTER ICEBERG TABLE test REFRESH '*****.metadata.json';