Spark で Iceberg テーブルを作成しスキーマや write mode を変更してデータを書き込みメタデータの内容を確認する

icebergsparksnowflake

Apache Iceberg は大規模なデータの処理を効率的に行えるようにするオープンなテーブルフォーマットで TrinoSnowflake など様々なシステムから利用できる。ACID なトランザクション、元データを書き換えることなくテーブルのスキーマを変更できる Schema Evolution、タイムトラベルに加え、Hive の day=xxxx/ のような物理構造に依存せず day(event_ts) や month(event_ts) といったテーブルには含まれない論理的な値によって PARTITION BY する Hidden Partitioning によって、パーティションの粒度を意識することなく WHERE event_ts BETWEEN 2025-01-01 AND 2025-03-01 のようにクエリでき、またクエリを壊すことなく粒度を変更する Partition evolution を行うこともできる。

テーブルの作成

Quickstart の docker-compose.yml を用いると、Spark のデフォルトカタログに Iceberg が設定された Jupyter Notebook と S3 の互換ストレージ MinIO が立ち上がるので簡単に試すことができる。

spark.sparkContext.getConf().getAll()

'''
[('spark.sql.catalog.demo.s3.endpoint', 'http://minio:9000'),
 ('spark.sql.catalog.demo.warehouse', 's3://warehouse/wh/'),
 ('spark.sql.catalog.demo.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO'),
 ('spark.sql.catalog.demo.uri', 'http://rest:8181'),
 ('spark.sql.catalog.demo.type', 'rest'),
 ('spark.sql.catalog.demo', 'org.apache.iceberg.spark.SparkCatalog')
 ('spark.sql.defaultCatalog', 'demo'),
 ...]
'''

データベースやテーブルの操作は SQL で行うことができる。

spark.sql("CREATE DATABASE IF NOT EXISTS testdb")
spark.sql("DROP TABLE IF EXISTS nyc.taxis")
df = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2021-04.parquet")
df.write.saveAsTable("nyc.taxis")
spark.sql("DESCRIBE nyc.taxis").show()

'''
+--------------------+-------------+-------+
|            col_name|    data_type|comment|
+--------------------+-------------+-------+
|            VendorID|       bigint|   NULL|
|tpep_pickup_datetime|timestamp_ntz|   NULL|
|tpep_dropoff_date...|timestamp_ntz|   NULL|
|     passenger_count|       double|   NULL|
|       trip_distance|       double|   NULL|
|          RatecodeID|       double|   NULL|
|  store_and_fwd_flag|       string|   NULL|
|        PULocationID|       bigint|   NULL|
|        DOLocationID|       bigint|   NULL|
|        payment_type|       bigint|   NULL|
|         fare_amount|       double|   NULL|
|               extra|       double|   NULL|
|             mta_tax|       double|   NULL|
|          tip_amount|       double|   NULL|
|        tolls_amount|       double|   NULL|
|improvement_surch...|       double|   NULL|
|        total_amount|       double|   NULL|
|congestion_surcharge|       double|   NULL|
|         airport_fee|       double|   NULL|
+--------------------+-------------+-------+
'''

spark.sql("SELECT COUNT(*) as cnt FROM nyc.taxis").show()

'''
+-------+
|    cnt|
+-------+
|2171187|
+-------+
'''

Metadata layer のファイルの内容

MinIO を見ると次のようなディレクトリ構造になっていることが確認できる。MinIO Client の access_key と secret は WebUI に admin/password で入って発行できる。

$ brew install minio/stable/mc
$ mc alias set myminio http://localhost:9000 (access_key) (secret)
Added `myminio` successfully.

$ mc tree --files myminio                                                                                  
myminio
└─ warehouse
   └─ nyc
      └─ taxis
         ├─ data
         │  └─ 00003-4-3459f892-f51e-4824-bebd-0f9e82b5edd4-0-00001.parquet
         └─ metadata
            ├─ 00000-52269a8f-f9bb-4983-8ec4-b110faef7b54.metadata.json
            ├─ d7106d39-cc3c-4c2f-8326-8c7390582a38-m0.avro
            └─ snap-6618805095317987444-1-d7106d39-cc3c-4c2f-8326-8c7390582a38.avro
 
$ mc cp -r myminio/warehouse/nyc .

Overview の図に沿って metadata layer のファイルの内容を見ていく。

metadata file にはテーブルのスキーマや snapshot ごとの manifest list のパスなどが含まれている。 write.parquet.compression-codec のデフォルトは 1.4.0 で gzip から zstd に変更されたzstd (Zstandard) は Meta によって開発され RFC にもなっている圧縮アルゴリズム。gzip と比較して同程度の圧縮率でより高速に読み書きできるようだ。

