When you read data without specifying schema in Spark, the schema is automatically determined from the input as follows.
Why can Athena v2 fail to query map columns in parquet source tables - sambaiz-net
# {"aaa":123,"ccc":[123],"eee":{"fff":123},"hhh":null}
df = spark.read.json("s3://hogefuga/testjson/")
df.printSchema()
'''
root
|-- aaa: long (nullable = true)
|-- ccc: array (nullable = true)
| |-- element: long (containsNull = true)
|-- eee: struct (nullable = true)
| |-- fff: long (nullable = true)
|-- hhh: string (nullable = true)
'''
This works well in most cases, but if the field that assumes map is determined as struct, or if the field is determined as string as it contains only null, processings may fail by mismatch of function inputs. explode() is a function that expands array and map into multiple rows.
df.createOrReplaceTempView("tbl")
print(spark.sql("SELECT v FROM tbl LATERAL VIEW explode(ccc) as v").head()) # Row(v=123)
print(spark.sql("SELECT k, v FROM tbl LATERAL VIEW explode(eee) as k, v").head())
# cannot resolve 'explode(tbl.`eee`)' due to data type mismatch: input to function explode should be array or map type, not struct<fff:bigint>
print(spark.sql("SELECT v FROM tbl LATERAL VIEW explode(CAST(eee AS map<string,long>)) as v").head())
# cannot resolve 'tbl.`eee`' due to data type mismatch: cannot cast struct<fff:bigint> to map<string,bigint>
You can explicitly pass the schema to the DataFrameReader.
from pyspark.sql.types import StructType, StructField, LongType, ArrayType, MapType, StringType, LongType
schema = StructType([
StructField("aaa", LongType()),
StructField("ccc", ArrayType(LongType())),
StructField("eee", MapType(StringType(), LongType())),
StructField("hhh", LongType()),
])
df2 = spark.read.schema(schema).json("s3://hogefuga/testjson/")
df2.printSchema()
'''
root
|-- aaa: long (nullable = true)
|-- ccc: array (nullable = true)
| |-- element: long (containsNull = true)
|-- eee: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = true)
|-- hhh: long (nullable = true)
'''
df2.createOrReplaceTempView("tbl2")
print(spark.sql("SELECT k, v FROM tbl2 LATERAL VIEW explode(eee) as k, v").head()) # Row(k='fff', v=123)
But if you createDataFrame() from an existing rdd with explicit schema, the conversion to map fails similar to cast().
spark.createDataFrame(spark.read.json("s3://hogefuga/testjson/").rdd, schema)
# TypeError: field eee: MapType(StringType,LongType,true) can not accept object Row(fff=123) in type <class 'pyspark.sql.types.Row'>
If it is troublesome to specify all schemas since the data structure is large, you can convert it by Row.asDict() in UDF (User Defined Function). However, performance is not good because it needs to launch Python workers and exchange data with JVM to execute the UDF.
from pyspark.sql import functions as F
struct_to_string_long_map = F.udf(lambda row: row.asDict()), MapType(StringType(), LongType()))
spark.udf.register("struct_to_string_long_map", struct_to_string_long_map)
print(spark.sql("SELECT k, v FROM tbl LATERAL VIEW explode(struct_to_string_long_map(eee)) as k, v").head()) # Row(k='fff', v=123)
References
Spark DataFrameのschemaにまつわる罠とschemaの補正 | つかびーの技術日記
PySpark の UDF (User Defined Function) を試す - CUBE SUGAR CONTAINER
AWS Glue ETL パフォーマンス・チューニング① 基礎知識編 AWS Black Belt Online Seminar