Using RDD for Creating Applications in Spark Tutorial

Introduction

Welcome to the third chapter of the Apache Spark and Scala tutorial (part of the Apache Spark and Scala course.) This lesson will explain how to use RDD for creating applications in Spark.
Let us explore the objectives of RDD for creating applications in the next section.

Objectives

After completing this lesson, you will be able to:

  • Explain the features of RDDs

  • Explain how to create RDDs

  • Describe RDD operations and methods

  • Discuss how to run a Spark project with SBT

  • Explain RDD functions, and

  • Describe how to write different codes in Scala

We will begin with an introduction to RDDs API in the next section.

RDDs API

An RDD acts like the workhorse of Spark, as it can be considered as a handle for a collection of individual data partitions. Actually, RDDs are more than that. In case of cluster installations, different data partitions can be on different nodes. RDDs, acting as handles, provide the capability to access all partitions. They also allow you to perform computations and transformations using the contained data. 

In case the entire or a part of RDD is lost, they can be reconstructed by using lineage information. Lineage means the sequence of transformations that are used to produce the current RDD. 

An RDD derives directly or indirectly from the class RDD. This class contains various methods that perform operations within the associated partitions on the data. It is an abstract class. Using an RDD means that you are actually using a concertized implementation of RDD.

Spark has recently become very popular to process big data. The reason is that it does not have restrictions regarding what data can be stored within partitions.

In addition, the RDD API contains various useful operations; however, various convenience functions are missing. The reason is that the Spark creators needed to keep the core API that could be common enough to handle arbitrary data-types.

Basically, an RDD API considers every data item as a single value. But, you would want to work with key-value pairs, which Spark provides through its extended capability to support PairRDDFunctions. 

In the next section of the tutorial, we will discuss features of RDD.

Features of RDDs

RDDs are merely distributed collections of elements. All Spark tasks are expressed in terms of RDDs such as creating new RDDs, transforming the existing RDDs, and calling operations on RDDs for computing results.

Spark under the hood distributes the data included in RDDs automatically across your cluster. It then parallelizes the operations performed by you. The key features of RDDs are listed below.

Immutable

RDDs are immutable, which means that once they are created, they never change. This feature helps to parallelize and also allows to cache data spread between different cluster nodes. 

Lazy Evaluated

RDDs are lazy evaluated too. when you define an RDD, it does not contain any data. It is only when the data is referenced, the computation to create the data in an RDD is done. This feature defers evaluation and allows to separate execution from evaluation. Lazy transformations also allow recreating data on failure. 

Cacheable

In addition, RDDs are cacheable, which improves execution engine performance.

Type Inferred

Also, the type inference feature is a part of the compiler to determine the type by value.

All transformations are free from side effects. Therefore, you can determine the type by the operation. Each transformation includes a specific return type. With this feature, you can be less worried about the representation of many transforms.

In the next couple of sections of the tutorial, we will discuss creating RDDs.

Interested in learning more about Apache Spark & Scala? Enroll in our Apache course today!

Creating RDDs

To create RDDs, you can either parallelize an existing collection or reference an external dataset. To create parallelized collections, you would need to call the parallelize method of SparkContext on a collection that exists in your driver program. Consider the given example, which is creating a parallelized collection with numbers 1 to 5.

In addition, one can build these RDDs by referencing any Hadoop supported storage source, which includes a HDFS, HBase, your shared file system, or any data source offering a Hadoop InputFormat.

Creating RDDs—Referencing an External Dataset

Let’s talk about creating RDDs by referencing an external dataset in detail. Spark mainly supports text files, SequenceFiles, and other Hadoop InputFormats. The image below shows how RDDs undergo transformations and interact with the external world.
creating rdds in spark

In the next subsequent sections of this RDD tutorial, we will discuss referencing an external dataset.

Referencing an External Dataset—Text Files

To create text file RDDs, use the textFile method of SparkContext. First, the textFile method considers a URI for a locally residing file or the given file. It then reads the file as lines collection.

An example to use the method is given below.
referencing an external dataset

Once the text file RDDs are created, dataset operations can act upon them. For instance, the sizes of all the lines can be added using the reduce and map.  

An example is displayed above.

Note that you can also use the SparkContext.wholeTextFiles method, which allows reading a directory with various small text files. This method then returns each file as different pairs of filename and content. This is in contrary to the textFile method, which gives one record for each line in each file.

Referencing an External Dataset—Text Files (contd.)

You must remember a few points about reading files.

When you use a local path that exists on the file system, the related file needs to be available at the same location on worker nodes. You can use a network-mounted shared file system or copy the file to all workers.

