Spark の MLlib で k-means法によるクラスタリングを行う

sparkpythonmachinelearning

Spark には MLlib という機械学習のライブラリがあり、 今回はその中の Kmeans によるクラスタリングを行う。 k-means法は各データのクラスタを事前に決めた数からランダムに決めて、クラスタごとに中心を取ってから、各データのクラスタを最も近い中心のクラスタに変更する、というのを収束するまで繰り返すという手法。 Kmeans には収束が早い k-means++法が実装されており、distanceMeasure はデフォルトで euclidean となっている。

データは Iris データセットを用いる。

from sklearn.datasets import load_iris

iris = load_iris(as_frame=True).data
df = spark.createDataFrame(iris)
print(df.head())
# Row(sepal length (cm)=5.1, sepal width (cm)=3.5, petal length (cm)=1.4, petal width (cm)=0.2)

VectorAssembler で複数のカラムの値を含むベクトルを生成する。

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=df.columns, outputCol="features")
data = assembler.transform(df)
print(data.head())
# Row(sepal length (cm)=5.1, sepal width (cm)=3.5, petal length (cm)=1.4, petal width (cm)=0.2, features=DenseVector([5.1, 3.5, 1.4, 0.2]))

fit() すると Transformer が返るので transform() すると predictionCol にクラスタが出力される。

from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=3, seed=1, featuresCol="features", predictionCol="prediction")
model = kmeans.fit(data)
predictions = model.transform(data)

print(predictions.groupBy("prediction").count().collect())
# [Row(prediction=1, count=50), Row(prediction=0, count=38), Row(prediction=2, count=62)]

入力ベクトルを PCA で次元削減しクラスタごとに色分けしてプロットしてみる。

import matplotlib.pyplot as plt
from pyspark.ml.feature import PCA

pca = PCA(k=2, inputCol="features", outputCol="pca_features")
transformed = pca.fit(predictions).transform(predictions).select("pca_features", "prediction")
df = transformed.toPandas()
plt.scatter(df['pca_features'].apply(lambda x: x[0]), df['pca_features'].apply(lambda x: x[1]), c=df['prediction'])

for label in plt.gca().xaxis.get_ticklabels():
    label.set_visible(False)
for label in plt.gca().yaxis.get_ticklabels():
    label.set_visible(False)

plt.savefig('plot.png')

import boto3

s3 = boto3.client('s3')
s3.upload_file('plot.png', <bucket>, 'path/plot.png')