Using JdbcRDD with Spark is slightly confusing, so I thought about putting a simple use case to explain the functionality. Most probably you’ll use it with spark-submit but I have put it here in spark-shell to illustrate easier.

Database Preparation

We are going to load a person table in MySql database. Here’s the DDL:

CREATE TABLE `person` (
`person_id` int(11) NOT NULL AUTO_INCREMENT,
`first_name` varchar(30) DEFAULT NULL,
`last_name` varchar(30) DEFAULT NULL,
`gender` char(1) DEFAULT NULL,
PRIMARY KEY (`person_id`)
)

The following is an example of some data which is loaded:

 

first_name last_name gender
Barack Obama M
Bill Clinton M
Hillary Clinton F

 

I am a fan of oversimplification when it comes to examples, as you can see.

JDBC Driver

We have to make the MySql JDBC driver available to spark-shell. I am using:

mysql-connector-java-5.1.29-bin.jar

You can download it from http://dev.mysql.com/downloads/connector/j/ if it’s not already available.

Get Hands dirty on spark-shell

Load spark-shell with MySql driver and launch it.

$spark-shell --jars /path-to-mysql-jar/mysql-connector-java-*.*.**-bin.jar

Create variables for the username, password and JDBC url:

scala> val url="jdbc:mysql://localhost:3306/hadoopdb"
scala> val username = "hduser"
scala> val password = "******"

Let’s get imports out of the way:

scala> import org.apache.spark.rdd.JdbcRDD
scala> import java.sql.{Connection, DriverManager, ResultSet}

Create JdbcRDD:

scala> Class.forName("com.mysql.jdbc.Driver").newInstance
scala> val myRDD = new JdbcRDD( sc, () => 
DriverManager.getConnection(url,username,password) ,
"select first_name,last_name,gender from person limit ?, ?",
1, 5, 2, r => r.getString("last_name") + ", " + r.getString("first_name"))

The two ?’s are bind variables for PreparedStatement inside JdbcRDD. The first ? is for offset (lowerbound), i.e., which row to start compute with and the second ? is for limit (upperbound), i.e., how many rows to read.

The syntax of JdbcRDD goes like this:

JdbcRDD( SparkContext,  
getConnection: () => Connection,  
sql: String,  
lowerBound: Long,  
upperBound: Long,  
numPartitions: Int,  
mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray 
)

Now query the results:

scala> myRDD.foreach(println)

Write it to HDFS:

scala> myRDD.saveAsTextFile("person")

InfoObjects Spark Sandbox comes fully-loaded with Hadoop, Spark and more than 10 other ecosystem technologies. Please email bigdata@infoobjects.com to request download access to the Sandbox.

Top