From ebca54ee2b24415b190ba062cfa018c612b5c58d Mon Sep 17 00:00:00 2001 From: Haiping Lu Date: Wed, 30 Mar 2022 00:05:18 +0100 Subject: [PATCH] add lab 8 and lab 9 --- Lab 8 - Scalable k-means clustering.md | 316 +++++++++++++++ ...ionality reduction and Spark data types.md | 383 ++++++++++++++++++ 2 files changed, 699 insertions(+) create mode 100644 Lab 8 - Scalable k-means clustering.md create mode 100644 Lab 9 - Scalable PCA for dimensionality reduction and Spark data types.md diff --git a/Lab 8 - Scalable k-means clustering.md b/Lab 8 - Scalable k-means clustering.md new file mode 100644 index 0000000..3b45f78 --- /dev/null +++ b/Lab 8 - Scalable k-means clustering.md @@ -0,0 +1,316 @@ +# Lab 8: $k$-means clustering + +[COM6012 Scalable Machine Learning **2022**](https://github.com/haipinglu/ScalableML) by [Haiping Lu](https://haipinglu.github.io/) at The University of Sheffield + +**Accompanying lectures**: [YouTube video lectures recorded in Year 2020/21.](https://www.youtube.com/watch?v=eLlwMhfbqAo&list=PLuRoUKdWifzxZfwTMvWlrnvQmPWtHZ32U) + +## Study schedule + +- [Task 1](#1-k-means-clustering): To finish in the lab session on 31st March. **Essential** +- [Task 2](#2-exercises): To finish by the following Tuesday 26th April (due to 3-week vocation). ***Exercise*** +- [Task 3](#3-additional-ideas-to-explore-optional): To explore further. *Optional* + +### Suggested reading + +- Chapters *Clustering* and *RFM Analysis* of [PySpark tutorial](https://runawayhorse001.github.io/LearningApacheSpark/pyspark.pdf) +- [Clustering in Spark](https://spark.apache.org/docs/3.2.1/ml-clustering.html) +- [PySpark API on clustering](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.ml.clustering.KMeans.html) +- [PySpark code on clustering](https://github.com/apache/spark/blob/master/python/pyspark/ml/clustering.py) +- [$k$-means clustering on Wiki](https://en.wikipedia.org/wiki/K-means_clustering) +- [$k$-means++ on Wiki](https://en.wikipedia.org/wiki/K-means%2B%2B) +- [$k$-means|| paper](http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf) + +## 1. $k$-means clustering + +[$k$-means](http://en.wikipedia.org/wiki/K-means_clustering) is one of the most commonly used clustering algorithms that clusters the data points into a predefined number of clusters. The Spark MLlib implementation includes a parallelized variant of the [$k$-means++](https://en.wikipedia.org/wiki/K-means%2B%2B) method called [$k$-means||](http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf). + +`KMeans` is implemented as an `Estimator` and generates a [`KMeansModel`](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.ml.clustering.KMeansModel.html) as the base model. + +[API](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.ml.clustering.KMeans.html): `class pyspark.ml.clustering.KMeans(featuresCol='features', predictionCol='prediction', k=2, initMode='k-means||', initSteps=2, tol=0.0001, maxIter=20, seed=None, distanceMeasure='euclidean', weightCol=None)` + +The following parameters are available: + +- *k*: the number of desired clusters. +- *maxIter*: the maximum number of iterations +- *initMode*: specifies either random initialization or initialization via k-means|| +- *initSteps*: determines the number of steps in the k-means|| algorithm (default=2, advanced) +- *tol*: determines the distance threshold within which we consider k-means to have converged. +- *seed*: setting the **random seed** (so that multiple runs have the same results) +- *distanceMeasure*: either Euclidean (default) or cosine distance measure +- *weightCol*: optional weighting of data points + +Let us request for 2 cores using a regular queue. We activate the environment as usual and then install `matplotlib` (if you have not done so). + + ```sh + qrshx -pe smp 2 + source myspark.sh # myspark.sh should be under the root directory + conda install -y matplotlib + cd com6012/ScalableML # our main working directory + pyspark --master local[2] # start pyspark with 2 cores requested above. + ``` + +We will do some plotting in this lab. To plot and save figures on HPC, we need to do the following before using pyplot: + +```python +import matplotlib +matplotlib.use('Agg') # Must be before importing matplotlib.pyplot or pylab! +``` + +Now import modules needed in this lab: + +```python +from pyspark.ml.clustering import KMeans +from pyspark.ml.clustering import KMeansModel +from pyspark.ml.evaluation import ClusteringEvaluator +from pyspark.ml.linalg import Vectors +import matplotlib.pyplot as plt +``` + +### Clustering of simple synthetic data + +Here, we study $k$-means clustering on a simple example with four well-separated data points as the following. + +```python +data = [(Vectors.dense([0.0, 0.0]),), (Vectors.dense([1.0, 1.0]),), + (Vectors.dense([9.0, 8.0]),), (Vectors.dense([8.0, 9.0]),)] +df = spark.createDataFrame(data, ["features"]) +kmeans = KMeans(k=2, seed=1) # Two clusters with seed = 1 +model = kmeans.fit(df) +``` + +We examine the cluster centers (centroids) and use the trained model to "predict" the cluster index for a data point. + +```python +centers = model.clusterCenters() +len(centers) +# 2 +for center in centers: + print(center) +# [0.5 0.5] +# [8.5 8.5] +model.predict(df.head().features) +# 0 +``` + +We can use the trained model to cluster any data points in the same space, where the cluster index is considered as the `prediction`. + +```python +transformed = model.transform(df) +transformed.show() +# +---------+----------+ +# | features|prediction| +# +---------+----------+ +# |[0.0,0.0]| 0| +# |[1.0,1.0]| 0| +# |[9.0,8.0]| 1| +# |[8.0,9.0]| 1| +# +---------+----------+ +``` + +We can examine the training summary for the trained model. + +```python +model.hasSummary +# True +summary = model.summary +summary +# +summary.k +# 2 +summary.clusterSizes +# [2, 2]] +summary.trainingCost #sum of squared distances of points to their nearest center +# 2.0 +``` + +You can check out the [KMeansSummary API](https://spark.apache.org/docs/3.2.1/api/java/org/apache/spark/ml/clustering/KMeansSummary.html) for details of the summary information, e.g., we can find out that the training cost is the sum of squared distances to the nearest centroid for all points in the training dataset. + +### Save and load an algorithm/model + +We can save an algorithm/model in a temporary location (see [API on save](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.ml.PipelineModel.html?highlight=pipelinemodel%20save#pyspark.ml.PipelineModel.save)) and then load it later. + +Save and load the $k$-means algorithm (settings): + +```python +import tempfile + +temp_path = tempfile.mkdtemp() +kmeans_path = temp_path + "/kmeans" +kmeans.save(kmeans_path) +kmeans2 = KMeans.load(kmeans_path) +kmeans2.getK() +# 2 +``` + +Save and load the learned $k$-means model (note that only the learned model is saved, not including the summary): + +```python +model_path = temp_path + "/kmeans_model" +model.save(model_path) +model2 = KMeansModel.load(model_path) +model2.hasSummary +# False +model2.clusterCenters() +# [array([0.5, 0.5]), array([8.5, 8.5])] +``` + +### Iris clustering + +Clustering of the [Iris flower data set](https://en.wikipedia.org/wiki/Iris_flower_data_set) is a classical example [discussed the Wikipedia page of $k$-means clustering](https://en.wikipedia.org/wiki/K-means_clustering#Discussion). This data set was introduced by [Ronald Fisher](https://en.wikipedia.org/wiki/Ronald_Fisher), "the father of modern statistics and experimental design" (and thus machine learning) and also "the greatest biologist since Darwin". The code below is based on Chapter *Clustering* of [PySpark tutorial](https://runawayhorse001.github.io/LearningApacheSpark/pyspark.pdf), with some changes introduced. + +#### Load and inspect the data + +```python +df = spark.read.load("Data/iris.csv", format="csv", inferSchema="true", header="true").cache() +df.show(5,True) +# +------------+-----------+------------+-----------+-------+ +# |sepal_length|sepal_width|petal_length|petal_width|species| +# +------------+-----------+------------+-----------+-------+ +# | 5.1| 3.5| 1.4| 0.2| setosa| +# | 4.9| 3.0| 1.4| 0.2| setosa| +# | 4.7| 3.2| 1.3| 0.2| setosa| +# | 4.6| 3.1| 1.5| 0.2| setosa| +# | 5.0| 3.6| 1.4| 0.2| setosa| +# +------------+-----------+------------+-----------+-------+ +# only showing top 5 rows +df.printSchema() +# root +# |-- sepal_length: double (nullable = true) +# |-- sepal_width: double (nullable = true) +# |-- petal_length: double (nullable = true) +# |-- petal_width: double (nullable = true) +# |-- species: string (nullable = true) +``` + +We can use `.describe().show()` to inspect the (statistics of) data: + +```python +df.describe().show() +# +-------+------------------+-------------------+------------------+------------------+---------+ +# |summary| sepal_length| sepal_width| petal_length| petal_width| species| +# +-------+------------------+-------------------+------------------+------------------+---------+ +# | count| 150| 150| 150| 150| 150| +# | mean| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.1986666666666672| null| +# | stddev|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.7631607417008414| null| +# | min| 4.3| 2.0| 1.0| 0.1| setosa| +# | max| 7.9| 4.4| 6.9| 2.5|virginica| +# +-------+------------------+-------------------+------------------+------------------+---------+ +``` + +#### Convert the data to dense vector (features) + +Use a `transData` function similar to that in Lab 2 to convert the attributes into feature vectors. + +```python +def transData(data): + return data.rdd.map(lambda r: [Vectors.dense(r[:-1])]).toDF(['features']) + +dfFeatureVec= transData(df).cache() +dfFeatureVec.show(5, False) +# +-----------------+ +# |features | +# +-----------------+ +# |[5.1,3.5,1.4,0.2]| +# |[4.9,3.0,1.4,0.2]| +# |[4.7,3.2,1.3,0.2]| +# |[4.6,3.1,1.5,0.2]| +# |[5.0,3.6,1.4,0.2]| +# +-----------------+ +# only showing top 5 rows +``` + +#### Determine $k$ via silhouette analysis + +We can perform a [Silhouette Analysis](https://en.wikipedia.org/wiki/Silhouette_(clustering)) to determine $k$ by running multiple $k$-means with different $k$ and evaluate the clustering results. See [the ClusteringEvaluator API](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.ml.evaluation.ClusteringEvaluator.html), where `silhouette` is the default metric. You can also refer to this [scikit-learn notebook on the same topic](https://scikit-learn.org/stable/auto_examples/cluster/plot_kmeans_silhouette_analysis.html). Other ways of determining the best $k$ can be found on [a dedicated wiki page](https://en.wikipedia.org/wiki/Determining_the_number_of_clusters_in_a_data_set). + +```python +import numpy as np + +numK=10 +silhouettes = np.zeros(numK) +costs= np.zeros(numK) +for k in range(2,numK): # k = 2:9 + kmeans = KMeans().setK(k).setSeed(11) + model = kmeans.fit(dfFeatureVec) + predictions = model.transform(dfFeatureVec) + costs[k]=model.summary.trainingCost + evaluator = ClusteringEvaluator() # to compute the silhouette score + silhouettes[k] = evaluator.evaluate(predictions) +``` + +We can take a look at the clustering results (the `prediction` below is the cluster index/label). + +```python +predictions.show(15) +# +-----------------+----------+ +# | features|prediction| +# +-----------------+----------+ +# |[5.1,3.5,1.4,0.2]| 1| +# |[4.9,3.0,1.4,0.2]| 1| +# |[4.7,3.2,1.3,0.2]| 1| +# |[4.6,3.1,1.5,0.2]| 1| +# |[5.0,3.6,1.4,0.2]| 1| +# |[5.4,3.9,1.7,0.4]| 5| +# |[4.6,3.4,1.4,0.3]| 1| +# |[5.0,3.4,1.5,0.2]| 1| +# |[4.4,2.9,1.4,0.2]| 1| +# |[4.9,3.1,1.5,0.1]| 1| +# |[5.4,3.7,1.5,0.2]| 5| +# |[4.8,3.4,1.6,0.2]| 1| +# |[4.8,3.0,1.4,0.1]| 1| +# |[4.3,3.0,1.1,0.1]| 1| +# |[5.8,4.0,1.2,0.2]| 5| +# +-----------------+----------+ +# only showing top 15 rows +``` + +Plot the cost (sum of squared distances of points to their nearest centroid, the smaller the better) against $k$. + +```python +fig, ax = plt.subplots(1,1, figsize =(8,6)) +ax.plot(range(2,numK),costs[2:numK],marker="o") +ax.set_xlabel('$k$') +ax.set_ylabel('Cost') +plt.grid() +plt.savefig("Output/Lab8_cost.png") +``` + +We can see that this cost measure is biased towards a large $k$. Let us plot the silhouette metric (the larger the better) against $k$. + +```python +fig, ax = plt.subplots(1,1, figsize =(8,6)) +ax.plot(range(2,numK),silhouettes[2:numK],marker="o") +ax.set_xlabel('$k$') +ax.set_ylabel('Silhouette') +plt.grid() +plt.savefig("Output/Lab8_silhouette.png") +``` + +We can see that the silhouette measure is biased towards a small $k$. By the silhouette metric, we should choose $k=2$ but we know the ground truth $k$ is 3 (read the [data description](https://archive.ics.uci.edu/ml/datasets/iris) or count unique species). Therefore, this metric is not giving the ideal results in this case (either). [Determining the optimal number of clusters](https://en.wikipedia.org/wiki/Determining_the_number_of_clusters_in_a_data_set) is an open problem. + +## 2. Exercises + +### Further study on iris clustering + +Carry out some further studies on the iris clustering problem above. + +1. Choose $k=3$ and evaluate the clustering results against the ground truth (class labels) using the [Normalized Mutual Information (NMI) available in scikit-learn](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.normalized_mutual_info_score.html). You need to install `scikit-learn` in the `myspark` environment via `conda install -y scikit-learn`. This allows us to study the clustering quality when we know the true number of clusters. +2. Use multiple (e.g., 10 or 20) random seeds to generate different clustering results and plot the respective NMI values (with respect to ground truth with $k=3$ as in the question above) to observe the effect of initialisation. + +## 3. Additional ideas to explore (*optional*) + +### RFM Customer Value Analysis + +- Follow Chapter *RFM Analysis* of [PySpark tutorial](https://runawayhorse001.github.io/LearningApacheSpark/pyspark.pdf) to perform [RFM Customer Value Analysis](https://en.wikipedia.org/wiki/RFM_(customer_value)) +- The data can be downloaded from [Online Retail Data Set](https://archive.ics.uci.edu/ml/datasets/online+retail) at UCI. +- Note the **data cleaning** step that checks and removes rows containing null value via `.dropna()`. You may need to do the same when you are dealing with real data. +- The **data manipulation** steps are also useful to learn. + +### Network intrusion detection + +- The original task is a classification task. We can ignore the class labels and perform clustering on the data. +- Write a standalone program (and submit as a batch job to HPC) to do $k$-means clustering on the [KDDCUP1999 data](https://archive.ics.uci.edu/ml/datasets/KDD+Cup+1999+Data) with 4M points. You may start with the smaller 10% subset. + +### Color Quantization using K-Means + +- Follow the scikit-learn example [Color Quantization using K-Means](https://scikit-learn.org/stable/auto_examples/cluster/plot_color_quantization.html#sphx-glr-auto-examples-cluster-plot-color-quantization-py) to perform the same using PySpark on your high-resolution photos. diff --git a/Lab 9 - Scalable PCA for dimensionality reduction and Spark data types.md b/Lab 9 - Scalable PCA for dimensionality reduction and Spark data types.md new file mode 100644 index 0000000..e1e8222 --- /dev/null +++ b/Lab 9 - Scalable PCA for dimensionality reduction and Spark data types.md @@ -0,0 +1,383 @@ +# Lab 9: PCA for dimensionality reduction and Spark data types + +[COM6012 Scalable Machine Learning **2022**](https://github.com/haipinglu/ScalableML) by [Haiping Lu](https://haipinglu.github.io/) at The University of Sheffield + +**Accompanying lectures**: [YouTube video lectures recorded in Year 2020/21.](https://www.youtube.com/watch?v=jz48ORUxqB4&list=PLuRoUKdWifzylmRAERh5ipVhmQY_IoS4Y) + +## Study schedule + +- [Task 1](#1-data-types-in-rdd-based-api): To finish in the lab session on 28th April. **Essential** +- [Task 2](#2-pca): To finish in the lab session on 28th April. **Essential** +- [Task 3](#3-exercises): To finish by the following Tuesday 3rd May. ***Exercise*** +- [Task 4](#4-additional-ideas-to-explore-optional): To explore further. *Optional* + +### Suggested reading + +- [Extracting, transforming and selecting features](https://spark.apache.org/docs/3.2.1/ml-features.html) +- [PCA in Spark DataFrame API `pyspark.ml`](https://spark.apache.org/docs/3.2.1/ml-features.html#pca) +- [SVD in Spark RDD API `pyspark.mllib`](https://spark.apache.org/docs/3.2.1/mllib-dimensionality-reduction.html#singular-value-decomposition-svd) +- [StandardScaler in Spark](https://spark.apache.org/docs/3.2.1/ml-features.html#standardscaler) to standardise/normalise data to unit standard deviation and/or zero mean. +- [Data Types - RDD-based API](https://spark.apache.org/docs/3.2.1/mllib-data-types.html) +- [PCA on Wiki](https://en.wikipedia.org/wiki/Principal_component_analysis) +- [Understanding Dimension Reduction with Principal Component Analysis (PCA)](https://blog.paperspace.com/dimension-reduction-with-principal-component-analysis/) +- [Principal Component Analysis explained on Kaggle](https://www.kaggle.com/nirajvermafcb/principal-component-analysis-explained) with data available [here](https://www.kaggle.com/liujiaqi/hr-comma-sepcsv), and background info [here](https://rstudio-pubs-static.s3.amazonaws.com/345463_37f54d1c948b4cdfa181541841e0db8a.html) + +## 1. Data Types in RDD-based API + +To deal with data efficiently, Spark considers different [data types](https://spark.apache.org/docs/3.2.1/mllib-data-types.html). In particular, MLlib supports local vectors and matrices stored on a single machine, as well as distributed matrices backed by one or more RDDs. Local vectors and local matrices are simple data models that serve as public interfaces. The underlying linear algebra operations are provided by [Breeze](http://www.scalanlp.org/). A training example used in supervised learning is called a “labeled point” in MLlib. + +### [Local vector](https://spark.apache.org/docs/3.2.1/mllib-data-types.html#local-vector): Dense vs Sparse + +> A local vector has integer-typed and 0-based indices and double-typed values, stored on a single machine. MLlib supports two types of local vectors: dense and sparse. A dense vector is backed by a double array representing its entry values, while a sparse vector is backed by two parallel arrays: indices and values. For example, a vector (1.0, 0.0, 3.0) can be represented in dense format as [1.0, 0.0, 3.0] or in sparse format as (3, [0, 2], [1.0, 3.0]), where 3 is the size of the vector. + +Check out the [Vector in RDD API](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.mllib.linalg.Vectors.html?highlight=mllib%20linalg%20vectors#pyspark.mllib.linalg.Vectors) or [Vector in DataFrame API](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.ml.linalg.Vector.html?highlight=ml%20linalg%20vector#pyspark.ml.linalg.Vector) (see method `.Sparse()`) and [SparseVector in RDD API ](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.mllib.linalg.SparseVector.html?highlight=sparsevector#pyspark.mllib.linalg.SparseVector) or [SparseVector in DataFrame API ](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.ml.linalg.SparseVector.html?highlight=sparsevector#pyspark.ml.linalg.SparseVector). The official example is below + +```python +import numpy as np +from pyspark.mllib.linalg import Vectors + +dv1 = np.array([1.0, 0.0, 3.0]) # Use a NumPy array as a dense vector. +dv2 = [1.0, 0.0, 3.0] # Use a Python list as a dense vector. +sv1 = Vectors.sparse(3, [0, 2], [1.0, 3.0]) # Create a SparseVector. +``` + +Note the vector created by `Vectors.sparse()` is of type `SparseVector()` + +```python +sv1 +# SparseVector(3, {0: 1.0, 2: 3.0}) +``` + +To view the sparse vector in a dense format + +```python +sv1.toArray() +# array([1., 0., 3.]) +``` + +### [Labeled point](https://spark.apache.org/docs/3.2.1/mllib-data-types.html#labeled-point) + +> A labeled point is a local vector, either dense or sparse, associated with a label/response. In MLlib, labeled points are used in supervised learning algorithms. We use a double to store a label, so we can use labeled points in both regression and classification. For binary classification, a label should be either 0 (negative) or 1 (positive). For multiclass classification, labels should be class indices starting from zero: 0, 1, 2, .... + +See [LabeledPoint API in MLlib](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.mllib.regression.LabeledPoint.html?highlight=labeledpoint#pyspark.mllib.regression.LabeledPoint). Now, we create a labeled point with a positive label and a dense feature vector, as well as a labeled point with a negative label and a sparse feature vector. + +```python +from pyspark.mllib.linalg import SparseVector +from pyspark.mllib.regression import LabeledPoint + +pos = LabeledPoint(1.0, [1.0, 0.0, 3.0]) +neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0])) + +neg +# LabeledPoint(0.0, (3,[0,2],[1.0,3.0])) +neg.label +# 0.0 +neg.features +# SparseVector(3, {0: 1.0, 2: 3.0}) +``` + +Now view the features as dense vector (rather than sparse vector) + +```python +neg.features.toArray() +# array([1., 0., 3.]) +``` + +### [Local matrix](https://spark.apache.org/docs/3.2.1/mllib-data-types.html#local-matrix) + +> A local matrix has integer-typed row and column indices and double-typed values, stored on a single machine. MLlib supports dense matrices, whose entry values are stored in a single double array in column-major order, and sparse matrices, whose non-zero entry values are stored in the Compressed Sparse Column (CSC) format in column-major order. For example, we create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) and a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) in the following: + +```python +from pyspark.mllib.linalg import Matrix, Matrices + +dm2 = Matrices.dense(3, 2, [1, 3, 5, 2, 4, 6]) +sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8]) +print(dm2) +# DenseMatrix([[1., 2.], +# [3., 4.], +# [5., 6.]]) +print(sm) +# 3 X 2 CSCMatrix +# (0,0) 9.0 +# (2,1) 6.0 +# (1,1) 8.0 +``` + +See [Scala API for Matrices.sparse](https://spark.apache.org/docs/3.2.1/api/scala/org/apache/spark/mllib/linalg/Matrices$.html) and from its [source code](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala), we can see it creates a CSC [SparseMatrix](https://spark.apache.org/docs/3.2.1/api/scala/org/apache/spark/mllib/linalg/SparseMatrix.html). + +Here the [compressed sparse column (CSC or CCS) format](https://en.wikipedia.org/wiki/Sparse_matrix#Compressed_sparse_column_(CSC_or_CCS)) is used for sparse matrix representation. You can learn it from this [simple explanation](https://stackoverflow.com/questions/44825193/how-to-create-a-sparse-cscmatrix-using-spark?answertab=votes#tab-top). To learn more about CSC, you may refer to a [top video](https://www.youtube.com/watch?v=fy_dSZb-Xx8) and a [top post with animation](https://matteding.github.io/2019/04/25/sparse-matrices/#compressed-sparse-matrices). +> values are read first by column, a row index is stored for each value, and column pointers are stored. For example, CSC is (val, row_ind, col_ptr), where val is an array of the (top-to-bottom, then left-to-right) non-zero values of the matrix; row_ind is the row indices corresponding to the values; and, col_ptr is the list of val indexes where each column starts. + + +```python +dsm=sm.toDense() +print(dsm) +# DenseMatrix([[9., 0.], +# [0., 8.], +# [0., 6.]]) +``` + +### [Distributed matrix](https://spark.apache.org/docs/3.2.1/mllib-data-types.html#distributed-matrix) + +> A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. Four types of distributed matrices have been implemented so far. + +#### RowMatrix + +> The basic type is called RowMatrix. A RowMatrix is a row-oriented distributed matrix without meaningful row indices, e.g., a collection of feature vectors. It is backed by an RDD of its rows, where each row is a local vector. We assume that the number of columns is not huge for a RowMatrix so that a single local vector can be reasonably communicated to the driver and can also be stored / operated on using a single node. +> Since each row is represented by a local vector, the number of columns is limited by the integer range but it should be much smaller in practice. + +Now we create an RDD of vectors `rows`, from which we create a RowMatrix `mat`. + +```python +from pyspark.mllib.linalg.distributed import RowMatrix + +rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]) +mat = RowMatrix(rows) + +m = mat.numRows() # Get its size: m=4, n=3 +n = mat.numCols() + +rowsRDD = mat.rows # Get the rows as an RDD of vectors again. +``` + +We can view the RowMatrix in a dense matrix format + +```python +rowsRDD.collect() +# [DenseVector([1.0, 2.0, 3.0]), DenseVector([4.0, 5.0, 6.0]), DenseVector([7.0, 8.0, 9.0]), DenseVector([10.0, 11.0, 12.0])] +``` + +## 2. PCA + +[Principal component analysis](http://en.wikipedia.org/wiki/Principal_component_analysis) (PCA) is a statistical procedure that uses an orthogonal transformation to convert a set of observations of possibly correlated variables (entities each of which takes on various numerical values) into a set of values of linearly uncorrelated variables called **principal components (PCs)**. A PCA class trains a model to project vectors to a low-dimensional space using PCA and this is probably the most commonly used **dimensionality reduction** method. + +### PCA in DataFrame-based API `pyspark.ml` + +Check out the [API](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.ml.feature.PCA.html?highlight=pyspark%20ml%20feature%20pca#pyspark.ml.feature.PCA). Check [`pyspark.ml.feature.PCAModel`](https://spark.apache.org/docs/3.2.1/api/python/reference/api/pyspark.ml.feature.PCAModel.html?highlight=pyspark%20ml%20feature%20pcamodel#pyspark.ml.feature.PCAModel) too to see what is available for the fitted model. Let us project three 5-dimensional feature vectors into 2-dimensional principal components. + +```python +from pyspark.ml.feature import PCA +from pyspark.ml.linalg import Vectors + +data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),), + (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), + (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)] +df = spark.createDataFrame(data, ["features"]) +df.show() +# +--------------------+ +# | features| +# +--------------------+ +# | (5,[1,3],[1.0,7.0])| +# |[2.0,0.0,3.0,4.0,...| +# |[4.0,0.0,0.0,6.0,...| +# +--------------------+ + +pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures") +model = pca.fit(df) + +result = model.transform(df).select("pcaFeatures") +result.show(truncate=False) +# +----------------------------------------+ +# |pcaFeatures | +# +----------------------------------------+ +# |[1.6485728230883807,-4.013282700516296] | +# |[-4.645104331781534,-1.1167972663619026]| +# |[-6.428880535676489,-5.337951427775355] | +# +----------------------------------------+ +``` + +Check the explained variance in percentage + +```python +model.explainedVariance +# DenseVector([0.7944, 0.2056]) +``` + +Take a look at the principal components Matrix. Each column is one principal component. + +```python + print(model.pc) +# DenseMatrix([[-0.44859172, -0.28423808], +# [ 0.13301986, -0.05621156], +# [-0.12523156, 0.76362648], +# [ 0.21650757, -0.56529588], +# [-0.84765129, -0.11560341]]) +``` + +### PCA in RDD-based API `pyspark.mllib` + +#### Eigendecomposition for PCA + +`pyspark.mllib` supports PCA for **tall-and-skinny** (big $n$, small $d$) matrices stored in row-oriented format and any Vectors. We demonstrate how to compute principal components on a [RowMatrix](http://spark.apache.org/docs/3.2.1/mllib-data-types.html#rowmatrix) and use them to project the vectors into a low-dimensional space in the cell below. + +```python +from pyspark.mllib.linalg import Vectors +from pyspark.mllib.linalg.distributed import RowMatrix + +rows = sc.parallelize([ + Vectors.sparse(5, {1: 1.0, 3: 7.0}), + Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0), + Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0) +]) +rows.collect() +# [SparseVector(5, {1: 1.0, 3: 7.0}), DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), DenseVector([4.0, 0.0, 0.0, 6.0, 7.0])] + +mat = RowMatrix(rows) +``` + +Compute the top 2 principal components, which are stored in a local dense matrix (the same as above). + +```python +pc = mat.computePrincipalComponents(2) +print(pc) +# DenseMatrix([[-0.44859172, -0.28423808], +# [ 0.13301986, -0.05621156], +# [-0.12523156, 0.76362648], +# [ 0.21650757, -0.56529588], +# [-0.84765129, -0.11560341]]) +``` + +Project the rows to the linear space spanned by the top 2 principal components (the same as above) + +```python +projected = mat.multiply(pc) +projected.rows.collect() +# [DenseVector([1.6486, -4.0133]), DenseVector([-4.6451, -1.1168]), DenseVector([-6.4289, -5.338])] +``` + +Now we convert to dense rows to see the matrix + +```python +from pyspark.mllib.linalg import DenseVector + +denseRows = rows.map(lambda vector: DenseVector(vector.toArray())) +denseRows.collect() +# [DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), DenseVector([4.0, 0.0, 0.0, 6.0, 7.0])] +``` + +#### SVD for PCA - more *scalable* way to do PCA + +Read [SVD in RDD-based API `pyspark.mllib`](https://spark.apache.org/docs/3.2.1/mllib-dimensionality-reduction.html#singular-value-decomposition-svd). As covered in the lecture, we will need SVD for PCA on large-scale data. Here, we use it on the same small toy example to examine the relationship with eigenvalue decomposition based PCA methods above. + +We compute the top 2 singular values and corresponding singular vectors. + +```python +svd = mat.computeSVD(2, computeU=True) +U = svd.U # The U factor is a RowMatrix. +s = svd.s # The singular values are stored in a local dense vector. +V = svd.V # The V factor is a local dense matrix. +``` + +If we are doing it right, the **right** singular vectors should be the same as the eigenvectors. + +```python +print(V) +# DenseMatrix([[-0.31278534, 0.31167136], +# [-0.02980145, -0.17133211], +# [-0.12207248, 0.15256471], +# [-0.71847899, -0.68096285], +# [-0.60841059, 0.62170723]]) +``` + +But it is **not the same**! Why? Remeber that we need to do **centering**! We can do so use the [StandardScaler (check out the API](https://spark.apache.org/docs/3.2.1/mllib-feature-extraction.html#standardscaler)) to center the data, i.e., remove the mean. + +```python +from pyspark.mllib.feature import StandardScaler + +standardizer = StandardScaler(True, False) +model = standardizer.fit(rows) +centeredRows = model.transform(rows) +centeredRows.collect() +# [DenseVector([-2.0, 0.6667, -1.0, 1.3333, -4.0]), DenseVector([0.0, -0.3333, 2.0, -1.6667, 1.0]), DenseVector([2.0, -0.3333, -1.0, 0.3333, 3.0])] +centeredmat = RowMatrix(centeredRows) +``` + +Compute the top 2 singular values and corresponding singular vectors. + +```python +svd = centeredmat.computeSVD(2, computeU=True) +U = svd.U # The U factor is a RowMatrix. +s = svd.s # The singular values are stored in a local dense vector. +V = svd.V # The V factor is a local dense matrix. +``` + +Check the **PC** obtained this time (it is the same as the above PCA methods now) + +```python +print(V) +DenseMatrix([[-0.44859172, -0.28423808], + [ 0.13301986, -0.05621156], + [-0.12523156, 0.76362648], + [ 0.21650757, -0.56529588], + [-0.84765129, -0.11560341]]) +``` + +Let us examine the relationships between the singular values and the eigenvalues. + +```python +print(s) +# [6.001041088520536,3.0530049438580336] +``` + +We get the eigenvalues by taking squares of the singular values + +```python +evs=s*s + print(evs) +[36.012494146111734,9.320839187221594] +``` + +Now we compute the percentage of variance captures and compare with the above to verify (see/search `model.explainedVariance`). + +```python +evs/sum(evs) +# DenseVector([0.7944, 0.2056]) +``` + +## 3. Exercises + +### PCA on iris + +Study the [Iris flower data set](https://en.wikipedia.org/wiki/Iris_flower_data_set) `iris.csv` under `Data` with PCA. + +1. Follow [Understanding Dimension Reduction with Principal Component Analysis (PCA)](https://blog.paperspace.com/dimension-reduction-with-principal-component-analysis/) to do the same analysis using the DataFrame-based PCA `pca.fit()` from `pyspark.ml`. +2. Follow this lab to verify that using the other two RDD-based PCA APIs `computePrincipalComponents` and `computeSVD` will give the same PCA features. + +## 4. Additional ideas to explore (*optional*) + +### [HR analytics](https://rstudio-pubs-static.s3.amazonaws.com/345463_37f54d1c948b4cdfa181541841e0db8a.html) + +A company is trying to figure out why their best and experienced employees are leaving prematurely from a [dataset](https://www.kaggle.com/liujiaqi/hr-comma-sepcsv). Follow the example [Principal Component Analysis explained on Kaggle](https://www.kaggle.com/nirajvermafcb/principal-component-analysis-explained) to perform such analysis in PySpark, using as many PySpark APIs as possible. + + +### Word meaning extraction + +Use PySpark to perform the steps in IBM's notebook on [Spark-based machine learning for word meanings](https://github.com/IBMDataScience/word2vec/blob/master/Spark-based%20machine%20learning%20for%20word%20meanings.ipynb) that makes use of PCA, kmeans, and Word2Vec to learn word meanings. + +### Bag of words analysis + +Choose a [Bag of Words Data Set](https://archive.ics.uci.edu/ml/datasets/Bag+of+Words). Let us take the **NIPS full papers** data as an example. + +The format of this data is + +```markdown + Number of documents + Number of words in the vocabulary + Total number of words in the collection + docID wordID count + docID wordID count + ... + docID wordID count +``` + +Our data matrix will be $doc \times wordcount$. To begin, we need to read this data in. Possible steps would include: + +1. extract the number of documents and the size of the vocabulary, and strip off the first 3 lines +2. combine the words per document +3. create sparse vectors (for better space efficiency) + +Start from a small dataset to test your work, and then checking **whether** your work scales up to the big **NYTIMES** bagofwords data. Keep everything as parallel as possible. + +### Large image datasets + +Find some large-scale image datasets to examine the principal components and explore low-dimensional representations.