Sparkでschemaを指定せずjsonなどを読み込むと次のように入力データから自動で決定される。
Athena v2でparquetをソースとしmapフィールドを持つテーブルのクエリが成功したり失敗したりする原因 - sambaiz-net
# {"aaa":123,"ccc":[123],"eee":{"fff":123},"hhh":null}
df = spark.read.json("s3://hogefuga/testjson/")
df.printSchema()
'''
root
|-- aaa: long (nullable = true)
|-- ccc: array (nullable = true)
| |-- element: long (containsNull = true)
|-- eee: struct (nullable = true)
| |-- fff: long (nullable = true)
|-- hhh: string (nullable = true)
'''
これは大抵の場合うまくはたらくが、mapを想定したフィールドがstruct判定されたり、フィールドにnullしか含まれてないためにstring判定されたりすると、関数の入力に合わず処理に失敗することがある。explode()はarrayやmapを複数行に展開する関数。
df.createOrReplaceTempView("tbl")
print(spark.sql("SELECT v FROM tbl LATERAL VIEW explode(ccc) as v").head()) # Row(v=123)
print(spark.sql("SELECT k, v FROM tbl LATERAL VIEW explode(eee) as k, v").head())
# cannot resolve 'explode(tbl.`eee`)' due to data type mismatch: input to function explode should be array or map type, not struct<fff:bigint>
print(spark.sql("SELECT v FROM tbl LATERAL VIEW explode(CAST(eee AS map<string,long>)) as v").head())
# cannot resolve 'tbl.`eee`' due to data type mismatch: cannot cast struct<fff:bigint> to map<string,bigint>
そこで明示的にDataFrameReaderにschemaを渡してやる。
from pyspark.sql.types import StructType, StructField, LongType, ArrayType, MapType, StringType, LongType
schema = StructType([
StructField("aaa", LongType()),
StructField("ccc", ArrayType(LongType())),
StructField("eee", MapType(StringType(), LongType())),
StructField("hhh", LongType()),
])
df2 = spark.read.schema(schema).json("s3://hogefuga/testjson/")
df2.printSchema()
'''
root
|-- aaa: long (nullable = true)
|-- ccc: array (nullable = true)
| |-- element: long (containsNull = true)
|-- eee: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = true)
|-- hhh: long (nullable = true)
'''
df2.createOrReplaceTempView("tbl2")
print(spark.sql("SELECT k, v FROM tbl2 LATERAL VIEW explode(eee) as k, v").head()) # Row(k='fff', v=123)
なお、既存のrddからschemaを指定してcreateDataFrame()するとmapへの変換に失敗する。cast()も同様。
spark.createDataFrame(spark.read.json("s3://hogefuga/testjson/").rdd, schema)
# TypeError: field eee: MapType(StringType,LongType,true) can not accept object Row(fff=123) in type <class 'pyspark.sql.types.Row'>
データ構造が大きく全てのschemaを指定するのが面倒な場合は、UDF (User Defined Function)でRow.asDict()して変換する方法もある。 ただし UDF の実行に Python Worker の立ち上げと JVM とのデータのやり取りが必要になるのでパフォーマンスは良くない。
ScalaでSparkのアプリケーションを開発してGitHub ActionsでデプロイしEMRでリモートデバッグする - sambaiz-net
from pyspark.sql import functions as F
struct_to_string_long_map = F.udf(lambda row: row.asDict()), MapType(StringType(), LongType()))
spark.udf.register("struct_to_string_long_map", struct_to_string_long_map)
print(spark.sql("SELECT k, v FROM tbl LATERAL VIEW explode(struct_to_string_long_map(eee)) as k, v").head()) # Row(k='fff', v=123)
参考
Spark DataFrameのschemaにまつわる罠とschemaの補正 | つかびーの技術日記
PySpark の UDF (User Defined Function) を試す - CUBE SUGAR CONTAINER
AWS Glue ETL パフォーマンス・チューニング① 基礎知識編 AWS Black Belt Online Seminar