Treat Spark struct as map to expand to multiple rows with explode

sparkpython

When you read data without specifying schema in Spark, the schema is automatically determined from the input as follows.

# {"aaa":123,"ccc":[123],"eee":{"fff":123},"hhh":null}
df = spark.read.json("s3://hogefuga/testjson/")
df.printSchema()
'''
root
 |-- aaa: long (nullable = true)
 |-- ccc: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- eee: struct (nullable = true)
 |    |-- fff: long (nullable = true)
 |-- hhh: string (nullable = true)
'''

This works well in most cases, but if the field that assumes map is determined as struct, or if the field is determined as string as it contains only null, processings may fail by mismatch of function inputs. explode() is a function that expands array and map into multiple rows.

df.createOrReplaceTempView("tbl")
print(spark.sql("SELECT v FROM tbl LATERAL VIEW explode(ccc) as v").head()) # Row(v=123)

print(spark.sql("SELECT k, v FROM tbl LATERAL VIEW explode(eee) as k, v").head()) 
# cannot resolve 'explode(tbl.`eee`)' due to data type mismatch: input to function explode should be array or map type, not struct<fff:bigint>

print(spark.sql("SELECT v FROM tbl LATERAL VIEW explode(CAST(eee AS map<string,long>)) as v").head()) 
# cannot resolve 'tbl.`eee`' due to data type mismatch: cannot cast struct<fff:bigint> to map<string,bigint>

You can explicitly pass the schema to the DataFrameReader.

from pyspark.sql.types import StructType, StructField, LongType, ArrayType, MapType, StringType, LongType
schema = StructType([
    StructField("aaa", LongType()),
    StructField("ccc", ArrayType(LongType())),
    StructField("eee", MapType(StringType(), LongType())),
    StructField("hhh", LongType()),
])
df2 = spark.read.schema(schema).json("s3://hogefuga/testjson/")
df2.printSchema()
'''
root
 |-- aaa: long (nullable = true)
 |-- ccc: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- eee: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)
 |-- hhh: long (nullable = true)
 '''

df2.createOrReplaceTempView("tbl2")
print(spark.sql("SELECT k, v FROM tbl2 LATERAL VIEW explode(eee) as k, v").head()) # Row(k='fff', v=123)

But if you call createDataFrame() with explicit schema and the existing rdd, the conversion to map fails like cast().

spark.createDataFrame(spark.read.json("s3://hogefuga/testjson/").rdd, schema)
# TypeError: field eee: MapType(StringType,LongType,true) can not accept object Row(fff=123) in type <class 'pyspark.sql.types.Row'>

If the data structure is large and it is troublesome to specify all schemas, there is also a way of converting by Row.asDict() in UDF (User Defined Function).

from pyspark.sql import functions as F
struct_to_string_long_map = F.udf(lambda row: row.asDict()), MapType(StringType(), LongType()))
spark.udf.register("struct_to_string_long_map", struct_to_string_long_map)
print(spark.sql("SELECT k, v FROM tbl LATERAL VIEW explode(struct_to_string_long_map(eee)) as k, v").head()) # Row(k='fff', v=123)

References

Spark DataFrameのschemaにまつわる罠とschemaの補正 | つかびーの技術日記

PySpark の UDF (User Defined Function) を試す - CUBE SUGAR CONTAINER