Enable Job Bookmark of AWS Glue to process from the records following ones executed previously

awspythonspark

Job Bookmark of AWS Glue is a feature that saves what records are processed, and prevent it from being executed next time. Parquet and ORC, which were not supported before 1.0, are now supported.

AWS GlueでCSVを加工しParquetに変換してパーティションを切りAthenaで参照する - sambaiz-net

Bookmark is available if Job Bookmark is enabled, call DynamicFrame methods with a transaction_ctx, and call job.commit().

For example, following job that counts a table and outputs it doesn’t count records previously counted if Bookmark is available, so it outputs not total but difference count from the previous time. The lag before it can be referenced due to S3 eventual consistency is also taken into account. Instead of simply selecting the target based on the last execution time alone, Glue looks at data from a little before the last execution time.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
source = glueContext.create_dynamic_frame.from_catalog(
    database="test", table_name="test", transformation_ctx="source")
source.toDF().createOrReplaceTempView("t")
df = spark.sql("SELECT COUNT(1) as cnt FROM t")
df.write.mode("append").csv(
    "s3://*****/test-table-summary", compression="gzip")
job.commit()

If there is no differences from the previous time so that the contents of DynamicFrame are empty, the schema of DataFrame is also empty. Therefore if the field is referenced in SQL, “cannnot resolve *** given input columns” error is occured. To resolve this problem, a check like “if df.count() == 0” is needed.

Even if the data is partitioned by date and specified the date in the query, since it cannot be narrowed down in Bookmark, the following logs are output a lot and all partitions are searched.

INFO [Thread-5] hadoop.PartitionFilesListerUsingBookmark (FileSystemBookmark.scala:apply(356)): After initial job bookmarks filter, processing 0.00% of 592 files in partition DynamicFramePartition([email protected],s3://hogefuga/log/2021/09/02/11/,1630504916000).

As a result, it takes time from adding Executors to starting the Task on the timeline.

Spark Web UI: Monitor Job Stages, Tasks distribution and SQL plan - sambaiz-net

Passing the following parameter to from_catalog() will allow you to narrow down the target partition. additional_options is narrowed down when getPartition(), so it is efficient when the number of partitions is huge but the partition index is needed on the table, and it cannot be indexed if the partition key type is date. pushDownPredicate is narrowed down after getPartition(). These can also be used together.

pushDownPredicate="day>=10 and customer_id like '10%'",
additional_options={"catalogPartitionPredicate":"year='2021' and month='06'"}