Apache Storm Advanced Concepts Tutorial

4.1 Storm Advanced Concepts

Hello and welcome to the fourth lesson of the Apache Storm course offered by SimpliLearn. This lesson will introduce you to the advanced concepts in Storm. Let us start with exploring the objectives of this lesson.

4.2 Objectives

By the end of this lesson, you will be able to describe the types of spouts and the structure of spouts and bolts. You will also be able to explain eight different types of grouping in Storm, reliable processing in Storm and the Storm topology life cycle. Then, you will also be able to discuss the real time event processing in Storm. Let’s get started with understanding the different types of spouts.

4.3 Types of Spouts

There are two types of spouts, reliable spouts and unreliable spouts. For reliable spouts, Storm tracks the tuple processing in the topology and ensures that the tuple is fully processed. If the processing fails, user can resubmit the tuple to the topology. This is useful if you are getting the data from other messaging systems such as Kafka. If the processing fails for a particular message, you can request Kafka to resend the message. Reliable processing is known to add about 20% overhead to Storm processing. For unreliable spouts, Storm does not track the tuple processing. If any processing fails, the tuples may be lost. This is much faster as there is no tracking overhead. Let us take a look at the structure of a spout.

4.4 Structure of Spout

Spout implementations: Spout should implement the IRichSpout interface. As a part of this interface, following methods need to be defined: • Open method is called when the spout is first created. • nextTuple method is called to add a new tuple to the output stream. This method can insert a tuple to the output stream by using the emit method. If emit method is not called, no tuple can be inserted. nextTuple method is called periodically by the Storm worker process. • Ack method is called when the tuple is successfully processed. • Fail method is called when the tuple processing fails. • declareOutputfields is called to specify the structure of output tuple from the spout. A spout can output multiple streams by creating multiple streams in this method. Next, let us explore the structure of a bolt.

4.5 Structure of Bolt

Bolt implementations: Bolt should implement the IRichBolt interface. As a part of this interface, following methods should be defined: • prepare method is called when the bolt is first created. • execute method is called for each tuple that the bolt is subscribed to. This method can call the emit method to output tuples for further processing. If emit method is not called, no tuple is output from this bolt. • declareOutputfields method is called to specify the structure of output tuple. Multiple streams can be output by creating multiple streams in this method. . Moving on, let us identify the different types of groupings provided by Storm..

4.6 Stream Groupings

The tuples in a stream are grouped before reaching a bolt. There are eight groupings provided by Storm as follows: 1. Shuffle grouping 2. Fields grouping 3. Partial key grouping 4. All grouping 5. Global grouping 6. None grouping 7. Direct grouping 8. Local or shuffle grouping Now, let us look at how each type of grouping functions. We will begin with discussing shuffle grouping.

4.7 Reliable Processing in Storm

Reliability can be achieved in Storm by: • Specifying the path from spout to bolts through a process called anchoring. • Specifying successful or unsuccessful processing using ack and fail methods. The diagram here shows the topology from the example we discussed in the previous lesson. It shows the spout producing tuples that are processed by the Split bolt. The tuples from the split bolt are processed by the Count bolt. For successful processing, a tuple from the spout should be successfully processed by Split bolt as well as Count bolt. Next, you will learn about the ack and fail methods.

4.8 Ack and Fail

Reliability is achieved by using the ack and fail methods. Spout and bolts use the ack and fail methods to indicate the successful and unsuccessful processing respectively. The emit method indicates the tuples that are processed by spouts and bolts. When all the tasks in the path of a tuple send acks, the tuple is treated as successful, otherwise it is treated as failed. An acker task is created by Storm to handle the Acknowledgements. The diagram shows the acker task collecting ack and fail messages from Split and Count bolts and then sending the ack or fail message to the spout. The acker task tracks the ack and fail messages from each bolt in the path of a tuple and sends ack to spout only if all the bolts send ack message for a particular tuple. Now, let us look at ack timeout.

4.9 Ack Timeout

