Walk through Iceberg metadata contents by creating tables, modifying schema and write mode, and writing data in Spark

icebergsparksnowflake

Apache Iceberg is an open table format that enables efficient processing of large amounts of data, and can be used by various systems such as Spark, Trino and Snowflake. In addition to ACID transaction, Schema Evolution, which allow you to change the table schema without rewriting the original data, and Time Travel, Hidden Partitioning, which does not rely on physical structures such as Hive’s day=xxxx/ and enables to do PARTITION BY logical values ​​that are not included in the table, such as day(event_ts) and month(event_ts), allows queries such as “WHERE event_ts BETWEEN 2025-01-01 AND 2025-03-01” without being aware of the granularity of the partitions, and Partition evolution, which changes the granularity without breaking the query, can also be performed.

Creating tables

Using the docker-compose.yml from the Quickstart, you can easily try it out as it launches a Jupyter Notebook with Iceberg configured as Spark’s default catalog, along with MinIO, an S3-compatible storage.

spark.sparkContext.getConf().getAll()

'''
[('spark.sql.catalog.demo.s3.endpoint', 'http://minio:9000'),
 ('spark.sql.catalog.demo.warehouse', 's3://warehouse/wh/'),
 ('spark.sql.catalog.demo.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO'),
 ('spark.sql.catalog.demo.uri', 'http://rest:8181'),
 ('spark.sql.catalog.demo.type', 'rest'),
 ('spark.sql.catalog.demo', 'org.apache.iceberg.spark.SparkCatalog')
 ('spark.sql.defaultCatalog', 'demo'),
 ...]
'''

Database and table operations can be performed using SQL.

spark.sql("CREATE DATABASE IF NOT EXISTS testdb")
spark.sql("DROP TABLE IF EXISTS nyc.taxis")
df = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2021-04.parquet")
df.write.saveAsTable("nyc.taxis")
spark.sql("DESCRIBE nyc.taxis").show()

'''
+--------------------+-------------+-------+
|            col_name|    data_type|comment|
+--------------------+-------------+-------+
|            VendorID|       bigint|   NULL|
|tpep_pickup_datetime|timestamp_ntz|   NULL|
|tpep_dropoff_date...|timestamp_ntz|   NULL|
|     passenger_count|       double|   NULL|
|       trip_distance|       double|   NULL|
|          RatecodeID|       double|   NULL|
|  store_and_fwd_flag|       string|   NULL|
|        PULocationID|       bigint|   NULL|
|        DOLocationID|       bigint|   NULL|
|        payment_type|       bigint|   NULL|
|         fare_amount|       double|   NULL|
|               extra|       double|   NULL|
|             mta_tax|       double|   NULL|
|          tip_amount|       double|   NULL|
|        tolls_amount|       double|   NULL|
|improvement_surch...|       double|   NULL|
|        total_amount|       double|   NULL|
|congestion_surcharge|       double|   NULL|
|         airport_fee|       double|   NULL|
+--------------------+-------------+-------+
'''

spark.sql("SELECT COUNT(*) as cnt FROM nyc.taxis").show()

'''
+-------+
|    cnt|
+-------+
|2171187|
+-------+
'''

Metadata layer file contents

Looking at MinIO, we can confirm it has the following directory structure. The access_key and secret for the MinIO Client can be obtained by logging into the WebUI with admin/password.

$ brew install minio/stable/mc
$ mc alias set myminio http://localhost:9000 (access_key) (secret)
Added `myminio` successfully.

$ mc tree --files myminio                                                                                  
myminio
└─ warehouse
   └─ nyc
      └─ taxis
         ├─ data
         │  └─ 00003-4-3459f892-f51e-4824-bebd-0f9e82b5edd4-0-00001.parquet
         └─ metadata
            ├─ 00000-52269a8f-f9bb-4983-8ec4-b110faef7b54.metadata.json
            ├─ d7106d39-cc3c-4c2f-8326-8c7390582a38-m0.avro
            └─ snap-6618805095317987444-1-d7106d39-cc3c-4c2f-8326-8c7390582a38.avro
 
$ mc cp -r myminio/warehouse/nyc .

Let’s walk the contents of the metadata layer files according to the Overview diagram.

metadata file contains table schemas and manifest list paths for each snapshot. The default value for write.parquet.compression-codec was changed from gzip to zstd in 1.4.0. zstd (Zstandard) is a compression algorithm developed by Meta and was published as RFC. It seems to be able to read and write data faster than gzip with the same compression ratio.

