Apache Storm Advanced Concepts Tutorial

Welcome to the fourth chapter of the Apache Storm tutorial (part of the Apache Storm course). This chapter will introduce you to the advanced concepts in Storm.

Let us start by exploring the objectives of this chapter.

Objectives

By the end of this chapter, you will be able to

  • Describe the types of spouts and the structure of spouts and bolts.

  • Explain eight different types of grouping in Storm.

  • Explain reliable processing in Storm.

  • Explain Storm topology life cycle.

  • Discuss the real-time event processing in Storm.

Let’s get started with understanding the different types of spouts.

Types of Spouts

There are two types of spouts, reliable spouts, and unreliable spouts.

 Reliable spouts

 For reliable spouts, Storm tracks the tuple processing in the topology and ensures that the tuple is fully processed. If the processing fails, the user can resubmit the tuple to the topology. This is useful if you are getting the data from other messaging systems such as Kafka.

 If the processing fails for a particular message, you can request Kafka to resend the message. Reliable processing is known to add about 20% overhead to Storm processing.

 Unreliable spouts

 For unreliable spouts, Storm does not track the tuple processing. If any processing fails, the tuples may be lost. This is much faster as there is no tracking overhead.

 Let us take a look at the structure of a spout.

Wish to have in-depth knowledge of Apache Storm? Check out our Course Preview! 

Structure of Spout

Spout should implement the IRichSpout interface.

As a part of this interface, following methods need to be defined:

Open: Called when the spout is first created

nextTuple: Add a new tuple to the output stream using emit method. If emit method is not called, no tuple is inserted. It is called periodically by the worker.

Ack: Called when the tuple is successfully processed

Fail: Called when the tuple processing fails

declareOutputfields: Specify the structure of the output tuple. Multiple streams can be the output by creating multiple streams

Next, let us explore the structure of a bolt.

Structure of Bolt

Bolt should implement the IRichBolt interface.

As a part of this interface, following methods should be defined:

prepare: It is called when the bolt is first created

Execute: It is called for each tuple that the bolt is subscribed to. It can call emit method to output tuples for further processing. If the emit method is not called, tuple is not the output.

declareOutputfields: Specifies the structure of the output tuple. Multiple streams can be the output by creating multiple streams.

Moving on, let us identify the different types of groupings provided by Storm.

Stream Groupings

The tuples in a stream are grouped before reaching a bolt.

 There are eight groupings provided by Storm as follows:

  1. Shuffle grouping

  2. Fields grouping

  3. Partial key grouping

  4. All grouping

  5. Global grouping

  6. None grouping

  7. Direct grouping

  8. Local or shuffle grouping

Now, let us look at how each type of grouping functions. We will begin with discussing shuffle grouping.

Shuffle Grouping

In this grouping, tuples are randomly distributed to bolts so that each task gets an equal number of tuples.

Shuffle grouping 

Fields Grouping

In this grouping, tuples are first partitioned by the fields specified. Those in the same partition are always sent to the same bolt.

Fields grouping

Partial Key Grouping

This grouping is a combination of fields grouping and shuffled grouping. Tuples are grouped by the field values and also shuffled to do a load balancing for the bolts.

partial key grouping

All Grouping

In this grouping, tuples are replicated across all the tasks of the bolt. There will be multiple copies of the tuples.

All grouping

Global Grouping

In this grouping, all the tuples go to the same bolt instance. Bolt with the lowest ID gets all the tuples.

global grouping

None Grouping

In this grouping, the user does not care which bolt processes which tuple. Currently, this is implemented the same way as shuffle grouping. Tuples are randomly distributed to bolts so that each task gets an equal number of tuples.

none grouping

Direct Grouping

In this grouping, the producer of the tuple specifies which bolt instance the tuple should be sent to. This is done with the emitDirect method.

direct grouping

Local or Shuffle Grouping

In this grouping, tuples are given preference to bolt instances in the same worker process. If there are no local instances, the tuples are distributed in the way as shuffle grouping.

local or shuffle grouping

Reliable Processing in Storm

