Apache Storm Interfaces Tutorial
5.1 Storm Interfaces
Hello and welcome to the fifth lesson of the Apache Storm course offered by SimpliLearn. This lesson will provide you the information on various interfaces to Storm. Now, let us start with exploring the objectives of this lesson.
By the end of this lesson, you will be able to identify the different interfaces to Storm, explain Java interface to Storm, describe how to create spouts, bolts and their connections, explore Real time data processing platform, describe Kafka interface to Storm, describe writing a Kafka Spout and explore Storm Interface to Cassandra. Let us begin with discussing the Storm interfaces.
5.3 Storm Interfaces
Storm provides Java as a basic interface and also other interfaces such as Python and Ruby. It provides interfaces that can be used from any language. You can have interfaces for spouts, bolts or both. Messaging interfaces to Kafka, Kestrel and Twitter are available in Storm. You can have bolts writing to Cassandra or Hbase. We will discuss the Java, Kafka and Cassandra interfaces in this lesson. Next, we will look at Java interface to Storm.
5.4 Java Interface to Storm
Storm provides Java interfaces and classes for interfacing with the Storm cluster. There are three types of classes provided. Spout classes are used to create spouts. Bolt classes are used to create bolts and topology classes are used to connect spouts and bolts to create the Storm topology. Now, we will look at Spout interface.
5.6 Spout Interface
Spout interface provides the interfaces to create streams from external data. There are two main interfaces provided for Java, IRichSpout interface and BaseRichSpout class. IRichSpout is the main interface to implement spouts in Storm. BaseRichSpout and other language or message spouts extend the IRichSpout. BaseRichSpout is a class implementing IRichSpout. Next, let us look at IRichSpout methods.
5.7 IRichSpout Methods
IRichSpout provides the interface for Storm spouts. This table presents some important methods that are a part of the IRichSpout interface. Remember that the term interface in Java means that it is only a template and any class that implements the interface has to provide the methods for the interface. This table shows four methods with the signature for the methods. The open method is called when the spout is initialized. It is called only once for the spout. It gets the configuration for the spout and also the context of the spout. The collector is used to emit or output the tuples from this spout. The nextTuple method is called by Storm to request the spout to emit one or more tuples to the output collector. If the spout has no tuples to emit, then the method should return immediately. It normally calls the emit method on the output collector of the spout to add the tuples to the stream. The ack method is called when Storm has determined that the tuple associated with the msgId has been fully processed. This method can be used to take messages off of an incoming message queue. The fail method is called when Storm has determined that the tuple associated with the msgId is not fully processed. This can happen due to failure of any of the bolts processing the tuple or due to time outs. This method can be used to indicate replay of the message in the message queue. Next, let us look at BaseRichSpout methods.
5.8 BaseRichSpout Methods
BaseRichSpout implements the IRichSpout interface and also the component interface of Storm. Apart from the IRichSpout methods, this class also contains some important methods. These methods are shown in the table. The declareOutputFields method declares the output fields for the tuples output by this spout. The declarer format is explained later in the lesson. The getComponentConfiguration method can be called to get the current configuration of the spout. Now, we will explore the outputFieldsDeclarer interface.
5.9 OutputFieldsDeclarer Interface
The ouputFieldsDeclarer interface is used to specify the output format of spouts and bolts. This has four main output types: • declare(Fields fields): This is the simplest definition. It specifies the list of fields that form the output tuple and tuples are output to the default output stream. • declare(boolean direct, Fields fields): This is similar to the previous case except that the direct indicator used to specify if this is a direct grouping. We discussed direct grouping in the previous lesson. • declareStream(String streamid, Fields fields): This is used to declare multiple streams of tuples. For example, you may want to send all the error messages to a stream that displays them on a screen and all information messages to a log file. • declareStream(String streamid, boolean direct, Fields fields): This is used to declare multiple streams of tuples where each stream can have a direct indicator to specify direct grouping. Moving on, let us discuss bolt interface.
5.10 Spout Definition Example
The program fragment shows the spout definition using the BaseRichSpout class. The spout is defined as a class with name GetStorckSpout. It extends the base class BaseRichSpout. You can see the definition overriding three methods open, nextTuple and declareOutputFields. In the open method, an output collector is created. The file containing stock entries is also opened here. In the declareOutputFields method, the structure of the output tuple is defined. In this example, output from the Spout is the stock market data that contains three fields: ticker, value and status. The method nextTuple is used to create the output stream. Next, let us look at nextTuple method of the spout. The program fragment shows the nextTuple method of the spout. The nextTuple method reads each line from the file and outputs as a tuple using the emit function. The line is divided into fields using the split function with comma delimiter. It also maintains a message ID counter and increments it for each input line. This message ID is passed as the second argument to the emit function. This is for tracking the tuple so that reliable processing can be done. Note that we have excluded the exception handling code in this example. Any file handling in Java needs to include exception handling. Next, let us look at Bolt interface.
5.11 Bolt Interface
Bolt interface provides the classes and interfaces for processing input streams from spouts and other bolts. There are two main interfaces and two main classes provided for Java. The two interfaces provided are IRichBolt and IBasicBolt, and the classes provided are BaseRichBolt and BaseBasicBolt. IRichBolt is the main interface to implement bolts in Storm. BaseRichBolt and other language or database bolts extend the IRichBolt. BaseRichBolt is a class implementing IRichBolt. IBasicBolt is a bolt interface similar to IRichBolt but has the automatic tracking of tuples. BaseBasicBolt is a class implementing IBasicBolt and is used to track tuples. Now, let us look at IRichBolt methods.
5.12 Irichbolt Methods
IRichBolt interface provides the interface for bolts in Storm. Some of the important methods for IRichBolt interfaces along with the method signature are provided in the table shown. The prepare method is called when the bolt is initialized and is similar to the open method in spout. It is called only once for the bolt. It gets the configuration for the bolt and also the context of the bolt. The collector is used to emit or output the tuples from this bolt. The execute method is called by Storm for each input tuple. This method can process the input tuple and emit one or more tuples to the output collector. If the bolt has no tuples to emit, then the method should return immediately. It normally calls the emit method on the output collector of the bolt to add the tuples to the stream. The emit method should include the input tuple as a parameter to indicate successful processing of the input tuple. Now, we will explore the BaseRichBolt methods.
5.13 Baserichbolt Methods
BaseRichBolt implements the IRichBolt interface and also the component interface of Storm. Apart from the IRichBolt methods, this class contains couple of important methods as described in this table. The declareOutputFields method is used to specify the output fields for the tuples output by this bolt. The declarer format was explained earlier in the lesson. The getComponentConfiguration method can be called to get the current configuration of the bolt. Let us now discuss the IBasicBolt methods.
5.14 Ibasicbolt Methods
The IBasicBolt interface provides the interface for reliable processing in Storm. Some of the methods for this interface along with signatures are described in this table. The prepare method is called when the bolt is initialized and is similar to the open method in spout. It is called only once for the bolt. It gets the configuration for the bolt and also the context of the bolt. This method can be used for opening any files for output or to open any connections to the database for storing data. The execute method is called by Storm for each input tuple. This method can process the input tuple and emit one or more tuples to the output collector. If the bolt has no tuples to emit, then the method should return immediately. It normally calls the emit method on the output collector of the bolt to add the tuples to the stream. This method automatically manages the acking for the input tuple and sends an ack for the input tuple at the end of the method. You can throw a FailedException if you want to indicate that the processing has failed. This interface is different from the IRichBolt interface. Moving on, we will learn about the BaseBasicBolt methods. BaseBasicBolt implements the IBasicBolt interface and also the component interface of Storm. Apart from the IBasicBolt methods, this class contains a couple of important methods as described in the table shown. The declareOutputFields method is used to specify the output fields for the tuples output by this bolt. The declarer format was explained earlier in the lesson. The getComponentConfiguration method can be called to get the current configuration of the bolt. Let us look at configuring Kafka.
5.15 Bolt Interface Example 1
We will show how to use the bolt classes shown with an example. We will take the tuple data from spout that provides stock market data with fields ticker symbol, value of stock and status of stock. The code fragment shows the bolt definition GetGoodStocks that extends the bolt class BaseBasicBolt. It has the execute and declareOutputFields methods. We will look at the execute method later in the lesson. The declareOutputFields method creates two fields for the output tuple from the bolt. The fields are ticker symbol and value of the stock. Next, let us look at the execute method of the bolt.
5.16 Bolt Interface Example 2
StormSubmitter class is used to submit a topology to Storm cluster when you use the command Storm jar to submit a topology class to cluster. The given table lists some of the methods of this class including their signature. Note that these are static methods, so they can be called without creating an object of StormSubmitter class. The submitTopology method is used to submit a topology to run on the cluster. It takes three parameters. The first parameter name is the name given to the topology and will be shown when you use the command storm list. stormConf is used to specify configuration parameters such as the number of workers and debug flag. The submitTopologyWithProgressBar method is similar to the above function. It takes the same parameters as the above function but shows a progress bar upon submit. The progress bar is just a bar showing the progress of upload of the jar to Storm cluster. Next, let us look at an example of TopologyBuilder.
5.18 TopologyBuilder Methods
5.22 Apache Kafka Recap
We talked about Apache Kafka in the first lesson. Let us quickly do a recap about Kafka as we want to discuss Storm interface to Kafka. Kafka is a high-performance real time messaging system. It is an open source and a part of Apache projects. Kafka is a distributed and partitioned messaging system. It is highly fault-tolerant and can process millions of messages per second and send to many receivers. Apache Kafka can stream messages which can be useful for streaming the messages into Storm. Now, we will discuss the Kafka data model.
5.21 Topology Builder Example
The code fragment for the main function that connects the spout and bolts to create the topology is shown here. The main function first gets a handle for the TopologyBuilder object. Then, it calls the setSpout method to set the GetStockSpout as the spout. This component is given the name “StockSpout”. The third parameter 1 to setSpout indicates to run only one spout in parallel. Since we are reading from a single file, we want only one instance to read the file. Next, it calls the setBolt method to set the GetGoodStocks bolt. This bolt is given the name “StockFilter”. Here, the third parameter is 2 indicating two bolt instances to run in parallel. The output of this method is stored as a BoltDeclarer object. Next, the bolt is linked to the spout using shuffle grouping. So, the two bolt instances will get the stock tuples from the spout in random order. Next, the main function gets a configuration object and sets the number of worker processes to 1. Finally, the main function submits the topology with the given configuration to Storm. It uses “test2” as the name of the topology. The createTopology method is called to create the topology and the handle is sent to Storm submit. Next, let us do a quick recap of Apache Kafka.
5.23 Kafka Data Model
Kafka data model consists of messages and topics. Below are some of the terms used in Kafka. • Messages: Any piece of information such as lines in a log file, a row of stock market data, an error message from a system • Topics: Messages are grouped into categories called topics. For example, LogMessage, StockMessage etc. • Producers: Processes that publish messages into a topic in Kafka • Consumers: Processes that get the messages from a topic in Kafka • Brokers: Processes within Kafka that process the messages • Kafka Cluster: A set of brokers that are processing the messages The diagram shows a Kafka cluster with three broker processes. There are two producers sending messages to the cluster and two consumers that are receiving the messages from the cluster. Now, we will quickly do a recap of Apache Cassandra.
5.24 Apache Cassandra Recap
We also talked about Apache Cassandra as a no-sql database in the first lesson. Let us recap about Cassandra as we want to discuss Storm storing data into Cassandra. Cassandra is an Apache open source database with the following characteristics: • Highly fault tolerant – no single point of failure • Highly available – ring architecture • Real time read and write • Super-fast writes • Simple SQL interface • Key-value database • Does not provide group by clause. Any grouping of data has to be done externally and stored as separate tables. This diagram shows a Cassandra cluster consisting of six nodes arranged in a ring model. Next, let us look at real time data analysis platform.
5.25 Real Time Data Analysis Platform
Kafka-Storm-Cassandra together form a high-performance real time big data analytics platform. In this platform, Kafka receives each line of data as a message and forwards to Storm. Storm parallelizes the data and starts multiple bolts for inserting data into Cassandra. Each bolt inserts data into one or more tables in Cassandra. Millions of records can be processed per second with this architecture. Storm provides special Kafka spouts to get data from Kafka. The Cassandra Java interface can be used to insert records into Cassandra from Storm bolts. This diagram shows this platform with Kafka getting the messages from external system and sending to Kafka Spout within Storm. Storm spout parallelizes the messages into multiple bolts and the bolts insert the data into Cassandra. This provides a real time data analytics platform. Now, we will explore the Kafka interface to Storm.
5.26 Kafka Interface to Storm
Kafka can produce messages for processing in Storm. A Kafka spout is used to create tuples from messages. Kafka interface to Strom consists of KafkaSpout, SpoutConfig and KafkaBolt classes: KafkaSpout is a class that extends IRichSpout and produces tuples from Kafka messages. It can receive messages from a Kafka producer and sends the messages to bolts in Storm. SpoutConfig is a class that specifies the configuration for a Kafka spout. KafkaBolt is a class that extends BaseRichBolt and sends tuple data to Kafka as messages. We will not discuss much about Kafka bolt as this is not commonly used. This diagram shows a Storm cluster consisting of a Kafka spout and three bolts. The Kafka spout receives the messages from an external data source through the Kafka cluster. The bolts receive the data from the Kafka spout and send the processed data to external outputs. Next, let us look at Kafka Spout.
5.27 Kafka Spout
Kafka Spout provides the class that interfaces with Kafka to get the messages from Kafka and convert them into tuples for Storm. The features of this Spout are: • It gets messages from a topic in Kafka • It uses the BrokerHosts and zkHosts interface to connect to zookeeper for Kafka • The interface SpoutConfig, an extension of KafkaConfig interface is used to configure the topic for connecting to Kafka cluster You can connect the Kafka Spout in your topology like any other spout Moving on, we will look at Kafka spout configuration.
5.28 Kafka Spout Configuration
Kafka Spout needs the configuration set for connecting to Kafka cluster. It uses the SpoutConfig for connection parameters for Kafka cluster. SpoutConfig is the class for specifying spout configuration. It takes following arguments for constructing the SpoutConfig object: • A list of type BrokerHosts. This is a list of zookeeper hosts for Kafka connection. This list is constructed using zkhosts method. • A topic name of type String. This is the name of the topic in Kafka that holds the messages. As discussed earlier, messages are grouped into topics in Kafka. • A directory name of type String, this is the root directory in Zookeeper for this spout. • An identifier of type String, this is a unique identifier for this Spout. This diagram shows a code fragment for setting up the spout configuration. It first gets the list of zookeeper hosts using the zkhosts function. The host name and port are sent as parameters. We are using the zookeeper at localhost and the default zookeeper port of 2181. Next, a new SpoutConfig object is created with the zookeeper list as the first argument, “test” as the topic name, “/test” as the zookeeper root directory and “mystorm1” as the identifier for this spout. Further, we will learn about Kafka Spout Schemes.
5.29 Kafka Spout Schemes
Kafka spout can process the input messages into Storm tuples using various schemes. The scheme can be set in the spout configuration object. These schemes are as shown below: • RawMultiSchme: This is the default. It takes the input byte array and outputs tuple with byte array. The name of the output field is “bytes”. • SchemeAsMultiScheme: This takes the specified scheme as the scheme for conversion. It can use the StringScheme which converts the input messages as strings. • KeyValueSchemeAsMultiScheme: This takes a specified key value scheme which contains a list of keys and values for conversion. This diagram shows the code fragment building upon the code as shown earlier. It has an extra line where the scheme in the SpoutConfig object is put as String scheme. Next, let us look at using Kafka spout in Storm.
5.30 Using Kafka Spout in Storm
Kafka Spout can be used in the same way any other spout. You don’t need to write any code for the spout itself as the messages will come from Kafka. The code fragment shows creating a Kafka spout by connecting to Kafka cluster at localhost. The Kafka topic used is the topic “test”. A Kafka configuration object is created with the topic “test”. The scheme is set to String scheme. Then, a KafkaSpout object is created with this KafkaConfig. Next, a TopologyBuilder object is created as a part of Storm Java interface. KafkaSpout is set as the spout for this topology by using the setSpout method. The parallelism is specified as 1 but can be increased while processing multiple Kafka streams. Now, we will discuss the Storm interface to Cassandra.
5.31 Storm Interface to Cassandra
Storm bolts can be used to connect to a Cassandra database and store data. Use the Datastax driver for Cassandra (http://downloads.datastax.com/java-driver/cassandra-java-driver-2.1.4.tar.gz) for connecting to Cassandra. Write the code in Storm bolt to take a tuple and write to the database. Database connection should be created in the prepare method of bolts. In the execute method, build the statement for database and save the data. We will now learn how to insert or update Cassandra data using Java.
5.32 Insert or Update Cassandra
Below are the steps to store data on Cassandra using Java interface: • Get a handler for the Cassandra cluster • Get a session handler by connecting to the cluster • Prepare the insert statement for data as ‘insert into (column list) values (value list) • Send the insert statement to Cassandra using the session handler • Cassandra is a key-value store. Cassandra will update the data if key values already exist. Otherwise, Cassandra will insert the new row into the database • Close the session Now, let us learn how to set up Cassandra session in Storm.
5.33 Setting Up Cassandra Session
We can create a separate class for handling a session for Cassandra. In this class, we can have: A connect method to get a handler for the cluster and get a session handler by connecting to the cluster. A close method is used to close the connection. The code fragment shows a SessionClient class being created to handle the connection to Cassandra. This class has a connect method that takes the address of the Cassandra node as a parameter. It first gets the cluster handler by using the builder method of the Cluster object. It then uses the connect method of the cluster object to initiate a database session to Cassandra. This class also contains a close method to close the connection to the cluster. Since, the session has to be setup only once, the prepare method in Storm bolt is used to set up a session for Cassandra. The code fragment shows the definition of a bolt called CassandraCount that extends the BaseBasicBolt. This method first crates a SessionClient object to handle connection to Cassandra. The connect method is then called with the IP address of the Cassandra node to connect to Cassandra. Next, let us learn how to insert or update data into Cassandra from bolt.
5.34 Insert or Update Data into Cassandra from Bolt
Let us take an example of inserting test.logtype into a table. In Cassandra, it has type as key and cnt as value. A method storeData is created in the class SessionClient to insert data into Cassandra. This method takes log type and corresponding count as parameters. It builds the insert into database SQL command with the given values and executes the statement on the session that was connected earlier. The code fragment shows a SessionClient object with the method storeData. This method takes two parameters: logtype of type string and count of type integer. It builds the insert statement with the parameter values. For example, if the logype value was “ERROR” and count was 5, then it will build the insert statement as “insert into testdb.logtype(type, cnt) values (“ERROR”, 5); . This statement will be executed on the Cassandra connection. Please note that Cassandra treats insert statements as updates if the key is already present in the database. So in this example, if there was no record for the key “ERROR” in the table testdb.logtype, then the row (“ERROR”,5) will be inserted into the database. If there was a previous row in the database with the values (“ERROR”, 3), then that row will be updated to (“ERROR”, 5) Next, let us look at the steps to insert or update data into Cassandra from bolt. In the execute method of the bolt, the storeData method can be called to insert or update each logtype and count value into the database. The code fragment shows the execute method of the CassandraCount bolt. It gets the logtype from the input tuple and increments a count for the logtype using a map. It stores the new count into the map and then calls the storeData method of the Cassandra client object to insert or update the count to Cassandra. Now, we will look at Kafka to Storm to Cassandra.
5.35 Kafka – Storm – Cassandra
In the realtime data platform, Kafkaspout is used to get data from Kafka and then the bolts save data into Cassandra using the Cassandra Java interface. The program structure is as shown. The code fragment shows the Kafka spout we explained earlier along with the Storm topology builder that was also shown earlier. A topology builder object is created first. Next, a Kafka spout is created with the localhost zookeeper connection and Kafka topic “test”. Next, this Kafka Spout is added to the topology. Next, a bolt for splitting the lines is added to the topology. Next, the CassandraCount bolt that was explained earlier is added to the topology. Finally, the StormSubmitter is used to submit this topology to the Storm cluster. We have come to the end of this lesson. Now, let’s do a small quiz to test your knowledge.
A few questions will be presented on the following screens. Select the correct option and click submit to see the feedback.
• Storm provides Java and other language interfaces • Storm Java interface consists of spouts, bolts and topology classes • Java spout interface consists of IRichSpout interface and BaseRichSpout class • Java bolt interface consists of IRichBolt interface and BaseRichBolt class • Java topology interface consists of TopologyBuilder and StormSubmitter classes • Real time data analysis platform can be built using Kafka, Storm and Cassandra • Kafka Spout is used to get data into Storm through Kafka cluster • Cassandra Java interface is used to store data into Cassandra through Storm bolt
This concludes the lesson: Interfaces to Storm. In the next lesson, we will learn about the Storm Trident.
About the On-Demand Webinar
About the Webinar