Spark 101 - Integration with data sources

Published 11/12/2015 6:42:00 AM
Filed under Scala

In the previous post I showed you how to build basic Spark programs. Building a basic
Spark application is the hello world scenario of Spark. Normally you'd use something
like a database to feed your algorithm in Spark and output the results of the
algorithm somewhere else.

But what can you connect to Spark and where do you leave the outputs of your program?

In this post I will show you some of the options you have to integrate your
Spark program with data sources. Hopefully you will slowly start to see where
Spark fits in your solution.

Please note I'm using Scala for the sample code in this post.
So things my differ a bit in other languages or may not work at all *grin*.

Loading data for batch jobs

So the first and probably most well-known and well understood way to work with data
is by loading a bunch of rows from a database and processing them as a batch.

Something like SELECT * FROM customers is understood very well by most people.
In Spark you can do run batch oriented programs in a number of ways.

First there's the Spark SQL classes that you can use. One of the things you can
do with Spark SQL is to connect to a database through JDBC. You can connect to any
database from Spark SQL as long as there's a JDBC driver for it. For example,
you can use Microsoft SQL server or MySQL as a data source for your program.

To use the Spark SQL components you need to add a new dependency to your SBT file.

libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.1"

After importing the new dependency you get access to a class called
SQLContext which provides access to a bunch of different data sources all of
them oriented towards working with batches of data.

It currently supports things like Parquet, JSON, Apache Hive and JDBC databases.
In this post I will keep it simple and connect to a MySQL database so be sure to
check out the documentation on how to connect to the other data source types.

Our sample program is going to load customers from a table and extract people
called Mike from that table. In order to do that you first need to create the
SparkContext and SQLContext.

val config = new SparkConf()
  .setAppName("sample-app-sql-jdbc")
  .setMaster("local[2]")

val sc = new SparkContext(config)
val sqlContext = new SQLContext(sc)

The only thing you need to put into the SQL context is the SparkContext. This context
is used for coordination purposes.

After you created the SQL context you can start to load data. This looks like this:

val config: SparkConf = new SparkConf()
.setAppName("sample-app-sql-jdbc")
.setMaster("local[2]")

val sc = new SparkContext(config)
val sqlContext = new SQLContext(sc)

val ds = sqlContext.read.format("jdbc").options(
  Map(
    "url" -> "jdbc:mysql://localhost:3306/mydb",
    "dbtable" -> "customers",
    "partitionColumn" -> "country",
    "driver" -> "com.mysql.jdbc.Driver"
  )
).load()

The SQLContext is asked to read data in the JDBC format. As input for this method
you need to provide a map containing a JDBC URL and driver name. This is used to
set up the connection with the database. You also need to provide the name of the table
you want to read from. Finally you specify the name of the partition column.

The partition column needs to be numeric and is used to split the data into manageable
chunks that are spread over the available Spark nodes in the cluster.

Notice that you cannot write SQL queries to get the data. The idea is that all data
in the table is loaded into Spark and processed. So if you need to filter out rows
before they go into Spark you better remove them from the table you're going to process.
Better yet, create a dedicated table for Spark jobs if you want certain rows of data
to be included in a job.

Note I'm using an external JDBC driver, in order to use it you need to modify
a script file in the bin directory of every Spark node called compute_classpath.sh
and include the path to the JAR file containing the JDBC driver. Without this change
your Spark application doesn't work!

After you loaded the data into a data source you can start to work with it.
For example use a filter to filter out rows from the source table.

val numberOfMikesInTheWorld = ds.select("id","name","address","zipcode","city")
  .map(row => row.getAs[String]("name"))
  .filter(name => name.startsWith("Mike"))
  .count()

The first line selects a number of columns from the datasource to work with.
After that I can start to map the rows from the datasource to return just the
name of the customer. Finally I use a filter statement to filter out people
that aren't called mike.

So once you have a data source that loads data from JDBC you can work with it
like with any other RDD (Resilient Distributed Datasource).

JSON and Parquet work just like this. In the case of JSON and Parquet however you
need to specify a path to a file. You can load the files from disk, but I'd suggest
you don't do this. Loading files from disk means that the file has to be available
to every possible Spark node, since you don't know where the Job gets scheduled.

If you do need to read JSON files, load them from HDFS so that
you have a central point to get access to them.

