Clustering by k-means method with MLlib of Spark

sparkpythonmachinelearning

Spark has MLlib which is a library for machine learning. This article, try clustering using Kmeans.

K-means is a clustering method that randomly assigns each data to one of a pre-determined number of clusters first, computes the center of each cluster, and then updates the cluster assignment of each data to the cluster whose center is closest, which repeats until convergence. Kmeans is implemented in k-means++ that converges faster, and its default distance measure is euclidean.

Load Iris dataset.

from sklearn.datasets import load_iris

iris = load_iris(as_frame=True).data
df = spark.createDataFrame(iris)
print(df.head())
# Row(sepal length (cm)=5.1, sepal width (cm)=3.5, petal length (cm)=1.4, petal width (cm)=0.2)

VectorAssembler makes vectors including multiple columns of data.

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=df.columns, outputCol="features")
data = assembler.transform(df)
print(data.head())
# Row(sepal length (cm)=5.1, sepal width (cm)=3.5, petal length (cm)=1.4, petal width (cm)=0.2, features=DenseVector([5.1, 3.5, 1.4, 0.2]))

fit() returns Transformer, and transform() outputs a cluster as predictionCol.

from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=3, seed=1, featuresCol="features", predictionCol="prediction")
model = kmeans.fit(data)
predictions = model.transform(data)

print(predictions.groupBy("prediction").count().collect())
# [Row(prediction=1, count=50), Row(prediction=0, count=38), Row(prediction=2, count=62)]

Reduce the dimension of the input vectors with PCA and plot them with different colors for each cluster.

import matplotlib.pyplot as plt
from pyspark.ml.feature import PCA

pca = PCA(k=2, inputCol="features", outputCol="pca_features")
transformed = pca.fit(predictions).transform(predictions).select("pca_features", "prediction")
df = transformed.toPandas()
plt.scatter(df['pca_features'].apply(lambda x: x[0]), df['pca_features'].apply(lambda x: x[1]), c=df['prediction'])

for label in plt.gca().xaxis.get_ticklabels():
    label.set_visible(False)
for label in plt.gca().yaxis.get_ticklabels():
    label.set_visible(False)

plt.savefig('plot.png')

import boto3

s3 = boto3.client('s3')
s3.upload_file('plot.png', <bucket>, 'path/plot.png')