Welcome to the third chapter of the Apache Spark and Scala tutorial (part of the Apache Spark and Scala course). This lesson will explain how to use RDD for creating applications in Spark.
Let us explore the objectives of RDD for creating applications in the next section.
After completing this lesson, you will be able to:
Explain the features of RDDs
Explain how to create RDDs
Describe RDD operations and methods
Discuss how to run a Spark project with SBT
Explain RDD functions, and
Describe how to write different codes in Scala
We will begin with an introduction to RDDs API in the next section.
An RDD acts like the workhorse of Spark, as it can be considered as a handle for a collection of individual data partitions. Actually, RDDs are more than that. In case of cluster installations, different data partitions can be on different nodes. RDDs, acting as handles, provide the capability to access all partitions. They also allow you to perform computations and transformations using the contained data.
In case the entire or a part of RDD is lost, they can be reconstructed by using lineage information. Lineage means the sequence of transformations that are used to produce the current RDD.
An RDD derives directly or indirectly from the class RDD. This class contains various methods that perform operations within the associated partitions on the data. It is an abstract class. Using an RDD means that you are actually using a concertized implementation of RDD.
Spark has recently become very popular to process big data. The reason is that it does not have restrictions regarding what data can be stored within partitions.
In addition, the RDD API contains various useful operations; however, various convenience functions are missing. The reason is that the Spark creators needed to keep the core API that could be common enough to handle arbitrary data-types.
Basically, an RDD API considers every data item as a single value. But, you would want to work with key-value pairs, which Spark provides through its extended capability to support PairRDDFunctions.
In the next section of the tutorial, we will discuss features of RDD.
RDDs are merely distributed collections of elements. All Spark tasks are expressed in terms of RDDs such as creating new RDDs, transforming the existing RDDs, and calling operations on RDDs for computing results.
Spark under the hood distributes the data included in RDDs automatically across your cluster. It then parallelizes the operations performed by you. The key features of RDDs are listed below.
Immutable
RDDs are immutable, which means that once they are created, they never change. This feature helps to parallelize and also allows to cache data spread between different cluster nodes.
Lazy Evaluated
RDDs are lazy evaluated too. when you define an RDD, it does not contain any data. It is only when the data is referenced, the computation to create the data in an RDD is done. This feature defers evaluation and allows to separate execution from evaluation. Lazy transformations also allow recreating data on failure.
Cacheable
In addition, RDDs are cacheable, which improves execution engine performance.
Type Inferred
Also, the type inference feature is a part of the compiler to determine the type by value.
All transformations are free from side effects. Therefore, you can determine the type by the operation. Each transformation includes a specific return type. With this feature, you can be less worried about the representation of many transforms.
In the next couple of sections of the tutorial, we will discuss creating RDDs.
Interested in learning more about Apache Spark & Scala? Enroll in our Apache course today!
To create RDDs, you can either parallelize an existing collection or reference an external dataset. To create parallelized collections, you would need to call the parallelize method of SparkContext on a collection that exists in your driver program. Consider the given example, which is creating a parallelized collection with numbers 1 to 5.
In addition, one can build these RDDs by referencing any Hadoop supported storage source, which includes an HDFS, HBase, your shared file system, or any data source offering a Hadoop InputFormat.
Let’s talk about creating RDDs by referencing an external dataset in detail. Spark mainly supports text files, SequenceFiles, and other Hadoop InputFormats. The image below shows how RDDs undergo transformations and interact with the external world.
In the next subsequent sections of this RDD tutorial, we will discuss referencing an external dataset.
To create text file RDDs, use the textFile method of SparkContext. First, the textFile method considers a URI for a locally residing file or the given file. It then reads the file as lines collection.
An example to use the method is given below.
Once the text file RDDs are created, dataset operations can act upon them. For instance, the sizes of all the lines can be added using the reduce and map.
An example is displayed above.
Note that you can also use the SparkContext.wholeTextFiles method, which allows reading a directory with various small text files. This method then returns each file as different pairs of filename and content. This is in contrary to the textFile method, which gives one record for each line in each file.
You must remember a few points about reading files.
When you use a local path that exists on the file system, the related file needs to be available at the same location on worker nodes. You can use a network-mounted shared file system or copy the file to all workers.
All file-based input methods of Spark, which include textFile, support to run on compressed files, directories, and wildcards.
To control the number of partitions of the file, the textFile method also takes a second argument, which is optional. For each block, Spark by default creates one partition. However, you can ask for more partitions. To do this, you need to pass a larger value.
You must remember that the number of partitions must be higher than blocks.
For SequenceFiles, use the SparkContext’s sequenceFile[K, V] method. Here, K and V represent the types of key and values in the file respectively. For this method, note that the parameters K and V should be subclasses of the writable interface of Hadoop such as Text and IntWritable.
You can also specify native types for a few common writables. The method will automatically read Texts and IntWritable.
In case of other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method. Note that this method considers an arbitrary input format class and JobConf, value class, and key class. You need to set them in the similar manner as done for a Hadoop job.
In addition, the SparkContext.newAPIHadoopRDD method can also be used. This method is based on the new MapReduce API.
Note that SparkContext.objectFile and RDD.saveAsObjectFile support to save an RDD in an easy-to-understand format along with Java objects that are serialized. However, these methods are not as effective as others, but they provide an easy method to save an RDD.
We will discuss creating RDDs in the next section.
Once RDDs are created, the distributed dataset, that is distData, can be operated on in parallel.
For parallel collections, the number of partitions for the dataset is an important parameter. Spark runs one task per cluster partition. However, in typical cases, you need 2-4 of them for each of your CPUs in your cluster.
Generally, Spark attempts setting the number of partitions automatically depending on your cluster. But, they can be set manually.
To do so, pass it as a second parameter to parallelize. An example is depicted on the screen.
The first image on the screen shows that data processing can be performed in Spark by reading data from the disk and loading it as an RDD in memory. After the first transformation, data can be stored on a hard disk that can again be loaded as an RDD to perform a transformation and then store the final result on the hard disk.
The second image shows the output of each transformation that will be stored only on the hard disk.
We will discuss RDD operations in the next section of this tutorial.
RDDs provide support to two different types of operations, which are transformations and actions. Transformations allow creating a dataset from a present one. For example, map passes the elements of every dataset using a function and gives an RDD with results.
However, actions allow returning a value to the driver program. Actions return values once a computation on the dataset is run. For example, reduce aggregates all RDD elements using a function. It then provides the final result to the driver program.
The table shown below shows some more transformations and actions.
In the next section, we will discuss transformations in RDD operations.
All Spark transformations are lazy, which means that they do not compute the results immediately. These are computed only in the case when an action requires that a result should be returned to the driver program.
Due to this feature, Spark is capable of running more proficiently. As an example, you can see that if you create a dataset using the map feature, it is used in a reduce. It returns the result to the driver of only the reduce.
As the default, each transformed RDD may get recomputed in every instance when you run an action on it. For persisting an RDD in memory, you can use the cache or persist method. By doing so, Spark allows faster access at the time of next time query by keeping the elements around on the cluster.
In the next section, we will discuss features of RDD persistence.
RDD persistence is considered to one of the most important Spark traits. As of its feature, every node stores any of its partitions that is computed within memory. It is reused in all other actions that are in the dataset or any other derived dataset. Due to this capability, future actions are much faster. Therefore, caching helps in iterative algorithms and interactive use.
The cache is fault-tolerant. This means that any lost RDD partition will be automatically recomputed using the original transformations.
To store every persisted RDD, you can use a different storage level. For instance, for persisting the disk dataset, it can be persisted in memory, however as serialized Java objects for saving space, replicating it among nodes, and storing it off-heap in Tachyon. To set these levels, you can pass a StorageLevel object to the persist() method.
In the next section, we will discuss storage levels of RDD persistence.
As discussed, using the persist() method, you are allowed to specify the desired storage level. You can use this method for assigning any other storage level other than the default one.
The table given below lists and explains all the storage levels of RDD persistence.
We will discuss Choosing The Correct RDD Persistence Storage Level in the next section.
Storage levels provide trade-offs between CPU efficiency and memory usage. Therefore, you must choose the one that fits your needs. In case your RDDs are fitting well with the default storage level, it is recommended that you leave them in the same manner. The reason is that it provides the maximum CPU efficiency and hence allows running the operations on RDDs as fast as possible.
However, if you do not want to use the default one that leaving them is not possible, try the one given on the screen and select a serialization library that is fast. This will help in making the objects comparatively more efficient in terms of space, however practically fast in accessing.
You should not perform the disk spill except if functions that have computed the datasets filter a large amount of the data or are expensive. If you do so, the process of recomputing a partition can become as efficient as if you are reading it from a disk.
In case is fast fault recovery is required, you should use the replicated storage levels. Although all storage levels provide fault tolerance, using replicated storage levels allow continuing to run tasks on the RDD while you don’t have to wait for recomputing a partition that is lost.
In case environments have various applications or high memory amounts, you can use the experimental OFF_HEAP storage level. Using it has many advantages like multiple executors sharing the same memory pool in Tachyon and reduced costs of garbage collection.
In addition, the data that got cached does not get lost in a case separate executors get crashed.
In the next section, we will discuss invoking the spark shell.
The Spark shell provides an easy way to learn the API. In addition, it is a powerful tool that allows analyzing data interactively. It is available in Scala or Python.
To run the same, you need first to access the Spark Home directory and then run the given commands applicable to Scala and Python.
In the next section, we will discuss importing spark classes.
Once the shell is invoked, you need to create import some Spark classes into your program by executing the codes given below. Note the different codes applicable to Scala, Java, and Python.
We will discuss creating the sparkcontext in the next section.
Next, you need to create a SparkContext instance to interact with Spark and distribute jobs. A SparkContext class provides a connection to a Spark cluster and hence offers the entry point to interact with Spark. To create it, you can execute the given codes applicable to Scala, Java, and Python. These codes set the application name and Spark Master details.
In the next section, we will discuss loading a file in shell.
Let’s now create a new RDD from the README file text in the Spark source directory. For this, you can execute the given code.
We will discuss performing basic operations on files in spark shell RDDs in the next section.
Let’s now perform a few actions on files in the Spark shell RDDs. The first action is to get the count from a file. Consider the given example to do the same. The next action is to get the first element from a file.
In the next section, we will discuss packaging and running a spark project with SBT.
To package Spark, Maven is the official recommendation. It is the build of reference. However, for day-to-day development, SBT is supported. The reason is that it provides much faster iterative compilation. It is used by more advanced developers. Its build is derived from the POM files of Maven.
To package a Spark project with SBT, you need to create it first.
An example of such code is given below.
This will set the same Maven profiles and variables to control the build.
Next, you need to test with SBT. Note that a few tests need Spark to be packaged first. So you should always run the build/sbt assembly at the first time.
The example given below shows the same.
Let’s now view how to run a Spark project with SBT. To run only a specific test suite, you can use the code given below. Similarly, to run test suites of a specific subproject, use the given code.
You should cache the RDD using the same context and reuse it for other jobs. In this manner, you only cache once and use it many times. In addition, you should use external caching solutions such as Tachyon.
Let us now talk about shared broadcast variables, which allow you to save a read-only variable that is cached on every machine.
In general situations, its copy is shipped with tasks.
The variables are capable of providing each node with a large input dataset copy efficiently.
These variables can be distributed by the use of effective broadcast algorithms, which reduces the cost of communication.
The Spark actions are performed via a stages set that are disconnected by distributed “shuffle” operations. However, broadcast variables allow broadcasting the data that is common and is required by tasks during every stage. In this way, the data that is broadcasted is cached in the serialized form and deserialized before every task is run.
They are created by calling the SparkContext.broadcast(v) method from the variable v. They provide a wrapper around v.
In addition, these variables should be used in place of v in all functions that are running on a cluster. This ensures that v doesn’t get shipped more than a time to the nodes. This object is recommended not to be changed once it is broadcasted for making sure that all of the nodes receive the same broadcast value.
The example given below shows the creation of broadcast variables.
Another type of shared variables is accumulators, which are added only to through an associative operation. Therefore, they are efficiently supported in parallel. They are used to implement counters or sums.
Natively, Spark supports numeric types accumulators; however, you can add support for new types. They are displayed in the UI of Spark if created with a name. Due to this, it is useful to understand the running stages progress.
To create an accumulator, you need to use the SparkContext.accumulator(v) method. Then, the tasks running on the cluster can add to it using the add method or the plus equal to operator. Note that only the driver program can read the values of accumulators using the value method.
The code given below shows an accumulator that is being used to add an array element.
We will discuss writing a Scala application in the next section.
Now that you have learned the basic concepts of RDD and caching levels let’s view a simple Scala application.
In this example, the text file README.MD is being loaded and cached in the memory. Post this, the number of lines is being counted containing characters ‘a’ and ‘b.’
We discussed that Spark has PairRDDFunctions as its extensions. There are three more extensions, DoubleRDDFunctions, OrderedRDDFunctions, and SequenceFileRDDFunctions.
The DoubleRDDFunctions extension contains methods to aggregate numeric values that become available when an RDD data item can be implicitly converted to the Scala data-type double.
The PairRDDFunctions extension includes methods that become available when the data items have a two-component tuple structure. Spark interprets the first item as the key and the second one as the associated value.
The OrderedRDDFunctions extension includes methods that become available if the data items are two-component tuples. Here, the key is sortable implicitly.
The SequenceFileRDDFunctions extension includes methods that let you create Hadoop sequences from RDDs. Data items must be two component key-value tuples, which are needed by PairRDDFunctions.
In the next subsequent sections, we will discuss various methods involved in RDD.
Let’s now learn about DoubleRDD functions or methods. These are listed and explained in the given table.
If you consider the simple join operator, it is an inner join. The keys that are only present in both pair RDDs come as output. However, in case of various values for the same key, the output RDD has an entry for each possible pair of values. For PairRDD functions, you can use reduceByKey() that can aggregate data separately for each key.
An example to use it is given below.
This code uses the reduceByKey operation on key-value pairs to count how many times each line of text occurs in a file. PairRDD also has the join() method that can merge two RDDs by grouping elements with the same key.
An example to use the same is given above.
The table below lists and explains other PairRDD methods.
Let’s now learn about Java PairRDD functions or methods. These are listed and explained in the given table.
A few more Java PairRDD methods are listed and explained too.
The table shows and explains general RDD methods.
A few more general RDD methods are also listed.
Let’s now talk about the Java RDD methods. These are shown in the table below.
A few more Java RDD methods are also listed and explained.
Now, we will discuss common Java RDD methods. These are listed and explained in the table displayed below.
The table below lists the function classes that are used by the Java API. Every class has a single abstract method, which is call() and must be implemented.
To combine JavaPairRDD functions, you can use the given method that aggregates the elements of each partition. It then aggregates the results for all the partitions using neutral "zero value" and combine functions.
Using this method, you can return a different result type, U, then the type of this RDD, T. Therefore, you would need two operations, one operation for merging a T into a U and one operation for merging two U's. The other one is listed below. Both of them can modify and return their first argument and not create a new U for avoiding the problem of memory allocation.
Transformations in RDD are sample, map, filter, and groupByKey.
A Sample Returns a random sample subset RDD of the input RDD.
A Map passes each element through function.
A filter, on the other hand, creates a new RDD by passing in a function used to filter the results. Consider the given example.
However, groupByKey returns a dataset of given pairs. An example is shown below.
Other methods used in RDD are listed and explained in the table below.
Interested in learning more about Apache Spark & Scala? Enroll in our Apache course today!
Let’s now talk about actions in RDD. These are given in the table shown below with their syntax.
We will discuss key-value pair in scala in the next section.
Some special operations are only available on RDDs of key-value pairs. These operations are available automatically on RDDs with Tuple2 objects in the PairRDDFunctions class that automatically wraps around an RDD of tuples.
A common example is shuffle operations like aggregating or grouping the elements by a key.
In the given example, the reduceByKey operation on key-value pairs is being used for counting the number of times each line of text occurs in a file.
We will discuss key-value pair in Java in the next section.
In Java, these pairs are represented using the Scala.Tuple2 class that exists in the Scala standard library. For this, you can call new Tuple2(a, b) and access its fields later with the given tuples.
On the other hand, key-value pairs’ RDDS are characterized by the JavaPairRDD class. For this, you can build JavaPairRDDs from JavaRDDs by using special versions of the map operations such as flatMapToPair and mapToPair.
Consider the given example that is using the reduceByKey operation on key-value pairs for counting the number of times each line of text occurs in a file.
We will discuss MapReduce and RDD Operations in the next section.
Consider the given example to use MapReduce and Pair RDD operations.
The highlighted code outputs the RDD to any Hadoop-supported file system as it uses a Hadoop OutputFormat class. The class is supporting the key and value types K and V.
This code outputs the RDD to any Hadoop-supported file system as it uses the new Hadoop API OutputFormat object.
In the next subsequent sections, we will discuss reading, writing text and sequence files.
To perform batch analytics, Spark reads files from HDFS.
In the given example, a text file is being read from HDFS using the hadoopFile method of SparkContext and being printed as a list.
SequenceFile provides a file format for storing data as serialized key-value pairs used in Hadoop. You can read sequence files stored in HDFS and perform transformation of the data stored.
In the given example, a sequence file is being read by using customized Writable class and being printed as a list.
You can store the result back into HDFS for persistence once the transformation is done or the business logic is applied on the RDD.
For example, here a text file is being written into HDFS using the saveAsTextFile method.
In the example given below, customized Writable class salesRecordWritableRDD is being used for writing data as sequence file in HDFS.
We will discuss using a groupby in the next section.
In this example, Scala is being used for performing a group by operation in Spark.
The further code is displayed.
Let us summarize the topics covered in this lesson:
There are two ways to create RDDs: parallelize an existing collection and reference an external dataset.
Spark supports text files, SequenceFiles, and other Hadoop InputFormats.
RDDs support two types of operations: transformations and actions.
Various features of RDDs are immutable, persistence, lazy evaluated, and more.
Choose the applicable storage level, as there are trade-offs between memory usage and CPU efficiency.
To invoke the Spark shell, go to the home directory and run the applicable code.
A few actions that can be performed on files in Spark shell RDDs are getting the count from a file and getting the first element from a file.
To build a Spark project with SBT, create the build and test it.
Broadcast variables allow keeping a read-only variable cached on every machine, while accumulators are added only to through an associative operation.
An RDD acts as a handle for a collection of individual data partitions.
In Spark, there are four extensions to the RDD API: DoubleRDDFunctions, PairRDDFunctions, OrderedRDDFunctions, and SequenceFileRDDFunctions.
The given method aggregates the elements of each partition.
Transformations in RDD include a sample, map, filter, and groupByKey.
Some special operations are only available on RDDs of key-value pairs.
In Java, these are represented using the Scala Tuple2 class.
With this, we come to the end of the third chapter “Using RDD for Creating Applications in Spark” of the Apache Spark and Scala course. The next chapter is Running SQL Queries using Spark SQL.
A Simplilearn representative will get back to you in one business day.