S3 Tables は re:Invent 2024 で発表された Iceberg テーブルに特化したストレージ。併せて発表されたオブジェクトの最終変更日時などのメタデータをクエリできる S3 Metadata も S3 Tables へ書き込む。
Spark で Iceberg テーブルを作成しスキーマや write mode を変更してデータを書き込みメタデータの内容を確認する - sambaiz-net
料金は東京リージョンで 0.0288 USD/GB (~50TB/月) と、S3 標準の USD 0.025/GB と比べて 10% ほど高くなっており、これに加えてオブジェクト数に対してのモニタリングコストも発生するほか、現状ストレージクラスがないため、ライフサイクルで古いデータを安いティアに変更したり、しばらくアクセスされていないオブジェクトのティアと自動で変更する Intelligent-Tiering を適用することができない。なお、ライフサイクル移行リクエストに $0.01/1000オブジェクト、Intelligent-Tiering のモニタリングに $0.0025/1000オブジェクト程度のコストが発生することは考慮する必要がある。
Table bucket を作成する。Athena など分析系のサービスとのインテグレーションを有効にすると Lake Formation 用の Role と s3tablescatalog という名前のカタログが作られ、それらのサービスからクエリできるようになる。
CDK で Glue Data Catalog 上のテーブルに Lake Formation による行やカラムレベルでのアクセス制限をかける - sambaiz-net
S3 Tables にテーブルを作ると自動でカタログに反映される。
EMR Serverless の最新バージョン emr-7.7.0 にもまだ S3Tables の実装が含まれていなかったので spark.jars で渡すことにした。なお、Studio の作成時に現状デフォルトで選ばれる emr-6.15.0 では AWS SDK のNoSuchFieldError が発生した。また S3 Tables の API を呼ぶためにネットワーク設定が必要。
EMR Studio の Jupyter Notebook から EMR Serverless で Spark の MLlib を動かす - sambaiz-net
$ cat build.gradle
plugins {
id 'java'
}
repositories {
mavenCentral()
}
dependencies {
implementation 'software.amazon.awssdk:s3tables:2.29.26'
implementation 'software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.5'
}
task copyDependencies(type: Copy) {
from configurations.runtimeClasspath
into "$buildDir/dependencyJars"
}
$ gradle copyDependencies
$ aws s3 cp --recursive build/dependencyJars s3://(general_purpose_bucket)/jars/
catalog-impl に S3TablesCatalog を、warehouse に Table bucket を指定する。
%%configure -f
{
"conf": {
"spark.jars": "s3a://(general_purpose_bucket)/jars/*",
"spark.sql.defaultCatalog": "s3tables_test",
"spark.sql.catalog.s3tables_test": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.s3tables_test.catalog-impl": "software.amazon.s3tables.iceberg.S3TablesCatalog",
"spark.sql.catalog.s3tables_test.warehouse": "arn:aws:s3tables:ap-northeast-1:(account_id):bucket/(table_bucket)"
}
}
Namespace と Table を作成してデータを入れる。
spark.sql("CREATE NAMESPACE IF NOT EXISTS test_namespace")
spark.sql("CREATE TABLE IF NOT EXISTS test_namespace.test_table (id bigint, data string) USING iceberg")
df = spark.createDataFrame([
(1, "aaaa"),
(2, "bbbb"),
(3, "cccc")
], ["id", "data"])
df.writeTo("test_namespace.test_table").append()
Lake Formation でユーザーや Role にテーブルにアクセスする権限を与える。Administrator であっても権限がないと Insufficient permissions to execute the query エラーになる。
カタログを選択して Athena からクエリする。