Athena v2でparquetをソースとしmapフィールドを持つテーブルのクエリが成功したり失敗したりする原因

awsserializeetl

次のようなmapのフィールドを含むログをjsonなどで出力し、Glue Studioでparquetに変換してAthenaでクエリを実行すると、ものによってクエリが成功したり失敗したりする。

カラムナフォーマットParquetの構造とReadの最適化 - sambaiz-net

type Log struct {
    A map[string]int
}
=> {"A":{"B":10,"C":20}}

parquetのmetadataは次のようになっておりmapという情報は失われている。

$ parquet meta test.parquet
File path:  test.parquet
Created by: parquet-glue version 1.8.2
Properties:
  org.apache.spark.sql.parquet.row.metadata: {"type":"struct","fields":[{"name":"A","type":{"type":"struct","fields":[{"name":"B","type":"integer","nullable":true,"metadata":{}},{"name":"C","type":"integer","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}
Schema:
message glue_schema {
  optional group A {
    optional int32 B;
    optional int32 C;
  }
}


Row group 0:  count: 1  116.00 B records  start: 4  total(compressed): 116 B total(uncompressed):112 B 
--------------------------------------------------------------------------------
     type      encodings count     avg size   nulls   min / max
A.B  INT32     S   _     1         58.00 B    0       "10" / "10"
A.C  INT32     S   _     1         58.00 B    0       "20" / "20"

これに対してカラムの型をmapにすると必ず失敗すると思いきや、key,valueをいずれも値の型にしたときにクエリ自体は成功してしまう。

select * from test
-- struct<B:int,C:int> => {b=10, c=20}
-- map<string,int> => HIVE_BAD_DATA: Field b's type INT32 in parquet file s3://***/test.parquet is incompatible with type varchar defined in table schema
-- map<int,int> => {10=20}

EMRでのPrestoの実行

EMRでPrestoを動かしこれを再現させてみる。 –configurations で Glue Data Catalog を Hive metastore に指定している。

AWS CLIでEMRクラスタを立ち上げSparkのアプリケーションを実行する - sambaiz-net

$ aws emr create-cluster \
  --release-label emr-6.3.1 \
  --applications Name=Presto \
  --instance-type m6g.xlarge \
  --instance-count 1 \
  --use-default-roles \
  --enable-debugging \
  --log-uri s3n://<bucket>/emrlogs/ \
  --name presto_test \
  --bootstrap-actions Path=s3://<bucket>/install-ssm-agent.sh,Name=insatall-ssm-agent \
  --configurations '[
      {
        "Classification": "presto-connector-hive",
        "Properties": {
          "hive.metastore": "glue"
        }
      }
    ]'

中に入って同じクエリを実行したがこちらは真っ当にクエリが失敗してしまった。

$ aws ssm start-session --target <instance_id>
$ presto-cli
presto> select * from awsdatacatalog.default.test;
Query 20220814_125728_00018_5vv6m failed: The column a of table default.test is declared as type map<int,int>, but the Parquet file (s3://***/test.parquet) declares the column as type optional group a {
  optional int32 b;
  optional int32 c;
}

エラーを出しているコードを見てみると Presto v0.226 で型のチェックが入ったようで、 v0.217 ベースのAthena v2には反映されていないようだ。

スキーマを明示した parquet の生成

いずれにせよ期待する結果は返らないのでテーブルのスキーマを明示的に渡して parquet を生成する。 デフォルトの mode=‘PERMISSIVE’ だと 型が一致しないフィールドの値は null になるが、FAILFAST だとエラーになる。

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, MapType, IntegerType, StringType

with SparkSession.builder.appName("json_to_parquet").getOrCreate() as spark:
  schema = spark.table("default.test").schema # StructType(List(StructField(a,MapType(StringType,IntegerType,true),true)))
  schema[0].name = schema[0].name.upper()
  df = spark.read.schema(schema).json("s3://<bucket>/test.json", mode='FAILFAST')
  df.write.parquet("s3://<bucket>/", compression="gzip")

すると次のような key, value のフィールドを持つ parquet が出力され、map<string,int> としてアクセスできるようになった。

$ parquet meta test.parquet 

File path:  test.parquet
Created by: parquet-mr version 1.10.1 (build 65f31597b18a0f2718a129fd2d69af0168952c55)
Properties:
                   org.apache.spark.version: 3.1.1
  org.apache.spark.sql.parquet.row.metadata: {"type":"struct","fields":[{"name":"A","type":{"type":"map","keyType":"string","valueType":"integer","valueContainsNull":true},"nullable":true,"metadata":{}}]}
Schema:
message spark_schema {
  optional group A (MAP) {
    repeated group key_value {
      required binary key (STRING);
      optional int32 value;
    }
  }
}


Row group 0:  count: 1  144.00 B records  start: 4  total: 144 B
--------------------------------------------------------------------------------
                   type      encodings count     avg size   nulls   min / max
A.key_value.key    BINARY    G   _     2         32.00 B    0       "B" / "C"
A.key_value.value  INT32     G   _     2         40.00 B    0       "10" / "20"

ちなみにコード上の key が int だからといってスキーマの型を map<int,int> にすると json の読み込みが org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer で失敗する。

type Log struct {
    A map[int]string
}
=> {"A":{"10":"aaa","20":"bbb"}}