Reliability can be achieved in Storm by:

  • Specifying the path from spout to bolts through a process called anchoring.

  • Specifying successful or unsuccessful processing using ack and fail methods.

The diagram here shows the topology from the example we discussed in the previous lesson. It shows the spout producing tuples that are processed by the Split bolt. The tuples from the split bolt are processed by the Count bolt.

reliable processing in storm   

For successful processing, a tuple from the spout should be successfully processed by the Split bolt as well as a Count bolt.  

Next, you will learn about the ack and fail methods.

Ack and Fail

Reliability is achieved by using the ack and fail methods.

Spout and bolts use the ack and fail methods to indicate the successful and unsuccessful processing respectively. The emit method indicates the tuples that are processed by spouts and bolts. When all the tasks in the path of a tuple send acks, the tuple is treated as successful. Otherwise, it is treated as failed. An acker task is created by Storm to handle the Acknowledgements.  

The diagram shows the acker task collecting ack and fail messages from Split and Count bolts and then sending the ack or fail message to the spout. The acker task tracks the ack and fail messages from each bolt in the path of a tuple and sends ack to spout only if all the bolts send ack message for a particular tuple. 

ack & fail method

Now, let us look at ack timeout.

Ack Timeout

If an ack is not received within a specified timeout time period, the tuple processing is considered as failed. The configuration parameter TOPOLOGY_MESSAGE_TIMEOUT_SECS specifies the timeout in seconds. The default timeout is 30 seconds.

The diagram shows the acker task receiving the timeout apart from ack and fail, and passing the same to the spout.

ack timeout

Moving on, let us discuss the concept of Anchoring.

Anchoring

Specifying the link from the input path to output path for a tuple is called anchoring. Anchoring is done by specifying the input tuple in the emit method of a bolt. Input tuple should be the first parameter of the bolt.

For example, if the bolt has the execute method as shown below: execute(Tuple inputTuple) The emit method should look like: emit(inputTuple, other parameters.)

Next, let us look at topology lifecycle.

Topology Lifecycle

Topology represents the combination of spouts and bolts. The lifecycle of a topology is as follows:

  1. Topology is submitted to a Storm Nimbus

  2. Nimbus copies the code to a local file system

  3. Nimbus creates topology information in zookeeper

  4. Supervisor gets new topology information from zookeeper

  5. Supervisor copies any required code to the local file system

  6. Supervisor starts the worker processes for the topology

  7. Worker process starts the spout and bolt tasks

  8. These tasks will run till the topology is killed

  9. Nimbus monitors the workers through Zookeeper

Now, let us move on to understand data ingestion in Storm.

Data Ingestion in Storm

Data ingestion in Storm is done using spouts. A spout can open and read a file for real-time event data. It can read data from Kafka messaging system using special Kafka spouts.

We will use the sample program we ran in the Storm installation lesson to read real-time log events data from a file.

Now, we will understand data ingestion in storm in a better way using an example.

Data Ingestion in Storm Example

Submit the example topology with the below-mentioned commands:

cd /tmp

wget simplilearncdn/logfile

wget simplilearncdn/LogProcessTopology.jar

storm jar LogProcessTopology.jar storm.starter.LogProcessTopology test1 

Continuing with this example, Let us now put the following messages into the log file which is the input for storm:

cat >> /tmp/logfile

ERROR : first message

WARNING : second message

ERROR : third message

INFO : Fourth message 

Next, let us check the output of data ingestion in Storm in another telnet session.

Data Ingestion in Storm Check Output

Open a telnet session to VM by using telnet and type the command mentioned below after login:

tail –f /tmp/stormoutput.txt

Keep the two windows side by side so that you can see the result on the second window as you type messages on the first window.

Screenshots for Real-Time Data Ingestion

The image below shows the data entry for Storm and the image below shows the output from Storm. As you type messages on the left screen and press enter, the count of the messages is shown on the right screen. After checking a few messages, you can stop the data entry by pressing Ctrl-D on the left screen. You can end the data output on the second screen by pressing Ctrl-C.

