Spark ML Programming Tutorial
Hello and welcome to Lesson 6 of the Apache Spark and Scala course offered by Simplilearn. This lesson will introduce and explain the concepts of Spark Machine Learning or ML programming.
After completing this lesson, you will be able to: • Explain the use cases and techniques of Machine Learning or ML • Describe the key concepts of Spark ML • Explain the concept of an ML Dataset, and • Discuss ML algorithm, model selection via cross validation
6.3 Introduction to Machine Learning
Let’s first understand what machine learning is. It is a sub field of Artificial Intelligence that has empowered various smart applications. It deals with the construction and study of systems that can learn from data. For instance, consider the photo album feature of Facebook. This feature has the capability to learn from the data and hence recognize the faces that can be tagged in a picture. Similarly, the Siri application of Apple also has the capability to learn from the data and hence analyze the human voice meaning to perform the desired action or provide the desired answers. Linkedin and Google driverless car also work on the same concept. Therefore, the objective of Machine Learning is to let a computer predict something. An obvious scenario is to predict an event of the future. Apart from this, it also covers to predict unknown things or events. This means that something that has not been programmed or inputted in it. In other words, Computers act without being explicitly programmed. It can be seen as building blocks to make computers behave more intelligently. In the words of Arthur Samuel in 1959: “(Machine Learning is a) field of study that gives computers the ability to learn without being explicitly programmed”. Later, in 1997, Tom Mitchell gave another definition that proved more useful for engineering purposes, “A computer program is said to learn from experience E with respect to some task T and some performance measure P, if its performance on T, as measured by P, improves with experience E.”
6.4 Common Terminologies in Machine Learning
Before we move further, you should know the common terminologies used in Machine Learning like feature vector, feature space, samples, and labelled data. Feature vector is defined as a typical setting that is done for machine learning, which is provided an objects or data points collection. Each item in this collection is described by a number of features. These features can be of different types. For example, they can be categorical like colors such as blue, black, and red, or they can be continuous such as integer-valued or real-valued. Features of a few of the items are displayed on the screen. Samples are defined as the items to process, for example, a row in a database, a picture, or a document. If a feature vector is a vector of length l, you can consider each data point being mapped to a d-dimensional vector space, which is referred as the feature space. Another terminology is labelled data that is the data with known classification results.
6.5 Applications of Machine Learning
Sometimes, Machine learning is related with data mining, however, it is more focused towards exploratory data analysis. In addition, pattern recognition and machine learning are also related and can be seen as two facets of the same area. A few examples of the machine learning applications are listed on the screen.
6.6 Machine Learning in Spark
The scalable machine learning library of Spark is MLlib. It contains general learning utilities and algorithms, which include regression, collaborative filtering, classification, clustering, dimensionality reduction, and underlying optimization primitives. There are two types of API available. The primary API is original spark.mllib API and a higher-level API to construct Machine Learning workflows is Pipelines spark.ml.
6.7 Spark ML API
The APIs for machine learning algorithms are standardized by Spark ML. With this, it is easy to combine various algorithms in a single workflow or pipeline. The key concepts related to Spark ML API are listed on the screen. The first concept is of an ML dataset. Spark machine learning utilizes the Spark SQL DataFrame as a dataset. It can contain various types of data types, for example, a dataset can contain different columns that store feature vectors, predictions, true labels, and text. A transformer is defined as an algorithm that is capable of transforming one DataFrame into another. For instance, an ML model can transform an RDD with features into an RDD with predictions. An estimator is another algorithm that can produce a transformer by fitting on a DataFrame. For instance, a learning algorithm can train on a dataset and produce a model. A pipeline specifies an ML workflow by chaining various transformers and estimators together. Param is the common API to specify parameters for all transformers and estimators. We will talk about these concepts in more detail in the next screens.
Let’s first talk about DataFrames. Machine learning is applicable to various data types, which include text, images, structured data, and vectors. To support these data types under a unified Dataset concept, Spark ML includes the Spark SQL DataFrame. These DataFrames provide support to various basic types, structured types, and ML vector types. You can create a DataFrame from a regular RDD, either implicitly or explicitly.
6.9 Transformers and Estimators
Now, let’s elaborate more on transformers and estimators. A transformer consists of learned models and feature transformers. It is an abstraction and generally created by adding one or more columns. For instance, a feature transformer can take a dataset, read one of its columns, covert it into a new one, add this new column to the original dataset, and then finally output the updated dataset. Technically, a transformer uses the transform() method to convert one DataFrame to another. On the other hand, an estimator works by abstracting a learning algorithm concept or any other algorithm concept that trains or fits on data. From the technical standpoint, an estimator uses the fit () method to accept a DataFrame and produce a transformer. For instance, the LogisticRegression learning algorithm is an estimator that calls the fit () method and trains a LogisticRegressionModel, which is a transformer.
The next concept of Spark ML API is pipeline. Running an algorithms sequence for processing and learning from data is common in machine learning. For instance, the process workflow of a simple text document includes the stages listed on the screen. First, the text of every document is split into words. Then, these words are converted into a numerical feature vector. And, at the final stage, the feature vectors and labels are used to learn a prediction model. Such workflows in Spark ML are represented through pipelines. These pipelines include PipelineStages sequences, consisting of transformers and estimators, that are run in a specific order. A typical ML workflow is complex, such as depicted on the screen.
6.11 Working of a Pipeline
Let’s now understands how a pipeline works. We have already discussed that a pipeline represents a sequence of stages, where every stage is a transformer or an estimator. All these stages run in an order and the dataset that is inputted is altered as it passes through every stage. For the stages of transformers, the transform () method is used, while for the stages of estimators, the fit () method is used to create a transformer. The transformer thus resulted becomes a part of the fitted pipeline or the PipelineModel. Let’s understand this sequence with a simple text document workflow. This pipeline has three stages, two of which, Tokenizer and HashingTF, are transformers, while the third, LogisticRegression, is an estimator. The row below it shows the flow of data through the pipeline, where the cylinders represent DataFrames. The original dataset containing raw labels and text documents is being processed with the Pipeline.fit() method. The raw text documents are being split into words with the use of the Tokenizer.transform() method. This is adding a new column containing words into the dataset. The word column is being converted into feature vectors with the use of the HashingTF.transform() method, which is adding a new column containing these vectors to the dataset. Now, the pipeline first calls LogisticRegression.fit() to create the LogisticRegressionModel because LogisticRegression is an Estimator. In case it had more stages, it would have called the transform () method of LogisticRegressionModel on the dataset before passing the dataset to the next stage.
6.12 Working of a Pipeline (contd.)
Now, the pipeline is also an estimator. Therefore, it produces a PipelineModel once the fit () method is run. The PipelineModel is a transformer and is used at the test time. The image on the screen shows this use. In this image, the PipelineModel contains the same number of stages as in the original pipeline. However, all estimators have become transformers. When a test dataset is processed with the transform () method of the PipelineModel, the data in an order through the pipeline. The transform () method in each stage updates the dataset and passes it. PipelineModels and pipelines make sure that the test and training data pass through similar feature processing steps.
6.13 DAG Pipelines
So far, we have discussed linear pipelines, in which each stage uses the data that is produced by the last one. However, it is also probable that a non-linear pipeline is created as long as the graph of data flow creates a Directed Acyclic Graph or DAG. Currently, such graphs are implicitly specified on the basis of the input and output column names of every stage. The stages of a pipeline need to be specified in the topological order is the pipeline forms a DAG.
6.14 Runtime Checking
We have discussed that pipelines can operate on datasets using various types. Therefore, compile-time type checking cannot be used by them. PipelineModels and pipelines alternatively use runtime checking. This is done before running the pipeline and using the dataset schema.
6.15 Parameter Passing
The last concept of Spark ML API is param, which is the uniform API to specify parameters for estimators and transformers. It contains self-contained documentation and is a named parameter. A ParamMap represents a set of (parameter, value) pairs. You can pass parameters to an algorithm using two methods. The first method is by setting parameters for an instance. For example, you can call the given method for LogisticRegression’s instance lr. This will make lr.fit() use at most 10 iterations. This type of API is similar to the API used in MLlib. The other method is by passing a ParamMap to transform () or fit (). All parameters in this ParamMap will override the parameters that have been formerly specified using setter methods. Note that parameters are related to the specific instances of transformers and estimators.
6.16 General Machine Learning Pipeline-Example
A machine learning pipeline includes various steps, as depicted in the diagram.
6.17 General Machine Learning Pipeline-Example (contd.)
Further, to test a model, you need to perform the steps as listed on the screen. You need to first prepare training data. For this, you can use a case class called LabeledPoint. The RDDs of case classes are converted by Spark SQL into DataFrames. Here, it uses the metadata of the case class for inferring the schema. Next, you need to create an estimator instance of LogisticRegression. You can use setter methods to set parameters. After this, you need to learn a LogisticRegression model and then apply the model to make the prediction on the new incoming data with the use of the Transformer.transform() method.
6.18 Model Selection via Cross-Validation
Model selection is an important task in machine learning. It includes the use of data to figure out the best parameters or model for a task. It is also termed as tuning.Pipelines facilitate model selection, which does not tune each element in the pipeline separately and makes tuning an entire pipeline in one go easy. At present, spark.ml uses the CrossValidator class to support model selection. This class takes an evaluator, an Estimator, and a set of ParamMaps. It splits the dataset into a folds set. These folds are used as different test and training datasets. For instance, with three folds, the class will generate three (training, test) dataset pairs. Each of these pairs utilizes 2/3 of the data for training and 1/3 of the data for testing. The class iterates through the set of ParamMaps. The class, for each of the ParamMaps, trains the specific estimator and evaluates it with the use of the specific evaluator. The ParamMap producing the best evaluation metric among all models is considered as the best model. Finally, the class uses this best ParamMap and the complete dataset to fit the estimator.
6.19 Supported Types, Algorithms, and Utilities
The types, algorithms, and utilities included in the spark.mllib, which is the main MLlib API, are listed on the screen. These are categorized under data types, basic statistics, classification and regression, collaborative filtering, clustering, dimensionality reduction, feature extraction and transformation, frequent pattern mining, and optimization. We will discuss these categories in detail in the next screens.
6.20 Data Types
MLlib provides support to different data types listed on the screen. A local vector includes 0-based indices, integer-based indices, and double-typed values. These values are saved on a single machine. There are two types of local vectors supported by MLlib, which are sparse and dense. A sparse vector is backed up by two parallel arrays, which are values and indices. On the other hand, a dense vector is backed by a double array that represents its entry values. A labeled point is also a local vector, but it is related with a label or a response. These are utilized in supervised learning algorithms in MLlib. To use labeled points in both classification and regression, a double is used to store a label. In case of multiclass classification, labels should be class indices that start from zero. However, in case of binary classification, these should be 0 or 1. A local matrix includes double-typed values, and integer-typed row and column indices. Like local vectors, these values are also saved on a single machine. In this case, only dense matrices are supported by MLlib. The related entry values are saved in a single double array in column major. The code given on the screen shows how to create a dense local vector.
6.21 Feature Extraction and Basic Statistics
Term Frequency—Inverse Document Frequency or TF-IDF is defined an easy way for generating feature vectors using text documents such as web pages. It is calculated as two statistics for every term in every document: TF that is defined as the number of times the term appears in the particular document and IDF that is defined as the measure how often a term appears in the entire document corpus. The product of TD per times IDF signifies the relevance of a term with respect to a specific document. The code given on the screen shows how to compute column summary statistics.
Clustering is defined as an unsupervised learning problem in which the objective is to group the entities subsets on the basis of some idea of similarity. It is generally used as a hierarchical supervised learning pipeline component or for exploratory analysis. MLlib supports various models of clustering, which include K-means, Gaussian mixture, Power iteration clustering or PIC, Latent Dirichlet allocation or LDA, and Streaming k-means. We will discuss about these models in detail in the next screens.
K-means works by clustering the data points into a predefined clusters number. It is one of the most generally used clustering algorithms. A parallelized variant of the k-means++ method is also included in the MLlib implementation, which is given on the screen. The MLlib implementation includes the listed parameters. K is defined as the number of required clusters, maxIterations is defined as the maximum number of iterations to run, and initializationMode specifies random initialization or initialization via the given k-means method. Runs is defined as the number of times to run the k-means algorithm, initializationSteps defines the number of steps in the k-means algorithm, while the last parameter epsilon signifies the distance threshold within which you consider k-means to have converged.
6.24 K-Means (contd.)
The flowchart on the screen shows how the K-means algorithm works.
6.25 Demo-Perform Clustering Using K-Means
This demo will show the steps to perform clustering using K-means in Spark.
6.26 Perform Clustering Using K-Means
In this demo, you will learn how to perform clustering, which is also known as un-supervised learning using the K-Means technique in Spark. We are going to run this example in spark-shell. Before we write our logic we need to import Kmeans, KmeansModel, and Vectors classes into the spark-shell. Note the three import statements being shown on the screen. Let’s read the kmeans sample data from HDFS using the sc.textFile() method. We are going to convert this data into double and cache them into memory. Let’s define parameters such as number of clusters and number of iterations Let’s use Kmeans to train the data by the train method of the Kmeans class and by passing data and other parameters to it. We compute the WSSE to evaluate the goodness of fit for the training data. You can see that WSSE for Kmeans is 0.12 for the given data.
6.27 Gaussian Mixture
A Gaussian mixture model signifies a compound distribution. In this, points are drawn from one of the k Gaussian sub-distributions. Each of these sub-distributions has its own probability. The expectation-maximization algorithm is used by the MLlib implementation for inducing the maximum-likelihood model given a samples set. The parameters of the implementation are listed on the screen. K is defined as the number of required clusters and convergenceTol is defined as the maximum change in log-likelihood at which you think convergence is achieved. maxIterations is defined as the maximum number of iterations for performing without reaching convergence, while initialModel is an optional starting point from which the EM algorithm is started. In case this parameter is deleted, a random starting point is constructed from data.
6.28 Power Iteration Clustering (PIC)
PIC is an efficient and scalable algorithm that is used to cluster a graph vertices, in which the graph is provided pairwise similarities as edge properties. It uses power iteration to compute a pseudo-eigen vector of the graph normalized affinity matrix for clustering vertices. At the backend, MLlib has a PIC implementation using GraphX. It outputs a model along with the clustering assignments by taking an RDD of tuples. These tuples are of srcId, dstId, and similarity, where similarities need to be non-negative. The similarity measure is assumed symmetric by PIC. In the input data, a pair should appear at most once, irrespective of ordering. In case a pair missing from the input, the related similarity is considered as 0. The MLlib implementation includes the listed parameters. K is defined as the number of clusters, while maxIterations is the maximum number of power iterations, initializationModel is the initialization model, which can be default “random” for using a random vector as vertex properties or “degree” for using normalized sum similarities.
6.29 Latent Dirichlet Allocation (LDA)
LDA works by inferring topics from a text documents collection. It is a topic model and can be considered as a clustering algorithm as given on the screen. Topics are related to cluster centers and documents are related to rows or examples in a dataset. Both topics and documents reside in a feature space. Here, feature vectors are word counts vectors. Instead of using a traditional distance to estimating a clustering, LDA utilizes a function that is based on a statistical model of documents generation.
6.30 Latent Dirichlet Allocation (LDA) (contd.)
LDA provides support to various inference algorithms via the setOptimizer function. While OnlineLDAOptimizer is generally memory friendly and utilizes iterative mini-batch sampling for online variational inference, EMLDAOptimizer utilizes expectation-maximization on the likelihood function to learn clustering and provide comprehensive results. LDA provides topics and topic distributions for documents after fitting on the documents. The parameters taken by LDA are listed on the screen. K is defined as the number of topics, that is, cluster centers, and maxIterations is defined as the limit on the number of iterations of EM used for learning. docConcentration is a Hyperparameter used for prior over distributions of documents over topics. At present, it needs to be greater than 1, where for smoother inferred distributions, larger values should be used. topicConcentration is also a Hyperparameter used for prior over distributions of topics over terms or words. At present, it needs to be greater than 1, where for smoother inferred distributions, larger values should be used. In case checkpointing is being used, checkpointInterval denotes the frequency with which checkpoints are created. Using checkpointing can help in reducing the shuffle file sizes on disk if maxIterations is large. It can also help with failure recovery. You should note that LDA is a new feature and hence some functionality is missing. To be specific, it does not provide support to prediction on new documents yet. In addition, it does not include a Python API.
6.31 Collaborative Filtering
Collaborative filtering is generally used for recommender systems. The objective of these techniques is to complete a user-term association matrix entries that are missing. At present, MLlib provides support to model-based collaborative filtering. In this, products and users are explained though a small latent factors set. These factors are usable for predicting the missing entries. To learn these factors, MLlib utilizes the alternative least squares or ALS algorithm. The MLlib implementation includes the parameters listed on the screen. numBlocks is defined as the number of blocks that are used for parallelizing computation and rank is defined as the number of latent factors in the model. Iterations is defined as the number of iterations to run, while lambda defines the parameter of regularization in ALS. implicitPrefs defines what should be used out of the variant adapted for implicit feedback data or the explicit feedback ALS variant. alpha is a parameter that is applicable to the implicit feedback ALS variant governing the baseline confidence in preference observation.
MLlib provides support to different regression analysis, binary classification, and multiclass classification methods. The table shown on the screen lists and explains the supported algorithms for every problem type.
6.33 Classification (contd.)
Two examples of classification are shown on the screen.
In linear regression, multiple procedures have been developed for the purpose of parameter inference and estimation. These procedures vary in terms of presence of a closed-form solution, robustness in terms of heavy-tailed distributions, and computational simplicity of algorithms. They also differ with respect to theoretical assumptions that are required for validating the desirable statistical properties like asymptotic efficiency and consistency. One of the statistical problem-solving method is linear least squares. This allows to increase the accuracy of solution approximation with respect to the complexity of a specific problem.
6.35 Example of Regression
For regression problems, linear least squares is the most general formulation. With the use of various regularization types, different related regression methods are derived. No Regularization is used by the general least squares or linear least squares. While Lasso uses L1 regularization, ridge regression uses L2 regularization. The training error or average loss of all these models is termed as the mean squared error. At the back, a simple distributed version of stochastic gradient descent or SGD is implemented by MLlib. It builds on the gradient descent primitive underlying. All the algorithms provided take a regularization parameter as an input. These also take different parameters that are related with SGD. The code given on the screen shows how to implement linear regression.
6.36 Demo-Perform Classification Using Linear Regression
This demo will show the steps to perform classification using linear aggression in Spark.
6.37 Perform Classification Using Linear Regression
In this demo, you will learn how to perform classification, also known as supervised learning, using linear regression in Spark. First, we are going to write a Scala class “LinearRegression”, which will have case class called Params for holding variables such as the number of iterations. Using OptionParser will parse the Params and assign these variables to the scala option data type. In the run method, we are going to load the sample data and split it into the test and training data. After that, we are going to perform data testing to build a model using the least square technique. Now, we are going to use LinearRegressionWithSGD to build a simple linear model to predict the label values. Here, we are setting all the required parameters such as number of iterations, stepsize, updater and reg param. Once we have a model created by running the algorithm, we can use that for doing the prediction for the test data. We compute the mean squared error to evaluate the goodness of fit for the training data. See the sample data going to be used for this example.
6.38 Demo-Run Linear Regression
This demo will show the steps to run linear regression in Spark.
6.39 Run Linear Regression
In this demo, you will learn how to run linear regression in spark. You need to go to the execute command as shown on the screen to execute this program. Type the given Class name including the given package name. Note that if the file is local file, you need to prefix the input file path as shown. Once you execute this command, spark mLlib program will get executed. As you can see on the screen, it has used 405 training datasets and 96 testing datasets. Test RMSE, which is an indicator of effectiveness of this algorithm, is 10.22 as shown on the screen.
6.40 Demo-Perform Recommendation Using Collaborative Filtering
This demo will show the steps to perform recommendation.
6.41 Perform Recommendation Using Collaborative Filtering
In this demo, you will learn how to perform recommendation using collaborative filtering (Alternating Least Squares algorithm) in Spark. Collaborative Filtering or CF is a subset of algorithms that exploits other users and items along with their ratings and target user history to recommend an item that the target user does not have ratings for. First, we are going to write a scala class “MovieLensALS” class which will have the case class called Params for holding variables such as number of iterations. Using OptionParser will parse the Params and assign these variables to the scala option data type. In the run method, we are going to load the sample data and split it into the test and training data. After that we are going to perform data testing to build a model using the least square technique. Now, we are going to use Alternating Least Squares to build a recommendation system. Here, we are setting all the required parameters such as rank, number of iterations, lambda, ImplicitPrefs, UserBlocks, and ProductBlocks. We compute the Root Mean Squared Error to evaluate the goodness of fit for the training data. Once we have a model created by running the algorithm, we can use that for doing the prediction for the test data.
6.42 Demo-Run Recommendation System
This demo will show the steps to run the recommendation system in Spark.
6.43 Run Recommendation System
In this demo, you can learn how to run the recommendation system in Spark to recommend movies to users. You need to go to the execute command as shown on the screen to execute this program. Type the given Class name including the given package name. Note that if the file is local, you need to prefix the input file path as shown. Once you execute this command, spark MLlib program will get executed . As you can see on the screen, it has used 1501 ratings from 30 users on 100 movies. Out of 1501 ratings datasets, it has taken 1199 for training and rest 302 for testing the model. Test RMSE which is an indicator of effectiveness of this algorithm is 1.47 as shown on the screen.
Next, let’s answer a few questions based on your understanding of the concepts covered in this lesson.
Let us summarize the topics covered in this lesson: • Machine Learning is a sub field of Artificial Intelligence that has empowered various smart applications. • Some terminologies used in ML are feature vector, feature space, samples, and labelled data. • The scalable machine learning library of Spark is MLlib. • Spark ML includes the Spark SQL DataFrame to support ML data types. • A transformer consists of learned models and feature transformers. • Workflows in Spark ML are represented through pipelines. • A non-linear pipeline can also be created as long as the graph of data flow creates a DAG. • Param is the uniform API to specify parameters for estimators and transformers. • Model selection includes the use of data to figure out the best parameters or model for a task. • MLlib supports data types including local vector, labeled point, and local matrix. • TF-IDF is defined an easy way for generating feature vectors using text documents such as web pages.
6.46 Summary (contd.)
• Clustering is defined as an unsupervised learning problem in which the objective is to group the entities subsets on the basis of some idea of similarity. • K-means works by clustering the data points into a predefined clusters number. • A Gaussian mixture model signifies a compound distribution. • PIC is an efficient and scalable algorithm that is used to cluster a graph vertices. • LDA works by inferring topics from a text documents collection. • Collaborative filter is used to complete a user-term association matrix entries that are missing. • MLlib provides support to different regression analysis, binary classification, and multiclass classification methods. • In linear regression, multiple procedures have been developed for the purpose of parameter inference and estimation.
With this, we come to the end of the lesson 6 “Spark ML Programming” of the Apache Spark and Scala course. The next lesson is Spark GraphX Programming.
About the On-Demand Webinar
About the Webinar