クエリの例
Spark 3.1.1 ベースの Glue 3.0 で、TPC-DS (scale=1000) の sales テーブルと sales のIDでパーティションを切った store_sales テーブルを JOIN する次のクエリを実行すると 10 DPS で 1分4秒 かかった。
GlueのTPC-DS Connectorでデータを生成する - sambaiz-net
spark.sql("""
SELECT *
FROM (SELECT s_store_sk FROM tcpds_parquet.store WHERE s_number_employees > 290) store
JOIN tcpds_parquet.store_sales_partitioned
ON (store_sales_partitioned.ss_store_sk = store.s_store_sk)
""").count()
一方、store テーブルのIDを文字列にした次のクエリは 2分3秒 と倍の時間がかかってしまう。 なお、このような文字列と数値の比較を行うと 1 = CAST(1 AS INT) のように自動でキャストされるのでエラーにならない。
spark.sql("""
SELECT *
FROM (SELECT CAST(s_store_sk AS STRING) as s_store_sk FROM tcpds_parquet.store WHERE s_number_employees > 290) store
JOIN tcpds_parquet.store_sales_partitioned
ON (store_sales_partitioned.ss_store_sk = store.s_store_sk)
""").count()
不要なデータが読まれているかの確認
History ServerでSQLの plan を確認すると、いずれのクエリでもサイズの小さな sales テーブルは Broadcast されているが、 store_sales テーブルについては前者が必要なパーティションのみを読んでいるのに対して後者は全て読んでしまっている。
SparkのWeb UIでJobのStageとExecutorによるTask分散、SQLのplanを確認する - sambaiz-net
Apache SparkのRDD, DataFrame, DataSetとAction, Transformation - sambaiz-net
また、S3 の Server access logging を有効にして出力されたログに Athena で次のクエリを実行することでも後者は余分なパーティションを参照していることがわかる。
SELECT
method,
ss_store_sk,
s_store_sk IS NOT NULL as output_contains_store,
call
FROM (
SELECT
method,
CAST(ss_store_sk AS INT) as ss_store_sk,
count(1) as call
FROM (
SELECT
REGEXP_EXTRACT(text, 'REST\.(\w*)\.OBJECT', 1) as method,
REGEXP_EXTRACT(text, '/tcpds_parquet/store_sales-partitioned/ss_store_sk%3D(\d*)', 1) as ss_store_sk
FROM "s3-access-log"
)
WHERE ss_store_sk IS NOT NULL
GROUP BY method, ss_store_sk
) log
LEFT JOIN (SELECT s_store_sk FROM tcpds_parquet.store WHERE s_number_employees > 290) store
ON (log.ss_store_sk = store.s_store_sk)
Dynamic Partition Pruning (DPP)
前者のクエリの plan を見ると Spark 3.0 で追加された Dynamic Partition Pruning(DPP)が働いていることが確認できる。 これは Broadcast されたデータを見て余分なパーティションをフィルタしてくれる機能。 デフォルトで有効になっており無効にすると全てのパーティションが読まれるようになる。
with SparkSession.builder.config("spark.sql.optimizer.dynamicPartitionPruning.enabled", "false").getOrCreate() as spark:
spark.sql("""
SELECT *
FROM (SELECT s_store_sk FROM tcpds_parquet.store WHERE s_number_employees > 290) store
JOIN tcpds_parquet.store_sales_partitioned
ON (store_sales_partitioned.ss_store_sk = store.s_store_sk)
""").count()
ただし次のように読むべきパーティションの条件が与えられている場合は通常の Partition Pruning が行われる。
with SparkSession.builder.config("spark.sql.optimizer.dynamicPartitionPruning.enabled", "false").getOrCreate() as spark:
spark.sql("""
SELECT *
FROM tcpds_parquet.store_sales_partitioned
WHERE ss_store_sk > 500
""").count()
なまじ型が異なっていても動いてしまうため、クエリを見るだけでは DPP が働かないことに気づくのは難しいが、 集計に時間がかかっている場合は plan を見てみると思いのほか高速化に役立つことがある。
参考
Apache Spark 3.0 DPP. Introduction of Apache Spark 3.X… | by Aparup Chatterjee | Medium