Spark GraphX Programming Tutorial
Hello and welcome to the last lesson, Lesson 7, of the Apache Spark and Scala course offered by Simplilearn. This lesson will introduce and explain the concepts of Spark GraphX programming.
After completing this lesson, you will be able to: • Explain the key concepts of Spark GraphX programming • Discuss the limitations of the Graph Parallel system • Describe the operations with a graph, and • Discuss the Graph system optimizations
7.003 Introduction to Graph-Parallel System
Today, big graphs exist in various important applications, be it web, advertising, or social networks. A few of such graphs are represented graphically on the screen. These graphs allow to perform tasks such as targeting advertising, identifying communities, and deciphering the documents meaning. This is possible by modeling the relations between products, users, and ideas. The size and significance of graph data is growing. In its response, various new large-scale distributed graph-parallel frameworks, such as GraphLab, Giraph, and PowerGraph, have been developed. With each framework, a new programming abstraction is available. These abstractions allow to explain graph algorithms in a compact manner and also, the related runtime engine that can execute these algorithms efficiently on distributed and multicore systems. Additionally, these frameworks abstract away the issues of the large-scale distributed system design. Therefore, they are capable of simplifying the design, application, and implementation of the new sophisticated graph algorithms to large-scale real-world graph problems.
7.004 Limitations of Graph-Parallel System
Before we move further, you should know the limitations of the Graph-Parallel system. One of them is that although the current frameworks have various common properties, each of them presents a little different graph computation. These computations are custom-made for a specific graph applications and algorithms family or the original domain. Additionally, all these frameworks depend on different runtime. Therefore, it is tricky to create these abstractions. While these frameworks are capable of resolving the graph computation issues, they cannot resolve the data ETL issues. They also cannot address the issues related to the process of deciphering and applying the computation results. The new frameworks however have built-in support available for interactive graph computation.
7.005 Introduction to GraphX
Let’s now talk about GraphX, which is a graph computation system running in the framework of the data-parallel system. It extends the RDD abstraction and hence introduces a new features called Resilient Distributed Graph or RDG. In a graph, RDG relates records with vertices and edges and produces an expressive computational primitives’ collection. In addition, it simplifies the graph ETL and analysis process substantially by providing new operations for viewing, filtering, and transforming graphs. GraphX combines the benefits of graph-parallel and data-parallel systems as it efficiently expresses graph computation within the framework of the data-parallel system. In addition, GraphX distributes graphs efficiently as tabular data structures by leveraging new ideas in their representations. In a similar way, GraphX uses in-memory computation and fault-tolerance by leveraging the improvements of the data flow systems. GraphX also simplifies the graph construction and transformation process by providing powerful new operations. With the use of these primitives, it is possible to implement the abstractions of PowerGraph and Pregal in a few lines. It is also possible to load, transform, and compute interactively on massive graphs.
7.006 Introduction to GraphX (contd.)
The image on the screen shows how GraphX works.
7.007 Importing GraphX
To start working with GraphX, you first need to import it and Spark into your project. The code to do this is given on the screen.
7.008 The Property Graph
The property graph is defined as a directed multigraph that has properties related to every vertex and edge. Here, a directed graph is defined as a graph that has potentially various parallel edges that share the same source and destination vertexes. Every vertex is identified by a unique 64-bit long identifier, known as VertexID. In a similar manner, every edge has an individual source and destination vertex identifier. The properties of these graphs are saved as Scala or Java objects along with their every vertex and edge. These graphs are parameterized over the edge or ED and vertex or VD types. Here, the types are the types of objects that are related with every edge and vertex. GraphX reduces the memory footprint by optimizing the presentation of edge and vertex types when they exist as plain old data types and by saving them in specialized arrays. The code given on the screen also shows the same. Here, this class extends and is an optimized version of RDD[(VertexID, VD)]; however, this class is an optimized version of RDD[Edge[ED]]. Both VertexRDD[VD] and EdgeRDD[ED] leverage internal optimizations and offer additional functionality that is built around graph computation.
7.009 The Property Graph (contd.)
An example of the property graph is displayed on the screen.
7.010 Features of the Property Graph
A few more features of the property graph are also listed on the screen. Similar to RDDs, the property graph is also fault-tolerant, distributed, and immutable. If you need to perform any changes to the structure or values of the graph, you would need to produce a new graph with the required changes. Note that there are considerable parts of the original graph, which include structure, indices, and attributes, which remain unaffected. These parts are reused in the new graph, which reduces this inherently functional data-structure cost. You can use various vertex-partitioning heuristics to partition the graph across the workers. Similar to RDDs, every graph partition can be created again on a separate machine in case a failure happens. From the logical standpoint, the property graph is similar to a typed collections RDDs pair that encodes each vertex and edge properties. As a result, it includes members for accessing the graph vertices and edges.
7.011 Creating a Graph
Now, let us understand how to create a graph. The code to create a simple graph of co-worker is given on the screen. A graphical representation of this graph is also given on the screen.
7.012 Demo-Create a Graph Using GraphX
This demo will show the steps to create a graph using the GraphX library of Spark.
7.013 Create a Graph Using GraphX
In this demo, you will learn how to create a graph using the GraphX library of Spark. We need to import the given packages into spark-shell to perform a graph analysis in spark. At a high level, GraphX extends the Spark RDD by introducing a new Graph abstraction: a directed multigraph with properties attached to each vertex and edge. The property graph is a directed multigraph with user defined objects attached to each vertex and edge. A directed multigraph is a directed graph with potentially multiple parallel edges sharing the same source and destination vertices. The ability to support parallel edges simplifies modeling scenarios where there can be multiple relationships (for example, co-worker and friend) between the same vertices. Each vertex is keyed by a unique 64-bit long identifier (VertexID). GraphX does not impose any ordering constraints on the vertex identifiers. Here, we are constructing a graph from a collection of RDDs: by defining them as an array of nodes. Each node has a vertex ID and set of properties name and role. Similarly, we create an RDD for edges. Here each edge establishes a relationship between nodes. We need define a default user in case there are relationships with missing users. Here this user is David who has not yet been assigned to a project. We can build an initial graph by passing vertices, edges and default user.
7.014 Triplet View
Apart from the property graph’s vertex and edge views, GraphX also includes a triplet view. This view combines the properties of the vertices and edges logically that produce the given class. This class contains the EdgeTriplet class instances. The EdgeTriplet class adds the given members containing the source and destination properties respectively and hence extends the Edge class. This view is also shown graphically on the screen.
7.015 Graph Operators
Similar to RDDs, property graphs also provide various basic operators. These operators input user-defined functions and result into new graphs that have properties and structures transformed. The core operators with optimized implementations are defined in a graph. On the other hand, the convenient operators expressed as core operators compositions are defined in GraphOps. However, the GraphOps operators are available as Graph members automatically because of Scala implicits. To understand this, consider the given code example that can compute the in-degree of every vertex that is defined in GraphOps. The reason why core graph operators are differentiated from GraphOps is to be able for supporting various future graph representations.
7.016 List of Operators
The code shown on the screen shows a functionality summary of the operators defined in Graph and GraphOps. For simplicity, these are presented as graph members. You should note that a few function signatures have been simplified and a few more advanced functionalities have been removed. Therefore, you should refer to the API docs to determine the official list of operations.
7.017 List of Operators (contd.)
The further code is displayed.
7.018 Property Operators
Similar to the map operator of RDDs, the property graph also contains property operators. The code to define and use them is displayed on the screen. These operators are generally used for initializing the graph for a specific project or computation.
7.019 Structural Operators
At present, GraphX provides support to just commonly used structural operators; however, more are expected to be added in the future. The supported ones include reverse operators and subgraph operators. The use of these operators is explained through the given code. The reverse operators reverse all the edge directions and return new graphs. For instance, they can be used in case of computing the inverse PageRank. These operators do not change the properties of vertices and edges and the edges number. Therefore, they can be used without data duplication or movement efficiently. On the other hand, the subgraph operators input the predicates of vertices and edges and return graphs that contain only the vertices satisfying the vertex predicate and edges satisfying the edge predicate. They also connect the vertices satisfying the vertex predicate. These operators are usable for restricting the graph to the suitable vertices and edges by eliminating the broken links.
Let’s learn more about subgraphs. In the first image shown on the screen, this operator is being used to return the graph that contains only those vertices where the relation type is not “relative”. However, in the second image, it is being used to return the graph that contains only those vertices who value is Bob.
7.021 Join Operators
Sometimes, it is required to join data originating from RDDs or external collections that have graphs. For instance, in cases when you need to pull the vertex properties from one graph to the other, you might require extra properties. In such cases, join operators are useful. The supported ones include joinVertices operator and outerJoinVertices operators. The use of these operators is explained through the given code. The joinVertices operator is capable of joining the vertices with an RDD. It then returns a graph having its vertex properties received by the application of the user-defined map function to the joined vertices result. For the vertices with matching value in the RDD, the original value is retained. On the other hand, the outerJoinVertices operator is more general and operates similar to joinVertices. The only difference is that the user-defined map function is applied to all vertices. It can alter the type of vertex property. The map function takes an Optiontype, as all vertices may not have a matching value in the RDD being inputted.
7.022 Demo-Perform Graph Operations Using GraphX
This demo will show the steps to perform graph operations using GraphX.
7.023 Perform Graph Operations Using GraphX
In this demo, you will learn how to perform a graph analysis using the GraphX library of Spark. Let’s filter out those vertices from the earlier created graph to count the number of vertices having property “MR Developer”. We can use the filter method of Vertex RDD for this. You can see the number of such vertices is just one. Lets count those edges whose source ID is greater than the distinction ID. We can use the filter method of edge RDD for this. In addition to the vertex and edge views of the property graph, GraphX also exposes a triplet view. The triplet view logically joins the vertex and edge properties yielding an RDD[EdgeTriplet[VD, ED]] containing instances of the EdgeTriplet class. Use the triplets view to create an RDD of facts to find out all the relationship in the graph. Let’s print all these relationships by typing the command shown on the screen.
7.024 Demo-Perform Subgraph Operations
This demo will show the steps to perform the subgraph operations using GraphX.
7.025 Perform Subgraph Operations
In this demo, you will learn how to perform a graph analysis using the GraphX library of Spark. The subgraph operator takes vertex and edge predicates and returns the graph containing only the vertices that satisfy the vertex predicate and edges that satisfy the edge predicate and connect vertices that satisfy the vertex predicate. The subgraph operator can be used in a number of situations to restrict the graph to the vertices and edges of interest or eliminate broken links. For example, in the following code we will remove broken links. We can use subgraph to get all the vertices and its relationships by checking the vertex predicate properties not equal to “Work not yet assigned”. To print all these vertices, we can use the command as shown on the screen. Here we are using collect and foreach methods to println all the vertices.
7.026 Neighborhood Aggregation
An important step in various graph analytics tasks is to aggregate the neighborhood information of every vertex. For instance, you might require to identify the number of every user’s followers. Various iterative graph algorithms such as Shortest Path and PageRank perform this operation. The primary aggregation operator, mapReduceTriplets, inputs a user-defined map function applied to every triplet and then provides messages that are destined to none, both, or either vertices in the triplet. Its use is as depicted in the given code. For improving performance, this primary operator has been changed to the new graph.AggregateMessages operator.
Let’s discuss more about the primary aggregation operator, mapReduceTriplets. As discussed, with this operator, the map function is applied to every edge graph triplet. The messages thus yielded are destined to the vertices that are adjacent. With the reduce function, messages that are destined to the same vertex are aggregated. As a result, a VertexRDD is obtained that contains aggregate messages for every vertex. For instance, consider the given code in which mapReduceTriplets is being used for counting number of degree for each vertex. The image on the screen also shows the application of this operator.
7.028 Demo-Perform MapReduce Operations
This demo will show the steps to perform MapReduce operations in Graph using GraphX.
7.029 Perform MapReduce Operations
In this demo, you will learn how to perform the map reduce graph analysis using the GraphX library of Spark. A key step in many graph analytics tasks is aggregating the information about the neighborhood of each vertex. For example, we might want to know the number of followers each user has or the average age of the followers of each user. Many iterative graph algorithms (for example, PageRank, Shortest Path, and connected components) repeatedly aggregate the properties of neighboring vertices (for example, current PageRank Value, shortest path to the source, and smallest reachable vertex ID). For this example, we need to import the GraphGenerators class apart from the grapbX classes. See the import statement shown on the screen. Create a graph with "age" as the vertex property. Here we use a random graph for simplicity. The core aggregation operation in GraphX is aggregateMessages. This operator applies a user defined sendMsg function to each edge triplet in the graph and then uses the mergeMsg function to aggregate those messages at their destination vertex. The user defined sendMsg function takes an EdgeContext, which exposes the source and destination attributes along with the edge attribute and functions (sendToSrc, and sendToDst) to send messages to the source and destination attributes. Think of sendMsg as the map function in map-reduce. The user defined mergeMsg function takes two messages destined to the same vertex and yields a single message. Think of mergeMsg as the reduce function in map-reduce. The aggregateMessages operator returns a VertexRDD[Msg] containing the aggregate message (of type Msg) destined to each vertex. Vertices that did not receive a message are not included in the returned VertexRDD. Here, we created a graph with "age" as the vertex property. We used a random graph for simplicity. Now, we are going to compute the number of older followers and their total age. Here , as the map method we are Sending message to the destination vertex containing counter and age. In the reduce method we are going to add counter and age Using the mapValue method we are going to divide the total age by the number of older followers to get the average age of older followers To Display the results, we can use the command as shown on the screen . Here we are using the collect and foreeach methods to print the results.
7.030 Counting Degree of Vertex
One of the common aggregation tasks is to compute the degree of every vertex, which is defined as the number of edges that are adjacent to every vertex. When it comes to directed graphs, it is generally required to identify the out-degree, in-degree, and the total degree of every vertex. The operators to compute these degrees of every vertex are included in the GraphOps class. For instance, consider the given code that is computing the maximum in, out, and total degrees.
7.031 Collecting Neighbors
Sometimes, it is easy to express computation by performing a collection of neighboring vertices and the related attribute at every vertex. To do so, you can use the given operators. The code to use them is given on the screen. These operators can prove to be very costly because they need substantial communication and duplicate information. If possible, try to express the same computation by the use of the aggregateMessages operator.
7.032 Caching and Uncaching
Similar to RDDs, GraphX must be cached explicitly when using multiple times, as they are not persisted in memory by default. Therefore, you should always call the Graph.cache() method first. In case of iterative computations, you may also need to uncache to obtain best performance. Cached graphs and RDDs, by default, exist in memory until a pressure evicts in an LRU order. In such computations, intermediate results originating from previous computations fill the cache. However, they get evicted eventually, the data that is unnecessarily stored in memory slows down garbage collection. Therefore, it is more efficient if you uncache these intermediate results as soon as they are not required. This includes uncaching all other datasets, materializing graphs or RDDs, and using only the materialized datasets for further iterations. Graphs include various RDDs and therefore, it is tricky to correctly unpersist them. In case of iterative computations, you should use the Pregel API that unpersists intermediate results correctly.
7.033 Graph Builders
To build a graph from a vertices and edges collection existing on a disk or in an RDD, GraphX provides various ways. By default, none of these graph builders repartitions the edges of a graph. Instead, those are left in their as is default partitions. These graph builders are listed on the screen. Graph.groupEdges needs that the graph should be repartitioned. This is because of its assumption that identical edges are collocated on the same partition. Therefore, before calling this, you must call Graph.partitionBy. The next graph builder, Graph.apply, lets you create a graph from RDDs containing vertices and edges. It picks duplicate vertices arbitrarily. It also picks the vertices that are found in the edge RDD, but does not pick the vertex RDD that is assigned the default attribute. The Graph.fromEdges builder lets you create a graph only from an RDD of edges. It creates any vertices mentioned by edges automatically and assigns them the default value. With the Graph.fromEdgeTuples graph builder, you can create a graph only from an RDD of edge tuples. This assigns the value 1 to the edges and then creates any vertices mentioned by edges automatically while assigning them the default value. This graph builder also provides support to deduplicate the edges. For this, you would need to pass some of a PartitionStrategy as the uniqueEdges parameter. It also requires a partition strategy to collocate similar edges on the same partition in order to deduplicate them.
7.034 Vertex and Edge RDDs
Another concept related to GraphX is vertex RDDs. The VertexRDD[A] is an extension of the given class. It adds additional constraints that every VertexID appears just once. In addition, it represents a vertices set, where each vertex has an attribute of type A. This is accomplished by saving the attributes of vertices in a hash-map and reusable data structure. As a result, two VertexRDDs can be combined in constant time with no hash evaluations if they are derived from the same base. Similarly, the EdgeRDD[ED] is an extension of the given class. It organizes the edges into blocks that are partitioned by the use of one of the partitioning strategies that are defined in PartitionStrategy. The attributes of edges and the adjacency structure are saved differently that enables the maximum reuse when it comes to the changing attribute values. The use of three additional functions exposed by it is explained through the given code. Generally, the operations on the Edge RDDs are achieved by the use of graph operators, or they depend upon the operations that are defined in the base RDD class.
7.035 Graph System Optimizations
GraphX uses the vertex-cut approach in case of distributed graph partitioning. Instead of splitting the graphs along edges, it partitions them along vertices. Doing so helps in the reduction of storage overhead and communication. From the logical standpoint, it corresponds to the assignment of edges to machines and letting the vertices to span across various machines. The correct and exact method to assign edges is dependent upon the PartitionStrategy. You can choose any strategy by the use of the Graph.partitionBy operator that repartitions the graph. By default, the initial partitioning of the edges is used as the partitioning strategy that is provided on graph construction. However, you can switch to 2D-partitioning and other heuristics easily too. The key challenge to the effective graph-parallel computation after the edges have been partitioned is to join the vertex attributes with the edges efficiently. You move vertex attributes to edges because real-world graphs include more edges as compared to vertices. In addition, you maintain a routing table internally that explains where to broadcast vertices when it comes to implement the join needed for aggregateMessages and triplets like operations. This is because all partitions do not include edges that are adjacent to all vertices.
7.036 Built-in Algorithms
For simplifying analytics tasks, GraphX also contains a few graph algorithms. These are included in the org.apache.spark.graphx.lib package and are accessible through GraphOps as directed methods on graphs. These algorithms are listed as page rank, connected components, and triangle counting. PageRank assumes that each edge from a to b represents an endorsement of b’s importance by a. It thus measures the importance in a graph. For instance, in Twitter, if a person is followed by various people, he or she will be ranked highly. On the PageRank object, GraphX is available with various static and dynamic PageRank implementations as methods. While dynamic ones run till the ranks coverage, static ones run for a fixed iterations number. It can be directly called as methods on a graph. The code to use it is given on the screen. The next algorithm, the connected components algorithm works by labeling every connected graph component with ID of its lowest-numbered vertex. For instance, in case of social networks, these components can approximate clusters. It is called by one of its implementation, theConnectedComponents object. An example code to use it is given on the screen. The Triangle Counting algorithm assumes a vertex as part of a triangle, which has two adjacent vertices and an edge between them. It is implemented in the TriangleCount object, which computes the triangle number passing through every vertex and provides them a clustering measure.
Next, let’s answer a few questions based on your understanding of the concepts covered in this lesson.
Let us summarize the topics covered in this lesson: • Graphs allow to perform tasks such as targeting advertising, identifying communities, and deciphering the documents meaning. • There are several limitations of the Graph-Parallel system such as runtime dependency and data ETL issues. • GraphX is a graph computation system running in the framework of the framework of the data-parallel system. • To start working with GraphX, you first need to import it and Spark into your project. • The property graph is defined as a directed multigraph that has properties related to every vertex and edge. • GraphX also includes a triplet view. • GraphX operators input user-defined functions and result into new graphs that have properties and structures transformed. These include property operators, structural operators, and join operators. • An important step in various graph analytics tasks is to aggregate the neighborhood information of every vertex. • One of the common aggregation tasks is to compute the degree of every vertex.
7.039 Summary (contd.)
• Sometimes, it is easy to express computation by performing a collection of neighboring vertices and the related attribute at every vertex. • GraphX must be cached explicitly when using multiple times. • To build a graph from a vertices and edges collection existing on a disk or in an RDD, GraphX provides Graph Builders. • The VertexRDD[A] adds additional constraints that every VertexID appears just once. • The EdgeRDD[ED] organizes the edges into blocks that are partitioned by the use of one of the partitioning strategies. • GraphX uses the vertex-cut approach in case of distributed graph partitioning. • For simplifying analytics tasks, GraphX also contains a few graph algorithms.
With this, we come to the end of the lesson 7 “Spark GraphX Programming” of the Apache Spark and Scala course. This was the last lesson of the course.
About the On-Demand Webinar
About the Webinar