$ cat nyc/taxis/metadata/00000-52269a8f-f9bb-4983-8ec4-b110faef7b54.metadata.json | jq
{
  "format-version": 2,
  "table-uuid": "ac2af1c6-df08-45e7-a3ea-c4d8b111f1ff",
  "location": "s3://warehouse/nyc/taxis",
  "last-sequence-number": 1,
  "last-updated-ms": 1737784106099,
  "last-column-id": 19,
  "current-schema-id": 0,
  "schemas": [
    {
      "type": "struct",
      "schema-id": 0,
      "fields": [
        {
          "id": 1,
          "name": "VendorID",
          "required": false,
          "type": "long"
        },
        {
          "id": 2,
          "name": "tpep_pickup_datetime",
          "required": false,
          "type": "timestamp"
        },
        ...
      ]
    }
  ],
  "default-spec-id": 0,
  "partition-specs": [
    {
      "spec-id": 0,
      "fields": []
    }
  ],
  "last-partition-id": 999,
  "default-sort-order-id": 0,
  "sort-orders": [
    {
      "order-id": 0,
      "fields": []
    }
  ],
  "properties": {
    "owner": "root",
    "created-at": "2025-01-25T05:48:19.449822093Z",
    "write.format.default": "parquet",
    "write.parquet.compression-codec": "zstd"
  },
  "current-snapshot-id": 6618805095317987444,
  "refs": {
    "main": {
      "snapshot-id": 6618805095317987444,
      "type": "branch"
    }
  },
  "snapshots": [
    {
      "sequence-number": 1,
      "snapshot-id": 6618805095317987444,
      "timestamp-ms": 1737784106099,
      "summary": {
        "operation": "append",
        "spark.app.id": "local-1737784086110",
        "added-data-files": "1",
        "added-records": "2171187",
        "added-files-size": "33920189",
        "changed-partition-count": "1",
        "total-records": "2171187",
        "total-files-size": "33920189",
        "total-data-files": "1",
        "total-delete-files": "0",
        "total-position-deletes": "0",
        "total-equality-deletes": "0"
      },
      "manifest-list": "s3://warehouse/nyc/taxis/metadata/snap-6618805095317987444-1-d7106d39-cc3c-4c2f-8326-8c7390582a38.avro",
      "schema-id": 0
    }
  ],
  "statistics": [],
  "partition-statistics": [],
  "snapshot-log": [
    {
      "timestamp-ms": 1737784106099,
      "snapshot-id": 6618805095317987444
    }
  ],
  "metadata-log": []
}

manifest list には snapshot に対応する manifest のパスが、

$ fastavro -p nyc/taxis/metadata/snap-6618805095317987444-1-d7106d39-cc3c-4c2f-8326-8c7390582a38.avro
{
    "manifest_path": "s3://warehouse/nyc/taxis/metadata/d7106d39-cc3c-4c2f-8326-8c7390582a38-m0.avro",
    "manifest_length": 8185,
    "partition_spec_id": 0,
    "content": 0,
    "sequence_number": 1,
    "min_sequence_number": 1,
    "added_snapshot_id": 6618805095317987444,
    "added_files_count": 1,
    "existing_files_count": 0,
    "deleted_files_count": 0,
    "added_rows_count": 2171187,
    "existing_rows_count": 0,
    "deleted_rows_count": 0,
    "partitions": []
}

manifest file には data file のパスや値のレンジなどの情報が含まれる。ALTER TABLE … WRITE ORDERED BY で sort-orders を設定すると値がソートして書き込まれるようになりプルーニングの効率が上がる。ちなみに data file のサイズは write.target-file-size-bytes で調整できてデフォルトは 512 MB。テーブルが小さすぎる場合や大きすぎる場合は変更することでパフォーマンスが向上する可能性がある。

$ fastavro -p nyc/taxis/metadata/d7106d39-cc3c-4c2f-8326-8c7390582a38-m0.avro
{
    "status": 1,
    "snapshot_id": 6618805095317987444,
    "sequence_number": null,
    "file_sequence_number": null,
    "data_file": {
        "content": 0,
        "file_path": "s3://warehouse/nyc/taxis/data/00003-4-3459f892-f51e-4824-bebd-0f9e82b5edd4-0-00001.parquet",
        "file_format": "PARQUET",
        "partition": {},
        "record_count": 2171187,
        "file_size_in_bytes": 33920189,
        "column_sizes": [
            {
                "key": 1,
                "value": 275145
            },
            {
                "key": 2,
                "value": 8306768
            },
            ...
        ],
        "value_counts": [
            {
                "key": 1,
                "value": 2171187
            },
            {
                "key": 2,
                "value": 2171187
            },
            ...
        ],
        "null_value_counts": [
            {
                "key": 1,
                "value": 0
            },
            {
                "key": 2,
                "value": 0
            },
            ...
        ],
        "nan_value_counts": [
            {
                "key": 4,
                "value": 0
            },
            {
                "key": 5,
                "value": 0
            },
            ...
        ],
        "lower_bounds": [
            {
                "key": 1,
                "value": "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
            },
            {
                "key": 2,
                "value": "\u00c0\u00ed[Rc_\u0004\u0000"
            },
            ...
        ],
        "upper_bounds": [
            {
                "key": 1,
                "value": "\u0006\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
            },
            {
                "key": 2,
                "value": "\u00c0\u00b4\u000e\u001es\u00c7\u0005\u0000"
            },
            ...
        ],
        "key_metadata": null,
        "split_offsets": [
            4
        ],
        "equality_ids": null,
        "sort_order_id": 0
    }
}