If an ack is not received within a specified timeout time period, the tuple processing is considered as failed. The configuration parameter TOPOLOGY_MESSAGE_TIMEOUT_SECS specifies the timeout in seconds. The default timeout is 30 seconds. The diagram shows the acker task receiving the timeout apart from ack and fail, and passing the same to the spout. Moving on, let us discuss the concept of Anchoring.

4.10 Anchoring

Specifying the link from the input path to output path for a tuple is called anchoring. Anchoring is done by specifying the input tuple in the emit method of bolt. Input tuple should be the first parameter of the bolt. For example, if the bolt has the execute method as shown below: execute(Tuple inputTuple) The emit method should look like: emit(inputTuple, other parameters) . Next, let us look at topology lifecycle.

4.11 Topology Lifecycle

Topology represents the combination of spouts and bolts. The lifecycle of a topology is as follows: 1. Topology is submitted to a Storm Nimbus 2. Nimbus copies the code to local file system 3. Nimbus creates topology information in zookeeper 4. Supervisor gets new topology information from zookeeper 5. Supervisor copies any required code to the local file system 6. Supervisor starts the worker processes for the topology 7. Worker process starts the spout and bolt tasks 8. These tasks will run till the topology is killed 9. Nimbus monitors the workers through Zookeeper Now, let us move on to understand data ingestion in Storm.

4.12 Data Ingestion in Storm

Data ingestion in Storm is done using spouts. A spout can open and read a file for real time event data. It can read data from Kafka messaging system using special Kafka spouts. We will discuss about Kafka spouts in the next lesson. We will use the sample program we ran in the Storm installation lesson to read real time log events data from a file. Now, we will understand data ingestion in storm in a better way using an example.

4.13 Data Ingestion in Storm Example

Submit the example topology with the below mentioned commands: cd /tmp wget simplilearncdn/logfile wget simplilearncdn/LogProcessTopology.jar storm jar LogProcessTopology.jar storm.starter.LogProcessTopology test1 Continuing with this example, Let us now put the following messages into the log file which is the input for storm: cat >> /tmp/logfile ERROR : first message WARNING : second message ERROR : third message INFO : Fourth message Next, let us check the output of data ingestion in Storm in another telnet session.

4.14 Data Ingestion in Storm Check Output

Open a telnet session to VM by using telnet and type the command mentioned below after login: tail –f /tmp/stormoutput.txt Keep the two windows side by side so that you can see the result on the second screen as you type messages on the first screen. Continuing with the data ingestion example, here you will see that

4.15 Screen Shots for Real Time Data Ingestion

The screen on the left shows the data entry for Storm and the screen on the right shows the output from Storm. As you type messages on the left screen and press enter, the count of the messages is shown on the right screen. After checking a few messages, you can stop the data entry by pressing Ctrl-D on the left screen. You can end the data output on the second screen by pressing Ctrl-C. Finally, stop the topology using the following command: storm kill test1 Next, let us look at the actual program code for the data ingestion.

4.16 Spout Definition

The program fragment shows the spout definition for this example. The spout is defined as a class with the name GetLineSpout. It extends the base class BaseRichSpout. You can see the definition overriding three methods: open, nextTuple and declareOutputFields. In the open method, an output collector is created. In the declareOutputFields method, the structure of the output tuple is defined. In this example, output from the spout is the entire line itself. The method nextTuple is used to create the output stream. Further, you will see the code for this method. Now, let us look at the nextTuple method of the spout. The code fragment shows the nextTuple method of the spout. The nextTuple method opens a file for reading. This is the file from which the spout will read the messages from. It reads each line from the file and outputs as a tuple using the emit function. It also maintains a message ID counter and increments it for each input line. This message ID is passed as the second argument to the emit function. This is for tracking the tuple so that reliable processing can be done. Please note that the exception handling code has been excluded the exception from this. Any file handling in Java needs to include exception handling. You will now learn about the code for bolts.

4.17 Bolt Definition

