Accumulators are Spark’s answer to MapReduce counters but they do much more than that. Let’s first start with a simple example which uses accumulators for simple count.

scala> val ac = sc.accumulator(0)
scala> val words = sc.textFile("hdfs://localhost:9000/user/hduser/words").flatMap(_.split("\\W+"))
scala> words.foreach(w => ac += 1 )
scala> ac.value

It was a silly example so let’s make it sillier. Lets count total letters across the data.

scala> val ac = sc.accumulator(0)
scala> val words = sc.textFile("hdfs://localhost:9000/user/hduser/words").flatMap(_.split("\\W+"))
scala> words.foreach(w => ac += w.length )
scala> ac.value

Following the tradition, let’s put this data in people directory

Barack,Obama,53
George,Bush,68
Bill,Clinton,68

and then calculate total age

scala> val ac = sc.accumulator(0)
scala> val people = sc.textFile("people").map( v => v.split(","))
scala> people.foreach( p => ac += p(2).toInt)
scala> ac.value

Here you may notice that we are doing ac += p(2).toInt . It may appear like we are doing equivalent of ac = ac + p(2).toInt but if you try to do that you will get an exception as ac is a val. In reality += is another function in Scala.

Now that we have done so far is sum or count both associative tasks. One common use case to use counters is to count bad records.

Let’s create another dataset of people with some bad records.Let’s put it in directory baddata

Barack,Obama,53
George,Bush,68
Hillary,Clinton,F
Bill,Clinton,68
Tom,Cruise,M

Now our job is to calculate bad records in accumulator badRecords

scala> val badRecords = sc.accumulator(0)
scala> val baddata = sc.textFile("baddata").map(v => v.split(","))
scala> baddata.foreach( r => { try{r(2).toInt} catch{ case e: NumberFormatException =>  badRecords += 1 }})
scala> badRecords.value

Unlike broadcast variables, accumulators do not need anything special to be done to be disposed of. Garbage collector picks them the moment they go out of scope like any other Java Object.

Top