スキーマの更新

カラム名を変更してクエリできることを確認する。

spark.sql("ALTER TABLE nyc.taxis RENAME COLUMN fare_amount TO fare")

spark.sql("SELECT fare as cnt FROM nyc.taxis LIMIT 5").show()

'''
+----+
| cnt|
+----+
|25.5|
| 5.0|
|11.5|
|44.2|
| 9.0|
+----+
'''

データファイルの方は変わっておらず metadata が追加されている。

$ mc tree --files myminio
myminio
└─ warehouse
   └─ nyc
      └─ taxis
         ├─ data
         │  └─ 00003-4-3459f892-f51e-4824-bebd-0f9e82b5edd4-0-00001.parquet
         └─ metadata
            ├─ 00000-52269a8f-f9bb-4983-8ec4-b110faef7b54.metadata.json
            ├─ 00001-384bf525-2a5b-43cb-bc76-2c70c63880df.metadata.json
            ├─ d7106d39-cc3c-4c2f-8326-8c7390582a38-m0.avro
            └─ snap-6618805095317987444-1-d7106d39-cc3c-4c2f-8326-8c7390582a38.avro
    
  
$ brew install parquet-cli
$ parquet meta nyc/taxis/data/00003-4-3459f892-f51e-4824-bebd-0f9e82b5edd4-0-00001.parquet 
...
fare_amount            DOUBLE    Z _ R     2171187   1.07 B     0       "-415.0" / "395854.44"
...

新しい metadata にはカラム名が変更された schema-id=1 のスキーマが含まれ current-schema-id が 1 になっている。

$ cat nyc/taxis/metadata/00001-384bf525-2a5b-43cb-bc76-2c70c63880df.metadata.json | jq
{
  "format-version": 2,
  "table-uuid": "ac2af1c6-df08-45e7-a3ea-c4d8b111f1ff",
  "location": "s3://warehouse/nyc/taxis",
  "last-sequence-number": 1,
  "last-updated-ms": 1737788467669,
  "last-column-id": 19,
  "current-schema-id": 1,
  "schemas": [
    {
      "type": "struct",
      "schema-id": 0,
      "fields": [
        ...
        {
          "id": 11,
          "name": "fare_amount",
          "required": false,
          "type": "double"
        },
        ...
      ]
    },
    {
      "type": "struct",
      "schema-id": 1,
      "fields": [
        ...
        {
          "id": 11,
          "name": "fare",
          "required": false,
          "type": "double"
        },
        ...
      ]
    }
  ],
  ...
}

データの更新

続いてデータを更新する。

spark.sql("DELETE FROM nyc.taxis WHERE trip_distance > 2.0")

今度は metadata に加えて snapshot が作られ、更新後の全データが入っている新しいファイルが作られている。

$ mc tree --files myminio  
myminio
└─ warehouse
   └─ nyc
      └─ taxis
         ├─ data
         │  ├─ 00000-15-bfd17dbe-1da9-4267-951b-2c89252a36fd-0-00001.parquet
         │  └─ 00003-4-3459f892-f51e-4824-bebd-0f9e82b5edd4-0-00001.parquet
         └─ metadata
            ├─ 00000-52269a8f-f9bb-4983-8ec4-b110faef7b54.metadata.json
            ├─ 00001-384bf525-2a5b-43cb-bc76-2c70c63880df.metadata.json
            ├─ 00002-091f69c9-60c8-4f59-a22d-e6b78acc73eb.metadata.json
            ├─ 6fc7451d-b371-4e3f-ae9b-3e47cd8bb741-m0.avro
            ├─ 6fc7451d-b371-4e3f-ae9b-3e47cd8bb741-m1.avro
            ├─ d7106d39-cc3c-4c2f-8326-8c7390582a38-m0.avro
            ├─ snap-6618805095317987444-1-d7106d39-cc3c-4c2f-8326-8c7390582a38.avro
            └─ snap-8453071282482784430-1-6fc7451d-b371-4e3f-ae9b-3e47cd8bb741.avro

