Apache SparkのRDD, DataFrame, DataSetとAction, Transformation

sparketl

Sparkとは

ハイパフォーマンスな汎用分散処理システム。 HDFSやS3といった分散ストレージとHadoop YARNといったクラスタマネージャと共に使われる。

HDFS(Hadoop Distributed File System)とは - sambaiz-net

Hadoop YARN によってアプリケーションにリソースが割り当てられる流れと割り当てられているリソース量の確認 - sambaiz-net

中間データをメモリに置いておくことでHadoopのMapReduceよりも高速に処理することができる。 APIはJava, Scala, Python, Rのものがあって、 Pythonは手軽に書ける一方、パフォーマンスはJVMとのやりとりが発生するため落ちる。

ScalaでSparkのアプリケーションを開発してGitHub ActionsでデプロイしEMRでリモートデバッグする - sambaiz-net

RDDとDataFrameとDataSet

RDD(Resilient Distributed Dataset)はSpark Coreの低レベルな インタフェースで、 型を持つイミュータブルな分散コレクション。

DataFrameはSpark SQLのテーブル状のデータ形式で、 Tungstenというバイトレベルの最適化や Catalystというクエリのオプティマイザが効くので自分でRDDのAPIを呼ぶよりパフォーマンスが良い。

DataSetは型を持つDataFrameで、Spark 2.0からはDataFrameもDataset[Row]のエイリアスになった。

ActionとTransformation

外部への出力といった副作用を持つActionに対して RDDを返すだけの処理をTransformationという。 Transformationは都度実行されるのではなく Actionが実行される際に、依存関係を表すDAG(Directed Acyclic Graph) をもとに必要なものが遅延評価される。DAGはWeb UIで可視化できる。

SparkのWeb UIでJobのStageとExecutorによるTask分散、SQLのPhysical planを確認する - sambaiz-net

また、ネットワークやノードの障害によって計算結果が失われても依存関係をたどり、 並列に計算することで高速に復旧できるようになっている。

narrow dependencyとwide dependency

TransformationはRDDのパーティションを各Executorが並列に処理する。 mapやfilterなどの、見るパーティションが単一か他と被らない依存関係をnarrow dependencyといって、 そのExecutorだけで処理が完結するためパイプラインでまとめて実行でき速い。 一方、groupByKeyなどの一つのパーティションが複数のパーティションの処理で必要とされる依存関係をwide dependencyといって、 そのパーティションをExecutorが持っていない場合、コストが高いシャッフルが発生する。 シャッフルを回避するためにspark.sql.autoBroadcastJoinThresholdより小さいDataFrameをjoinする際は全てのExecutorにそれを送るBroadcast Hash Joinが行われる。 シャッフルを必要としない一連の実行単位をstageという。

参考

High Performance Spark - O’Reilly Media

RDD vs DataFrames and Datasets: A Tale of Three Apache Spark APIs

Stage — Physical Unit Of Execution · Mastering Apache Spark