Welcome to the fourth chapter of the Apache Storm tutorial (part of the Apache Storm course). This chapter will introduce you to the advanced concepts in Storm.
Let us start by exploring the objectives of this chapter.
By the end of this chapter, you will be able to
Describe the types of spouts and the structure of spouts and bolts.
Explain eight different types of grouping in Storm.
Explain reliable processing in Storm.
Explain Storm topology life cycle.
Discuss the real-time event processing in Storm.
Let’s get started with understanding the different types of spouts.
There are two types of spouts, reliable spouts, and unreliable spouts.
Reliable spouts
For reliable spouts, Storm tracks the tuple processing in the topology and ensures that the tuple is fully processed. If the processing fails, the user can resubmit the tuple to the topology. This is useful if you are getting the data from other messaging systems such as Kafka.
If the processing fails for a particular message, you can request Kafka to resend the message. Reliable processing is known to add about 20% overhead to Storm processing.
Unreliable spouts
For unreliable spouts, Storm does not track the tuple processing. If any processing fails, the tuples may be lost. This is much faster as there is no tracking overhead.
Let us take a look at the structure of a spout.
Wish to have in-depth knowledge of Apache Storm? Check out our Course Preview!
Spout should implement the IRichSpout interface.
As a part of this interface, following methods need to be defined:
Open: Called when the spout is first created
nextTuple: Add a new tuple to the output stream using emit method. If emit method is not called, no tuple is inserted. It is called periodically by the worker.
Ack: Called when the tuple is successfully processed
Fail: Called when the tuple processing fails
declareOutputfields: Specify the structure of the output tuple. Multiple streams can be the output by creating multiple streams
Next, let us explore the structure of a bolt.
Bolt should implement the IRichBolt interface.
As a part of this interface, following methods should be defined:
prepare: It is called when the bolt is first created
Execute: It is called for each tuple that the bolt is subscribed to. It can call emit method to output tuples for further processing. If the emit method is not called, tuple is not the output.
declareOutputfields: Specifies the structure of the output tuple. Multiple streams can be the output by creating multiple streams.
Moving on, let us identify the different types of groupings provided by Storm.
The tuples in a stream are grouped before reaching a bolt.
There are eight groupings provided by Storm as follows:
Shuffle grouping
Fields grouping
Partial key grouping
All grouping
Global grouping
None grouping
Direct grouping
Local or shuffle grouping
Now, let us look at how each type of grouping functions. We will begin with discussing shuffle grouping.
Shuffle Grouping
In this grouping, tuples are randomly distributed to bolts so that each task gets an equal number of tuples.
Fields Grouping
In this grouping, tuples are first partitioned by the fields specified. Those in the same partition are always sent to the same bolt.
Partial Key Grouping
This grouping is a combination of fields grouping and shuffled grouping. Tuples are grouped by the field values and also shuffled to do a load balancing for the bolts.
All Grouping
In this grouping, tuples are replicated across all the tasks of the bolt. There will be multiple copies of the tuples.
Global Grouping
In this grouping, all the tuples go to the same bolt instance. Bolt with the lowest ID gets all the tuples.
None Grouping
In this grouping, the user does not care which bolt processes which tuple. Currently, this is implemented the same way as shuffle grouping. Tuples are randomly distributed to bolts so that each task gets an equal number of tuples.
Direct Grouping
In this grouping, the producer of the tuple specifies which bolt instance the tuple should be sent to. This is done with the emitDirect method.
Local or Shuffle Grouping
In this grouping, tuples are given preference to bolt instances in the same worker process. If there are no local instances, the tuples are distributed in the way as shuffle grouping.
Reliability can be achieved in Storm by:
Specifying the path from spout to bolts through a process called anchoring.
Specifying successful or unsuccessful processing using ack and fail methods.
The diagram here shows the topology from the example we discussed in the previous lesson. It shows the spout producing tuples that are processed by the Split bolt. The tuples from the split bolt are processed by the Count bolt.
For successful processing, a tuple from the spout should be successfully processed by the Split bolt as well as a Count bolt.
Next, you will learn about the ack and fail methods.
Reliability is achieved by using the ack and fail methods.
Spout and bolts use the ack and fail methods to indicate the successful and unsuccessful processing respectively. The emit method indicates the tuples that are processed by spouts and bolts. When all the tasks in the path of a tuple send acks, the tuple is treated as successful. Otherwise, it is treated as failed. An acker task is created by Storm to handle the Acknowledgements.
The diagram shows the acker task collecting ack and fail messages from Split and Count bolts and then sending the ack or fail message to the spout. The acker task tracks the ack and fail messages from each bolt in the path of a tuple and sends ack to spout only if all the bolts send ack message for a particular tuple.
Now, let us look at ack timeout.
If an ack is not received within a specified timeout time period, the tuple processing is considered as failed. The configuration parameter TOPOLOGY_MESSAGE_TIMEOUT_SECS specifies the timeout in seconds. The default timeout is 30 seconds.
The diagram shows the acker task receiving the timeout apart from ack and fail, and passing the same to the spout.
Moving on, let us discuss the concept of Anchoring.
Specifying the link from the input path to output path for a tuple is called anchoring. Anchoring is done by specifying the input tuple in the emit method of a bolt. Input tuple should be the first parameter of the bolt.
For example, if the bolt has the execute method as shown below: execute(Tuple inputTuple) The emit method should look like: emit(inputTuple, other parameters.)
Next, let us look at topology lifecycle.
Topology represents the combination of spouts and bolts. The lifecycle of a topology is as follows:
Topology is submitted to a Storm Nimbus
Nimbus copies the code to a local file system
Nimbus creates topology information in zookeeper
Supervisor gets new topology information from zookeeper
Supervisor copies any required code to the local file system
Supervisor starts the worker processes for the topology
Worker process starts the spout and bolt tasks
These tasks will run till the topology is killed
Nimbus monitors the workers through Zookeeper
Now, let us move on to understand data ingestion in Storm.
Data ingestion in Storm is done using spouts. A spout can open and read a file for real-time event data. It can read data from Kafka messaging system using special Kafka spouts.
We will use the sample program we ran in the Storm installation lesson to read real-time log events data from a file.
Now, we will understand data ingestion in storm in a better way using an example.
Submit the example topology with the below-mentioned commands:
cd /tmp
wget simplilearncdn/logfile
wget simplilearncdn/LogProcessTopology.jar
storm jar LogProcessTopology.jar storm.starter.LogProcessTopology test1
Continuing with this example, Let us now put the following messages into the log file which is the input for storm:
cat >> /tmp/logfile
ERROR : first message
WARNING : second message
ERROR : third message
INFO : Fourth message
Next, let us check the output of data ingestion in Storm in another telnet session.
Open a telnet session to VM by using telnet and type the command mentioned below after login:
tail –f /tmp/stormoutput.txt
Keep the two windows side by side so that you can see the result on the second window as you type messages on the first window.
The image below shows the data entry for Storm and the image below shows the output from Storm. As you type messages on the left screen and press enter, the count of the messages is shown on the right screen. After checking a few messages, you can stop the data entry by pressing Ctrl-D on the left screen. You can end the data output on the second screen by pressing Ctrl-C.
Finally, stop the topology using the following command: storm kill test1
Next, let us look at the actual program code for the data ingestion.
The program fragment shows the spout definition for this example. The spout is defined as a class with the name GetLineSpout. It extends the base class BaseRichSpout.
You can see the definition overriding three methods:
Open
nextTuple
declareOutputFields.
In the open method, an output collector is created. In the declareOutputFields method, the structure of the output tuple is defined.
In this example, the output from the spout is the entire line itself. The method nextTuple is used to create the output stream. Further, you will see the code for this method.
public static class GetLineSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
FileReader input = null; BufferedReader binput = null; Long msgId = 0L;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
@Override
public void nextTuple() { // code presented in next screen
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}}
Now, let us look at the nextTuple method of the spout.
The code fragment shows the nextTuple method of the spout. The nextTuple method opens a file for reading. This is the file from which the spout will read the messages from. It reads each line from the file and outputs as a tuple using the emit function. It also maintains a message ID counter and increments it for each input line. This message ID is passed as the second argument to the emit function. This is for tracking the tuple so that reliable processing can be done.
@Override
public void nextTuple() {
if(input == null) {
input = new FileReader("/tmp/logfile");
binput = new BufferedReader(input);
}
while (true) {
String str= null;
while((str = binput.readLine()) == null){ // Read each line
Utils.sleep(1000); //if file ended, wait for some input to the file
}
msgId++; // increment message Id
_collector.emit(new Values(str),(Object)msgId);
}
}
Please note that the exception handling code has been excluded the exception from this. Any file handling in Java needs to include exception handling.
In the next section, you will now learn about bolts.
You will have two bolts,
GetLogType to filter the log types
Count to count the log types.
The code fragment to get log type is shown here.
public static class GetLogType extends BaseBasicBolt {
// Define execute function
// Define output fields
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
The bolt is defined as a class GetLogType that extends the base class BaseBasicBolt. It has the execute method and the declareOutputFields methods. Later in the lesson, you will learn about the execute method. The declareOutputFields method creates one field for the output tuple from the bolt.
Moving on, let us look at the execute function for this bolt.
The code fragment shows the execute method of the GetLogType bolt.
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String line = tuple.getString(0); //get the entire line
StringTokenizer tokenizer = new StringTokenizer(line);
while(tokenizer.hasMoreTokens()){
String mystring=tokenizer.nextToken();
if(mystring.equals("ERROR") || mystring.equals("INFO") ||
mystring.equals("WARNING") ){
collector.emit( new Values(mystring));
}
}
}
The code shows the execute method getting the tuple as the input argument and getting a collector object for the output. The input tuple is parsed using the tokenizer function. Tokenizer divides the input line into words with the default delimiter as white space. The method loops over each word in the line and calls the collector’s emit method to output the words that match INFO or ERROR or WARNING.
Next, let us look at the code for the second bolt.
The code fragment shown is for the count bolt that counts the number of words.
The class is defined with the name WordCount, and it extends the base class BaseBasicBolt. It defines a map to track the counts for each log type. It also defines a PrintWriter to storm the output to a file. It has the execute and declareOutputFields methods.
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>(); // for tracking counts
PrintWriter fOut;
// function execute
//function declare output fields
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
Further, in the lesson, you will look at the execute method. The declareOutputFields method is different from the method for the previous bolt. It has an extra field count that you need to output for each log type.
Here, you will look at the execute method for this bolt.
The code fragment shows the execute method of the WordCount bolt.
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
if(fOut == null) {
fOut = new PrintWriter(new FileWriter("/tmp/stormoutput.txt"));
}
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
fOut.printf("%s:%d\n",word,count);
fOut.flush();
}
The code shows the execute method getting the tuple as the input argument and getting a collector object for the output. The input is from the GetLogType bolt and hence contains a single word INFO or ERROR or WARNING as the input. The method gets the existing count for the word from the map. It increments the count and stores it back into the map.
It also opens the output file for writing and writes the log type and the count to the file. It also emits the log type along with the count.
Next, you will look at how to connect a spout and a bolt.
Looking for more information on Apache Storm? Watch our Course Preview!
The main function that connects the spout and bolt to create the topology is as shown:
The main function first gets a handle for the topology builder object. Then, it calls the setSpout method to set the GetLineSpout as the spout.
The second parameter 1 to setSpout indicates to run only one spout in parallel. Since we are reading from a single file, we want only one instance to read the file.
Next, it calls the setBolt method to set the GetLogType bolt. This bolt is given the name “split.” Here, the second parameter is 2 indicating two bolt instances to run in parallel. The bolt is linked to the spout using shuffle grouping. So, the two instances will get the lines from the spout in random order.
Next, the main function calls the setBolt method again to set the WordCount bolt. This bolt is given the name “count.” Here, the second parameter is 2 indicating two bolt instances to run in parallel. The bolt is linked to the “split” bolt using fields grouping. The field word is indicated as the field to group the data.
The output of the Logtype bolt will be grouped so that all the tuples with the same log type will go to the same bolt instance.
This will ensure that the map defined in the WordCount bolt works properly. Next, the main function gets a configuration object and sets the number of worker processes to 1.
Finally, the main function submits the topology with the given configuration to Storm.
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new GetLineSpout(), 1);
builder.setBolt("split", new GetLogType(), 2).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 2).fieldsGrouping("split", new Fields("word"));
Config conf = new Config(); conf.setNumWorkers(1);
if (args != null && args.length > 0) {
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
}
Next, let us look at the wrapper class.
Finally, the wrapper class is as shown:
# Import required libraries
public class LogProcessTopology {
// spout definition
// bolt 1 definition
// bolt 2 definition
// main function definition
}
The file can be downloaded from simplilearncdn/storm/LogProcessTopology.java
The file contains the import libraries and then the wrapper class LogProcessTopology.
Note that in Java, the class name should match the file name.
The class contains the spout definition, definitions for the two bolts and the main function definition. We have come to the end of this lesson.
Here are the key takeaways:
There are reliable and unreliable spouts in Storm.
Storm spouts implement the IRichSpout interface whereas the bolts implement the IRichBolt interface. Storm provides eight different groupings for the tuples.
Shuffle grouping is random whereas field grouping provides a grouping of fields.
Storm can track a tuple from the start to successful processing by using the ack and fail methods.
Specifying the tuple path from input to output is called anchoring.
Real-time data can be ingested into Storm using the spouts.
Topology is used to connect spout to bolts.
This concludes the chapter: Advanced Storm concepts. In the next lesson, we will learn about the Storm interfaces.
A Simplilearn representative will get back to you in one business day.