So far in Spark, JdbcRDD has been the right way to connect with a relational data source. In Spark 1.4 onwards there is an inbuilt datasource available to connect to a jdbc source using dataframes.

Dataframe

Spark introduced dataframes in version 1.3 and enriched dataframe API in 1.4. RDDs are a unit of compute and storage in Spark but lack any information about the structure of the data i.e. schema. Dataframes combine RDDs with Schema and this small addition makes them very very powerful. You can read more about dataframes here.

Please make sure that jdbc driver jar is visible on client node and all slaves nodes on which executor will run.

Let us create ‘person’ table in mysql (or database of your choice) with following script:

Create table ‘person’ in MySql using following DDL
CREATE TABLE `person` (
`person_id` int(11) NOT NULL AUTO_INCREMENT,
`first_name` varchar(30) DEFAULT NULL,
`last_name` varchar(30) DEFAULT NULL,
`gender` char(1) DEFAULT NULL,
`age` tinyint(4) DEFAULT NULL,
PRIMARY KEY (`person_id`)
)

Now Let’s insert some data to play with

Insert into person values(‘Barack’,’Obama’,’M’,53);
Insert into person values(‘Bill’,’Clinton’,’M’,71);
Insert into person values(‘Hillary’,’Clinton’,’F’,68);
Insert into person values(‘Bill’,’Gates’,’M’,69);
Insert into person values(‘Michelle’,’Obama’,’F’,51);

Download mysql jar from http://dev.mysql.com/downloads/connector/j/.

Make MySQL driver available to Spark shell and launch it

$spark-shell -–driver-class-path /path-to-mysql-jar/mysql-connector-java-5.1.34-bin.jar

Construct Jdbc URL

scala> val url="jdbc:mysql://localhost:3306/hadoopdb"

Create connection properties object with username and password

scala> val prop = new java.util.Properties
scala> prop.setProperty("user",”hduser”)
scala> prop.setProperty("password","********")

Load DataFrame with JDBC data-source (url, table name, properties)

scala> val people = sqlContext.read.jdbc(url,"person",prop)

Show the results in a nice tabular format

scala> people.show

This has loaded the whole table, what if I only would like to load males (url, table name, predicates, properties)

scala> val males = sqlContext.read.jdbc(url,"person",Array("gender='M'"),prop)
scala> males.show

Show only first names

scala> val first_names = people.select("first_name")
scala> first_names.show

Show only people below age 60

scala> val below60 = people.filter(people("age") < 60) scala> below60.show

Group people by gender

scala> val grouped = people.groupBy("gender")

Find number of males and females

scala> val gender_count = grouped.count
scala> gender_count.show

Find average age of males and females

scala> val avg_age = grouped.avg("age")
scala> avg_age.show

Now save this avg age data to a new table

scala> gender_count.write.jdbc(url,"gender_count",prop)

Please contact us if you need any help.

Top