Iceberg テーブルを Glue Data Catalog に登録して Athena や Snowflake からクエリする

awsicebergsparksnowflake

Glue Data Catalog に Iceberg テーブルを登録すると、他のテーブルと同様に Athena や EMR などから参照できるほか、Snowflake ではスキーマを渡すことなく ICEBERG TABLE を作成することができ、メタデータの二重管理を避けることができる。また自動で compaction したり、ジョブの失敗時に生まれるメタデータから参照されていないorphan files を掃除したりする機能まである。従来の Hive パーティションでは Glue Data Catalog にパーティション情報を登録し GetPartitions API で取得するため、パーティションが多いとレイテンシやスロットリングが問題になるが、Iceberg はパーティション情報を manifest ファイルとして S3 に保持するため、この問題を回避できる。

Spark で Iceberg テーブルを作成しスキーマや write mode を変更してデータを書き込みメタデータの内容を確認する - sambaiz-net

今回は Athena for Spark でテーブルなどを作成する。table format に Iceberg を選ぶと catalog-impl などが設定されるIcebergSparkSessionExtensions は SQL からプロシージャの CALL などを行えるようにするモジュール。

ちなみに EMR on EKS で動かしたところ Iceberg をサポートしているバージョン emr-7.0.0 でも ClassNotFoundException: org.apache.iceberg.spark.SparkCatalog になったりしたが、最新まで上げると動くことが確認できた。

CDK で EKS クラスタを立ち上げ EMR on EKS に登録し Spark のジョブを動かす - sambaiz-net

Spark でデータベースや Iceberg テーブルを作成・更新すると Data Catalog にも反映される。glue と S3 の権限が必要。

spark.sql("CREATE DATABASE iceberg_test LOCATION 's3://*****/iceberg_test'")
spark.sql("CREATE TABLE iceberg_test.test_table (id bigint, data string) USING iceberg")

なお、CDK (CloudFormation) で openTableFormatInput.icebergInput を渡して EXTERNAL_TABLE を作成することで Glue Data Catalog に Iceberg テーブルを登録することもできるが、スキーマを更新などすると Iceberg テーブルとして扱われなくなって壊れてしまう問題があるのと、パーティションにも対応していないこともあって現状あまり実用的ではない。

const database = new glue.CfnDatabase(this, "Database", {
  catalogId: this.account,
  databaseInput: {
    name: databaseName,
  }
})

new glue.CfnTable(this, "Table", {
  databaseName: databaseName,
  catalogId: this.account,
  tableInput: {
      name: "test_table",
      storageDescriptor: {
          columns: [
            { name: "id", type: "int" },
            { name: "data", type: "string" },
          ],
          location: `s3://*****/iceberg_test`,
      },
      tableType: 'EXTERNAL_TABLE',
  },
  openTableFormatInput: {
      icebergInput: {
          metadataOperation: "CREATE",
          version: "2",
      }
  }
})

Table properties に書き込まれ更新される metadata_location は同時にテーブルが更新されたことを検知しコミットを失敗させる楽観的ロックで参照される。以前は DynamoLockManager が使われていた

ちなみに TBLPROPERTIES に write.spark.accept-any-schema = true を設定して .option(“mergeSchema”,“true”) で書き込むと Schema Merge が行われ、データに合わせてカラムが追加されるが、 この場合も Catalog に反映される。

spark.sql("""
CREATE TABLE iceberg_test.test_table
USING ICEBERG
TBLPROPERTIES ('write.spark.accept-any-schema'='true');
""")

df.writeTo("iceberg_test.test_table") \
  .option("mergeSchema", "true") \
  .append()

これによりスキーマを渡さずに CREATE TABLE することもできるが、カラムがないので PARTITION BY などは後で行う必要がある。

spark.sql("ALTER TABLE iceberg_test.test_table2 ADD PARTITION FIELD aaa")
spark.sql("ALTER TABLE iceberg_test.test_table2 ADD PARTITION FIELD bbb")
spark.sql("DESCRIBE TABLE iceberg_test.test_table2")

新たなカラムが途中に追加されようとすると次のようなカラムが順番通りでないというエラーが発生する。spark.sql.iceberg.check-ordering = false にするとこのチェックを回避することができる。これは入力データとテーブル定義の順番が異なっているために誤った位置のデータを取って書き込んでしまうことを防ぐためのようだが、この設定が追加されたPRで併せてテーブルではなく入力データのスキーマからフィールドのポジションを取るように改修されていて、falseにしても問題がない認識でいるし実際そうした。

