DuckDB の Hive パーティションプルーニングの仕組みをソースコードから理解する

database

DuckDB で Hive フォーマットのパーティション付きテーブルを読み込む際にどのようにプルーニングが行われるか確認する。

DuckDB の Go クライアントで Google Sheets のデータを SQL で取得する - sambaiz-net

$ duckdb --version
v1.4.4 (Andium) 6ddac802ff

hive_partitioning=true を指定すると指定したパーティションのファイルのみが読まれる。

$ duckdb
D COPY (SELECT i, i % 12 + 1 AS month FROM range(1000000) t(i))
  TO 'data' (FORMAT PARQUET, PARTITION_BY (month));
D EXPLAIN ANALYZE
  SELECT * FROM read_parquet('data/**/*.parquet', hive_partitioning=true)
  WHERE month = 1;

┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││              Total Time: 0.0034s             ││
│└──────────────────────────────────────────────┘│
└────────────────────────────────────────────────┘
...
┌───────────────────────────┐
│         TABLE_SCAN        │
│    ────────────────────   │
Function:         │
│        READ_PARQUET       │
│                           │
│       File Filters:       │
│        (month = 1)        │
│                           │
│    Scanning Files: 1/12│    Total Files Read: 1└───────────────────────────┘

DuckDB のクエリ実行では、まず Parser が SQL 文字列をパースし、Binder がカタログを引いてテーブルやカラムの参照先と型を確定する。そこからスキャンやフィルタといった操作からなる Logical Plan が作られ、Optimizer がフィルタをプッシュダウンしてスキャン時に行うようにするなどの効率化を行い、Physical Plan に変換されて実行される。プルーニングは Optimizer で行われるが、Join Filter Pushdown のように実行時に動的に適用されるものもある。

Binder は Parser が生成した AST を受け取りテーブル関数の引数を解析する。この際、HivePartitioning::Parse() がファイルパスから key=value のパーティション情報を抽出する。Optimizer のフィルタプッシュダウンではテーブル関数の pushdown_complex_filter コールバックが呼ばれ、その中で HivePartitioning::ApplyFiltersToFileList() がファイルごとにパスからパーティション値を取り出してフィルタ式 WHERE month = 1 のカラムを置換し 2 = 1 で false になればそのファイルを除外する。

JOIN を含むクエリでは、JOIN 条件にパーティションカラムが含まれていれば片方のテーブルのカラムのみでフィルタした場合でも両テーブルにプルーニングが効く。 これは FilterCombiner が値の等しいカラムをグルーピングして同様に扱うため。ON t1.month = t2.monthWHERE t1.month = 1 から、両テーブルに month = 1 が適用される。

D SELECT *
  FROM read_parquet('data1/**/*.parquet', hive_partitioning=true) t1
  JOIN read_parquet('data2/**/*.parquet', hive_partitioning=true) t2 ON t1.month = t2.month
  WHERE t1.month = 1;

サブクエリは Optimizer によって JOIN に変換され、実行時にサブクエリの結果から min/max を集計して範囲外のファイルをスキップする Join Filter Pushdown が適用される。なお BigQuery ではサブクエリのような動的な式では全パーティションがスキャンされる

D SELECT *
  FROM read_parquet('data/**/*.parquet', hive_partitioning=true) t1
  WHERE t1.month = (SELECT month FROM other_table WHERE key = 1);