real time ingestion

Finally, stop the topology using the following command: storm kill test1

 Next, let us look at the actual program code for the data ingestion.

Spout Definition

The program fragment shows the spout definition for this example. The spout is defined as a class with the name GetLineSpout. It extends the base class BaseRichSpout.

You can see the definition overriding three methods:

  1. Open

  2. nextTuple

  3. declareOutputFields.

In the open method, an output collector is created. In the declareOutputFields method, the structure of the output tuple is defined.

In this example, the output from the spout is the entire line itself. The method nextTuple is used to create the output stream. Further, you will see the code for this method.  

public static class GetLineSpout extends BaseRichSpout {

SpoutOutputCollector _collector;

FileReader input = null; BufferedReader binput = null; Long msgId = 0L;

@Override

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

_collector = collector;

}

@Override

public void nextTuple() { // code presented in next screen

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}} 

Now, let us look at the nextTuple method of the spout. 

The code fragment shows the nextTuple method of the spout. The nextTuple method opens a file for reading. This is the file from which the spout will read the messages from. It reads each line from the file and outputs as a tuple using the emit function. It also maintains a message ID counter and increments it for each input line. This message ID is passed as the second argument to the emit function. This is for tracking the tuple so that reliable processing can be done. 

@Override

public void nextTuple() {

if(input == null) {

input = new FileReader("/tmp/logfile");

binput = new BufferedReader(input);

}

while (true) {

String str= null;

while((str = binput.readLine()) == null){ // Read each line

Utils.sleep(1000); //if file ended, wait for some input to the file

}

msgId++; // increment message Id

_collector.emit(new Values(str),(Object)msgId);

}

Please note that the exception handling code has been excluded the exception from this. Any file handling in Java needs to include exception handling.

In the next section, you will now learn about bolts.

Bolt Definition

You will have two bolts,

  • GetLogType to filter the log types

  • Count to count the log types.

The code fragment to get log type is shown here.

public static class GetLogType extends BaseBasicBolt {

// Define execute function

// Define output fields

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word"));

}

The bolt is defined as a class GetLogType that extends the base class BaseBasicBolt. It has the execute method and the declareOutputFields methods. Later in the lesson, you will learn about the execute method. The declareOutputFields method creates one field for the output tuple from the bolt.

Moving on, let us look at the execute function for this bolt.

The code fragment shows the execute method of the GetLogType bolt.

@Override

public void execute(Tuple tuple, BasicOutputCollector collector) {

String line = tuple.getString(0); //get the entire line

StringTokenizer tokenizer = new StringTokenizer(line);

while(tokenizer.hasMoreTokens()){

String mystring=tokenizer.nextToken();

if(mystring.equals("ERROR") || mystring.equals("INFO") ||

mystring.equals("WARNING") ){

collector.emit( new Values(mystring));

}

}

}

The code shows the execute method getting the tuple as the input argument and getting a collector object for the output. The input tuple is parsed using the tokenizer function. Tokenizer divides the input line into words with the default delimiter as white space. The method loops over each word in the line and calls the collector’s emit method to output the words that match INFO or ERROR or WARNING.

 Next, let us look at the code for the second bolt.

 The code fragment shown is for the count bolt that counts the number of words.

 The class is defined with the name WordCount, and it extends the base class BaseBasicBolt. It defines a map to track the counts for each log type. It also defines a PrintWriter to storm the output to a file. It has the execute and declareOutputFields methods. 

public static class WordCount extends BaseBasicBolt {

Map<String, Integer> counts = new HashMap<String, Integer>(); // for tracking counts

PrintWriter fOut;

// function execute

//function declare output fields

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("word", "count"));

}

}

Further, in the lesson, you will look at the execute method. The declareOutputFields method is different from the method for the previous bolt. It has an extra field count that you need to output for each log type.

Here, you will look at the execute method for this bolt.  

The code fragment shows the execute method of the WordCount bolt.  

@Override