You will have two bolts, GetLogType to filter the log types and Count to count the log types. The code fragment to get log type is shown here. The bolt is defined as a class GetLogType that extends the base class BaseBasicBolt. It has the execute method and the declareOutputFields methods. Later in the lesson, you will learn about the execute method. The declareOutputFields method creates one field for the output tuple from the bolt. Moving on, let us look at the execute function for this bolt. The code fragment shows the execute method of the GetLogType bolt. The code shows the execute method getting the tuple as the input argument and getting a collector object for the output. The input tuple is parsed using the tokenizer function. Tokenizer divides the input line into words with the default delimiter as white space. The method loops over each word in the line and calls the collector’s emit method to output the words that matches INFO or ERROR or WARNING. Next, let us look at the code for the second bolt. The code fragment shown is for the count bolt that counts the number of words. The class is defined with the name WordCount and it extends the base class BaseBasicBolt. It defines a map to track the counts for each log type. It also defines a PrintWriter to stor the output to a file. It has the execute and declareOutputFields methods. Further in the lesson, you will look at the execute method. The declareOutputFields method is different from the method for the previous bolt. It has an extra field count that you need to output for each log type. Here, you will look at the execute method for this bolt. The code fragment shows the execute method of the WordCount bolt. The code shows the execute method getting the tuple as the input argument and getting a collector object for the output. The input is from the GetLogType bolt and hence contains a single word INFO or ERROR or WARNING as the input. The method gets the existing count for the word from the map. It increments the count and stores it back into the map. It also opens the output file for writing and writes the log type and the count to the file. It also emits the log type along with the count. Next, you will look at how to connect a spout and a bolt.

4.18 Topology–Connecting Spout and Bolt

The main function that connects the spout and bolt to create the topology is as shown: The main function first gets a handle for the topology builder object. Then, it calls the setSpout method to set the GetLineSpout as the spout. The second parameter 1 to setSpout indicates to run only one spout in parallel. Since we are reading from a single file, we want only one instance to read the file. Next, it calls the setBolt method to set the GetLogType bolt. This bolt is given the name “split”. Here, the second parameter is 2 indicating two bolt instances to run in parallel. The bolt is linked to the spout using shuffle grouping. So, the two instances will get the lines from the spout in random order. Next, the main function calls the setBolt method again to set the WordCount bolt. This bolt is given the name “count”. Here, the second parameter is 2 indicating two bolt instances to run in parallel. The bolt is linked to the “split” bolt using fields grouping. The field word is indicated as the field to group the data. The output of the Logtype bolt will be grouped so that all the tuples with the same log type will go to the same bolt instance. This will ensure that the map defined in the WordCount bolt works properly. Next, the main function gets a configuration object and sets the number of worker processes to 1. Finally, the main function submits the topology with the given configuration to Storm. Next, let us look at the wrapper class.

4.19 Wrapper Class

Finally, the wrapper class is as shown: The file can be downloaded from simplilearncdn/storm/LogProcessTopology.java The file contains the import libraries and then the wrapper class LogProcessTopology. Note that in Java, the class name should match the file name. The class contains the spout definition, definitions for the two bolts and the main function definition. We have come to the end of this lesson. Now, let’s do a small quiz to test your knowledge.

4.20 Quiz

A few questions will be presented on the following screens. Select the correct option and click submit to see the feedback.

4.21 Summary

Here are the key takeaways: • There are reliable and unreliable spouts in Storm. • Storm spouts implement the IRichSpout interface whereas the bolts implement the IRichBolt interface. • Storm provides eight different groupings for the tuples. • Shuffle grouping is random whereas field grouping provides grouping by fields. • Storm can track a tuple from the start to successful processing by using the ack and fail methods. • Specifying the tuple path from input to output is called anchoring. • Real time data can be ingested into Storm using the spouts. • Topology is used to connect spout to bolts.

4.22 Conclusion

This concludes the lesson: Advanced Storm concepts. In the next lesson, we will learn about the Storm interfaces.

  • Disclaimer
  • PMP, PMI, PMBOK, CAPM, PgMP, PfMP, ACP, PBA, RMP, SP, and OPM3 are registered marks of the Project Management Institute, Inc.

Request more information

For individuals
For business
Phone Number*
Your Message (Optional)
We are looking into your query.
Our consultants will get in touch with you soon.

A Simplilearn representative will get back to you in one business day.

First Name*
Last Name*
Phone Number*
Job Title*