This recipe works with Spark 1.2. For Spark 1.3 version, please click here.

Apache Parquet, as a file format, has garnered significant attention recently. Let’s say you have a table with 100 columns, most of the time you are going to access 3-10 columns. In row oriented format all columns are scanned where you need them or not.

Apache Parquet saves data in column oriented fashion. So if you need 3 columns only, data of those 3 columns get loaded. Another benefit is that since all data in a given column is the same datatype (obviously), compression quality is far superior.

In this recipe, we will learn how to save a table in Parquet format, and then how to load it back.

Let’s use the person table we created in the other recipe.

 

first_name last_name gender
Barack Obama M
Bill Clinton M
Hillary Clinton F

 

Let’s load it in Spark SQL:

scala> val hc = new org.apache.spark.sql.hive.HiveContext(sc)
scala>import hc._
scala>case class Person(firstName: String, lastName: String, gender: String)
scala>val person = sc.textFile("person").map(_.split("\t")).map(p => Person(p(0),p(1),p(2)))
scala>person.registerTempTable("person")
scala>val males = hc.sql("select * from person where gender='M'")
scala>males.collect.foreach(println)

Now let’s save this person schema RDD to Parquet format:

scala> person.saveAsParquetFile("person.parquet")

There is an alternative way to save to Parquet if you have data already in the Hive table:

hive> create table person_parquet like person stored as parquet;
hive> insert overwrite table person_parquet select * from person;

Now let’s load this Parquet file. There is no need of using a case class anymore as schema is preserved in Parquet.

scala>val personRDD = hc.parquetFile("person.parquet")
scala> personRDD.registerAsTempTable("pp")
scala>val males = hc.sql("select * from pp where gender='M'")
scala>males.collect.foreach(println)

Sometimes Parquet files pulled from other sources like Impala save String as binary. To fix that issue, add the following line right after creating SqlContext:

scala> sqlContext.setConf("spark.sql.parquet.binaryAsString","true")
Top