All file-based input methods of Spark, which include textFile, support to run on compressed files, directories, and wildcards.
To control the number of partitions of the file, the textFile method also takes a second argument, which is optional. For each block, Spark by default creates one partition. However, you can ask for more partitions. To do this, you need to pass a larger value.

You must remember that the number of partitions must be higher than blocks.

Referencing an External Dataset—Sequence Files

For SequenceFiles, use the SparkContext’s sequenceFile[K, V] method. Here, K and V represent the types of key and values in the file respectively. For this method, note that the parameters K and V should be subclasses of the writable interface of Hadoop such as Text and IntWritable.

You can also specify native types for a few common writables. The method will automatically read Texts and IntWritable.

Referencing an External Dataset—Other Hadoop Input Formats

In case of other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method. Note that this method considers an arbitrary input format class and JobConf, value class, and key class. You need to set them in the similar manner as done for a Hadoop job.

In addition, the SparkContext.newAPIHadoopRDD method can also be used. This method is based on the new MapReduce API.  

Note that SparkContext.objectFile and RDD.saveAsObjectFile support to save an RDD in an easy-to-understand format along with Java objects that are serialized. However, these methods are not as effective as others, but they provide an easy method to save an RDD.

We will discuss creating RDDs in the next section.

Creating RDDs—Important Points

Once RDDs are created, the distributed dataset, that is distData, can be operated on in parallel.

For parallel collections, the number of partitions for the dataset is an important parameter. Spark runs one task per cluster partition. However, in typical cases, you need 2-4 of them for each of your CPUs in your cluster.

Generally, Spark attempts setting the number of partitions automatically depending on your cluster. But, they can be set manually.

To do so, pass it as a second parameter to parallelize. An example is depicted on the screen. 

creating rdds important points
The first image on the screen shows that data processing can be performed in Spark by reading data from the disk and loading it as an RDD in memory. After the first transformation, data can be stored on a hard disk that can again be loaded as an RDD to perform a transformation and then store the final result on the hard disk.

The second image shows the output of each transformation that will be stored only on the hard disk.

We will discuss RDD operations in the next section of this tutorial.

RDD Operations

RDDs provide support to two different types of operations, which are transformations and actions. Transformations allow creating a dataset from a present one. For example, map passes the elements of every dataset using a function and gives an RDD with results.

However, actions allow returning a value to the driver program. Actions return values once a computation on the dataset is run. For example, reduce aggregates all RDD elements using a function. It then provides the final result to the driver program.

The table shown below shows some more transformations and actions.

rdd operations in spark
In the next section, we will discuss transformations in RDD operations.

RDD Operations—Transformations

All Spark transformations are lazy, which means that they do not compute the results immediately. These are computed only in the case when an action requires that a result should be returned to the driver program.

Due to this feature, Spark is capable of running more proficiently. As an example, you can see that if you create a dataset using the map feature, it is used in a reduce. It returns the result to the driver of only the reduce.

As the default, each transformed RDD may get recomputed in every instance when you run an action on it. For persisting an RDD in memory, you can use the cache or persist method. By doing so, Spark allows faster access at the time of next time query by keeping the elements around on the cluster.

In the next section, we will discuss features of RDD persistence.

Features of RDD Persistence

RDD persistence is considered to one of the most important Spark traits. As of its feature, every node stores any of its partitions that is computed within memory. It is reused in all other actions that are in the dataset or any other derived dataset. Due to this capability, future actions are much faster. Therefore, caching helps in iterative algorithms and interactive use.

The cache is fault-tolerant. This means that any lost RDD partition will be automatically recomputed using the original transformations.

To store every persisted RDD, you can use a different storage level. For instance, for persisting the disk dataset, it can be persisted in memory, however as serialized Java objects for saving space, replicating it among nodes, and storing it off-heap in Tachyon. To set these levels, you can pass a StorageLevel object to the persist() method.

In the next section, we will discuss storage levels of RDD persistence.

Storage Levels Of RDD Persistence

As discussed, using the persist() method, you are allowed to specify the desired storage level. You can use this method for assigning any other storage level other than the default one.

The table given below lists and explains all the storage levels of RDD persistence.

storage levels of rdd persistence
We will discuss Choosing The Correct RDD Persistence Storage Level in the next section.

Choosing The Correct RDD Persistence Storage Level

Storage levels provide trade-offs between CPU efficiency and memory usage. Therefore, you must choose the one that fits your needs. In case your RDDs are fitting well with the default storage level, it is recommended that you leave them in the same manner. The reason is that it provides the maximum CPU efficiency and hence allows running the operations on RDDs as fast as possible.