$ cat nyc/taxis/metadata/00000-52269a8f-f9bb-4983-8ec4-b110faef7b54.metadata.json | jq
{
  "format-version": 2,
  "table-uuid": "ac2af1c6-df08-45e7-a3ea-c4d8b111f1ff",
  "location": "s3://warehouse/nyc/taxis",
  "last-sequence-number": 1,
  "last-updated-ms": 1737784106099,
  "last-column-id": 19,
  "current-schema-id": 0,
  "schemas": [
    {
      "type": "struct",
      "schema-id": 0,
      "fields": [
        {
          "id": 1,
          "name": "VendorID",
          "required": false,
          "type": "long"
        },
        {
          "id": 2,
          "name": "tpep_pickup_datetime",
          "required": false,
          "type": "timestamp"
        },
        ...
      ]
    }
  ],
  "default-spec-id": 0,
  "partition-specs": [
    {
      "spec-id": 0,
      "fields": []
    }
  ],
  "last-partition-id": 999,
  "default-sort-order-id": 0,
  "sort-orders": [
    {
      "order-id": 0,
      "fields": []
    }
  ],
  "properties": {
    "owner": "root",
    "created-at": "2025-01-25T05:48:19.449822093Z",
    "write.format.default": "parquet",
    "write.parquet.compression-codec": "zstd"
  },
  "current-snapshot-id": 6618805095317987444,
  "refs": {
    "main": {
      "snapshot-id": 6618805095317987444,
      "type": "branch"
    }
  },
  "snapshots": [
    {
      "sequence-number": 1,
      "snapshot-id": 6618805095317987444,
      "timestamp-ms": 1737784106099,
      "summary": {
        "operation": "append",
        "spark.app.id": "local-1737784086110",
        "added-data-files": "1",
        "added-records": "2171187",
        "added-files-size": "33920189",
        "changed-partition-count": "1",
        "total-records": "2171187",
        "total-files-size": "33920189",
        "total-data-files": "1",
        "total-delete-files": "0",
        "total-position-deletes": "0",
        "total-equality-deletes": "0"
      },
      "manifest-list": "s3://warehouse/nyc/taxis/metadata/snap-6618805095317987444-1-d7106d39-cc3c-4c2f-8326-8c7390582a38.avro",
      "schema-id": 0
    }
  ],
  "statistics": [],
  "partition-statistics": [],
  "snapshot-log": [
    {
      "timestamp-ms": 1737784106099,
      "snapshot-id": 6618805095317987444
    }
  ],
  "metadata-log": []
}

manifest list contains the manifest paths corresponding to the snapshot,

$ fastavro -p nyc/taxis/metadata/snap-6618805095317987444-1-d7106d39-cc3c-4c2f-8326-8c7390582a38.avro
{
    "manifest_path": "s3://warehouse/nyc/taxis/metadata/d7106d39-cc3c-4c2f-8326-8c7390582a38-m0.avro",
    "manifest_length": 8185,
    "partition_spec_id": 0,
    "content": 0,
    "sequence_number": 1,
    "min_sequence_number": 1,
    "added_snapshot_id": 6618805095317987444,
    "added_files_count": 1,
    "existing_files_count": 0,
    "deleted_files_count": 0,
    "added_rows_count": 2171187,
    "existing_rows_count": 0,
    "deleted_rows_count": 0,
    "partitions": []
}

and manifest file contains information such as data file paths and value ranges. Setting sort-orders with ALTER TABLE … WRITE ORDERED BY will make values ​be written in sorted order, which will improve pruning efficiency. By the way, the size of the data file can be adjusted by write.target-file-size-bytes and the default is 512 MB. If the table is too small or too large, changing this may improve performance.

$ fastavro -p nyc/taxis/metadata/d7106d39-cc3c-4c2f-8326-8c7390582a38-m0.avro
{
    "status": 1,
    "snapshot_id": 6618805095317987444,
    "sequence_number": null,
    "file_sequence_number": null,
    "data_file": {
        "content": 0,
        "file_path": "s3://warehouse/nyc/taxis/data/00003-4-3459f892-f51e-4824-bebd-0f9e82b5edd4-0-00001.parquet",
        "file_format": "PARQUET",
        "partition": {},
        "record_count": 2171187,
        "file_size_in_bytes": 33920189,
        "column_sizes": [
            {
                "key": 1,
                "value": 275145
            },
            {
                "key": 2,
                "value": 8306768
            },
            ...
        ],
        "value_counts": [
            {
                "key": 1,
                "value": 2171187
            },
            {
                "key": 2,
                "value": 2171187
            },
            ...
        ],
        "null_value_counts": [
            {
                "key": 1,
                "value": 0
            },
            {
                "key": 2,
                "value": 0
            },
            ...
        ],
        "nan_value_counts": [
            {
                "key": 4,
                "value": 0
            },
            {
                "key": 5,
                "value": 0
            },
            ...
        ],
        "lower_bounds": [
            {
                "key": 1,
                "value": "\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
            },
            {
                "key": 2,
                "value": "\u00c0\u00ed[Rc_\u0004\u0000"
            },
            ...
        ],
        "upper_bounds": [
            {
                "key": 1,
                "value": "\u0006\u0000\u0000\u0000\u0000\u0000\u0000\u0000"
            },
            {
                "key": 2,
                "value": "\u00c0\u00b4\u000e\u001es\u00c7\u0005\u0000"
            },
            ...
        ],
        "key_metadata": null,
        "split_offsets": [
            4
        ],
        "equality_ids": null,
        "sort_order_id": 0
    }
}

