So far in Spark, JdbcRDD has been the right way to connect with a relational data source. In Spark 1.4 onwards there is an inbuilt datasource available to connect to a jdbc source using dataframes.
Spark introduced dataframes in version 1.3 and enriched dataframe API in 1.4. RDDs are a unit of compute and storage in Spark but lack any information about the structure of the data i.e. schema. Dataframes combine RDDs with Schema and this small addition makes them very very powerful. You can read more about dataframes here.
Please make sure that jdbc driver jar is visible on client node and all slaves nodes on which executor will run.
Let us create ‘person’ table in mysql (or database of your choice) with following script:
Create table ‘person’ in MySql using following DDL CREATE TABLE `person` ( `person_id` int(11) NOT NULL AUTO_INCREMENT, `first_name` varchar(30) DEFAULT NULL, `last_name` varchar(30) DEFAULT NULL, `gender` char(1) DEFAULT NULL, `age` tinyint(4) DEFAULT NULL, PRIMARY KEY (`person_id`) )
Now Let’s insert some data to play with
Insert into person values(‘Barack’,’Obama’,’M’,53); Insert into person values(‘Bill’,’Clinton’,’M’,71); Insert into person values(‘Hillary’,’Clinton’,’F’,68); Insert into person values(‘Bill’,’Gates’,’M’,69); Insert into person values(‘Michelle’,’Obama’,’F’,51);
Download mysql jar from http://dev.mysql.com/downloads/connector/j/.
Make MySQL driver available to Spark shell and launch it
$spark-shell -–driver-class-path /path-to-mysql-jar/mysql-connector-java-5.1.34-bin.jar
Construct Jdbc URL
scala> val url="jdbc:mysql://localhost:3306/hadoopdb"
Create connection properties object with username and password
scala> val prop = new java.util.Properties scala> prop.setProperty("user",”hduser”) scala> prop.setProperty("password","********")
Load DataFrame with JDBC data-source (url, table name, properties)
scala> val people = sqlContext.read.jdbc(url,"person",prop)
Show the results in a nice tabular format
This has loaded the whole table, what if I only would like to load males (url, table name, predicates, properties)
scala> val males = sqlContext.read.jdbc(url,"person",Array("gender='M'"),prop) scala> males.show
Show only first names
scala> val first_names = people.select("first_name") scala> first_names.show
Show only people below age 60
scala> val below60 = people.filter(people("age") < 60) scala> below60.show
Group people by gender
scala> val grouped = people.groupBy("gender")
Find number of males and females
scala> val gender_count = grouped.count scala> gender_count.show
Find average age of males and females
scala> val avg_age = grouped.avg("age") scala> avg_age.show
Now save this avg age data to a new table
Please contact us if you need any help.