Spark SQLのJOIN時に余分なパーティションが読まれる例とDynamic Partition Pruning (DPP)

sparkaws

クエリの例

Spark 3.1.1 ベースの Glue 3.0 で、TPC-DS (scale=1000) の sales テーブルと sales のIDでパーティションを切った store_sales テーブルを JOIN する次のクエリを実行すると 10 DPS で 1分4秒 かかった。

GlueのTPC-DS Connectorでデータを生成する - sambaiz-net

spark.sql("""
    SELECT * 
    FROM (SELECT s_store_sk FROM tcpds_parquet.store WHERE s_number_employees > 290) store
    JOIN tcpds_parquet.store_sales_partitioned
    ON (store_sales_partitioned.ss_store_sk = store.s_store_sk)
""").count()

一方、store テーブルのIDを文字列にした次のクエリは 2分3秒 と倍の時間がかかってしまう。 なお、このような文字列と数値の比較を行うと 1 = CAST(1 AS INT) のように自動でキャストされるのでエラーにならない。

spark.sql("""
    SELECT * 
    FROM (SELECT CAST(s_store_sk AS STRING) as s_store_sk FROM tcpds_parquet.store WHERE s_number_employees > 290) store
    JOIN tcpds_parquet.store_sales_partitioned
    ON (store_sales_partitioned.ss_store_sk = store.s_store_sk)
""").count()

不要なデータが読まれているかの確認

History ServerでSQLの plan を確認すると、いずれのクエリでもサイズの小さな sales テーブルは Broadcast されているが、 store_sales テーブルについては前者が必要なパーティションのみを読んでいるのに対して後者は全て読んでしまっている。

SparkのWeb UIでJobのStageとExecutorによるTask分散、SQLのplanを確認する - sambaiz-net

Apache SparkのRDD, DataFrame, DataSetとAction, Transformation - sambaiz-net

また、S3 の Server access logging を有効にして出力されたログに Athena で次のクエリを実行することでも後者は余分なパーティションを参照していることがわかる。

SELECT 
    method, 
    ss_store_sk, 
    s_store_sk IS NOT NULL as output_contains_store, 
    call 
FROM (
    SELECT 
        method, 
        CAST(ss_store_sk AS INT) as ss_store_sk, 
        count(1) as call
    FROM (
        SELECT 
            REGEXP_EXTRACT(text, 'REST\.(\w*)\.OBJECT', 1) as method,
            REGEXP_EXTRACT(text, '/tcpds_parquet/store_sales-partitioned/ss_store_sk%3D(\d*)', 1) as ss_store_sk
        FROM "s3-access-log"
    )
    WHERE ss_store_sk IS NOT NULL
    GROUP BY method, ss_store_sk
) log 
LEFT JOIN (SELECT s_store_sk FROM tcpds_parquet.store WHERE s_number_employees > 290) store
ON (log.ss_store_sk = store.s_store_sk)

Dynamic Partition Pruning (DPP)

前者のクエリの plan を見ると Spark 3.0 で追加された Dynamic Partition Pruning(DPP)が働いていることが確認できる。 これは Broadcast されたデータを見て余分なパーティションをフィルタしてくれる機能。 デフォルトで有効になっており無効にすると全てのパーティションが読まれるようになる。

with SparkSession.builder.config("spark.sql.optimizer.dynamicPartitionPruning.enabled", "false").getOrCreate() as spark:
    spark.sql("""
        SELECT * 
        FROM (SELECT s_store_sk FROM tcpds_parquet.store WHERE s_number_employees > 290) store
        JOIN tcpds_parquet.store_sales_partitioned
        ON (store_sales_partitioned.ss_store_sk = store.s_store_sk)
    """).count()

ただし次のように読むべきパーティションの条件が与えられている場合は通常の Partition Pruning が行われる。

with SparkSession.builder.config("spark.sql.optimizer.dynamicPartitionPruning.enabled", "false").getOrCreate() as spark:
    spark.sql("""
        SELECT * 
        FROM tcpds_parquet.store_sales_partitioned
        WHERE ss_store_sk > 500
    """).count()

なまじ型が異なっていても動いてしまうため、クエリを見るだけでは DPP が働かないことに気づくのは難しいが、 集計に時間がかかっている場合は plan を見てみると思いのほか高速化に役立つことがある。

参考

Apache Spark 3.0 DPP. Introduction of Apache Spark 3.X… | by Aparup Chatterjee | Medium

Dynamic Partition Pruning in Apache Spark – Databricks