However, if you do not want to use the default one that leaving them is not possible, try the one given on the screen and select a serialization library that is fast. This will help in making the objects comparatively more efficient in terms of space, however practically fast in accessing.

You should not perform the disk spill except if functions that have computed the datasets filter a large amount of the data or are expensive. If you do so, the process of recomputing a partition can become as efficient as if you are reading it from a disk.

In case is fast fault recovery is required, you should use the replicated storage levels. Although all storage levels provide fault tolerance, using replicated storage levels allow continuing to run tasks on the RDD while you don’t have to wait for recomputing a partition that is lost.

In case environments have various applications or high memory amounts, you can use the experimental OFF_HEAP storage level. Using it has many advantages like multiple executors sharing the same memory pool in Tachyon and reduced costs of garbage collection.

In addition, the data that got cached does not get lost in a case separate executors get crashed.

In the next section, we will discuss invoking the spark shell.

Invoking the Spark Shell

The Spark shell provides an easy way to learn the API. In addition, it is a powerful tool that allows analyzing data interactively. It is available in Scala or Python.  

To run the same, you need first to access the Spark Home directory and then run the given commands applicable to Scala and Python. 

invoking spark shell
In the next section, we will discuss importing spark classes.

Importing Spark Classes

Once the shell is invoked, you need to create import some Spark classes into your program by executing the codes given below. Note the different codes applicable to Scala, Java, and Python. 
importing spark classes

We will discuss creating the sparkcontext in the next section.

Creating the SparkContext

Next, you need to create a SparkContext instance to interact with Spark and distribute jobs. A SparkContext class provides a connection to a Spark cluster and hence offers the entry point to interact with Spark. To create it, you can execute the given codes applicable to Scala, Java, and Python. These codes set the application name and Spark Master details.

creating spark context
In the next section, we will discuss loading a file in shell.

Loading a File in Shell

Let’s now create a new RDD from the README file text in the Spark source directory. For this, you can execute the given code. 

loading file in shell
We will discuss performing basic operations on files in spark shell RDDs in the next section.

Performing Some Basic Operations on Files in Spark Shell RDDs

Let’s now perform a few actions on files in the Spark shell RDDs. The first action is to get the count from a file. Consider the given example to do the same. The next action is to get the first element from a file.

In the next section, we will discuss packaging and running a spark project with SBT.

Packaging a Spark Project with SBT

To package Spark, Maven is the official recommendation. It is the build of reference. However, for day-to-day development, SBT is supported. The reason is that it provides much faster iterative compilation. It is used by more advanced developers. Its build is derived from the POM files of Maven.

To package a Spark project with SBT, you need to create it first.
An example of such code is given below.
packaging spark project with sbt
This will set the same Maven profiles and variables to control the build.

Next, you need to test with SBT. Note that a few tests need Spark to be packaged first. So you should always run the build/sbt assembly at the first time.

The example given below shows the same.

Running a Spark Project With SBT

Let’s now view how to run a Spark project with SBT. To run only a specific test suite, you can use the code given below. Similarly, to run test suites of a specific subproject, use the given code.

running spark project with sbt
You should cache the RDD using the same context and reuse it for other jobs. In this manner, you only cache once and use it many times. In addition, you should use external caching solutions such as Tachyon.

Shared Variables—Broadcast

Let us now talk about shared broadcast variables, which allow you to save a read-only variable that is cached on every machine.

In general situations, its copy is shipped with tasks.

The variables are capable of providing each node a large input dataset copy efficiently.

These variables can be distributed by the use of effective broadcast algorithms, which reduces the cost of communication.

The Spark actions are performed via a stages set that are disconnected by distributed “shuffle” operations. However, broadcast variables allow broadcasting the data that is common and is required by tasks during every stage. In this way, the data that is broadcasted is cached in the serialized form and deserialized before every task is run. 

They are created by calling the SparkContext.broadcast(v) method from the variable v. They provide a wrapper around v. 

In addition, these variables should be used in place of v in all functions that are running on a cluster. This ensures that v doesn’t get shipped more than a time to the nodes. This object is recommended not to be changed once it is broadcasted for making sure that all of the nodes receive the same broadcast value. 

The example given below shows the creation of broadcast variables.
shared variables in spark 1

Shared Variables—Accumulators

Another type of shared variables is accumulators, which are added only to through an associative operation. Therefore, they are efficiently supported in parallel. They are used to implement counters or sums. 

