Sparkでstructをmapとして扱いexplodeで複数行に展開できるようにする

sparkpythonetl

Sparkでschemaを指定せずjsonなどを読み込むと次のように入力データから自動で決定される。

Athena v2でparquetをソースとしmapフィールドを持つテーブルのクエリが成功したり失敗したりする原因 - sambaiz-net

# {"aaa":123,"ccc":[123],"eee":{"fff":123},"hhh":null}
df = spark.read.json("s3://hogefuga/testjson/")
df.printSchema()
'''
root
 |-- aaa: long (nullable = true)
 |-- ccc: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- eee: struct (nullable = true)
 |    |-- fff: long (nullable = true)
 |-- hhh: string (nullable = true)
'''

これは大抵の場合うまくはたらくが、mapを想定したフィールドがstruct判定されたり、フィールドにnullしか含まれてないためにstring判定されたりすると、関数の入力に合わず処理に失敗することがある。explode()はarrayやmapを複数行に展開する関数。

df.createOrReplaceTempView("tbl")
print(spark.sql("SELECT v FROM tbl LATERAL VIEW explode(ccc) as v").head()) # Row(v=123)

print(spark.sql("SELECT k, v FROM tbl LATERAL VIEW explode(eee) as k, v").head()) 
# cannot resolve 'explode(tbl.`eee`)' due to data type mismatch: input to function explode should be array or map type, not struct<fff:bigint>

print(spark.sql("SELECT v FROM tbl LATERAL VIEW explode(CAST(eee AS map<string,long>)) as v").head()) 
# cannot resolve 'tbl.`eee`' due to data type mismatch: cannot cast struct<fff:bigint> to map<string,bigint>

そこで明示的にDataFrameReaderにschemaを渡してやる。

from pyspark.sql.types import StructType, StructField, LongType, ArrayType, MapType, StringType, LongType
schema = StructType([
    StructField("aaa", LongType()),
    StructField("ccc", ArrayType(LongType())),
    StructField("eee", MapType(StringType(), LongType())),
    StructField("hhh", LongType()),
])
df2 = spark.read.schema(schema).json("s3://hogefuga/testjson/")
df2.printSchema()
'''
root
 |-- aaa: long (nullable = true)
 |-- ccc: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- eee: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)
 |-- hhh: long (nullable = true)
 '''

df2.createOrReplaceTempView("tbl2")
print(spark.sql("SELECT k, v FROM tbl2 LATERAL VIEW explode(eee) as k, v").head()) # Row(k='fff', v=123)

なお、既存のrddからschemaを指定してcreateDataFrame()するとmapへの変換に失敗する。cast()も同様。

spark.createDataFrame(spark.read.json("s3://hogefuga/testjson/").rdd, schema)
# TypeError: field eee: MapType(StringType,LongType,true) can not accept object Row(fff=123) in type <class 'pyspark.sql.types.Row'>

データ構造が大きく全てのschemaを指定するのが面倒な場合は、UDF (User Defined Function)Row.asDict()して変換する方法もある。 ただし UDF の実行に Python Worker の立ち上げと JVM とのデータのやり取りが必要になるのでパフォーマンスは良くない。

ScalaでSparkのアプリケーションを開発してGitHub ActionsでデプロイしEMRでリモートデバッグする - sambaiz-net

from pyspark.sql import functions as F
struct_to_string_long_map = F.udf(lambda row: row.asDict()), MapType(StringType(), LongType()))
spark.udf.register("struct_to_string_long_map", struct_to_string_long_map)
print(spark.sql("SELECT k, v FROM tbl LATERAL VIEW explode(struct_to_string_long_map(eee)) as k, v").head()) # Row(k='fff', v=123)

参考

Spark DataFrameのschemaにまつわる罠とschemaの補正 | つかびーの技術日記

PySpark の UDF (User Defined Function) を試す - CUBE SUGAR CONTAINER

AWS Glue ETL パフォーマンス・チューニング① 基礎知識編 AWS Black Belt Online Seminar