AWS GlueのJobのBookmarkを有効にして前回の続きから処理を行う

awspythonspark

GlueのJobのBookmarkは どこまで処理したかを記録し、次回はその続きから実行できるようにする機能。 1.0以前は対応していなかったParquetやORCも今は対応している。

AWS GlueでCSVを加工しParquetに変換してパーティションを切りAthenaで参照する - sambaiz-net

Job bookamarkをEnableにして、DynamicFrameのメソッドを呼ぶ際にtranscation_ctxを渡し、job.commit()するとBookmarkされる。

Bookmarkの設定

例えば、S3のjsonをソースとするテーブルをカウントして出力する次のjobは、Bookmarkを有効にすると既にカウントしたものが読まれなくなるため、 トータルの件数ではなく前回との増分が出力される。 S3の結果整合性による、参照できるようになるまでのラグも考慮されていて、 単純に保存時間のみによって対象を選ぶのではなく、リストを持って対象の時間の少し前から見るようになっている。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
source = glueContext.create_dynamic_frame.from_catalog(
    database="test", table_name="test", transformation_ctx="source")
source.toDF().createOrReplaceTempView("t")
df = spark.sql("SELECT COUNT(1) as cnt FROM t")
df.write.mode("append").csv(
    "s3://*****/test-table-summary", compression="gzip")
job.commit()

ただ、前回との差分がなくDynamicFrameの中身が空の場合、DataFrameのスキーマも空となってSQLでフィールドを参照するとcannnot resolve *** given input columnsになる問題がある。 これを回避するために if df.count() == 0 でチェックしたりしている。