Updating schema

Let’s check that we can still query after changing the column name.

spark.sql("ALTER TABLE nyc.taxis RENAME COLUMN fare_amount TO fare")

spark.sql("SELECT fare as cnt FROM nyc.taxis LIMIT 5").show()

'''
+----+
| cnt|
+----+
|25.5|
| 5.0|
|11.5|
|44.2|
| 9.0|
+----+
'''

The data files remain unchanged while metadata has been added.

$ mc tree --files myminio
myminio
└─ warehouse
   └─ nyc
      └─ taxis
         ├─ data
         │  └─ 00003-4-3459f892-f51e-4824-bebd-0f9e82b5edd4-0-00001.parquet
         └─ metadata
            ├─ 00000-52269a8f-f9bb-4983-8ec4-b110faef7b54.metadata.json
            ├─ 00001-384bf525-2a5b-43cb-bc76-2c70c63880df.metadata.json
            ├─ d7106d39-cc3c-4c2f-8326-8c7390582a38-m0.avro
            └─ snap-6618805095317987444-1-d7106d39-cc3c-4c2f-8326-8c7390582a38.avro
    
  
$ brew install parquet-cli
$ parquet meta nyc/taxis/data/00003-4-3459f892-f51e-4824-bebd-0f9e82b5edd4-0-00001.parquet 
...
fare_amount            DOUBLE    Z _ R     2171187   1.07 B     0       "-415.0" / "395854.44"
...

The new metadata includes a schema-id=1 schema containing the renamed column, and the current-schema-id has been updated to 1.

$ cat nyc/taxis/metadata/00001-384bf525-2a5b-43cb-bc76-2c70c63880df.metadata.json | jq
{
  "format-version": 2,
  "table-uuid": "ac2af1c6-df08-45e7-a3ea-c4d8b111f1ff",
  "location": "s3://warehouse/nyc/taxis",
  "last-sequence-number": 1,
  "last-updated-ms": 1737788467669,
  "last-column-id": 19,
  "current-schema-id": 1,
  "schemas": [
    {
      "type": "struct",
      "schema-id": 0,
      "fields": [
        ...
        {
          "id": 11,
          "name": "fare_amount",
          "required": false,
          "type": "double"
        },
        ...
      ]
    },
    {
      "type": "struct",
      "schema-id": 1,
      "fields": [
        ...
        {
          "id": 11,
          "name": "fare",
          "required": false,
          "type": "double"
        },
        ...
      ]
    }
  ],
  ...
}

Writing data

Next, let’s update the data.

spark.sql("DELETE FROM nyc.taxis WHERE trip_distance > 2.0")

This time, in addition to the metadata, a snapshot has been created and a new file containing all the updated data has been generated.

$ mc tree --files myminio  
myminio
└─ warehouse
   └─ nyc
      └─ taxis
         ├─ data
         │  ├─ 00000-15-bfd17dbe-1da9-4267-951b-2c89252a36fd-0-00001.parquet
         │  └─ 00003-4-3459f892-f51e-4824-bebd-0f9e82b5edd4-0-00001.parquet
         └─ metadata
            ├─ 00000-52269a8f-f9bb-4983-8ec4-b110faef7b54.metadata.json
            ├─ 00001-384bf525-2a5b-43cb-bc76-2c70c63880df.metadata.json
            ├─ 00002-091f69c9-60c8-4f59-a22d-e6b78acc73eb.metadata.json
            ├─ 6fc7451d-b371-4e3f-ae9b-3e47cd8bb741-m0.avro
            ├─ 6fc7451d-b371-4e3f-ae9b-3e47cd8bb741-m1.avro
            ├─ d7106d39-cc3c-4c2f-8326-8c7390582a38-m0.avro
            ├─ snap-6618805095317987444-1-d7106d39-cc3c-4c2f-8326-8c7390582a38.avro
            └─ snap-8453071282482784430-1-6fc7451d-b371-4e3f-ae9b-3e47cd8bb741.avro

