Advanced HDFS and MapReduce Video Tutorial
5.1 Advanced HDFS and MapReduce
Hello and welcome to Lesson “Advanced HDFS and MapReduce”, of the Big Data and Hadoop Developer course offered by Simplilearn.
After completing this lesson, you will be able to: ●Explain advanced HDFS and related concepts; ●Identify the steps to decommission a DataNode; ●Explain advanced MapReduce concepts; and ●Describe the various joins in MapReduce.
5.3 Advanced HDFS–Introduction
The Hadoop Distributed File System or HDFS is a block-structured, distributed file system designed to run on small, commodity machines so that running jobs perform better compared to single, standalone, dedicated server. HDFS answers the storage needs of Big Data, and makes data accessible to Hadoop services. Some of the settings in advanced HDFS, discussed in this lesson, are HDFS benchmarking, setting up HDFS block size, and decommissioning or removing a DataNode.
5.4 HDFS Benchmarking
HDFS benchmarking is verifying that the HDFS cluster is set up correctly and meets the performance expectations of the administrator. You will use DFSIO to test the Input/Output performance of the HDFS cluster. The image shown displays the command to be used for the write operation. The image shown displays the command for the read operation. This benchmark uses the file written by the write command executed earlier.
5.5 Setting Up HDFS Block Size
HDFS stores files across a cluster by breaking the data into fixed size blocks. The default size of a block is 64 MB. However, a Hadoop administrator can change the size of the block, in this case, to 128 MB. To set up the HDFS block size, open the xml file as shown on the screen. The command used to open the xml file is shown here. Next, type the block size command and save the file. The command specified in the image ensures that the block size of any file uploaded in HDFS from now on will be 128 MB. The parameter dfs.block.size is responsible for setting the size of the block, which the NameNode server has to create. The block size is specified in bytes, 134217728 in this case.
5.6 Decommissioning a DataNode
Decommissioning refers to disconnecting DataNode servers from the cluster's network. There may be multiple situations, such as a hardware upgrade or failure, when you want to remove one or more nodes from an HDFS cluster. This can be done in four steps. First, create a file named “exclude” in the location mentioned here. Second, type the IP address of the node. Next, save the file. Finally, run the command to decommission the IP address of the node. Use the command shown to create a file named “exclude” in the specified location. Type the IP address of the node or nodes to be decommissioned as shown. Save the file, and run the command displayed here to decommission the IP address specified in the file “exclude.”
5.7 Business Scenario
Olivia is the EVP of IT Operations at Nutri Worldwide, Inc. Her team is involved in setting up Hadoop infrastructure for the organization. After setting up the Hadoop infrastructure, Olivia and her team decide to test the effectiveness of HDFS.
5.10 Advanced MapReduce
Hadoop MapReduce uses data types to work with user-given mappers and reducers. The data is read from files into mappers, and emitted by mappers to reducers. Processed data is sent back by the reducers. Data emitted by reducers goes into output files. At every step, data is stored in Java objects. In the Hadoop environment, objects that can be put to or received from files and across the network must obey the Writable interface, which allows Hadoop to read and write data in a serialized form for transmission. Let’s look at Hadoop interfaces in some more detail.
The interfaces in Hadoop are Writable and WritableComparable. As you’ve already seen, a Writable interface allows Hadoop to read and write data in a serialized form for transmission. A Writable interface consists of two methods: read fields and write as shown here. A WritableComparable interface extends the Writable interface, so that the data can be used as a key and not as a value. As shown here, the WritableComparable implements two methods: compareTo and hashCode.
5.12 Data Types in Hadoop
Let’s now examine various data types in Hadoop and their functions. The first data type is Text. The function of this data type is to store String data. The IntWritable data type stores integer data. LongWritable, as the name suggests, stores Long data. Similarly, other data types are FloatWritable for storing Float data, DoubleWritable for storing Double data. There is also BooleanWritable and ByteWritable data types. NullWritable is a placeholder when a value in not needed.
5.13 Data Types in Hadoop (contd.)
The illustration here shows a sample data type you can create on your own. This data type will need you to implement a Writable interface.
5.14 InputFormats in MapReduce
MapReduce can specify how its input is to be read by defining an InputFormat. The table lists some of the classes of InputFormats provided by the Hadoop framework. Let’s look at each of them. •The first class is KeyValueTextInputFormat, which is used to create a single key-value pair per line. •TextInputFormat is used to create a program considering a key as the line number and a value as the line itself. •NLineInputFormat is similar to TextInputFormat except that there are N number of lines that make an input split. •MultiFileInput Format is used to implement an input format that aggregates multiple files into one split. •For the class SequenceFileInputFormat to be implemented, the input file must be a Hadoop sequence file which contains serialized key-value pairs.
5.15 OutputFormats in MapReduce
The classes for MapReduce OutputFormat is discussed here. The first class is the default OutputFormat, TextOutputFormat. It writes records as lines of text. Each key-value pair is separated by a TAB character. This can be customized by using the mapred.textoutputformat.separator property. The corresponding input format is KeyValueTextInputFormat. SequenceFileOutputFormat writes sequence files to save output. This represents a compact and compressed version of normal data blocks. SequenceFileAsBinaryOutputFormat is responsible for writing key-value pairs that are in a raw binary format into a sequential file container. MapFileOutputFormat writes MapFiles as the output. The keys in a MapFile are added in a specific order. The reducer then emits keys in the sorted order. MultipleTextOutputFormat writes data to multiple files whose names are derived from output keys and values. MultipleSequenceFileOutputFormat creates output in multiple files in a compressed form.
5.16 Distributed Cache
Distributed Cache is a Hadoop feature to cache files needed by applications. A distributed cache: •helps boost efficiency when a map or a reduce task needs access to common data. •lets a cluster node read the imported files from its local file system instead of retrieving the files from other cluster nodes. •allows both single files and archives like zip and tar.gz. •copies files only to slave nodes. If there are no slave nodes in the cluster, the distributed cache copies the files to the master node. •allows access to the cached files from mapper or reducer applications to make sure that the current working directory is added into the application path. •allows referencing of the cached files as though they are present in the current working directory.
5.17 Using Distributed Cache–Step 1
Set up the cache by copying the requisite files to the FileSystem.
5.18 Using Distributed Cache–Step 2
Set up the application's JobConf as shown.
5.19 Using Distributed Cache–Step 3
Use the cached files in the mapper or reducer.
5.20 Joins in MapReduce
Joins are relational constructs that can be used to combine relations. In MapReduce, joins are applicable in situations where you have two or more datasets you want to combine. A join is performed either in the map phase or in the reduce phase by taking advantage of the MapReduce Sort-Merge architecture. The various join patterns available in MapReduce are reduce side join, replicated join, composite join, and Cartesian product. •A reduce side join is used for joining two or more large datasets by the same foreign key with any kind of join operation. •A replicated join is a map-side join that works in situations where one of the datasets is small enough to cache. •A composite join is a map-side join used on very large formatted input data sets sorted and partitioned by a foreign key. •A Cartesian product is a map-side join where every single record is paired up with another dataset.
5.21 Reduce Side Join
A reduce side join works in the following ways: The mapper prepares for join operations. It takes each input record from every dataset and emits a foreign key-record pair. The reducer performs a join operation where it collects the values of each input group into temporary lists. The temporary lists are then iterated over, and the records from both sets are joined.
5.22 Reduce Side Join (contd.)
A reduce side join can be used: •when multiple large datasets are being joined by a foreign key, •when flexibility is needed to execute any join operation, •when a large amount of network bandwidth is available, and •when there is no limitation on the size of datasets. An SQL analogy of a reduce side join is given on the screen. In the output of a reduce side join, the number of part files equals the number of reduce tasks.
5.23 Replicated Join
A replicated join is a map-only pattern that works as follows: •It reads all files from the distributed cache and stores them into in-memory lookup tables. •The mapper processes each record and joins it with the data stored in memory. •There is no data shuffled to the reduce phase. •The mapper provides the final output part.
5.24 Replicated Join (contd.)
Replicated joins should be used: •when all datasets, except for the largest one, can fit into the main memory of each map task that is limited by Java Virtual Machine or JVM heap size, and •when there is a need for an inner join or a left outer join, with the large input dataset being the “left” part of the operation. An SQL analogy of a replicated join is given on the screen. In the output of a replicated join, the number of part files equals the number of map tasks.
5.25 Composite Join
A composite join is a map-only pattern working in the following ways: •All datasets are divided into the same number of partitions. •Each partition of the data set is sorted by a foreign key, and all the foreign keys reside in the associated partition of each dataset. •Two values are retrieved from the input tuple associated with each data set based on the foreign key and the output to the file system.
5.26 Composite Join (contd.)
Composite joins should be used: •when all datasets are sufficiently large, and •when there is a need for an inner join or a full outer join. An SQL analogy of a composite join is displayed on the screen. In the output of a composite join, the number of part files equals the number of map tasks.
5.27 Cartesian Product
A Cartesian product is a map-only pattern that works in the following ways: •Datasets are split into multiple partitions. •Each partition is fed to one or more mappers. For example, in the image shown here, split A-1 and A-2 are fed to three mappers each. •A RecordReader reads every record of input split associated with the mapper. •The mapper simply pairs every record of a dataset with every record of all other datasets.
5.28 Cartesian Product (contd.)
A Cartesian product should be used: •when there is a need to analyze relationships between all pairs of individual records, and •when there are no constraints on the execution time. In the output of a Cartesian product, every possible tuple combination from the input records is represented.
The following are a few questions to test your understanding of the concepts discussed.
Let’s summarize the topics covered in this lesson: ●HDFS is a block-structured distributed file system designed to run on small, commodity machines. ●The settings in advanced HDFS are HDFS benchmarking, setting up HDFS block size, and decommissioning a DataNode. ●Decommissioning a DataNode refers to disconnecting the DataNode servers from the Hadoop cluster's network. ●MapReduce can use custom data types, input formats, and output formats in addition to framework-defined types. ●Joins are relational constructs used to combine records.
This concludes ‘Advanced HDFS and MapReduce.’ In the next lesson, we will focus on ‘Pig.’
5.8 HDFS Demo 01
In this screen, you will see a demo on HDFS. Start Hadoop services using the command start-all.sh. Press Enter. Ensure hadoop services are live and running. Let us perform benchmarking of your HDFS. We will initially perform the write operation. The command is shown on the screen. Type the command and press Enter. The result shows the execution time to write files and IO rate and throughput. Press Enter. Let us perform the read operation of the file. The command is shown on the screen. Type the command and press Enter. The command will be executed. The result will show the output time. This is how benchmarking is done. In the next step, we will see how to manually set the block size. We can set the block size manually by editing hdfs-site.xml. The command to access hdfs-site.xml is shown on the screen. We need to set the parameter dfs.block.size to 134217728, that is, 128MB. Type the entire code as shown in the video. Type the code and press Enter. We need to refresh HDFS. To refresh HDFS, we need to stop and then start the hadoop services. To stop the services, the command is stop-all.sh. Type the command and press Enter. Let us upload a dataset for testing. The command is shown on the screen. Type the command and press Enter. Let us look at the block size. Access the GUI. The link to access the GUI is http://192.168.21.184:50070. Click datanew. Next, click big. Observe the block size. Thus, we have successfully set the block size. In the next step, we will see how to decommission DataNodes. We need to create an exclude file. Type the command shown on the screen and press Enter. Enter the IP address of the DataNode and save the file. Let us refresh the cluster. The command is shown on the screen. Thus, we have successfully executed the command.
5.9 Setting HDFS block size in Hadoop 2.7.1 Demo 02
In this screen, you will see a demo on setting HDFS block size in Hadoop 2.7.1. Download an e-book into your machine from the URL shown on the screen. Use the command displayed on the screen to verify if it has been downloaded. Let’s copy into HDFS using the hadoop fs -copyFromLocal command. Set the dfs block size to 30 bytes while copying the file. You will get an error. Now let’s try to copy into HDFS by setting the block size to 64 bytes while copying the file. You will again get an error. Now let’s copy into HDFS using a block size of 1048576 bytes while copying the file. This time the copying is successful, as 1048576 is a multiple of 64, which is the requirement for HDFS files block size. Verify the file using the command given on the screen. Check further file statistics using hadoop fsck filename command. We notice that the file is around 1573151 bytes and is copied into two blocks, as the size of each block is 1048576 bytes. Verify the block size by running the command displayed in the screen. Notice the block size matches what was set, that is 1048576.
5.29 MapReduce program for Writable classes Demo 03
In this demo, you will see how to write a MapReduce program for Writable classes. Create a new project by clicking File Menu, New, and Project. Select Java Project and click the Next button. Type the name for the project and click the Next button to continue. Select Libraries. Click the Add External JARs button to add external JARs. Select all jar files. Click the OK button to add jar files. Click the Add External JAR files button again. Double-click the lib folder. Select all JAR files. Click the OK button. Click the Finish button to continue. Click the No button to continue. Click the Play button to execute the program. You will receive an error stating input is not received. Right-click the App.java file. Click the Properties button. Select the class file and click the Edit button. Click the Arguments tab. Enter the input and output paths. Click the Apply button. Click the OK button. Click the Play button to execute the program. The program is executed perfectly. You can verify the output by visiting the web GUI. Open the home page of the web GUI and click Browse the file system. Click output2. The part-r-00000 contains the output. Click the same to view the output of the program. Thus we have successfully demonstrated the program using Writable class.
5.32 Case Study
Scenario: XY Invest provides investment advice to high net worth individuals and maintains stock market data of various exchanges. It handles stock market data critical for analysis and monitoring. This entails time-intensive processing of huge amounts of data, and vertical scaling is proving to be expensive. In its effort to find an alternate solution for processing, XY identifies that using Hadoop would reduce the company’s cost and effort significantly. It wants to use the advanced features of MapReduce for data processing. Click Analysis to know the company’s next move. Analysis: The company does research on Hadoop and finds that it is a popular solution for Big Data storage and processing. Hadoop has two components, HDFS for storage and MapReduce for processing. 1.A few advanced features of Hadoop: 2.It distributes processing so that jobs can be completed really quickly. 3.It is highly fault tolerant, so even if some tasks fail, they will be automatically retried. 4.It provides many writable interfaces for input and output. 5.It can distribute files needed for processing by using the distributed cache feature. Click Solution for the steps to explore stock market data using MapReduce.
5.33 Case Study - Demo
Solution: Perform the following steps to explore stock market data using MapReduce. 1.Get stock market data for Google. 2.Get the trade data for a customer (buys and sells of Google stock). 3.Store the trade data in HDFS. 4.Create a MapReduce program that uses distributed cache to distribute the stock market data. The MapReduce program also uses various writable interfaces of Hadoop. 5.Process the trade data and analyze the customer’s profit/loss.