Natively, Spark supports numeric types accumulators; however, you can add support for new types. They are displayed in the UI of Spark if created with a name. Due to this, it is useful to understand the running stages progress. 
To create an accumulator, you need to use the SparkContext.accumulator(v) method. Then, the tasks running on the cluster can add to it using the add method or the plus equal to operator. Note that only the driver program can read the values of accumulators using the value method.

The code given below shows an accumulator that is being used to add an array element.

shared variables in spark 2
We will discuss writing a scala application in the next section.

Writing a Scala Application

Now that you have learned the basic concepts of RDD and caching levels let’s view a simple Scala application.

In this example, the text file README.MD is being loaded and cached in the memory. Post this, the number of lines is being counted containing characters ‘a’ and ‘b.’
writing scala applications

Scala RDD Extensions

We discussed that Spark has PairRDDFunctions as its extensions. There are three more extensions, DoubleRDDFunctions, OrderedRDDFunctions, and SequenceFileRDDFunctions.

The DoubleRDDFunctions extension contains methods to aggregate numeric values that become available when an RDD data item can be implicitly converted to the Scala data-type double.

The PairRDDFunctions extension includes methods that become available when the data items have a two-component tuple structure. Spark interprets the first item as the key and the second one as the associated value.

The OrderedRDDFunctions extension includes methods that become available if the data items are two-component tuples. Here, the key is sortable implicitly.

The SequenceFileRDDFunctions extension includes methods that let you create Hadoop sequences from RDDs. Data items must be two component key-value tuples, which are needed by PairRDDFunctions.

In the next subsequent sections, we will discuss various methods involved in RDD.

DoubleRDD Methods

Let’s now learn about DoubleRDD functions or methods. These are listed and explained in the given table.
double rdd methods

PairRDD Methods—Join

If you consider the simple join operator, it is an inner join. The keys that are only present in both pair RDDs come as output. However, in case of various values for the same key, the output RDD has an entry for each possible pair of values. For PairRDD functions, you can use reduceByKey() that can aggregate data separately for each key.  

An example to use it is given below.
pair rdd methods 1

This code uses the reduceByKey operation on key-value pairs to count how many times each line of text occurs in a file. PairRDD also has the join() method that can merge two RDDs by grouping elements with the same key.

An example to use the same is given above.

PairRDD Methods—Others

The table below lists and explains other PairRDD methods.
pair rdd methods 2

Java PairRDD Methods

Let’s now learn about Java PairRDD functions or methods. These are listed and explained in the given table.
java pair rdd methods 1

Java PairRDD Methods (contd.)

A few more Java PairRDD methods are listed and explained too.
java pair rdd methods 2

General RDD Methods

The table shows and explains general RDD methods.
general rdd methods 1

General RDD Methods (contd.)

A few more general RDD methods are also listed.
general rdd methods 2

Java RDD Methods

Let’s now talk about the Java RDD methods. These are shown in the table below.
java rdd methods 1

Java RDD Methods (contd.)

A few more Java RDD methods are also listed and explained.
java rdd methods 2

Common Java RDD Methods

Now, we will discuss common Java RDD methods. These are listed and explained in the table displayed below.
common java rdd methods

Spark Java Function Classes

The table below lists the function classes that are used by the Java API. Every class has a single abstract method, which is call() and must be implemented.
spark java function classes

Method for Combining JavaPairRDD Functions

To combine JavaPairRDD functions, you can use the given method that aggregates the elements of each partition. It then aggregates the results for all the partitions using neutral "zero value" and combine functions.

Using this method, you can return a different result type, U, then the type of this RDD, T. Therefore, you would need two operations, one operation for merging a T into a U and one operation for merging two U's. The other one is listed below. Both of them can modify and return their first argument and not create a new U for avoiding the problem of memory allocation.

Transformations in RDD

Transformations in RDD are sample, map, filter, and groupByKey. 

A Sample Returns a random sample subset RDD of the input RDD.

A Map passes each element through function.

A filter, on the other hand, creates a new RDD by passing in a function used to filter the results. Consider the given example.
transformations in rdd 1

However, groupByKey returns a dataset of given pairs. An example is shown below.
transformations in rdd 2

Other Methods

Other methods used in RDD are listed and explained in the table below.
other methods used in rdd

Interested in learning more about Apache Spark & Scala? Enroll in our Apache course today!

Actions in RDD

Let’s now talk about actions in RDD. These are given in the table shown below with their syntax.
actions in rdd

We will discuss key-value pair in scala in the next section.

Key-Value Pair RDD in Scala

