Apache Storm Trident Tutorial

Storm Trident

Welcome to the sixth chapter of the Apache Storm tutorial (part of the Apache Storm course.) This lesson will introduce you to the concept of Trident interface to Storm.

 

Now, let us begin with exploring the objectives of this lesson.

Objectives

By the end of this lesson, you will be able to

 
  • Explain what is Trident

  • Explain the Trident data model

  • Identify various operations in Trident

  • Explain stateful processing using Trident

  • Explain pipelining in Trident

  • Explain Trident advantages

 

Let us start with understanding what Trident is.

Trident Introduction

Trident is a high-level abstraction on top of Storm.

 

It provides

  • Abstraction for joins, groups and aggregation functions.

  • Batching of tuples so that multiple tuples can be grouped into batches.

  • Transactional processing in Storm so that you can achieve better fault tolerance.

  • State manipulation in Storm so that you can have stateful processing.

 

Trident libraries are included as a part of standard Storm installation.

 

Next, let us look at Trident data model.

Trident Data Model

The core data model in Trident is the stream which is an unbounded sequence of tuples. The tuples in the stream are grouped as a series of batches. Each batch has a finite number of tuples. A stream is partitioned across the nodes in the cluster. Trident operations on the stream are applied in parallel across each partition.

 

This diagram shows input sequence of tuples – T1, T2, etc. coming into Trident Storm cluster. The tuples are partitioned into two nodes.

 

For example, the tuples T1, T2 and T3 go to partition one on node one whereas the tuples T4, T5, and T6 go to partition 2 of node 2. Here, the tuples are grouped into batches.

 

Each batch has three tuples which are the batch size. Batch1, batch3 and batch5 go to partition 1 of node 1 whereas batch2, batch4 and batch6 go to partition2 of node2.

 

Please note that in Storm, the input is unbounded so the cluster will keep getting the tuples and Trident will keep grouping them into batches.

 

Now, we will look at the stateful processing in Trident.

Stateful Processing using Trident

Since Storm is an unbounded stream of tuples, there is no state maintained during processing.

 

Stateless processing

 

Once a tuple is processed, Storm doesn’t store any information about the tuple. So when we receive a tuple, we don’t know if it is already two or not. This is the default processing in Storm.

 

Stateful processing

 

Once a tuple is processed, Storm stores the information about the tuple. So we can find out if a tuple was previously processed. Trident provides the topology that automatically maintains state information. State information is stored in memory or a database.

 

Next, we will look at Operations in Trident.

Operations in Trident

Trident consists of five types of operations:

 
  1. Partition local operations

  2. Repartitioning operations

  3. Aggregation operations

  4. Operations on grouped streams

  5. Merges and Joins

 

Here, we will look at the different types of partitions in detail one by one.

 

Let us start with partition local operations.

 

1. Partition Local Operations

 

Partition local operations are performed on the batches of tuples on a single partition. There is no data transfer between nodes for this operation. There are five types of partition local one.

 
  • Functions: A function is applied to each tuple

  • Filters: A filter is applied to each tuple to decide whether to include the tuple in the output or not. (Example: LogType == “ERROR”)

  • Partition aggregate: Outputs the aggregate of each partition

  • Partition persist: Provides stateful processing in Storm and aggregates across partitions

  • Projection: Outputs a subset of fields from the input tuple

 

Partition Aggregate

 

Partition Aggregate applies an aggregate function on each partition of a batch of tuples. There are three types of partition aggregates:

 
  • CombinerAggregator: It runs an init function on each input tuple, and the output is processed by a combine function till only one value is left. The output tuple contains a single field and value.

  • ReduceAggregator: It runs an init function once and then the reduce function for each tuple till only one value is output.

  • Aggregator: This is a general aggregator which has three methods. An init method is called at the beginning of the batch, an aggregate method is called for each input partition, and a complete method is called at the end of the batch.

 

2. Repartitioning Operations

 

Repartitioning operations run a function to change the partitioning of tuples. Repartitioning in a cluster may lead to the transfer of tuples over the network. Following functions are available for repartitioning:

 
  • Shuffle: Randomly redistribute tuples across all partitions. A round robin algorithm is used to evenly distribute the tuples

  • Broadcast: Every tuple is replicated to all the partitions

  • PartitionBy: Tuples are partitioned by a set of specified keys

  • Global: All tuples are sent to the same partition. Useful for calculating global sums across all batches

  • batchGlobal: All tuples in the batch are sent to the same partition

  • Partition: Tuples are partitioned based on a custom partitioner

 

3. Aggregation Operations

 

Trident provides two types of aggregations:

 