Loading data for streaming Spark applications

While batch jobs are the easiest to build, streaming is probably the use case
that you are going to need the most often.

When you are working with 1 terabyte of data it's not very wise to reload that data
every time you want to process just one new item in that set.

So instead of doing you could go for a method where you send the one single item
to Spark and let Spark process that single item in the context of the whole set.

This works great for a lot of scenarios like process mining, updating a streaming
linear regression model, etc. In fact I think that apart from these kind of scenarios
you can apply streaming also for things that are not related to machine learning at all.

For example, anomaly detection is also a use case where this works. You don't need
old events for detecting anomalies in a new event.

To use streaming Spark you're going to need two things, an external source that
pushes data into Spark and the Spark streaming context.

In order to get the Spark streaming context you need to add the spark-streaming
dependency to your SBT file:

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % "1.5.1",
  "org.apache.spark" %% "spark-streaming-kafka" % "1.5.1"
)

On top of the regular spark-streaming dependency I added the Kafka streaming
implementation as well. Kafka is a messaging framework that is really fast and
because of that it is very suitable to use in combination with Spark.

After you have this dependency in place you can initialize a
new streaming context like this:

val config = new SparkConf()
  .setAppName("streaming-sample-app")
  .setMaster("local[2]")

val sc = new SparkContext(config)
val ssc = new StreamingContext(sc, Seconds(1))

The first parameter for the StreamingContext constructor is the SparkContext
to use. The second parameter determines the duration for the batch.

What happens in Spark streaming is that the program listens for events on a stream
converting those events into a batch. In the sample it listens for events for 1 second
and turns the set of events received in that time window into a single batch to be
processed.

Converting a stream into a batch is necessary, because Spark at its core still is a
batch oriented program. By using the streaming API you effectively decrease the amount
of data that goes into a batch.

A word of advice If you need true streaming processing you have two options.
Either you go with Spark and hope that you end up with batches of just one item.
Or you switch to something like Apache Storm which is build for this sort of thing.
I'd go with the second option, but that's just me :-)

Now that you have a streaming context you can create a receiver for incoming events.
Spark supports things like file streams and event TCP streams, but I'm not going to use
that. Instead I'm going to connect Spark to Apache Kafka.

Apache Kafka is a blazingly fast messaging system that supports clustering
just like Apache Spark. Which means its highly scalable too. Just the sort of
thing you need for building semi-streaming big data appliances.

val kafkaStream = KafkaUtils.createStream(ssc,
  "local01,local02,local03", "my-spark-app",
  Map("events" -> 1))

val counter = kafkaStream.map({ case(key,message) => Person.fromMessage(message) })
  .filter(person => person.name.startsWith("Mike"))
  .count()

In the sample code you first create a new Kafka stream instance to read from.
I specified three Kafka server hosts to connect to. This is something you can do
when you want to make sure that the connection between Kafka and Spark stays available
to the application. Kafka requires the application to identify itself as a consumer group.
In this case this is the "my-spark-app" consumer group. When you connect multiple
applications with the same consumer group a message gets delivered to just one instance
of the consumer group. Additionally you need to specify the topic you want to read from.

Notice that I added the value one in the map for the topics. This is the number of threads
or partitions that the application is reading from. This doesn't influence the amount
of partitions Spark is going to use to process the incoming events.

Balancing the amount of Kafka and Spark partitions is not a topic in this post, but
I suggest you run a load test to determine the best combination for your solution.

Once you have a stream in the app, you can work with in the same way as you would
with normal RDDs, mapping, filtering and sorting data.

So what do you do with the results?

I've written a lot about getting data into Spark, but what do you do once you have
the results of your computation?

Spark allows you to not only load data from SQL or Streams,
but it also allows you to store data in SQL or Streams. Depending on
your use case you use either one of these.

For example, if you need to alert another system based on your computations you
are better off using something like Kafka to deliver the alert to that system.

If you want to query the results from some other part of your solution you can
store the results of your computation in something like Cassandra.

You will need to write a separate REST service on top of the output data source
to allow clients to query the results. Spark itself doesn't offer a way to do this
and frankly you shouldn't do that sort of thing with Spark. Spark is only a computational
engine to use in big data scenarios.

Where to find more information

If you want to know more about connecting SQL data sources and streams to Spark
I suggest you take a look at the following websites: