AWS GlueのJobのBookmarkを有効にして前回の続きから処理を行う

awspythonsparketl

GlueのJobのBookmarkは どこまで処理したかを記録し、次回はその続きから実行できるようにする機能。 1.0以前は対応していなかったParquetやORCも今は対応している。

AWS GlueでCSVを加工しParquetに変換してパーティションを切りAthenaで参照する - sambaiz-net

Job bookamarkをEnableにして、DynamicFrameのメソッドを呼ぶ際に transcation_ctx を渡し、job.commit() するとBookmarkされる。

例えば、S3のjsonをソースとするテーブルをカウントして出力する次のjobは、Bookmarkを有効にすると既にカウントしたものが読まれなくなるため、 トータルの件数ではなく前回との増分が出力される。 S3の結果整合性による、参照できるようになるまでのラグも考慮されていて、 単純に保存時間のみによって対象を選ぶのではなく、リストを持って対象の時間の少し前から見るようになっている。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
source = glueContext.create_dynamic_frame.from_catalog(
    database="test", table_name="test", transformation_ctx="source")
source.toDF().createOrReplaceTempView("t")
df = spark.sql("SELECT COUNT(1) as cnt FROM t")
df.write.mode("append").csv(
    "s3://*****/test-table-summary", compression="gzip")
job.commit()

前回との差分がなくDynamicFrameの中身が空の場合、DataFrameのスキーマも空となってSQLでフィールドを参照すると cannnot resolve *** given input columns になる問題があって、これを回避するために if df.count() == 0 でチェックしたりする必要がある。

日付によるパーティションを切ってクエリで日付を指定しても、 Bookmarkの判定では絞られないので次のようなログが大量に出て全てのパーティションが探索される。

INFO [Thread-5] hadoop.PartitionFilesListerUsingBookmark (FileSystemBookmark.scala:apply(356)): After initial job bookmarks filter, processing 0.00% of 592 files in partition DynamicFramePartition(com.amazonaws.services.glue.DynamicRecord@218398ba,s3://hogefuga/log/2021/09/02/11/,1630504916000).

これによりタイムライン上ではExecutorが起動してからTaskの開始までに時間がかかっているように見える。

SparkのWeb UIでJobのStageとExecutorによるTask分散、SQLのplanを確認する - sambaiz-net

from_catalog() に次のパラメータを渡すと対象のパーティションを絞ることができるadditional_options は getPartition() する際に絞られるのでパーティションの数が膨大な場合に効率が良いが、テーブルにパーティションインデックスが貼られている必要があって、パーティションのキーをdateにしていたりすると貼ることができない。 pushDownPredicate の方は getPartition() 後に絞られる。これらは併用することもできる。

pushDownPredicate="day>=10 and customer_id like '10%'",
additional_options={"catalogPartitionPredicate":"year='2021' and month='06'"}