Spark 101 - Writing your first Spark app

Published 11/10/2015 6:44:00 AM
Filed under Apache Spark

Recently I've started exploring Apache Spark as a way to process large amounts
of data coming from things like sensors. The kind of things that happen in
a typical IoT solution.

One of the things I noticed is that the hello world scenario for Spark looks
cool on the website, but in practice there are quite a few things you need
to be aware of if you want to use Spark in a real world scenario.

In this post I will show you how you can set up a basic Spark application.
This is going to be a series of posts exploring Spark and explaining
to process data coming from IoT devices.

A very short introduction into Spark

Before going ahead to show you some code, let me first explain what Spark is.
Spark in essence is a distributed computation engine. You can create a cluster
of Spark nodes to run programs that have the typical map/filter/reduce kind of
setup.

There are of course alternatives like Hadoop. The big difference between Hadoop
and Spark is that Hadoop is rather slow as it runs mostly on stored datasets.
Spark on the other hand has a slew of optimizations that make it run mostly from
memory. Also, the programming model of Spark is a lot less complex as you will
see in the samples later on.

So what is Spark good for? Most people will tell you it's good at processing
big data. They are right, but there's more to it. Put the buzzword aside and you
end up with something that can split workloads across multiple machines.

Basically, if your workload cannot be processed in a reasonable amount of time
on one machine you have the option of using Spark. Spark let's you divide the workload
across multiple machines making the execution time much shorter.

Building applications for Spark

How does this work? Let me show you through a sample.
To build an application for Spark you can use R, Scala, Java, C# or Python.
Some languages have a better Spark API than others. Scala has the most complete API
at this point, so I will show the code in Scala throughout my post series.

The first step you need to take when you want to write an application for Spark
is to set up the build file for your project. For this, create a new folder and drop
a new file with the name build.sbt in there with the following content:

scalaVersion := "2.10.6"

version := "0.0.1"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.1"

To make a Spark application you need just one dependency, the spark-core library.

With this library in place let's write a bit of code. Create a new folder
called src/main/scala/example and create a file called Program.scala in that
folder.

In the Program.scala we're going to write the Spark program.

object Program {
  def main(args: Array[String]) = {

  }
}

Normally you would let Program extend from scala.App, but not in the case
of Spark. This has to do with the way Spark tries to run your application and
the way Scala generates code when you base your main class on scala.App.

You can build the class and run it, but instead let's add the basics to
work with the Spark API.

import org.apache.spark.{SparkConf, SparkContext}

object Program {
  def main(args: Array[String]) = {
    val conf = new SparkConf()
      .setAppName("my-app")
      .setMaster("local")

    val sc = new SparkContext(conf)
  }
}

I've imported the SparkContext and SparkConf class from the org.apache.spark
package to get access to the main components provided by Spark.

The SparkContext is the place to be when you want to access data
from things like Cassandra, Hadoop, etc.

One of the things you can do with the Spark context is ask it to create a
resilient distributed datasource (RDD). The RDD is the other main component
you're going to use from Spark. RDDs provide access to the data that your
Spark program is going to process.

An RDD can contain any object you like, in a later post I will show you
how to work with Cassandra, but for now let's create a basic RDD containing some
sample data.

import org.apache.spark.{SparkConf, SparkContext}

object Program {
  def main(args: Array[String]) = {
    val conf = new SparkConf()
      .setAppName("my-app")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)

    val dataSource = sc.makeRDD(List(
      Person("Mike",28),
      Person("Adam", 31),
      Person("John", 30)))

    val output = dataSource.filter(person => person.age > 30)
  }
}

In this sample we now create a new RDD from a set of people and get everyone
with the age above 30.

When you run this application, you get nothing. It runs really really fast.
Not only because you have just three people, but also because it does nothing
with those people.

RDDs are lazy. When you call an operation on the RDD, most of the time, you
get back a new RDD with operation applied to it. Once you call methods like collect()
then the operations are applied to the items returned by the RDD.

The RDDs are lazy, because the contents of the RDD are split across the nodes
in the Spark cluster. When you call a method like collect() you schedule
a job on the Spark cluster. The master who gets the job will split up the work
depending on the operations you applied on the RDD, execute that work on the nodes
and gather the results on the master.

Although RDDs are lazy and work is split up, you won't notice it very much in
your program. There is however two things that you shouldn't do.

Don't create two spark contexts. There should be just one, create it
in the main method and pass it around if you must, but don't attempt to create
multiple instances. It breaks.

Also, access one RDD at a time. Code like the following sample is bad:

matrix = nodes
  .flatMap(node1 => nodes.map(node2 => (node1,node2)))
  .map({ case (n1,n2) => (n1,n2,distance(n1,n2)) })

