Running SQL Queries Using Spark SQL Tutorial
Hello and welcome to Lesson 4 of the Apache Spark and Scala course offered by Simplilearn. This lesson will explain how to use run SQL queries using SparkSQL.
After completing this lesson, you will be able to: • Explain the importance and features of SparkSQL • Describe the methods to convert RDDs to DataFrames • Explain a few concepts of SparkSQL, and • Describe the concept of hive integration
4.3 Importance of Spark SQL
Let us first understand the importance of SparkSQL. It is a module of Spark that is used for structured data processing. The module is capable of acting as a distributed SQL query engine and provides a feature called DataFrames that provides programming abstraction. The module allows you to query structured data in programs of Spark by using SQL or a similar DataFrame API. Moreover, it can be used with different platforms such as Scala, Java, R, and Python. SQL and DataFrames provide a collective method for accessing multiple data sources, which include Avro, Hive, ORC, JSON, Parquet, and JDBC. Data can also be joined among these sources. For ETL and business intelligence tools, Spark offers industry standard ODBC and JDBC connectivity. The image given on the screen shows the integration of various components.
4.4 Benefits of Spark SQL
SparkSQL provides hive compatibility. It reuses the Hive metastore and frontend. Due to this, it is completely compatible with the existing Hive queries, UDFs, and data. To get benefitted with this feature, just install it with Hive. The image on the screen shows the interaction of these components. In addition, it mixes SQL queries with programs of Spark easily. For making queries fast, it also includes a cost-based optimizer, code generations, and columnar storage. Moreover, this module uses the Spark engine that allows it to scale to multi hour queries and thousands of nodes. This feature gives full mid-query fault tolerance. With this module, you do not need to use a different engine for the historical data
Let us now understand the concept of dataframes. It is a distributed collection of data, in which data is organized into columns that are named. You can compare it with a data frame in R or Python or a table in a relational database, however with much richer optimization functionality. To construct a dataframe, you can use various sources like tables in Hive, structured data files, existing RDDs, and external databases. To convert them to RDDs, you can call the rdd method that returns the DataFrame content as an RDD of rows. The DataFrame API is available for different platforms like R, Scala, Python, and Java. Note that in prior versions of Spark SQL API, SchemaRDD has been renamed to DataFrame.
The other component of SparkSQL is SQLContext. The SQLContext class or any of its descendants acts like the entry point into all functionalities. You just need a SparkContext to build a basic SQLContext. The code given on the screen shows how to create an SQLContext object. In addition, you can also build a HiveContext for availing the benefit of a superset of the basic SQLContext functionality. It also provides more features such as the writing ability for queries by using the more comprehensive HiveSQL parser. Other features are accessing Hive UDFs and the read data ability from Hive tables. The entire data sources that are available to an SQLContext still exist. Therefore, you do not require an existing Hive setup for using a HiveContext.
4.7 SQLContext (contd.)
HiveContext is just packaged separately for avoiding the dependencies of Hive in the default Spark build. For your applications, if these dependencies are not a concern, then HiveContext is recommended for Spark 1.3. The future releases will be focused on getting SQLContext up to feature parity with a HiveContext. You can also use the spark.sql.dialect option to select the specific variant of SQL, which is used for parsing queries. To change this parameter, you can use the SET key=value command in SQL or the setConf method on an SQLContext. The only dialect available for an SQLContext is “sql” that makes use of a simple SQL parser. However, in a HiveContext, the default is “hiveql”, which is much more comprehensive. Therefore, it is recommended for most use cases. On an SQLContext, the sql function allows applications to programmatically run SQL queries and then return a DataFrame as a result. The code given on the screen shows the use of the sql function.
4.8 Creating a DataFrame
Let us now view an example code to create a DataFrame. In this example, a DataFrame is being created based on a JSON file content.
4.9 Using DataFrame Operations
DataFrames are capable of providing a domain-specific language that can be used for structured data manipulation in Java, Scala, and Python. The example given on the screen shows structured data processing using DataFrames. In this code, DataFrames are providing functions like printschema, show, groupby, filter, and more.
4.10 Using DataFrame Operations (contd.)
The further code is displayed.
4.11 Demo-Run SparkSQL with a Dataframe
This demo will show the steps to run SparkSQL with a dataframe.
4.12 Run SparkSQL with a Dataframe
In this demo, you will learn how to run Spark sql with a dataframe. “spark-shell” available in the bin directory of the Spark installation path enables us to run SQL queries using data frame. First, to run sql using spark, we need to create an instance of the SparkContext object. The schema of a table can be defined using case classes in spark sql. Here, we are defining the Customer case class to represent the schema of the customer table. We can read local or HDFS file by using the textFile method and represent it as an RDD. The resultant RDD can be converted into a data frame by calling the toDF() method. Note the code dispayed on the screen Once we have a variable representing a data frame, we can call the “registerTempTable” method on it to treat it as a database table. After registering the data frame as a table, we can run a SQL query on it as if we are running a query on the table. Here, we are going to use show() , printSchema(), filter() and groupby() methods to run various types of queries on the customer table.
4.13 Interoperating with RDDs
To convert existing RDDs into DataFrames, SparkSQL supports two methods. The first method uses the reflection based approach for inferring an RDD schema containing specific types of objects. It works well when the schema is already known when you are writing your Spark application. In such cases, it leads to more concise code. The second method is a programmatic approach that lets you build a schema and apply to an already existing RDD. However, this method is more verbose, but it lets you build DataFrames when you do not know the columns and their types until runtime.
4.14 Using the Reflection-Based Approach
Let us first talk about the reflection based approach. For SparkSQL, the Scala interface allows to convert an RDD with case classes to a DataFrame automatically. The case class has the table schema, where the arguments names to the case class are read using the reflection method. These then become the columns names. In addition, you can also nest the case classes and use them to contain complex types like sequence of arrays. You can also implicitly convert the resultant RDD to a DataFrame and register it as a table. The tables can then be used in the subsequent SQL statements.
4.15 Using the Reflection-Based Approach (contd.)
The example given on the screen shows how to infer the schema using the reflection based approach.
4.16 Using the Programmatic Approach
Now, let’s talk about the second method, which is a programmatic approach. This method is used when you cannot define case classes ahead of time. For instance, when the records structure is encoded in a text dataset or a string. In such cases, you can create a DataFrame using the three steps listed on the screen. As the first step, you need to use the existing RDD to create an RDD of rows. Next, you need to create the schema that is represented by a StructType and matches the rows structure just created. As the final step, you need to apply the schema created to the RDD of rows using the createDataFrame method. This method is provided by SQLContext.
4.17 Using the Programmatic Approach (contd.)
The example given on the screen shows how to specify the schema programmatically.
4.18 Demo-Run Spark SQL Programmatically
This demo will show the steps to run Spark SQL by programmatically specifying the schema of a table.
4.19 Run Spark SQL Programmatically
In this demo, you will learn how to run spark SQL by programmatically specifying the schema of a table. “spark-shell” available in the bin directory of the spark installation path enables us to run SQL queries by programmatically specifying the schema of a table. First, to run sql using spark, we need to create an instance of the SparkContext object. We can read local or HDFS file by using the textFile method and represent it as an RDD. We can use StructType to define the schema of the table, which uses the reflation mechanism to define a field name and its data type. We can use the “createDataFrame” method of the SQLContext object to create a dataframe by taking a variable representing the tables’s row data and table’s schema as parameters. Once we have a variable representing the data frame, we can call the “registerTempTable” method on it to treat as a database table. After registering the data frame as a table, we can run an SQL query on it as if we are running a query on the table. Here, we are going to use the SQLContext.sql() method to execute the SQL query and then we will transform the output using the map method before printing them onto the console.
4.20 Data Sources
The DataFrame interface allows to operate on various data sources. You can operate it on data sources as a normal RDD or by registering as a temporary table. By registering a DataFrame as a table, you can run SQL queries on its data. These methods are of two types: general methods that allow to load and save data using the Spark Data Sources and specific methods that allow you to operate on built-in data sources. The general methods are the simplest ones and use the default data source, which is parquet unless configured by spark.sql.sources.default. They are used for all operations. An example is given on the screen. For specific methods, the data source used with any extra options to be passed to the data source can be manually specified. These sources are specified using their fully qualified names, which is rg.apache.spark.sql.parquet. However, for the specific methods and built-in sources, you are allowed to use their short forms such as jdbc, json, and parquet. The example given on the screen shows the syntax that can be used to convert a DataFrame of any type to another.
4.21 Save Modes
Optionally, save operations can acquire a SaveMode. These modes specify the method of handling any existing data. Note that these modes are not atomic and do not use any locking. Therefore, you cannot safely use multiple writers that try to write data to the same location. In addition, when overwriting, the data gets deleted before you could write any new data. The table given on the screen lists and explains the different save modes used in Scala and Java.
4.22 Saving to Persistent Tables
You can use the saveAsTable command to save DataFrames as persistent tables when working with a HiveContext. This command is capable for materializing the DataFrames contents and creating a data pointer in the HiveMetastore. The persistent tables thus resulted exist even after you restart your Spark program, provided the connection to the same metastore is maintained. To create a DataFrame for a persistent table, you can call the table method with the table name on an SQLContext. The saveAsTable command by default creates a managed table, which means that the data location is controllable by the metastore. When a table is dropped, these tables will also have automatically deleted their data.
4.23 Parquet Files
Parquet represents a columnar format supported by various data processing systems. SparkSQL supports to read and write these files. The schema of the original data is automatically preserved. The example given on the screen shows the related operations.
4.24 Partition Discovery
A common optimization approach that is used in systems such as Hive is table partitioning. When a table is partitioned, data is generally saved in various directories. The partitioning column values are encoded in every directory path. Now, the Parquet data source can automatically discover and infer the information of partitioning. You can pass path/to/table to SQLContext.read.load or SQLContext.read.parquet that will allow Spark SQL to extract the information of partitioning from the paths automatically.
4.25 Schema Merging
Parquet provides support to schema evolution similar to Avro, Thrift, and ProtocolBuffer. You can start with a basic schema and then add more columns gradually to it as required. In this manner, you can have various Parquet files at the end with various mutually compatible schemas. Now, the Parquet data source can detect this case automatically and merge these files’ schemas. An example to use this is given on the screen.
4.26 JSON Data
Spark SQL is capable of inferring a JSON dataset schema and loading it as a DataFrame. You can perform this conversion using the SQLContext.read.json() method on a JSON file or an RDD of string. An important point is that the JSON file here is not a typical one. Every line needs to have a different self-contained and valid JSON object. Therefore, as a result, a general multi-line JSON file will fail in most of the cases. An example to use this is given on the screen.
4.27 Hive Table
Spark SQL is also capable to read and write data that is stored in an Apache Hive. Hive has multiple dependencies. This is the reason why the default Spark assembly does not include it. The related support is enabled by adding the -Phive-thriftserver and the –Phive flags to the build of Spark. By doing this, it creates a new assembly jar including Hive. This new jar must also exist on all the worker nodes. This is important as they would need to access the serialization and deserialization libraries of the Hive for accessing the stored data. To configure a Hive, you can place your hive-site.xml file in the configuration directory. The example to do so is given on the screen.
4.28 DML Operation-Hive Queries
You must create a HiveContext when you are working with Hive. It provides support to write queries using HiveQL and finds tables in the MetaStore. In case you do not have any existing deployment of Hive, you can still create a HiveContext. It builds warehouse and metastore_db automatically, if it is not configured by the hive-site.xml, in the present directory. An example of the same is given on the screen.
4.29 Demo-Run Hive Queries Using Spark SQL
This demo will show the steps to run hive queries using Spark SQL.
4.30 Run Hive Queries Using Spark SQL
In this demo, you will learn how to run hive queries using Spark SQL. “spark-shell” available in the bin directory of the spark installation path enables us to run sql queries that can be used to perform batch analytics on Hive. First, to run SQL on Hive using spark, we need to create an instance of the HiveContext object. We can run Hive queries using the sql() method of the HiveContext object.
4.31 JDBC to Other Databases
To read data from various other databases, SparkSQL also contains a data source using JDBC. The results are in the form of a DataFrame. Additionally, they can be easily joined with data sources and processed in Spark SQL. Moreover, the JDBC data source can be easily used from Python or Java because you need not provide Classtag. You should note that the Spark SQL JDBC server is different from it, which lets other applications for running queries using Spark SQL. To start with, one must include the related driver for the specific database on the classpath of Spark. For instance, from the Spark shell, to connect to postgres, you need to run the command as depicted on the screen.
4.32 Supported Hive Features
Spark SQL is compatible with user defined serialization formats or SerDes, user defined functions or UDFs, and Hive Metastore. At present, it is based on Hive 0.12.0 and 0.13.1. The Hive query statements are listed on the screen. These are Select, Order By, Group By, Cluster By, and Sort By. In addition, the Hive operators are relational operators, logical operators, arithmetic operators, mathematical functions, complex type constructors, and string functions.
4.33 Supported Hive Features (contd.)
A few other supported hive features are joins, which include join, left semi join, and cross join. It also supports unions, sub queries, sampling, explain, partitioned tables, views, and all hive DDL functions such as Create Table, Alter table, and create table as select.
4.34 Supported Hive Data Types
Let’s now talk about the supported hive data types. These are listed on the screen, which include string, binary, date, array, map, Boolean, struct and more.
4.35 Case Classes
Let’s now talk about case classes, which represent the underneath data schema. The APIs that are used in case classes include createSchemaRDD, SQLContext, and registerTempTable.
4.36 Case Classes (contd.)
The example given on the screen shows how to define a case class in Spark SQL.
Next, let’s answer a few questions based on your understanding of the concepts covered in this lesson.
Let us summarize the topics covered in this lesson: • SparkSQL is a module of Spark that is used for structured data processing. • DataFrames is a distributed collection of data, in which data is organized into columns that are names. • The SQLContext class or any of its descendants acts like the entry point into all functionalities. • To convert existing RDDs into DataFrames, SparkSQL supports two methods: reflection based approach and programmatic approach. • SaveModes specify the method of handling any existing data. • You can use the saveAsTable command to save DataFrames as persistent tables. • Parquet represents a columnar format supported by various data processing systems.
4.39 Summary (contd.)
• Spark SQL is capable of inferring a JSON dataset schema and loading it as a DataFrame and read and write data that is stored in an Apache Hive. • You must create a HiveContext when you are working with Hive. • To read data from various other databases, SparkSQL also contains a data source using JDBC. • Spark SQL is compatible with SerDes, UDFs, and Hive Metastore. • A few other supported hive features are joins, unions, sampling, explain, and more. • The supported data types include string, binary, date, array, map, Boolean, struct and more. • Case classes represent the underneath data schema.
With this, we come to the end of the lesson 4 “Running SQL Queries using SparkSQL” of the Apache Spark and Scala course. The next lesson is Spark Streaming.
About the On-Demand Webinar
About the Webinar