public void execute(Tuple tuple, BasicOutputCollector collector) {

String word = tuple.getString(0);

Integer count = counts.get(word);

if (count == null)

count = 0;

if(fOut == null) {

fOut = new PrintWriter(new FileWriter("/tmp/stormoutput.txt"));

}

count++;

counts.put(word, count);

collector.emit(new Values(word, count));

fOut.printf("%s:%d\n",word,count);

fOut.flush();

The code shows the execute method getting the tuple as the input argument and getting a collector object for the output. The input is from the GetLogType bolt and hence contains a single word INFO or ERROR or WARNING as the input. The method gets the existing count for the word from the map. It increments the count and stores it back into the map.

It also opens the output file for writing and writes the log type and the count to the file. It also emits the log type along with the count.

Next, you will look at how to connect a spout and a bolt.

Looking for more information on Apache Storm? Watch our Course Preview!

Topology–Connecting Spout and Bolt

The main function that connects the spout and bolt to create the topology is as shown:

The main function first gets a handle for the topology builder object. Then, it calls the setSpout method to set the GetLineSpout as the spout.

The second parameter 1 to setSpout indicates to run only one spout in parallel. Since we are reading from a single file, we want only one instance to read the file.

Next, it calls the setBolt method to set the GetLogType bolt. This bolt is given the name “split.” Here, the second parameter is 2 indicating two bolt instances to run in parallel. The bolt is linked to the spout using shuffle grouping. So, the two instances will get the lines from the spout in random order.

Next, the main function calls the setBolt method again to set the WordCount bolt. This bolt is given the name “count.” Here, the second parameter is 2 indicating two bolt instances to run in parallel. The bolt is linked to the “split” bolt using fields grouping. The field word is indicated as the field to group the data.  

The output of the Logtype bolt will be grouped so that all the tuples with the same log type will go to the same bolt instance.

This will ensure that the map defined in the WordCount bolt works properly. Next, the main function gets a configuration object and sets the number of worker processes to 1.  

Finally, the main function submits the topology with the given configuration to Storm.  

public static void main(String[] args) throws Exception {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new GetLineSpout(), 1);

builder.setBolt("split", new GetLogType(), 2).shuffleGrouping("spout");

builder.setBolt("count", new WordCount(), 2).fieldsGrouping("split", new Fields("word"));

Config conf = new Config(); conf.setNumWorkers(1);

if (args != null && args.length > 0) {

StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());

}

Next, let us look at the wrapper class.

Wrapper Class

Finally, the wrapper class is as shown:  

# Import required libraries

public class LogProcessTopology {

// spout definition

// bolt 1 definition

// bolt 2 definition

// main function definition

The file can be downloaded from simplilearncdn/storm/LogProcessTopology.java

The file contains the import libraries and then the wrapper class LogProcessTopology.  

Note that in Java, the class name should match the file name.  
The class contains the spout definition, definitions for the two bolts and the main function definition. We have come to the end of this lesson.

Summary

Here are the key takeaways:

  • There are reliable and unreliable spouts in Storm.

  • Storm spouts implement the IRichSpout interface whereas the bolts implement the IRichBolt interface. Storm provides eight different groupings for the tuples.

  • Shuffle grouping is random whereas field grouping provides a grouping of fields.

  • Storm can track a tuple from the start to successful processing by using the ack and fail methods.

  • Specifying the tuple path from input to output is called anchoring.

  • Real-time data can be ingested into Storm using the spouts.

  • Topology is used to connect spout to bolts.

Conclusion

This concludes the chapter: Advanced Storm concepts. In the next lesson, we will learn about the Storm interfaces.

  • Disclaimer
  • PMP, PMI, PMBOK, CAPM, PgMP, PfMP, ACP, PBA, RMP, SP, and OPM3 are registered marks of the Project Management Institute, Inc.

Request more information

For individuals
For business
Name*
Email*
Phone Number*
Your Message (Optional)
We are looking into your query.
Our consultants will get in touch with you soon.

A Simplilearn representative will get back to you in one business day.

First Name*
Last Name*
Email*
Phone Number*
Company*
Job Title*