Some special operations are only available on RDDs of key-value pairs. These operations are available automatically on RDDs with Tuple2 objects in the PairRDDFunctions class that automatically wraps around an RDD of tuples.

A common example is shuffle operations like aggregating or grouping the elements by a key.

In the given example, the reduceByKey operation on key-value pairs is being used for counting the number of times each line of text occurs in a file.
key-value pair rdd in scala

We will discuss key-value pair in Java in the next section.

Key-Value Pair RDD in Java

In Java, these pairs are represented using the scala.Tuple2 class that exists in the Scala standard library. For this, you can call new Tuple2(a, b) and access its fields later with the given tuples.

On the other hand, key-value pairs’ RDDS are characterized by the JavaPairRDD class. For this, you can build JavaPairRDDs from JavaRDDs by using special versions of the map operations such as flatMapToPair and mapToPair.  

Consider the given example that is using the reduceByKey operation on key-value pairs for counting the number of times each line of text occurs in a file. 
key-value pair rdd in java

We will discuss MapReduce and RDD Operations in the next section.

Using MapReduce and Pair RDD Operations

Consider the given example to use MapReduce and Pair RDD operations.

The highlighted code outputs the RDD to any Hadoop-supported file system as it uses a Hadoop OutputFormat class. The class is supporting the key and value types K and V.

This code outputs the RDD to any Hadoop-supported file system as it uses the new Hadoop API OutputFormat object.

using mapreduce and paid rdd operations in spark
In the next subsequent sections, we will discuss reading, writing text and sequence files.

Reading Text File from HDFS

To perform batch analytics, Spark reads files from HDFS.

In the given example, a text file is being read from HDFS using the hadoopFile method of SparkContext and being printed as a list.
reading text file from hdfs

Reading Sequence File from HDFS

SequenceFile provides a file format for storing data as serialized key-value pairs used in Hadoop. You can read sequence files stored in HDFS and perform transformation of the data stored.

In the given example, a sequence file is being read by using customized Writable class and being printed as a list.
reading sequence file from hdfs

Writing Text Data to HDFS

You can store the result back into HDFS for persistence once the transformation is done or the business logic is applied on the RDD.

For example, here a text file is being written into HDFS using the saveAsTextFile method.
writing text data to hdfs

Writing Sequence File to HDFS

In the example given below, customized Writable class salesRecordWritableRDD is being used for writing data as sequence file in HDFS.
writing sequence file to hdfs

We will discuss using a groupby in the next section.

Using GroupBy

In this example, Scala is being used for performing a group by operation in Spark. 
using group by operations 1
The further code is displayed.
using group by operations 2

Summary

Let us summarize the topics covered in this lesson:

  • There are two ways to create RDDs: parallelize an existing collection and reference an external dataset.

  • Spark supports text files, SequenceFiles, and other Hadoop InputFormats.

  • RDDs support two types of operations: transformations and actions.

  • Various features of RDDs are immutable, persistence, lazy evaluated, and more.

  • Choose the applicable storage level, as there are trade-offs between memory usage and CPU efficiency.

  • To invoke the Spark shell, go to the home directory and run the applicable code.

  • A few actions that can be performed on files in Spark shell RDDs are getting the count from a file and getting the first element from a file.

  • To build a Spark project with SBT, create the build and test it.

  • Broadcast variables allow keeping a read-only variable cached on every machine, while accumulators are added only to through an associative operation.

  • An RDD acts as a handle for a collection of individual data partitions.

  • In Spark, there are four extensions to the RDD API: DoubleRDDFunctions, PairRDDFunctions, OrderedRDDFunctions, and SequenceFileRDDFunctions.

  • The given method aggregates the elements of each partition.

  • Transformations in RDD include a sample, map, filter, and groupByKey.

  • Some special operations are only available on RDDs of key-value pairs.

  • In Java, these are represented using the scala.Tuple2 class.

Conclusion

With this, we come to the end of the third chapter “Using RDD for Creating Applications in Spark” of the Apache Spark and Scala course. The next chapter is Running SQL Queries using Spark SQL.

  • Disclaimer
  • PMP, PMI, PMBOK, CAPM, PgMP, PfMP, ACP, PBA, RMP, SP, and OPM3 are registered marks of the Project Management Institute, Inc.

Request more information

For individuals
For business
Name*
Email*
Phone Number*
Your Message (Optional)
We are looking into your query.
Our consultants will get in touch with you soon.

A Simplilearn representative will get back to you in one business day.

First Name*
Last Name*
Email*
Phone Number*
Company*
Job Title*