Register Iceberg Tables in Glue Data Catalog to query from Athena and Snowflake
awsicebergsparksnowflakeIf you register Iceberg tables in the Glue Data Catalog, you can not only reference them from Athena and EMR etc. like other tables, but you can also create an ICEBERG TABLE in Snowflake without specifying the schema, avoiding duplicate metadata management. It even has a feature for automatic compaction and deleting orphan files that are created when a job fails and not referenced from metadata. With traditional Hive partitions, partition information is registered in the Glue Data Catalog and retrieved via the GetPartitions API, which can cause latency and throttling issues when there are many partitions. Iceberg stores partition information in manifest files on S3, avoiding this problem.
This article I create tables and other resources using Athena for Spark. When you select Iceberg as the table format, settings like catalog-impl are configured. IcebergSparkSessionExtensions is a module to CALL procedures from SQL.
By the way, when I ran it on EMR on EKS, I got ClassNotFoundException: org.apache.iceberg.spark.SparkCatalog even though Iceberg is available with the version, emr-7.0.0, I could confirm that it works with the latest version.
Launch an EKS cluster and register it to EMR on EKS with CDK to run Spark jobs - sambaiz-net
Creating or updating databases and Iceberg tables from Spark, it is reflected in the Glue Data Catalog as well. You need glue and S3 policies.
spark.sql("CREATE DATABASE iceberg_test LOCATION 's3://*****/iceberg_test'")
spark.sql("CREATE TABLE iceberg_test.test_table (id bigint, data string) USING iceberg")
Besides, you can also register Iceberg tables in the Glue Data Catalog in CDK (CloudFormation) by creating an EXTERNAL_TABLE with openTableFormatInput.icebergInput, but there are issues where it is broken as an Iceberg table when you update the schema, and it also doesn’t support partitioning, so it’s not very practical at the moment.
const database = new glue.CfnDatabase(this, "Database", {
catalogId: this.account,
databaseInput: {
name: databaseName,
}
})
new glue.CfnTable(this, "Table", {
databaseName: databaseName,
catalogId: this.account,
tableInput: {
name: "test_table",
storageDescriptor: {
columns: [
{ name: "id", type: "int" },
{ name: "data", type: "string" },
],
location: `s3://*****/iceberg_test`,
},
tableType: 'EXTERNAL_TABLE',
},
openTableFormatInput: {
icebergInput: {
metadataOperation: "CREATE",
version: "2",
}
}
})
metadata_location written and updated as the a table property is referred for optimistic locking, which detects concurrent updates to the table and fails the commits. Previously, DynamoLockManager was used.
By the way, if you set write.spark.accept-any-schema = true in TBLPROPERTIES and write with .option(“mergeSchema”,“true”), Schema Merge will be performed and columns will be added according to the data, and in this case too, it will be reflected in the catalog.
spark.sql("""
CREATE TABLE iceberg_test.test_table
USING ICEBERG
TBLPROPERTIES ('write.spark.accept-any-schema'='true');
""")
df.writeTo("iceberg_test.test_table") \
.option("mergeSchema", "true") \
.append()
By this, you can CREATE TABLE without schema, but the columns haven’t existed yet, so you have to ADD PARTITION FIELD later.
spark.sql("ALTER TABLE iceberg_test.test_table2 ADD PARTITION FIELD aaa")
spark.sql("ALTER TABLE iceberg_test.test_table2 ADD PARTITION FIELD bbb")
spark.sql("DESCRIBE TABLE iceberg_test.test_table2")
When trying to add new columns in the middle, you’ll get an error indicating that the columns are not in order. You can bypass this check by setting spark.sql.iceberg.check-ordering = false. This check is designed to prevent writing data to incorrect positions when the input data and table definition orders differ. However, in the PR that added this setting, it was also modified to take field positions from the input data schema rather than the table, so I think it is no problem to set it to false and actually I did it.
Exception in thread "main" java.lang.IllegalArgumentException: Cannot write incompatible dataset to table with schema:
table {
1: id: optional string
2: age: optional int
3: name: optional string
}
Provided schema:
table {
1: id: optional string
3: name: optional string
2: age: optional int
}
Problems:
* name is out of order, before age
at org.apache.iceberg.types.TypeUtil.checkSchemaCompatibility(TypeUtil.java:489)
...
Inserting the data,
df = spark.createDataFrame([
(1, "aaaa"),
(2, "bbbb"),
(3, "cccc")
], ["id", "data"])
df.writeTo("iceberg_test.test_table").append()
check that we can access the table from Athena.
In Snowflake, create an EXTERNAL VOLUME for S3 and a CATALOG INTEGRATION for Glue, then pass them to the ICEBERG TABLE. Similar to STORAGE INTEGRATION, you need to obtain and configure the access user and External ID for the role.
Copy and Query S3 Data in Snowflake - sambaiz-net
CREATE OR REPLACE EXTERNAL VOLUME ICEBERG_EXTERNAL_VOLUME
STORAGE_LOCATIONS =
(
(
NAME = 'iceberg_test'
STORAGE_PROVIDER = 'S3'
STORAGE_BASE_URL = 's3://*****/iceberg_test/'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::*****:role/*****'
)
);
DESC EXTERNAL VOLUME ICEBERG_EXTERNAL_VOLUME;
CREATE CATALOG INTEGRATION GLUE_CATALOG_INTEGRATION
CATALOG_SOURCE = GLUE
CATALOG_NAMESPACE = 'iceberg_test'
TABLE_FORMAT = ICEBERG
GLUE_AWS_ROLE_ARN = 'arn:aws:iam::*****:role/*****'
GLUE_CATALOG_ID = '(aws account id)'
GLUE_REGION = 'ap-northeast-1'
ENABLED = TRUE
REFRESH_INTERVAL_SECONDS = 60;
DESC CATALOG INTEGRATION GLUE_CATALOG_INTEGRATION;
CREATE DATABASE TESTDB;
USE TESTDB;
CREATE SCHEMA TEST_SCHEMA;
CREATE ICEBERG TABLE TEST_SCHEMA.TEST_TABLE
EXTERNAL_VOLUME = ICEBERG_EXTERNAL_VOLUME
CATALOG = GLUE_CATALOG_INTEGRATION
CATALOG_TABLE_NAME = 'test_table'
CATALOG_NAMESPACE = 'iceberg_test'
AUTO_REFRESH = TRUE;
Terraform only has a snowflake_external_volume for preview now, and other resources are not yet available.
Now you can access it from Snowflake as well.
If you can’t get the latest data, check whether AUTO_REFRESH is disabled or failed, or whether caching is enabled.
SELECT SYSTEM$AUTO_REFRESH_STATUS(TEST_TABLE);
ALTER SESSION SET USE_CACHED_RESULT=FALSE;
By the way, you can convert tables to reference Snowflake’s catalog instead of external catalogs like Glue. By this, it can be used as a source for DYNAMIC ICEBERG TABLE which updates data through periodic query execution.
ALTER ICEBERG TABLE TEST_TABLE CONVERT TO MANAGED
BASE_LOCATION = 'snowflake/test_table';
CREATE DYNAMIC ICEBERG TABLE TEST_TABLE_ID (id bigint)
TARGET_LAG = '20 minutes'
WAREHOUSE = ***
EXTERNAL_VOLUME = ICEBERG_EXTERNAL_VOLUME
CATALOG = 'SNOWFLAKE'
BASE_LOCATION = 'snowflake/test_table_id'
AS
SELECT id FROM test_table;
After conversion, the table is written to the external volume’s (STORAGE_BASE_URL)/BASE_LOCATION and Snowflake will perform maintenance operations like compaction. This means it becomes a different table thereafter, so to keep up with the Glue table, you need to periodically REPLACE and CONVERT the table.