Aggregate

 

This method does the aggregate processing for all the tuples within a batch.

Example: LogStream.aggregate(new Count(), new Fields("count"))

 

If batch 1 has five tuples and batch 2 has four tuples, this will give two tuples with count as 5 and four respectively.

 

PersistentAggregate

 

This method does the aggregation over all the tuples of all the batches in the stream.

 

Example: LogStream.persistentAggregate(statespec, new Count(), new Fields("count"))

 

If batch 1 has five tuples and batch 2 has four tuples, this will give one tuple with count as 9.

Statespec provides a handle for storing the state of the tuple.

 

4. Operations on Grouped Streams

 

You can use the groupBy operation on streams. This operation first partitions the stream based on the group by fields and then groups the tuples in each partition by the group by fields.

 

5. Merges and Joins

 

Trident provides APIs for combining multiple streams using merges and join operations.

 

Merge

 

This method combines the specified streams of the topology into one stream. The output fields will be the fields of the first stream.

 

Example: topology.merge(Logstream1, Logstream2, Logstream3)

 

If Logstream1 has five tuples, Logstream2 has four tuples, and Logstream3 has ten tuples, then the merge will give a stream with 19 tuples.

 

Join

 

This method combines two streams using a join operation based on the specified fields. The operation is done on batches of tuples.

 