This code tries to perform an operation on an RDD inside the operation that
is performed on another RDD. If you do this, you will end up with an exception.
Why can't you do this? Spark doesn't know how to map such a construction so it
simply raises an exception instead of giving you weird results.

The proper way to create a construction like the one above, you can use the
zip operation:

val matrix = nodes.zip(nodes)
  .map({ case (n1,n2) => (n1,n2,distance(n1,n2)) })

Other than these two rules you build the application in any way you like.
Most probably it will involve building a lot of map, flatMap etc. calls.

Testing your application

When you have an application build on top of Spark you will probably wonder,
does it work? Do I get the results I want?

You can write unit-tests for your Spark program using your favorite
test framework. I happen to like scalatest, so I will show you how you can
test the program I showed before using scalatest.

For this I need to modify the application a little bit, so that the
logic is separated from the main entrypoint.

class CalculatePeopleAboveThirty(sc: SparkContext) {
  def calculate() = {
    val dataSource = sc.makeRDD(List(
      Person("Mike",28),
      Person("Adam", 31),
      Person("John", 30)))

    dataSource.filter(person => person.age > 30)
  }
}

The above class contains the same logic as before, but now I can inject
the spark context into the class and I get the results back so that I can
validate them.

To work with this class I need to modify the Program class as well.

import org.apache.spark.{SparkConf, SparkContext}

object Program {
  def main(args: Array[String]) = {
    val conf = new SparkConf()
      .setAppName("my-app")
      .setMaster("local[2]")

    val sc = new SparkContext(conf)

    val algorithm = new CalculatePeopleAboveThirty(sc)
    val output = algorithm.calculate()
  }
}

The program now only sets up the configuration and spark context and
calls a separate algorithm to perform the calculation.

Now that we have the separate class we can write a unit-test to validate
that class independent of our Spark cluster.

class PeopleProgramSpec extends WordSpec
with Matchers with ShouldVerb with BeforeAndAfter {
  val conf = new SparkConf()
    .setAppName("people-test")
    .setMaster("local[2]")

  val sc = new SparkContext(conf)

  "People Algorithm" when {
    val algorithm = new CalculatePeopleAboveThirty(sc)

    "asked to get people above 30" should {
      "filter out people that are below 30" in {
        val output = algorithm.calculate()

        output.count() should equal(2)
      }
    }
  }  

  override def after(fun: => Any) = {
    sc.stop()
  }
}

This test spins up a spark context configured to run locally. You can do this
by setting the master to "local" providing the number of partitions to use
between angle brackets.

After you set up the SparkContext instance you can start to test the code.
Invoke the algorithm and grab the results. Remember, at this point you don't
actually have any data.

By calling count() you load the data, execute the filter and return the
number of records that are remaining after the filter.

One important thing to remember: Close the SparkContext instance after you're
done testing your code. This cleans up everything that Spark left behind.

Running the application for real

Now that you have build and tested your code you can run it in Spark by invoking the
command submit-spark which looks like this:

spark-submit --class Program --master local[2] <path-to-jar>

The spark submit command expects the class it needs to run, the master it
needs to run that class on and finally you specify the path to the jar containing
the compiled sources.

Your JAR file is uploaded to Spark and the class is submitted as a job to Spark.
A cool detail here is that you can observe the job by going to
http://localhost:4040/ in your favorite browser.

If you plan on using external libraries in your Spark program you need to know
about one more thing. External libraries need to be packaged with your main jar
if you want to have it easy.

For this you have to include the assembly plugin in the SBT build.
Check out the documentation of the plugin to find out how.

After you have added this plugin change the SBT file by adding the
provided scope to the Spark dependencies.

Finally run the sbt assembly command in the terminal to get the assembled jar file.
This assembled jar can then be submitted using the spark-submit command.

Don't feel like using the assembly plugin? Don't worry, you can also make use
of the --packages option and the --repositories option. The first option
lets you specify maven coordinates for external dependencies. The second option
lets you specify any extra repository that you want to get the dependencies from.

Be aware though, the commandline option is brittle because a typo is easily made
and you need to maintain another piece of code that could get out of sync.

Where to go from here?

The sample in this post is obviously very simple. Spark features more than
just the basic RDDs and the SparkContext. I suggest you check out the
documentation if you want to know all the gory details.

Also, if you're interested in doing more, make sure that you check out MLLib
and GraphX to learn more about machine learning and graph computations using Spark.

In the next post I will show you how to integrate Spark applications with other
applications in order to build an analysis pipeline.

Until then, have fun exploring Spark!