Apache Storm Trident Tutorial
This concludes the lesson on Trident. This also concludes the SimpliLearn developer course on Storm.
• Trident is an abstraction on top of Storm • Trident groups tuples into batches and does operations on batches • It provides aggregation and other operations on top of Storm • Trident provides exactly once processing of tuples and stateful processing in Storm • Trident spouts and operations can be connected using the Trident topology
A few questions will be presented on the following screens. Select the correct option and click submit to see the feedback.
6.19 Wrapper class
The wrapper class includes all the classes and functions we explained in the previous screens. The code fragment for the wrapper class is shown here. The code fragment shows the wrapper class LogProcessTopology. Comments are shown for code blocks but represent the definition of each of those classes and functions as shown in the previous screens. This wrapper class is what gets compiled and submitted to Storm for execution. Next, let us look at Trident advantages.
6.20 Trident Advantages
Trident adds a lot of high-level functionalities to Storm and here are some advantages of using Trident: • Trident is smart about how to run the topology for maximizing the performance • It processes tuples in batches improving the throughput • It provides operators such as group by and aggregate which reduce the amount of programming you need to do in Storm • It can process computations within a partition first before sending tuples over the network, improving the throughput and reducing network traffic • It provides exactly once processing in Storm adding higher level of fault tolerance We have come to the end of this lesson. Now, let’s do a small quiz to test your knowledge.
6.18 Topology – Main Function
The code fragment for the main function that connects the spouts and bolts to create the topology is shown here. The main function gets the topology name as the first parameter. It creates a configuration object first. The debug flag is set in the configuration so that we can see the detailed messages in the log. The number of spouts that will run in parallel for pipelining is set to 1 as we want only one spout to run at a time. Next, the number of workers is also set to 1. Finally, the StormSubmitter is called with the topology name, configuration object and the topology from the builtTopology method. Now, we will understand the concept of wrapper class.
6.17 Topology – Connecting Spout and Bolt
The method that connects the spouts and bolts to create the Trident topology is shown here. After using GetLogType to filter only log types, we use the group by and aggregate to do the aggregation by the log type. Finally, printTuple class is used to store the output to a file. The code fragment shows the buildTopology method that connects the spouts and bolts to create the Trident topology. The method first creates the GetLineSpout object with a batch size of 10, which means that the spout will put ten tuples in each batch. Next, it creates an object of class TridentTopology. In this topology, a new stream is created with the newStream method. This new stream is created with the GetLineSpout as the spout and a parallelism of 1. The bolt GetLogType is connected to the output of this spout with each method of stream. Then, the groupBy Trident operation is called to group the results by each log type. The aggregate operation is applied on the group by output with the count function. This is a grouped aggregate, so the count of tuples is done for each log type to produce a stream with log type and count as fields. Finally, the printTuple method is called with each aggregated tuple to store the count of logtypes to a file. This Trident topology is built and returned as the Trident Topology. Next, let us look at the Topology main function.
6.16 Storing the Output Example
We will have another function operation to store the log type counts to an output file. The code fragment shows the class printTuple that extends BaseFunction class for storing the log type counts. Please note that Exception handling is not shown here. This class has a single method execute that takes a Trident tuple and an output collector as arguments. The input contains two fields. The first field is a log type word and the second one is a count. It opens the output file for writing and writes the log type and the count to the file. Next, let us look at a topology connecting a spout and a bolt.
6.1 Storm Trident
Hello and welcome to the last and final lesson of the Apache Storm course offered by SimpliLearn. This lesson will introduce you to the concept of Trident interface to Storm. Now, let us begin with exploring the objectives of this lesson.
By the end of this lesson, you will be able to explain what Trident is, describe Trident data model, identify various operations in Trident, explain the stateful processing using Trident, discuss pipelining in Trident and explain the advantages of Trident. Let us start with understanding what Trident is.
6.3 Trident Introduction
Trident is a high-level abstraction on top of Storm. It provides abstraction for joins, groups and aggregation functions. It provides batching of tuples so that multiple tuples can be grouped together into batches. It provides transactional processing in Storm so that you can achieve better fault tolerance. It also provides state manipulation in Storm so that you can have stateful processing. Trident libraries are included as a part of standard Storm installation. Next, let us look at Trident data model.
6.4 Trident Data Model
The core data model in trident is the stream which is an unbounded sequence of tuples. The tuples in the stream are grouped together as a series of batches. Each batch has a finite number of tuples. A stream is partitioned across the nodes in the cluster. Trident operations on the stream are applied in parallel across each partition. This diagram shows input sequence of tuples – T1, T2 etc. coming into Trident Storm cluster. The tuples are partitioned into two nodes. For example, the tuples T1, T2 and T3 go to partition 1 on node 1 whereas the tuples T4, T5, and T6 go to partition 2 of node 2. Here, the tuples are grouped into batches. Each batch has three tuples which is the batch size. Batch1, batch3 and batch5 go to partition 1 of node 1 whereas batch2, batch4 and batch6 go to partition2 of node2. Please note that in Storm, the input is unbounded, so the cluster will keep getting the tuples and Trident will keep grouping them into batches. Now, we will look at the stateful processing in Trident.
6.5 Stateful Processing using Trident
Since Storm is an unbounded stream of tuples, there is no state maintained during processing. Stateless processing: Once a tuple is processed, Storm doesn’t store any information about the tuple. So when we receive a tuple, we don’t know if it is already processed or not. This is the default processing in Storm. Stateful processing: Once a tuple is processed, Storm stores the information about the tuple. So we can find out if a tuple was previously processed. Trident provides the topology that automatically maintains state information. State information is stored in memory or in a database. Next, we will look at IRichSpout methods.
6.6 Operations in Trident
Trident consists of five types of operations: • Partition local operations • Repartitioning operations • Aggregation operations • Operations on grouped streams • Merges and Joins Here, we will look at the different types of partitions in detail one by one. Let us start with partition local operations.
6.7 Trident State
Trident provides a mechanism for managing states and transactions: • Trident state can be internal to topology or external stored into the databases. • Trident manages state in a fault-tolerant way, in case of retries or failure the message gets processed only once in Trident Topology. • Trident provides the following semantics for achieving only once processing and takes the appropriate steps to update the state in a consistent way: I. Tuples are processed in small batches. II. Each batch of Tuples is given a unique ID (known as Transaction ID), where if the batch is replayed it is given the same Transaction ID. III. State update is ordered which means that the state updates are not applied to batch 2 if batch 1 is not completed. Next, we will look at how Trident topology works.
6.8 Trident Topology
Trident provides the Trident topology class that is created using Trident topology builder. This provides many methods for Trident operations. Some of the methods and their signature are listed in the given table. The newStream method creates a new stream of tuples that will have the specified transaction ID. The tuples will be produced by the specified spout. Instead of IBatchSpout, different spouts can be specified. This is discussed later in the lesson. The merge method merges multiple input streams into one stream. The join method performs a join operation on the two streams based on the join fields specified. The output stream will contain the fields from join fields 1 as well as the outfields specified. Let us move on to understanding the concept of Trident Spouts.
6.9 Trident Spouts
Many different types of spouts can be used with Trident topology. Few useful bolts are listed in the given table. IBatchSpout: This is a non-transactional spout that emits a batch of tuples at a time. This provides limited fault-tolerance with only once processing guarantee. TridentSpout: This spout supports transaction semantics. It can provide at least once and exactly once processing using various functions. PartitionedTridentSpout: A transactional spout that reads from a partitioned data source. This ensures at least once processing. OpaquePartitionedTridentSpout: An opaque transactional spout that reads from a partitioned data source. Unlike PartitionedTrident Spout, this spout does not mandate that tuples belong to the same batch when resubmitted. This ensures fault-tolerant processing even when the source nodes fail, guaranteeing exactly-once processing. Next, let us look at the fault tolerance levels.
6.11 Fault-tolerance Levels
There are three types of fault-tolerance levels in Trident, provided by various spouts: Only once processing: This is the least fault tolerance level where transaction IDs are tracked to ensure that a tuple is processed only once. At least once processing: This is partly fault-tolerant as transaction ID information is stored in memory or external database to ensure that each tuple is processed at least once. Exactly once processing: This is highly fault-tolerant as transaction ID information is stored in external database and also allows replay of tuples in different batches. This ensures exactly once processing even when a tuple changes batches due to source node failure. Next, let us look at the concept of pipelining.
Trident can process multiple batches at a time using pipelining. Batches will be processed in parallel and status updates will be ordered as per batch ID. Maximum number of batches that will be processed in parallel can be specified with topology.max.spout.pending” property for the topology. Pipelining results in a higher throughput and lower latency. For Example: If the topology.max.spout.pending is set to 50, Trident will process multiple batches in parallel till 50 batches are pending status update. If 40 batches are pending status update, next batch will be taken up for processing. If 50 batches are pending status update, then the next batch will not be taken up. Next, let us look at exactly once processing.
6.13 Exactly Once processing
Trident automatically handles state so that the tuples are processed exactly once. Tuples are grouped into small batches and are processed in batches. Each batch is given a unique batch ID called the transaction ID. If a batch is retried, it gets the exact same transaction ID. State updates are ordered among the batches – If there are three batches 1, 2 and 3, then the state update for batch 2 will happen only after the state update for batch 1 is successful. State update for batch 3 will happen only after the state update for batch 2 is successful. This ensures that the batch updates happen exactly once. The state information can be stored in an external database so that even in case of node failures, the transactions can persist. Next, let us look at spout definition example.
6.14 Spout Definition Example
Spout can be defined as follows for data ingestion using Trident. This spout reads from the file and batches the input lines. Let us take an example of log processing where we output the count of each log type occurring in the log file. The program fragment shows the Spout definition using Trident spout. GetLineSpout is defined as a class that implements the IBatchSpout interface. It has a constructor that takes the batch size as the input parameter and sets the batchsize for the spout. It has an open method that is called only once for the spout instance. This function opens the input log file and sets the file handler. It has a getOutputFields method that sets the output fields as a single field returning the entire line. The emitBatch method of this class will be explained later in the lesson. The emitBatch method is used to output the tuples to the topology stream. The code fragment for this method is shown here. Please note that any file handling code has to handle exceptions in Java. The exception handling block is not shown here due to space constraints. The emitBatch method gets the batch ID for this batch of tuples and the output collector as parameters. It has a loop for the batch size so that it can emit batchsize number of tuples. In each iteration of the loop, it calls the getNextLine method to get the next line from the file and then calls the emit function of the output collector to output the line as a tuple. The getNextLine method is a custom method and is not a part of the interface. This method reads the next line from the log file. It returns this as a tuple using the Values function. Moving on, we will look at the Trident operation example.
6.15 Trident Operation Example
We will have a function operation to filter the log type lines and output only logtype. The code fragment shows the class GetLogType that extends BaseFunction class. This class has a single method execute that takes the input tuple as a Trident tuple and the output collector as parameters. It splits the line using space delimiter to convert it into words and then compares each word to ERROR or INFO or WARNING. If any of them matches then the matched word is output as the tuple using the emit method on the output collector. Now, we will look at an example to understand how to store the output.
About the On-Demand Webinar
About the Webinar