Example: If customer stream has (“custId,” “location”) and siteVisits stream has (“user,” URL”), then the following join: topology.join(customerStream,new Fields(“same”), visitStream, new Fields(“user”),new Fields(“user,” location,” URL”) gives all the tuples where custid matches the user.

Trident State

Trident provides following mechanism for managing states and transactions.

 
  1. Trident state can be internal to topology or external stored in the databases.

  2. Trident manages state in a fault-tolerant way, in case of retries or failure the message gets processed only once in Trident Topology.

 

Trident provides the following semantics for achieving only once processing and takes the appropriate steps to update the state consistently:

 
  1. Tuples are processed in small batches.

  2. Each batch of Tuples is given a unique ID (known as Transaction ID), where if the three is replayed it is given the same Transaction ID.

  3. State update is ordered which means that the state updates are not applied to batch two if batch one is not completed.

 

Next, we will look at how Trident topology works.

Trident Topology

Trident provides the Trident topology class that is created using Trident topology builder. This provides many methods for Trident operations.

 

Some of the methods and their signature are given below.

 

The newStream method creates a new stream of tuples that will have the specified transaction ID. The tuples will be produced by the specified spout. Instead of IBatchSpout, different spouts can be specified.

 

Stream newStream

(String txId, IBatchSpout

spout)

 

The merge method merges multiple input streams into one stream.

 

Stream merge(Fields

outputFields, Stream...

streams)

 

The join method performs a join operation on the two streams based on the join fields specified. The output stream will contain the fields from join fields one as well as the outfields specified.

 

Stream join(Stream s1,

Fields joinFields1,

Stream s2,

Fields joinFields2,

Fields outFields)

 

Let us move on to understanding the concept of Trident Spouts.

Trident Spouts

Many different types of spouts can be used with Trident topology.

 

Few useful bolts are given below.

 

IBatchSpout: This is a non-transactional spout that emits a batch of tuples at a time. This provides limited fault-tolerance with only once processing guarantee.

 

TridentSpout: This spout supports transaction semantics. It can provide at least once and exactly once processing using various functions.

 

PartitionedTridentSpout: A transactional spout that reads from a partitioned data source. This ensures at least once processing.

 

OpaquePartitionedTridentSpout: An opaque transactional spout that reads from a partitioned data source. Unlike PartitionedTrident Spout, this spout does not mandate that tuples belong to the same batch when resubmitted. This ensures fault-tolerant processing even when the source nodes fail, guaranteeing exactly-once processing.

 

Next, let us look at the fault tolerance levels.

Fault-tolerance Levels

There are three types of fault-tolerance levels in Trident, provided by various spouts.

 

Only once processing: This is the least fault tolerance level where transaction IDs are tracked to ensure that a tuple is processed only once.

 

At least once processing: This is partly fault-tolerant as transaction ID information is stored in memory or external database to ensure that each tuple is processed at least once.

 

Exactly once processing: This is highly fault-tolerant as transaction ID information is stored in an external database and also allows replay of tuples in different batches. This ensures exactly once processing even when a tuple changes batches due to source node failure.

 

Next, let us look at the concept of pipelining.

Pipelining

Trident can process multiple batches at a time using pipelining. Batches will be processed in parallel and status updates will be ordered as per batch ID. The maximum number of batches that will be processed in parallel can be specified with topology.max.spout.pending” property for the topology. Pipelining results in higher throughput and lower latency.

 

For example: If the topology.max.spout.pending is set to 50, Trident will process multiple batches in parallel till 50 batches are pending status update. If 40 batches are pending status update, next batch will be taken up for processing. If 50 batches are pending status update, then the next batch will not be taken up.

 

Next, let us look at exactly once processing.

Exactly Once processing

Trident automatically handles state so that the tuples are processed exactly once.

 

Tuples are grouped into small batches and are processed in batches. Each batch is given a unique batch ID called the transaction ID. If a batch is retried, it gets the same transaction ID.

 

State updates are ordered among the batches, if there are three batches 1, 2 and 3, then the state update for batch two will happen only after the state update for batch one is successful.

State update for batch three will happen only after the state update for batch two is successful.

 

This ensures that the batch updates happen exactly once. The state information can be stored in an external database so that even in case of node failures, the transactions can persist.

 

Next, let us look at spout definition example.

Spout Definition Example

Spout can be defined as follows for data ingestion using Trident.

 

This spout reads from the file and batches the input lines.

Let us take an example of log processing where we output the count of each log type occurring in the log file.

 

The program fragment shows the Spout definition using Trident spout. GetLineSpout is defined as a class that implements the IBatchSpout interface. It has a constructor that takes the batch size as the input parameter and sets the batch size for the spout. It has an open method that is called only once for the spout instance. This function opens the input log file and sets the file handler. It has a getOutputFields method that sets the output fields as a single field returning the entire line.

 

public static class GetLineSpout implements IBatchSpout {

FileReader input = null; BufferedReader binput = null; int batchsize;

public GetLineSpout(int batchSize) { this.batchSize = batchSize; }

@Override

public void open(Map conf, TopologyContext context) {

input = new FileReader("/tmp/logfile");

binput = new BufferedReader(input);

}

@Override

public void emitBatch() { // code presented in next screen

}

@Override

public Fields getOutputFields() {

return new Fields("line");

}}

 

The emitBatch method of this class will be explained later in the lesson. The emitBatch method is used to output the tuples to the topology stream.

 

The code fragment for this method is shown here. Please note that any file handling code has to handle exceptions in Java. The exception handling block is not shown here due to space constraints. The emitBatch method gets the batch ID for this batch of tuples and the output collector as parameters. It has a loop for the batch size so that it can emit batch size number of tuples. In each iteration of the loop, it calls the getNextLine method to get the next line from the file and then calls the emit function of the output collector to output the line as a tuple. The getNextLine method is a custom method and is not a part of the interface.

 

public void emitBatch(long batchId, TridentCollector collector) {

for(int i = 0; i < batchSize; i++) {

collector.emit(getNextLine()); // emit batchsize number of lines

}

}

private Values getNextLine() {

String str= null;

while((str = binput.readLine()) == null){ // Read each line

Utils.sleep(1000); //if file ended, wait for some input to the file

}

return (new Values(str));

}

 

This method reads the next line from the log file. It returns this as a tuple using the Values function.

 

Moving on, we will look at the Trident operation example.

Trident Operation Example

We will have a function operation to filter the log type lines and output only logtype.

 

The code fragment shows the class GetLogType that extends BaseFunction class.

 

This class has a single method execute that takes the input tuple as a Trident tuple and the output collector as parameters. It splits the line using space delimiter to convert it into words and then compares each word to ERROR or INFO or WARNING. If any of them matches, then the matched word is output as the tuple using the emit method on the output collector.

 

public static class GetLogType extends BaseFunction {

@Override

public void execute(TridentTuple tuple, TridentCollector collector) {

String line = tuple.getString(0);

for (String word : line.split(" ")) {

if(word.equals("ERROR") || word.equals("INFO") || word.equals("WARNING") ){

collector.emit(new Values(word));

}

}

}

}

 

Now, we will look at an example to understand how to store the output.

Storing the Output Example

We will have another function operation to store the log type counts to an output file.

 

The code fragment shows the class printTuple that extends BaseFunction class for storing the log type counts.

Please note that Exception handling is not shown here.

 

This class has a single method execute that takes a Trident tuple and an output collector as arguments. The input contains two fields. The first field is a log type word and the second one is a count. It opens the output file for writing and writes the log type and the count to the file.

 

public static class printTuple extends BaseFunction {

PrintWriter fOut;

@Override

public void execute(TridentTuple tuple, TridentCollector collector) {

String logType = tuple.getString(0);

Long cnt = tuple.getLong(1);

if(fOut == null) {

fOut = new PrintWriter(new FileWriter("/tmp/Tridentoutput.txt"));

}

fOut.println(logType + ":" + cnt);

fOut.flush();

}

}

 

Next, let us look at a topology connecting a spout and a bolt.

Topology – Connecting Spout and Bolt

The method that connects the spouts and bolts to create the Trident topology is shown here.

 

After using GetLogType to filter only log types, we use the group by and aggregate to do the aggregation by the log type. Finally, printTuple class is used to store the output to a file.

 

The code fragment shows the buildTopology method that connects the spouts and bolts to create the Trident topology. The method first creates the GetLineSpout object with a batch size of 10, which means that the spout will put ten tuples in each batch.

 

Next, it creates an object of class TridentTopology. In this topology, a new stream is created with the newStream method. This new stream is created with the GetLineSpout as the spout and parallelism of 1. The bolt GetLogType is connected to the output of this spout with each method of a stream.

 

Then, the groupBy Trident operation is called to group the results by each log type. The aggregate operation is applied to the group by output with the count function. This is a grouped aggregate, so the count of tuples is done for each log type to produce a stream with log type and count as fields.

 

Finally, the printTuple method is called with each aggregated tuple to store the count of logtypes to a file. This Trident topology is built and returned to the Trident Topology.

 

public static TridentTopology buildTopology() {

GetLineSpout spout = new GetLineSpout(10); // Batch size of 10 specified

TridentTopology topology = new TridentTopology();

Stream LogTypeCounts = topology.newStream("spout1", spout).parallelismHint(1)

.each(new Fields("line"), new GetLogType(), new Fields("word"))

.groupBy(new Fields("word"))

.aggregate(new Fields("word"), new Count(), new Fields("count"));

LogTypeCounts.each(new Fields("word","count"),new printTuple(),new Fields("word2","count2"));

return topology.build();

}

 

Next, let us look at the Topology main function.

Topology – Main Function

The code fragment for the main function that connects the spouts and bolts to create the topology is shown below.

 

The main function gets the topology name as the first parameter. It creates a configuration object first. The debug flag is set in the configuration so that we can see the detailed messages in the log.

 

The number of spouts that will run in parallel for pipelining is set to 1 as we want only one spout to run at a time. Next, the number of workers is also set to 1.

 

Finally, the StormSubmitter is called with the topology name, configuration object and the topology of the builtTopology method.

 

public static void main(String[] args) throws Exception {

if (args.length > 0) {

{

Config conf = new Config();

conf.setDebug(true);

conf.setMaxSpoutPending(1);

conf.setNumWorkers(1);

StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology());

}

}

 

Now, we will understand the concept of the wrapper class.

Wrapper class

The wrapper class includes all the classes and functions we explained in the previous screens.

 

The code fragment for the wrapper class is shown here.

The code fragment shows the wrapper class LogProcessTopology. Comments are shown for code blocks but represent the definition of each of those classes and functions as shown in the previous screens.

This wrapper class is what gets compiled and submitted to Storm for execution.

 

# Import required libraries

public class LogProcessTopology {

// GetLineSpout class definition

// GetLogType class definition

// printTuple class definition

// buildToplogy method definition

// main function definition

}

 

Next, let us look at Trident advantages.

Trident Advantages

Trident adds a lot of high-level functionalities to Storm and here are some advantages of using Trident.

 
  • Trident is smart about how to run the topology for maximizing the performance

  • It processes tuples in batches improving the throughput

  • It provides operators such as group by and aggregate which reduces the amount of programming you need to do in Storm

  • It can process computations within a partition first before sending tuples over the network, improving the throughput and reducing network traffic

  • It provides exactly once processing in Storm adding a higher level of fault tolerance

 

We have come to the end of this lesson.

Summary

Let us summarize the topics covered in this lesson.

 
  • Trident is an abstraction on top of Storm

  • Trident groups tuples into batches and does operations on batches

  • It provides aggregation and other operations on top of Storm

  • Trident provides exactly once processing of tuples and stateful processing in Storm

  • Trident spouts and operations can be connected using the Trident topology

Conclusion

This concludes the lesson on Trident. This also concludes the developer course on Storm.

  • Disclaimer
  • PMP, PMI, PMBOK, CAPM, PgMP, PfMP, ACP, PBA, RMP, SP, and OPM3 are registered marks of the Project Management Institute, Inc.

Request more information

For individuals
For business
Name*
Email*
Phone Number*
Your Message (Optional)
We are looking into your query.
Our consultants will get in touch with you soon.

A Simplilearn representative will get back to you in one business day.

First Name*
Last Name*
Email*
Phone Number*
Company*
Job Title*