Spark Streaming Tutorial
Hello and welcome to Lesson 5 of the Apache Spark and Scala training course offered by Simplilearn. This lesson will introduce and explain the concepts of Spark streaming.
After completing this lesson, you will be able to: • Explain a few concepts of Spark streaming • Describe basic and advanced sources • Explain how stateful operations work • Explain window and join operations
5.3 Introduction to Spark Streaming
Let’s first understand what Spark streaming is. It is the core Spark API’s extension that allows high-throughput, scalable, and fault-tolerant stream processing of data streams that are live. There are many sources of ingesting data such as Flume, ZeroMQ, Twitter, Kinesis, and Kafka. Also, you can also process TCP sockets by the use of complex algorithms that are expressed using high-level functions such as join, reduce, map, and window. Once the data is processed, you can push it to databases, live dashboards, and file systems like S3 and HDFS. It is also possible to apply the graph processing and machine learning algorithms of Spark on data streams. The image on the screen shows the data ingestion process and its output through Spark streaming.
5.4 Working of Spark Streaming
Now, we will discuss how Spark streaming works. It first takes live input data streams and then divides them into batches. After this, the Spark engine processes those streams and generates the final stream results in batches. The working of Spark streaming is also shown graphically on the screen.
5.5 Features of Spark Streaming
Let’s now talk about the features of Spark streaming. Spark streaming provides a feature called discretized stream or DStream that is high-level abstraction. It represents a continuous stream of data and is represented as an RDD sequence internally. They can be created by either applying high-level operations on other Dstreams or by using input data streams from sources like Flume, Kafka, and Kinesis. Spark streaming also supports machine learning and graph processing algorithms, and languages like Scala, Java, and Python. For high availability in production, Spark streaming uses ZooKeeper and HDFS. ZooKeeper provides some state storage and leader election. Multiple masters can be launched in your cluster that are connected to the same instance of ZooKeeper. While others remain in the standby mode, one of them is elected as the leader. If the present leader dies, scheduling is resumed when another Master is elected and it recovers the state of the old master. This recovery process should complete between one and two minutes. The delay thus occurred affects only the new applications scheduling, while the already running applications remain unaffected.
5.6 Streaming Word Count
The code shown on the screen is an example to execute Spark streaming. In this code, we are first importing the Spark Streaming classes’ names along with a few implicit conversions from StreamingContext. This will add suitable and required methods to various other classes. StreamingContext acts as the key entrance point to all functionalities related to streaming. Here, a local StreamingContext is being created having two threads that can execute. The batch interval is of one second. With the use of this code, we are creating a DStream representing streaming data. This data originates from a source of TCP that is defined as a port and host name. This represents the data stream originated from the data server. Here, every record is a line of text. With this code, the lines are being split into words with the use of space characters. An operation called flatMap is an operation of DStream that can create a new DStream. Being a one-to-many operation, it creates various new records from a single record existing in the source. Here, every line is split into various different words. The words stream is denoted by the wordsDStream. This is mapped to a DStream of pairs (word, 1). The split words are being counted by this code. The words pairs are being compacted to obtain words frequency in every batch of data. The wordCounts.print() method is finally printing some counts generated per second.
5.7 Micro Batch
Micro batching is a core Spark technique that lets a task or process handle a stream as a sequence containing data chunks or small batches. In case of incoming streams, events can be packed into various small batches and then delivered for processing to a batch system.
As discussed, DStream is defined as the fundamental abstraction that is available from Spark streaming. We have also discussed, that they can be created by either applying high-level operations on other Dstreams or by using input data streams. These streams are available from sources like Flume, Kinesis, and Kafka. Also, internally, it is characterized through a series of RDDs that is continuous. The image on the screen shows how every RDD that exists in a Dstream includes data related to a specific interval.
5.9 DStreams (contd.)
All operations that you apply on a DStream get translated to operations that are applicable on the underlying RDDs. As an example, recall that in the earlier example when a stream of lines was converted to words, to generate the RDDs of the “words DStream”, the flatMap operation was applied on each RDD in the “lines DStream”. This process is also shown through the given diagram. Note that these underlying RDD transformations are performed by the engine of Spark. The operations of DStream provide you a higher-level API experience by hiding most of these details.
5.10 Input DStreams and Receivers
Input DStreams represent the input data stream that is received from sources of streaming. Except file stream, each input Dstream is linked with a Receiver object. This object stores the data received from a source in the memory of Spark for processing. There are two topologies or categories of built-in streaming sources provided by Spark streaming: basic sources and advanced sources. Basic sources are available in the StreamingContext API directly. A few examples are socket connections and file systems. However, advanced sources include Twitter, Flume, Kafka, and Kinesis, and are available from extra utility classes. These sources need to be linked against extra dependencies. Note that you can create various input DStreams in case it is required to receive various data streams in parallel in the streaming application. Doing so will create various receivers to receive multiple data streams in parallel. Spark worker or executor takes one of the cores that is allocated to the Spark streaming application. The reason is that it is a long-running task. Hence, you should remember that the application is required to be allocated sufficient cores for processing the received and running receivers.
5.11 Input DStreams and Receivers (contd.)
You must remember that in case when a Spark Streaming program is run locally, you should not use “local” or “local” as the main or master URL. If you do so, it would mean that a one thread is usable to locally run tasks. In case we use an input DStream on the basis of a receiver, then one thread would be used for running the receiver. This will not leave any thread to process the data received. Therefore, in such cases, you should every time use “local[k]” as the main or master URL. Here k is a number greater than the number of receivers to run. Another important point is related to the logic extension to run on a cluster. The core number that is allocated to the application of Spark Streaming need to be greater as compared to the receivers’ number. If that does not happen, the system will not be able to process the received data.
5.12 Basic Sources
Let us first talk about the first category of built-in streaming sources, basic sources. For these sources, Spark streaming monitors the dataDirectory and processes all files that are created in it. The files in this directory must be in the same data format and must be created by moving or renaming them atomically. These files must not be altered once they are moved. The new data will not be read if the files are being appended continuously. For simple text files, the given method is the easier one. File streams do not need allocating cores because they do not need running a receiver. Basic sources can be divided into two categories, streams that are based on custom actors and queue of RDDs as a stream. In case of streams based on custom actors, you can create DStreams with data streams that are received through Akka actors by using the given method. Note that in case of Python API, actors Stream is not available. In case of queue of RDDs as a stream, you can also create a DStream using the given method to test a Spark streaming application. Every RDD that is pushed into the queue is processed like a stream and treated like a data batch in the DStream.
5.13 Advanced Sources
The other type of built-in streaming sources is advanced sources. A few of these sources are listed on the screen. These include Kafta, Flume, Kinesis, and Twitter. Note that Spark Streaming 1.4.1 is compatible with Kafka 0.8.2.1, while Spark Streaming 1.4.1 is compatible with Flume 1.4.0. In case of Twitter, Twitter utilities of Spark Streaming utilizes Twitter4j 3.0.3 for getting the public stream of tweets by the use of Streaming API of Twitter. The information required for authentication can be provided using any of the methods that is supported by the Twitter4J library. Either you can get the filtered stream on the basis of keywords or get the public stream.
5.14 Advanced Sources-Twitter
For example, consider that you want to create a DStream by the use of data from Twitter’s stream of tweets. To do so, you would need to perform the steps listed on the screen. The first step is linking, in which the artifact spark-streaming-twitter_2.10 needs to be added to the SBT/Maven project dependencies. The second step is programming, in which the TwitterUtils class needs to be imported and a DStream with TwitterUtils.createStream needs to be created as shown on the screen. The third step is deploying, in which an uber JAR needs to be generated with all dependencies. The application then needs to be deployed.
5.15 Transformations on DStreams
The transformations on DStreams are similar to those of RDDs. They let the data from the input DStream to be altered. A few of the common transformations on DStreams are listed and explained through the given table.
5.16 Transformations on Dstreams (contd.)
A few more common transformations on DStreams are also listed and explained.
5.17 Output Operations on DStreams
Let’s now talk about the output operations on DStreams. These operations let the data of DStreams to be pushed to external systems such as file system or database. They act similar to RDDs in a way that they trigger the real execution of all the DStream transformations. The reason is that these operations let the transformed data be used by external systems. These operations are listed and explained through the given table.
5.18 Design Patterns for Using ForeachRDD
dstream.foreachRDD is a powerful primitive that lets the data to be sent to external systems. But you must know the method of using it correctly and efficiently, as many common mistakes tend to occur. Generally, when writing data to an external system, it needs to create a connection object and use it to send data to a remotely lying system. For this, you might try to create a connection object at the Spark driver unintentionally. Instead, you should try to use it in a Spark worker for saving records in RDDs. The output operations execute DStreams lazily. To be specific, the RDD actions residing inside DStream output operations force to process the received data. Therefore, nothing will get executed in case your application does not include any output operation or includes operations such as dstream.foreachRDD() that do not have any RDD action inside them. In such cases, the system will just take the data and discard it. Output operations, by default, are implemented one at a time. They are implemented in the same order as they are defined in the application. An example to use dstream.foreachRDD is given on the screen.
5.19 DataFrame and SQL Operations
DataFrames and SQL operations can be easily used on streaming data. For this, you would need to create an SQLContext by the use of the SparkContext that the StreamingContext is using. In addition, you need to do in a way that you may restart it in case of driver failures. For this, you would need to create a lazily instantiated singleton instance of SQLContext.
5.20 DataFrame and SQL Operations (contd.)
For example, consider the given code that is modifying the earlier example of word count for generating word counts by the use of DataFrames and SQL. In this code, each RDD is being converted to a DataFrame, registered as a temporary table and finally queried by SQL.
A streaming application requires to operate in the 24.7 environment and therefore, it must be resilient to failures. Hence, it requires to checkpoint sufficient information for a storage system that is fault- tolerant, so that failure recovery can be performed. Checkpointing is of 2 kinds. The first type is Metadata checkpointing. It used for recovering from a node failure that runs the driver of streaming applications. It is explained as saving information that defines the streaming computation to fault-tolerant storage such as HDFS. The related metadata comprises of configuration used for creating the application of streaming, and batches that incomplete with queued up jobs, and the operations of DStream defining the application of streaming. The second type is Data checkpointing. It is explained as saving the generated RDDs to an unfailing storage. It is required in a few stateful transformations combining data across various batches. In such cases, the RDDs generated are dependent upon the RDDs of the earlier batches. This results in the increasing dependency chain length with time. For avoiding this, the RDDs related to stateful transformations that are intermediate are checkpointed on a periodic basis to an unfailling storage. It cuts off the dependency chains.
5.22 Enabling Checkpointing
Now the question is when should checkpointing be enabled? You must enable in case of applications that have the requirements as listed. The first requirement is the use of stateful transformations. In case reduceByKeyAndWindow that is being used with the inverse function or updateStateByKey in an application, you must provide the checkpoint directory for allowing RDD checkpointing on a periodic basis. Another requirement is to recover from driver failures that run an application. In such cases, you must implement the metadata checkpoints for recovering with the information on progress.
5.23 Socket Stream
The code that resides outside the closure of the DStream is implemented in the driver. However, the rdd.foreach method is implemented on every distributed RDD partition. Therefore, on the driver’s machine, there is a socket created. The job attempts writing to it on another machine, which does not work for obvious reasons. DStream.foreachRDD is implemented on the driver. In such cases, the socket and the computation are performed in the same host and hence it works. The nature of an RDD computation is distributed. Therefore, this Server Socket method is hard to be implemented as dynamic service discovery is a difficulty. In such cases, you should look for an alternative so that you may have centralized access to distributed data. For example, you can use Kafka. Consider the given example, which will work as it collects the RDD partitions from the workers and sends them to the driver to be written to socket.
5.24 File Stream
To read data files residing on any file system that is compatible with the HDFS API, you can create a DStream as shown on the screen.
5.25 Stateful Operations
Stateful operations are those operations that operate over various data batches. This includes the updateStateByKey operation and all window-based operations. These operations are dependent upon the earlier data batches. Therefore, they accumulate metadata over time continuously. For clearing this, Spark streaming saves intermediate data to HDFS and hence supports periodic checkpointing.
5.26 Window Operations
Spark Streaming also supports window operations that let you implement transformations over a window of data that is sliding. This is illustrated through the given image. As depicted, each time a window is sliding over a DStream source, the RDDs source falling within that particular window are being united. They are then being operated upon for producing the windowed DStream RDDs. In this case, slides are applied by two time units of data and the operation is applied over last three time units. It implies that all window operations require at least two parameters to be specified, which are window length and sliding interval. The window length is defined as the window duration, whereas the sliding interval is defined as the window operation interval at which it is being executed. Note that these parameters need to be the source batch interval multiples. Consider an example in which you need to extend the previously discussed example. In addition, you need to generate word counts that have been completed over past 30 seconds of data in each 10 seconds. For this, you are required to use the reduceByKey method on the DStream (word, 1) pairs over this duration. To accomplish this, you would need to use the reduceByKeyAndWindow method, as shown on the screen.
5.27 Types of Window Operations
Some of the general window operations are listed and explained through the given table. Note that all these operations take the discussed parameters, window length and slide interval.
5.28 Types of Window Operations Types (contd.)
A few more window operations are also listed and explained.
5.29 Join Operations-Stream-Dataset Joins
Spark streaming supports two types of join operations. The first one is stream-stream joins, which allows to join streams with other streams. An example code to use it given on the screen. In this, every batch interval, which is the RDD generated by stream1 is being joined with the RDD generated by stream 2. leftOuterJoin, rightOuterJoin, and fullOuterJoin can also be used. In addition, it is also useful for joining over windows of the streams. An example is displayed.
5.30 Join Operations-Stream-Stream Joins
The other type is stream-dataset joins, which allows to join a stream and a dataset. An example to use this join is displayed on the screen. It is also possible to change the dataset you need to join against. Note that the function that transforms is evaluated in each batch interval.
5.31 Monitoring Spark Streaming Application
Spark streaming also provides additional features apart from Spark’s monitoring features. The Web UI of Spark displays an additional streaming tab when a StreamingContext is used. This tab shows the running receivers statistics such as number of received records, active receivers, and receiver errors. It also shows completed batches details such as queueing delays and batch processing times. This information is usable to monitor the streaming application progress. In this context, two metrics, which are processing time and scheduling time are important. Processing time represents the time that it takes for processing every data batch. However, scheduling data is the time a batch waits in a queue to process the earlier batches to complete. In case the batch processing time is continuously above the batch interval or the queue delay is increasing, this implies that the system cannot process the data batches at the speed they are getting generated. In such cases, you should reduce the batch processing time. You can also monitor a Spark streaming program progress using the StreamingListener interface. It lets you to receive processing times and receiver status. This is a developer API and hence, it would be improved upon in the future.
5.32 Performance Tuning-High Level
To get the best performance from a Spark Streaming application on a cluster, you would need to tune it a bit. At a high level, you need to reduce the processing time of every data batch by using cluster resources effectively and set the correct batch size so that the data batches can be processed as soon as they are received.
5.33 Performance Tuning-Detail Level
At a detail level, you would need to consider the listed parameters and configurations. You can reduce data serialization overheads by tuning the serialization formats. In case the tasks number is high that is launched per second, for example, more than 50, the overhead can be substantial in case of sending tasks to slaves. It makes achieving sub-second latencies difficult. In such cases, you can reduce the overhead with the use of task serialization, that is, with the use of Kryo serialization to serialize tasks that can ultimately decrease the sizes of tasks. It can also be reduced by the execution mode, as if you run Spark in the Mesos that is coarse-grained or standalone mode. For performance tuning, you can also set the right interval of a batch. On the basis of the nature of the streaming computation nature, the interval can significantly affect the data rates, which the application can sustain on a fixed set of cluster resources. To analyze the correct batch size, a good method is to test it along with a low data rate and a conservative batch interval, for example, 5-10 seconds. For verifying if the system is capable of maintaining the data rate, you can check the end-to-end delay value that is experienced by every batch that is processed. In case the delay can be compared with the batch size, it means that the system is in the stable state or else, if there is a continuous increase in the delay, this implies that the system is in the unstable state as it cannot keep up. Once you get a stable configuration idea, the data rate can be increased or the batch size can be reduced. You should note that a momentary delay increase because of the temporary data rate increase is alright until the value of delay gets reduced back to a value that is low. Memory tuning is another configuration for performance tuning. The cluster memory amount that is needed by a Spark Streaming application is dependent upon the transformation type. For instance, consider a window operation is required to be used over the past ten minutes of data. In such cases, the cluster needs to contain enough memory that can handle ten minutes of data inside the memory. However, in case it is required to perform an easy map-filter-store operation, low memory would be required. The received data is saved with the given storage level. Therefore, the data not fitting into memory spills over to the disk. It can reduce the streaming application performance. Therefore, you should use enough memory as needed by your application. As the best practice, you should see the use of memory on a small scale first and then estimate it. Garbage collection is another aspect of memory tuning. For application requiring low latency, large pauses are not desirable due to JVM Garbage Collection.
5.34 Demo-Capture and Process the Netcat Data
This demo will show the steps to use Spark Streaming to capture and process the Netcat data.
5.35 Capture and Process the Netcat Data
In this demo, you will learn how to use spark streaming to capture and process the netcat data. First we are going to write a scala class “NetworkWordCount” to capture and process the NETCAT data. This class will have a main method in which we are going to write the logic of capturing the incoming netcat data using the StreamingContext object. We will spilt the line into individual words and produce each word and 1 as output. Here, we are using the StreamingContext Object with a batch interval of 1. This object will run as the listener service on the given IP address and port number. Let’s run the NetworkWordCount program by executing the command shown on the screen. In another terminal, let’s execute the “nc –lk 9977” command to start the “nc” client which connects with the NetworkWordCount streaming application and sends some data to it. In the background terminal you can see that for each passed line such as “Word Count ”from “nc” client, the NetworkWordCount application generates output (Word, 1) , (Count, 1). Finally, For the “Apache Spark” incoming data , it produces (Apark, 1) and (Spark,1) as output.
5.36 Demo-Capture and Process the Flume Data
This demo will show the steps to use Spark Streaming to capture and process the Flume data.
5.37 Capture and Process the Flume Data
In this demo, you will learn how to use spark streaming to capture and process the flume data. First we are going to write a scala class “NewsStremer” to capture and process the incoming Flume data. This class will have a main method in which we are going to write the logic of capturing the incoming flume data using the StreamingContext object. In this class, the case class will represent the structure of the incoming data from flume which is a JSON representation of the RSS feed data. This class has a method “containFlu” which checks for the name of certain diseases in the incoming summary field of the RSS feed and returns true if those words are found. In the main method, we are creating the instance of StreamingContext and starting a Flume receiver at the port number 44444 by using the “FlumeUtils.createStream” method. After that, we have written the logic of counting those disease names from the incoming data and printing them in the console . Finally, we are storing the output also in HDFS for historical reporting purposes. You can see the sample RSS feed data here which contains category and summary fields. Now, see the configuration file of Flume, which will be sending the RSS feed data to the Spark streaming receiver. In this file you can see that we have used the AVRO sink to dispatch the data to the Spark streaming receiver running on “localhost” port number 44444 Let’s run the flume streaming application “NewsStreamer” by executing the command as shown in the screen. In another terminal, let’s run FLUME to send the RSS feed data to the Spark “NewsStreamer” application by running the command as shown on the screen. From the output, you can see that the spark streaming application has found 2 diseases name in the incoming data as shown on the screen. We can verify the output in HDFS as well as we have stored the output in HDFS. Let’s type the command shown on the screen to check the content of the given file.
5.38 Demo-Capture the Twitter Data
This demo will show the steps to use Spark Streaming to capture Twitter data.
5.39 Capture the Twitter Data
In this demo, you will learn how to use spark streaming to capture the twitter data in a window operation of 10 and 60 seconds. First we are going to write a scala class “TwitterPopularTags” to capture and process the incoming Twitter data. This class will have a main method in which we are going to write the logic of capturing the incoming twitter data using the StreamingContext object. To Connect with Twitter, we need to register our application with Twitter and obtain consumerKey, consumerAccessToken, SecertToken and ConsumerSecretToken. Go to the given URL to register your application and receive these tokens. We are going to use the TwitterUtils spark class to open a stream with Twitter. After that, we will parse the received tweets to take out the hashtag from them. Then, we are going to calculate the popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter stream. Finally, are going to print these 10 popular hashtags of 60 and 10 second windows in the console.
Next, let’s answer a few questions based on your understanding of the concepts covered in this lesson.
Let us summarize the topics covered in this lesson: • Spark Streaming is the core Spark API’s extension that allows high-throughput, scalable, and fault-tolerant stream processing of data streams that are live. • DStream is a high-level abstraction and represents a continuous stream of data and represented as an RDD sequence internally. • Micro batching is a core Spark technique that lets a task or process handle a stream as a sequence containing data chunks or small batches. • Input DStreams represent the input data stream that is received from sources of streaming. • There are two topologies or categories of built-in streaming sources provided by Spark streaming: basic sources and advanced sources. • Transformations let the data from the input DStream be altered. • Output operations let the data of DStreams to be pushed to external systems such as file system or a database.
5.42 Summary (contd.)
• dstream.foreachRDD is a powerful primitive that lets the data be sent to external systems. • DataFrames and SQL operations can be easily used on streaming data. • A streaming application requires to checkpoint sufficient information for a fault- tolerant storage system. • The socket and the computation are performed in the same host and hence it works. • Stateful operations are those operations that operate over various data batches. • Window operations that let you implement transformations over a sliding window of data. • Spark streaming supports two types of join operations: stream-stream joins and stream-dataset joins. • The Web UI of Spark displays an additional streaming tab when a StreamingContext is used. • To get the best performance from a Spark Streaming application on a cluster, you would need to tune it a bit.
With this, we come to the end of the lesson 5 “Spark Streaming” of the Apache Spark and Scala course. The next lesson is Spark ML Programming.
About the On-Demand Webinar
About the Webinar