Why can Athena v2 fail to query map columns in parquet source tables

awsserializeetl

Output logs containing map fields as json etc., convert it to parquet with Glue Studio, and execute queries with Athena, then the queries can succeed or fail depending on the table.

Columnar format Parquet structure and read optimization - sambaiz-net

type Log struct {
    A map[string]int
}
=> {"A":{"B":10,"C":20}}

The parquet metadata is as follows and the information about map is lost.

$ parquet meta test.parquet
File path:  test.parquet
Created by: parquet-glue version 1.8.2
Properties:
  org.apache.spark.sql.parquet.row.metadata: {"type":"struct","fields":[{"name":"A","type":{"type":"struct","fields":[{"name":"B","type":"integer","nullable":true,"metadata":{}},{"name":"C","type":"integer","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]}
Schema:
message glue_schema {
  optional group A {
    optional int32 B;
    optional int32 C;
  }
}


Row group 0:  count: 1  116.00 B records  start: 4  total(compressed): 116 B total(uncompressed):112 B 
--------------------------------------------------------------------------------
     type      encodings count     avg size   nulls   min / max
A.B  INT32     S   _     1         58.00 B    0       "10" / "10"
A.C  INT32     S   _     1         58.00 B    0       "20" / "20"

If set this column type to map, I thought queries always fail, but if both key and value’s type are set to value’s one, queries themselves succeed.

select * from test
-- struct<B:int,C:int> => {b=10, c=20}
-- map<string,int> => HIVE_BAD_DATA: Field b's type INT32 in parquet file s3://***/test.parquet is incompatible with type varchar defined in table schema
-- map<int,int> => {10=20}

Running Presto on EMR

Run Presto on EMR and try to reproduce the error. Set Glue Data Catalog as Hive metastore with –configurations.

Launch an EMR cluster with AWS CLI and run Spark applications - sambaiz-net

$ aws emr create-cluster \
  --release-label emr-6.3.1 \
  --applications Name=Presto \
  --instance-type m6g.xlarge \
  --instance-count 1 \
  --use-default-roles \
  --enable-debugging \
  --log-uri s3n://<bucket>/emrlogs/ \
  --name presto_test \
  --bootstrap-actions Path=s3://<bucket>/install-ssm-agent.sh,Name=insatall-ssm-agent \
  --configurations '[
      {
        "Classification": "presto-connector-hive",
        "Properties": {
          "hive.metastore": "glue"
        }
      }
    ]'

Went inside and ran the same query, but it failed.

$ aws ssm start-session --target <instance_id>
$ presto-cli
presto> select * from awsdatacatalog.default.test;
Query 20220814_125728_00018_5vv6m failed: The column a of table default.test is declared as type map<int,int>, but the Parquet file (s3://***/test.parquet) declares the column as type optional group a {
  optional int32 b;
  optional int32 c;
}

Looking at the code the error occurs, type check implemented in Presto v0.226 seems not to be contained in Athena v2 that is based on Presto v0.217.

Parquet generation with the explicit schema

Anyway, it doesn’t return the expected result, so generate the parquet with schema that is the same as the table explicitly. mode=‘PERMISSIVE’ which is the default parse mode, sets values of mismatched type fields to null, while FAILFAST raises an exception.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, MapType, IntegerType, StringType

with SparkSession.builder.appName("json_to_parquet").getOrCreate() as spark:
  schema = spark.table("default.test").schema # StructType(List(StructField(a,MapType(StringType,IntegerType,true),true)))
  schema[0].name = schema[0].name.upper()
  df = spark.read.schema(schema).json("s3://<bucket>/test.json", mode='FAILFAST')
  df.write.parquet("s3://<bucket>/", compression="gzip")

As a result, the following parquet that contains key and value fields, is output and can be accessed as map<string,int>.

$ parquet meta test.parquet 

File path:  test.parquet
Created by: parquet-mr version 1.10.1 (build 65f31597b18a0f2718a129fd2d69af0168952c55)
Properties:
                   org.apache.spark.version: 3.1.1
  org.apache.spark.sql.parquet.row.metadata: {"type":"struct","fields":[{"name":"A","type":{"type":"map","keyType":"string","valueType":"integer","valueContainsNull":true},"nullable":true,"metadata":{}}]}
Schema:
message spark_schema {
  optional group A (MAP) {
    repeated group key_value {
      required binary key (STRING);
      optional int32 value;
    }
  }
}


Row group 0:  count: 1  144.00 B records  start: 4  total: 144 B
--------------------------------------------------------------------------------
                   type      encodings count     avg size   nulls   min / max
A.key_value.key    BINARY    G   _     2         32.00 B    0       "B" / "C"
A.key_value.value  INT32     G   _     2         40.00 B    0       "10" / "20"

By the way, if the key in the code is int, so the schema’s type is set to map<int,int> as well, reading json fails with “org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Integer” error.

type Log struct {
    A map[int]string
}
=> {"A":{"10":"aaa","20":"bbb"}}