$ parquet meta nyc/taxis/data/00000-15-bfd17dbe-1da9-4267-951b-2c89252a36fd-0-00001.parquet
...
fare                   DOUBLE    Z _ R     1239939   0.80 B     0       "-415.0" / "700.0"
...

この挙動はテーブルの write.(update|delete|merge).mode に依存していて、いずれもデフォルトは現在のデータをコピーして更新する copy-on-write になっている。これを merge-on-read に変更して INSERT してみる。

spark.sql("ALTER TABLE nyc.taxis SET TBLPROPERTIES ('write.update.mode' = 'merge-on-read')")

spark.sql("""
INSERT INTO nyc.taxis VALUES 
(1, 
 CAST('2021-04-01 00:42:37' AS TIMESTAMP), 
 CAST('2021-04-01 00:46:23' AS TIMESTAMP), 
 1.0, 0.9, 1.0, 'N', 75, 236, 2, 
 5.0, 3.0, 0.5, 0.0, 0.0, 0.3, 8.8, 2.5, 0.0)
""")

すると差分データが別ファイルとして書き込まれた。これにより書き込みのパフォーマンスは上がるが、読み込み時はファイルの数が増えるため下がってしまう。

$ mc tree --files myminio                                                                  
myminio
└─ warehouse
   └─ nyc
      └─ taxis
         ├─ data
         │  ├─ 00000-15-bfd17dbe-1da9-4267-951b-2c89252a36fd-0-00001.parquet
         │  ├─ 00000-18-710d719a-8b80-456e-aa0c-d392d0aa8c54-0-00001.parquet
         │  └─ 00003-4-3459f892-f51e-4824-bebd-0f9e82b5edd4-0-00001.parquet
         └─ metadata
            ├─ 00000-52269a8f-f9bb-4983-8ec4-b110faef7b54.metadata.json
            ├─ 00001-384bf525-2a5b-43cb-bc76-2c70c63880df.metadata.json
            ├─ 00002-091f69c9-60c8-4f59-a22d-e6b78acc73eb.metadata.json
            ├─ 00003-81cf943a-c4ea-4fab-8071-be9f741d3f46.metadata.json
            ├─ 00004-dee60357-85bf-47e9-8191-a19fecd3cfd4.metadata.json
            ├─ 00005-b898bb75-0909-402d-a0a9-6cd87b94d1a2.metadata.json
            ├─ 6fc7451d-b371-4e3f-ae9b-3e47cd8bb741-m0.avro
            ├─ 6fc7451d-b371-4e3f-ae9b-3e47cd8bb741-m1.avro
            ├─ 95d686a2-f8f6-4308-982f-0844ad914429-m0.avro
            ├─ d7106d39-cc3c-4c2f-8326-8c7390582a38-m0.avro
            ├─ snap-6550392446076941573-1-95d686a2-f8f6-4308-982f-0844ad914429.avro
            ├─ snap-6618805095317987444-1-d7106d39-cc3c-4c2f-8326-8c7390582a38.avro
            └─ snap-8453071282482784430-1-6fc7451d-b371-4e3f-ae9b-3e47cd8bb741.avro

$ parquet meta nyc/taxis/data/00000-18-710d719a-8b80-456e-aa0c-d392d0aa8c54-0-00001.parquet
...
Row group 0:  count: 1  766.00 B records  start: 4  total(compressed): 766 B total(uncompressed):595 B
...

そのままだと悪化する一方なので rewrite_data_files procedure を呼ぶとファイルをまとめて compact にすることができる。また metadata についても write.metadata.delete-after-commit.enabled を有効にすることで古いものを消すことができる。

spark.sql("""
CALL system.rewrite_data_files(
  table => 'nyc.taxis'
)""")

タイムトラベル

(テーブル名).snapshots から snapshot が作られた日時を確認でき、TIMESTAMP AS OF でタイムトラベルできる。

spark.sql("SELECT * FROM nyc.taxis.snapshots").show()

'''
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-01-25 05:48:...|6618805095317987444|               NULL|   append|s3://warehouse/ny...|{spark.app.id -> ...|
|2025-01-25 07:54:...|8453071282482784430|6618805095317987444|overwrite|s3://warehouse/ny...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
'''

spark.sql("SELECT count(fare_amount) FROM nyc.taxis TIMESTAMP AS OF '2025-01-25 06:00:00' LIMIT 5").show() # 2171187

spark.sql("SELECT count(fare) FROM nyc.taxis TIMESTAMP AS OF '2025-01-25 08:00:00' LIMIT 5").show() # 1239939

参考

near real time で更新される Apache Iceberg の table のメンテナンス

AWS 規範ガイダンス - AWS での Apache Iceberg の使用