Apache Spark 1.3 introduced a new concept called DataFrames and it got further developed in version 1.4. Dataframes also complement project Tungsten, a new initiative to improve CPU performance in Spark.
Motivation behind DataFrames
Spark so far has had a disproportionate focus on HDFS as storage. It was not unexpected. HDFS was the only Big Data storage for a long time and even now leads the pack of various storage solutions. HDFS has also been popular as an unstructured data storage.
This question comes up a lot of times: when HDFS is unstructured storage, why are we expecting some structure to be there even for MR jobs? When we supplement word unstructured with schema-on-read, things become clearer. In traditional structured storage like relational databases, schema is enforced at the time of writing. In HDFS, no schema is enforced during writing. It does not mean that data does not have any structure. Knowledge of this structure is needed before we load the data and make sense of the data.
The good part is that there is no concrete definition of structure. Structure here means the inherent structure the data has, as opposed to externally forced structure like normal forms.
Coming back to Spark, RDDs are an immutable distributed collection of data. This collection can be of any datatype like String, Integer or tuples. For most of the production data, it ends up being the RDD of tuples. Operations using tuples, though, are not complicated, but are messy. Anyone who has worked with tuples and tried to put their head around _1s and _2s can relate to it.
Another trend which has helped creation of Dataframes are new data formats like Avro and Parquet. These formats have schema embedded in them are getting very popular.
SchemaRDD in Spark SQL along with DSL has tried to address this problem in a piecemeal way. Let us refresh our memory of SchemaRDD.
RDD itself does not have any information about schema of the data it contains. This is where the role of SchemaRDD comes in. SchemaRDD is an RDD whose elements are Row objects. Along with that it also contain schema of these elements. So in short:
SchemaRDD = RDD [Row] + Schema
Spark SQL has two options to deal with complex operations like joins.
- Write SQL code
- Use domain specific language (DSL) syntax
DSLs so far have been an experimental feature originally introduced primarily to help testing.
DataFrames – new approach:
DataFrames take on this challenge with a fresh pair of eyes. By definition DataFrame is a distributed collection of data organized in named columns. It brings along two features with it:
- Inbuilt support for a variety of data formats like Parquet, Hive and JSON
- A more robust and feature rich DSL
Time to get the hands dirty:
You can download InfoObjects Sandbox v 1.3 from here. Sandbox has the vintage employee database loaded in Hive warehouse (/user/hive/warehouse). You can refer to this to get more information about this database.
Let’s get started.
Load Spark shell and initialize HiveContext (please do make sure hive-context.xml is copied in Spark conf directory):
$ spark-shell scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
Create DataFrame for “salaries” table in Hive warehouse:
scala>val salaries = hc.table("salaries")
Create alias for salaries so that it can be used with DSL:
Find employees earning more than 60K:
scala>val highEarners = salaries.filter( 'salary > 50000)
Check first value to see if salary is actually higher than 60K:
Well, some of you who live in the Bay Area may be frowning that 60K is a good starter salary but not high earners. So let’s give everyone a 10K raise as an annual performance review.
val newSalary = highEarners.select('salary + 10000)
Now let us load the “employees” Hive table:
scala> val emps = hc.table("employees") scala>emps.as('emps)
Let us group employees by gender:
val genders = emps.groupBy("gender")
Let’s print the counts:
Please do not confuse this count with count of RDD; here the count is an operation on GroupedData.
This count is an aggregation over group (by using database parlance).
There are few more methods which you can use over GroupedData:
The holy grail – Joins:
Let us take two tables, departments and dept_manager and join the two.
scala> val depts = hc.table("departments") scala> val dept_manager = hc.table("dept_manager") scala> val depts_with_manager = depts.join(dept_manager,depts.col("dept_no") === dept_manager.col("dept_no"),"left_outer")