RDD being a unit of compute in Spark is capable of doing all complex operations which traditionally are done using complex queries in databases. It’s sometimes difficult to get the exact steps to perform these operations, so this blog is an attempt in that direction using simple examples.

I can think of various sample data to create. But to pay respect to age old tradition, I would use the employee department data model. Anyone who joined the industry in the 1990s knows that this was “the” example everyone had to work with before anything else.

Employee Database Structure

Creative commons license:http://creativecommons.org/licenses/by-sa/3.0/legalcode (not affiliated with InfoObjects)

Loading Database in MySQL

You can skip this step if you have InfoObjects Big Data Sandbox.

Download database dump files from here.

shell > tar -xjf employees_db-full-1.0.6.tar.bz2
shell > cd employees_db/
shell > mysql -t < employees.sql

Loading data from MySQL to HDFS

You can skip this step if you have InfoObjects Big Data Sandbox.

Since this is a simple dump of data in HDFS, let’s use Sqoop:

shell > sqoop import --connect jdbc:mysql://127.0.0.1/employees --username hduser --password vipassana --table departments --warehouse-dir /user/hive/warehouse --fields-terminated-by , --hive-import
shell > sqoop import --connect jdbc:mysql://127.0.0.1/employees --username hduser --password vipassana --table dept_emp --warehouse-dir /user/hive/warehouse --fields-terminated-by , --hive-import
shell > sqoop import --connect jdbc:mysql://127.0.0.1/employees --username hduser --password vipassana --table dept_manager --warehouse-dir /user/hive/warehouse --fields-terminated-by , --hive-import
shell > sqoop import --connect jdbc:mysql://127.0.0.1/employees --username hduser --password vipassana --table employees --warehouse-dir /user/hive/warehouse --fields-terminated-by , --hive-import
shell > sqoop import --connect jdbc:mysql://127.0.0.1/employees --username hduser --password vipassana --table salaries --warehouse-dir /user/hive/warehouse --fields-terminated-by , --hive-import
shell > sqoop import --connect jdbc:mysql://127.0.0.1/employees --username hduser --password vipassana --table titles --warehouse-dir /user/hive/warehouse --fields-terminated-by , --hive-import

Loading RDDs

Loading RDDs with initial data:

scala > val depts = sc.textFile("/user/hive/warehouse/departments")
scala > val dept_emp = sc.textFile("/user/hive/warehouse/dept_emp")
scala > val dept_manager = sc.textFile("/user/hive/warehouse/dept_manager")
scala > val employees = sc.textFile("/user/hive/warehouse/employees")
scala > val salaries = sc.textFile("/user/hive/warehouse/salaries")
scala > val titles = sc.textFile("/user/hive/warehouse/titles")

It goes without saying do RDD.count action to make sure it got loaded correctly.

Load Spark SQL

scala > val hc = new org.apache.spark.sql.hive.HiveContext(sc)
scala > import hc._

Creating case classes:

scala > case class Department(dept_no:String,dept_name:String)
scala >

Loading RDDs:

scala > val departments = sc.textFile("/user/hive/warehouse/departments").map(_.split(",")).map( d => Department(d(0),d(1)))

Registering Tables:

scala > departments.registerTempTable("departments")

Running some basic SQL (to kick the tires):

scala > hc.sql("select dept_no, dept_name from departments").collect.foreach(println)
Top