Create BigQuery Tables for Apache Iceberg using dbt and Read them from Snowflake

snowflakegcpterraformiceberg

When reading data stored in BigQuery from Snowflake, direct access is not possible, so it’s necessary to either reload it into Snowflake tables or export it to GCS. However, regarding the former approach, there are concerns about data and schema discrepancy due to losing the single source of truth, and in either case, when using both BigQuery and Snowflake, storage costs are doubled. For the latter approach, instead of storing data in BigQuery storage, it’s possible to operate it as an external table, but this requires an external engine like Spark for writing data.

Walk through Iceberg metadata contents by creating tables, modifying schema and write mode, and writing data in Spark - sambaiz-net

BigQuery tables for Apache Iceberg are Iceberg tables that can be read from and written to using BigQuery. This means you can write to them using the BigQuery adapter for dbt with CREATE TABLE AS statements, just like regular tables.

CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME(
  ...
)
WITH CONNECTION `(project_id).us.****`
OPTIONS (
file_format = 'PARQUET',
table_format = 'ICEBERG',
storage_uri = 'gs://****');

You need to grant objectCreator permissions to the connection’s ServiceAccount.

resource "google_bigquery_connection" "iceberg_table" {
  project       = "(project_id)"
  connection_id = "iceberg_table"
  location      = "US"
  cloud_resource {}
}

resource "google_storage_bucket_iam_member" "iceberg_connection_gcs_creator" {
  bucket = "(bucket_name)"
  role   = "roles/storage.objectCreator"
  member = "serviceAccount:${google_bigquery_connection.iceberg_table.cloud_resource[0].service_account_id}"
}

However, since this is still in preview, the dbt PR hasn’t been merged yet and we cannot pass Iceberg-specific parameters when creating tables. Therefore, I overwrote the bigquery_table_options macro to pass these parameters. The reason I didn’t overwrite bigquery_options is to avoid failures when creating _tmp tables during incremental merges by specifying a different directory when temporary = True. Besides, since only clustering is supported and not partitioning, insert_overwrite is not possible.

Creating new tables in BigQuery by processing data with SQL using dbt - sambaiz-net

{% macro bigquery_table_options(config, node, temporary) %}
  {% set opts = adapter.get_table_options(config, node, temporary) %}
  
  {%- if config.get('table_format') == 'iceberg' -%}
    {%- set direcotry = 'iceberg/' if not temporary else 'iceberg_tmp/' -%}
    {%- do opts.update({
        'file_format': "'parquet'",
        'table_format': "'iceberg'",
        'storage_uri': "'gs://(bucket_name)/" ~ direcotry ~ model.schema ~ "/" ~ model.name ~ "'"
    }) -%}
  {%- endif -%}

  {% set connection -%}
    {%- if config.get('table_format') == 'iceberg' -%}
      WITH CONNECTION `{{ target.project }}.us.iceberg_table`
    {%- endif -%}
  {%- endset %}

  {%- do return(connection ~ "\n" ~ bigquery_options(opts)) -%}
{%- endmacro -%}

Now we can create Iceberg tables. However, the initial v0.metadata.json doesn’t contain format-version and other required fields, which causes errors when creating ICEBERG TABLE in Snowflake, so we need to EXPORT TABLE METADATA.

{{ config(
    materialized='incremental',
    table_format='iceberg',
    storage_uri='gs://*****',
    post_hook="EXPORT TABLE METADATA FROM {{ this }}",
) }}

select ...

Then create an EXTERNAL VOLUME and CATALOG INTEGRATION for GCS in Snowflake, grant the ServiceAccount the required permissions, and create an ICEBERG TABLE to read the data. Note that storage.buckets.get permission is required, so objectCreator permission alone is not sufficient.

CREATE EXTERNAL VOLUME GCS_EXTERNAL_VOLUME
  STORAGE_LOCATIONS =
    (
      (
        NAME = 'asia-northeast1'
        STORAGE_PROVIDER = 'GCS'
        STORAGE_BASE_URL = 'gcs://******/'
      )
    );
 
DESC EXTERNAL VOLUME GCS_EXTERNAL_VOLUME;
-- {"STORAGE_GCP_SERVICE_ACCOUNT":"****.iam.gserviceaccount.com",...}

SELECT SYSTEM$VERIFY_EXTERNAL_VOLUME('GCS_EXTERNAL_VOLUME');
-- {"success":true,...}

CREATE CATALOG INTEGRATION OBJECT_STORE_CATALOG_INTEGRATION
  CATALOG_SOURCE = OBJECT_STORE
  TABLE_FORMAT = ICEBERG
  ENABLED = TRUE;

CREATE ICEBERG TABLE test
   EXTERNAL_VOLUME = GCS_EXTERNAL_VOLUME
   CATALOG = OBJECT_STORE_CATALOG_INTEGRATION
   METADATA_FILE_PATH = '****/metadata/v****.metadata.json';

However, unlike with Glue, AUTO REFRESH does not occur because the metadata file path needs to be specified.

Register Iceberg Tables in Glue Data Catalog to query from Athena and Snowflake - sambaiz-net

You can retrieve the latest metadata file as shown below, and keep it up to date by ALTER TABLE REFRESH. This can also be done from a PROCEDURE.

CREATE STORAGE INTEGRATION GCS_DATA_INTEGRATION
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://******/');
  
CREATE STAGE GCS_DATA_STAGE
  STORAGE_INTEGRATION = GCS_DATA_INTEGRATION
  URL = 'gcs://******/';

SELECT METADATA$FILENAME AS FILE_PATH
FROM @GCS_DATA_STAGE
WHERE METADATA$FILENAME LIKE '%metadata.json'
ORDER BY METADATA$FILE_LAST_MODIFIED DESC
LIMIT 1;

ALTER ICEBERG TABLE test REFRESH '*****.metadata.json';