EMR Studio の Jupyter Notebook から EMR Serverless で Spark の MLlib を動かす

awspythonspark

EMR Studio はEMR クラスタで実行される Jupyter Notebook ベースの IDE。クラスタや S3 の料金はかかるがこれ自体は無料。EMR Serverless で動かすことで常時クラスタを立てておくことなく手軽に使うことができる。

似た用途のサービスとして Athena for Apache Spark があって、通常の Athena の延長でより手軽に使える一方、MLlib に対応していないなどの制約がある。また、EMR Studio の方が機能は豊富で、外部 IdP での認証Git 連携といったことができる。

Athena for Apache Spark の Notebook で DataFrame.toPandas().plot() した際の日本語が文字化けしないようにする - sambaiz-net

Studio を作成すると、

併せて EMR Serverless とそれに紐づいた Workspace (Notebook) が作られる。この Cluster ID は Application ID に対応し、クラスタを探しても見つからないのが分かりづらい。

EMR Serverless でインタラクティブワークロードを動かす場合、60 分以上アイドル状態になっているカーネルは自動的に Notebooks から終了される。この時間は変更できない。また、これとは別に Application 自体の自動停止の設定もある。Application が停止しても Workspace は消えない。

Studio に VPC の設定をすると EMR on EC2 や on EKS に変更できる。クラスタには Spark、Livy、Jupyter Enterprise Gateway がイントールされている必要がある。なお、EMR Serverless のネットワーク設定は別にあり、Studio の設定を変更しても反映されないことに注意。デフォルトの No network connectivity でも S3 にはアクセスできるようだがインターネットに出ることはできなかった。

Livy を EMR on EKS にインストールしSparkmagic でローカルの Jupyter Notebook から Spark のジョブを実行する - sambaiz-net

MLlib を実行してみる。デフォルトで Interactive runtime role には EMR Studio の S3 の権限しか付いていないので必要に応じて glue や S3 の権限を与える。

Spark の MLlib で k-means法によるクラスタリングを行う - sambaiz-net

from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans

spark = SparkSession.builder.appName("test").getOrCreate()

data = [
    (0, 1.0, 2.0),
    (1, 1.5, 1.8),
    (2, 5.0, 8.0),
    (3, 6.0, 9.0),
    (4, 9.0, 3.0),
    (5, 9.5, 2.5)
]

columns = ["id", "x", "y"]
df = spark.createDataFrame(data, columns)

assembler = VectorAssembler(inputCols=["x", "y"], outputCol="features")
vectorized_data = assembler.transform(df)

kmeans = KMeans(k=3, seed=1, featuresCol="features", predictionCol="prediction")
model = kmeans.fit(vectorized_data)
predictions = model.transform(vectorized_data)

predictions.show()
'''
+---+---+---+---------+----------+
| id|  x|  y| features|prediction|
+---+---+---+---------+----------+
|  0|1.0|2.0|[1.0,2.0]|         1|
|  1|1.5|1.8|[1.5,1.8]|         1|
|  2|5.0|8.0|[5.0,8.0]|         0|
|  3|6.0|9.0|[6.0,9.0]|         0|
|  4|9.0|3.0|[9.0,3.0]|         2|
|  5|9.5|2.5|[9.5,2.5]|         2|
+---+---+---+---------+----------+
'''