Exception in thread "main" java.lang.IllegalArgumentException: Cannot write incompatible dataset to table with schema:
table {
  1: id: optional string
  2: age: optional int
  3: name: optional string
}
Provided schema:
table {
  1: id: optional string
  3: name: optional string
  2: age: optional int
}
Problems:
* name is out of order, before age
	at org.apache.iceberg.types.TypeUtil.checkSchemaCompatibility(TypeUtil.java:489)
...

データを入れて、

df = spark.createDataFrame([
    (1, "aaaa"),
    (2, "bbbb"),
    (3, "cccc")
], ["id", "data"])

df.writeTo("iceberg_test.test_table").append()

Athena からアクセスできることを確認する。

Snowflake では S3 の EXTERNAL VOLUME と Glue の CATALOG INTEGRATION を作成し ICEBERG TABLE に渡す。STORAGE INTEGRATION のときと同様にアクセスユーザーや External ID を取得して Role に設定する必要がある。

Snowflake に S3 のデータをコピーしてクエリする - sambaiz-net

CREATE OR REPLACE EXTERNAL VOLUME ICEBERG_EXTERNAL_VOLUME
   STORAGE_LOCATIONS =
      (
         (
            NAME = 'iceberg_test'
            STORAGE_PROVIDER = 'S3'
            STORAGE_BASE_URL = 's3://*****/iceberg_test/'
            STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::*****:role/*****'
         )
      );
  
DESC EXTERNAL VOLUME ICEBERG_EXTERNAL_VOLUME;

CREATE CATALOG INTEGRATION GLUE_CATALOG_INTEGRATION
  CATALOG_SOURCE = GLUE
  CATALOG_NAMESPACE = 'iceberg_test'
  TABLE_FORMAT = ICEBERG
  GLUE_AWS_ROLE_ARN = 'arn:aws:iam::*****:role/*****'
  GLUE_CATALOG_ID = '(aws account id)'
  GLUE_REGION = 'ap-northeast-1'
  ENABLED = TRUE
  REFRESH_INTERVAL_SECONDS = 60;

DESC CATALOG INTEGRATION GLUE_CATALOG_INTEGRATION;

CREATE DATABASE TESTDB;
USE TESTDB;

CREATE SCHEMA TEST_SCHEMA;

CREATE ICEBERG TABLE TEST_SCHEMA.TEST_TABLE
  EXTERNAL_VOLUME = ICEBERG_EXTERNAL_VOLUME
  CATALOG = GLUE_CATALOG_INTEGRATION
  CATALOG_TABLE_NAME = 'test_table'
  CATALOG_NAMESPACE = 'iceberg_test'
  AUTO_REFRESH = TRUE;

なお、Terraform には preview の snowflake_external_volume があるのみで他のリソースはまだない

キーペア認証で Terraform を実行したり Snowflake CLI や gosnowflake でクエリを実行する - sambaiz-net

これで Snowflake からもアクセスできる。

最新のデータが取れない場合 AUTO_REFRESH が無効だったり失敗していたりキャッシュが効いている可能性があるので確認する。

SELECT SYSTEM$AUTO_REFRESH_STATUS(TEST_TABLE);

ALTER SESSION SET USE_CACHED_RESULT=FALSE;

ちなみに、Glue のような外部カタログから Snowflake のカタログを参照するようにテーブルを変換することもできる。これにより定期的にクエリを実行してデータを更新する DYNAMIC ICEBERG TABLE のソースにすることができる。

ALTER ICEBERG TABLE TEST_TABLE CONVERT TO MANAGED
  BASE_LOCATION = 'snowflake/test_table';

CREATE DYNAMIC ICEBERG TABLE TEST_TABLE_ID (id bigint)
  TARGET_LAG = '20 minutes'
  WAREHOUSE = ***
  EXTERNAL_VOLUME = ICEBERG_EXTERNAL_VOLUME
  CATALOG = 'SNOWFLAKE'
  BASE_LOCATION = 'snowflake/test_table_id'
  AS
    SELECT id FROM test_table;

ただ、変換すると外部ボリュームの (STORAGE_BASE_URL)/BASE_LOCATION に変換後のテーブルが書き込まれ Snowflake がコンパクションなどのメンテナンスを行うようになってしまう。つまりそれ以後別のテーブルになってしまうため、Glue のテーブルに追従するには定期的にテーブルを REPLACE して CONVERT する必要がある。

参考

AWS上でIcebergテーブルを作成する方法についての検討メモ #iceberg - Qiita