$ parquet meta nyc/taxis/data/00000-15-bfd17dbe-1da9-4267-951b-2c89252a36fd-0-00001.parquet
...
fare                   DOUBLE    Z _ R     1239939   0.80 B     0       "-415.0" / "700.0"
...

This behavior depends on the table’s write.(update|delete|merge).mode, and the default for both is copy-on-write, which copies current data and update it. Change this to merge-on-read and try INSERT.

spark.sql("ALTER TABLE nyc.taxis SET TBLPROPERTIES ('write.update.mode' = 'merge-on-read')")

spark.sql("""
INSERT INTO nyc.taxis VALUES 
(1, 
 CAST('2021-04-01 00:42:37' AS TIMESTAMP), 
 CAST('2021-04-01 00:46:23' AS TIMESTAMP), 
 1.0, 0.9, 1.0, 'N', 75, 236, 2, 
 5.0, 3.0, 0.5, 0.0, 0.0, 0.3, 8.8, 2.5, 0.0)
""")

The differential data is written as a separate file, which will improve write performance but will slow down read performance due to the increased number of files.

$ mc tree --files myminio                                                                  
myminio
└─ warehouse
   └─ nyc
      └─ taxis
         ├─ data
         │  ├─ 00000-15-bfd17dbe-1da9-4267-951b-2c89252a36fd-0-00001.parquet
         │  ├─ 00000-18-710d719a-8b80-456e-aa0c-d392d0aa8c54-0-00001.parquet
         │  └─ 00003-4-3459f892-f51e-4824-bebd-0f9e82b5edd4-0-00001.parquet
         └─ metadata
            ├─ 00000-52269a8f-f9bb-4983-8ec4-b110faef7b54.metadata.json
            ├─ 00001-384bf525-2a5b-43cb-bc76-2c70c63880df.metadata.json
            ├─ 00002-091f69c9-60c8-4f59-a22d-e6b78acc73eb.metadata.json
            ├─ 00003-81cf943a-c4ea-4fab-8071-be9f741d3f46.metadata.json
            ├─ 00004-dee60357-85bf-47e9-8191-a19fecd3cfd4.metadata.json
            ├─ 00005-b898bb75-0909-402d-a0a9-6cd87b94d1a2.metadata.json
            ├─ 6fc7451d-b371-4e3f-ae9b-3e47cd8bb741-m0.avro
            ├─ 6fc7451d-b371-4e3f-ae9b-3e47cd8bb741-m1.avro
            ├─ 95d686a2-f8f6-4308-982f-0844ad914429-m0.avro
            ├─ d7106d39-cc3c-4c2f-8326-8c7390582a38-m0.avro
            ├─ snap-6550392446076941573-1-95d686a2-f8f6-4308-982f-0844ad914429.avro
            ├─ snap-6618805095317987444-1-d7106d39-cc3c-4c2f-8326-8c7390582a38.avro
            └─ snap-8453071282482784430-1-6fc7451d-b371-4e3f-ae9b-3e47cd8bb741.avro

$ parquet meta nyc/taxis/data/00000-18-710d719a-8b80-456e-aa0c-d392d0aa8c54-0-00001.parquet
...
Row group 0:  count: 1  766.00 B records  start: 4  total(compressed): 766 B total(uncompressed):595 B
...

Since this will continue to deteriorate, you can compact the files by calling the rewrite_data_files procedure. Additionally, for metadata, you can enable write.metadata.delete-after-commit.enabled to remove old files.

spark.sql("""
CALL system.rewrite_data_files(
  table => 'nyc.taxis'
)""")

Time travel

You can check when snapshots were created from (table name).snapshots, and you can time travel using TIMESTAMP AS OF.

spark.sql("SELECT * FROM nyc.taxis.snapshots").show()

'''
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2025-01-25 05:48:...|6618805095317987444|               NULL|   append|s3://warehouse/ny...|{spark.app.id -> ...|
|2025-01-25 07:54:...|8453071282482784430|6618805095317987444|overwrite|s3://warehouse/ny...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
'''

spark.sql("SELECT count(fare_amount) FROM nyc.taxis TIMESTAMP AS OF '2025-01-25 06:00:00' LIMIT 5").show() # 2171187

spark.sql("SELECT count(fare) FROM nyc.taxis TIMESTAMP AS OF '2025-01-25 08:00:00' LIMIT 5").show() # 1239939

Reference

near real time で更新される Apache Iceberg の table のメンテナンス

AWS 規範ガイダンス - AWS での Apache Iceberg の使用