Sdílet prostřednictvím


Spark and Zeppelin BI using Azure HDInsight with OpenData Lombardia datasets

Intro

In the previous blog post (Azure Data Lake BI sample using Open Data Lombardia) we've shown how easy is to perform analytics using Microsoft Data Lake and U-SQL. An alternative to that is to use the Apache Spark engine (https://spark.apache.org/) on top of an HDInsight cluster. Spark on HDInsight is at the time of writing still in preview but usable for testing purposes nonetheless. It also comes with Zeppelin installed (see https://zeppelin.incubator.apache.org/ for further details) which is a powerful notebook capable of interrogating and rendering our data.

Spark

Spark is a yarn aware engine built using Scala programming language. Scala itself is a JVM compatible functional and object oriented programming language (see https://www.scala-lang.org/). Functional programming is increasingly gaining traction because CPU speeds are not growing as they did few years ago. The solution is increasing the number of CPU (dual-core, quad-core and so on) forcing the developers to think in terms of threads, processes or even servers (as in the case of Hadoop clusters). Functional programming languages, based on the premise of data immutability, handle concurrency problems very well leading to elegant programs that scale horizontally across cores, processes or even machines. Scala, for instance, has Akka actors (https://akka.io/) for this purpose but many other languages and frameworks are available. For example there are F#, Clojure and Haskell (just to name a few). Spark allows your Scala program to be executed on a Yarn cluster that runs in parallel on multiple machines. Since the data is immutable in functional programs Spark gives you the Resilient Distributed Dataset (aka RDD) which is a cached dataset with the ability to regenerate itself. In other words the RDDs can give you the speed of cached data and the ability to recover from failures (which in Hadoop clusters is common).
Let's see it in action.

HDInsight

Normally provisioning an Hadoop cluster with Scala, Spark and Zeppelin is a long and error prone task. There are precompiled versions but you have to fiddle with dependencies, configuration files, ssh keys and so on. Luckily with Azure this boils down to a few click in the UI :smile:.

Just provision an HDInsight cluster and take care to pick Spark as cluster type:

As usual you have to configure the Data Source and the Credentials:

Note: at the moment of writing you won't find Zeppelin in ubuntu-based HDInsight clusters, only in Windows ones so pick accordingly.

File upload

You can upload your datasets in many ways. For the sake of this demo I'll be using the Azure PowerShell cmdletSet-AzureStorageBlobContent like this:

Alternatively you can use Azure Data Lake storage for your datasets.

Zeppelin

Once the datasets are upload to Zeppelin notebook we can connect to Zeppelin and start processing them. Zeppelin exposes the implicitSparkContext as the sc variable (more on SparkContext here https://spark.apache.org/docs/latest/programming-guide.html).
The SparkContext allows us to interact with the Yarn cluster and issue Spark statements (and of course Scala ones). Let's load our file.

CSV load

This is simply done by this command:

Where wasb is the URI for Windows Azure Storage Blob.

In order to inspect our file we can use the take n function that will return the first n rows in our file. We can for example print them to our Zeppelin output like this:

This is equivalent this more elegant version:

The output will be like this one:

As we can see our file has a nice header on top. We want to filter it so let's assign to a constant first and then let's create a dependent dataset without the header:

From CSV to case class

In Spark you generally map your CSV to a case class. This is useful because it gives you the chance to perform type mapping. As we have seen from the previous image we have:

  1. SensorId
  2. An event time
  3. Some kind of float value
  4. A status constant

The date field is can be handled in many ways. For this example I'm going to map using a SparkSQL function called to_utc_timestamp (see DateFunctionsSuite.scala). This function expects a DataFrame - which we don't currently have. Let's create it:

This code simply defines a case class with the required fields, maps our CSV to the fields and then registers the temporary table in SparkSQL with the name of EventSQL. Notice we did not convert the date time field but we did an in-place conversion for the float field.

Now that we have a temp table (our DataFrame) we can create views on it. We do so in order to convert our DataTime string field to a more usable one:

This two rows create a new table called Events. We can inspect it directly in Zeppelin by telling it we want to perform a SparkSQL query. In order to do so all we have to do is start the paragraph with the %sql placeholder.

Graphs in Zeppelin

Zeppelin can represent your data graphically. It also gives you powerful BI tools. Let's start with something simple:

Neat. Notice we can inject parameters in our query (the $(ParameterName=DefaultValue) part). Zeppelin will show text boxes allowing our users to customize the query without touching the SQL. This is something very similar of what you can achieve in SQL Server Reporting Services.

Of course the SensorID field not descriptive. We need to join the dimension file to give our dataset more meaning.

Dimension file

Creating the dimension table is straightforward. The drill is the same: load the file, split by comma and map the fields to a case class.

This step is a little verbose because we handle the variable column CSV. We first define a function, someOrNull, returning an Option[String]. That means our function can return a String but also return nothing. The Option[T] pattern is an elegant way to express the possibility of not receiving data back from the function. It replaces the conventional null (and the misleading empty string) without exposing your code to nasty NullPointerExceptions.
Notice also we cached the array. Spark being a lazy language will not perform the operation right away but will cache the first execution instead. We will see later in the execution plan how the cached dataset appears.

BI

Now we can answer some interesting questions. For example what was the temperature in Milan - Lambrate during 2015, October 10th?

Notice how Zeppelin allows us to customize the graph right into the notebook. By selecting the keys and values we get the nice timeline shown above.

Stacked bars

Zeppelin has many interesting graph options. We can, for example, show the temperature from two different sensors side-by-side during Christmas:

This graph tells us Lambrate was generally colder than Via Feltre except in the early afternoon.

Let's compare two different cities:

Edolo is noticeably colder. It's to be expected since it's over 750 mt above the sea level. It's interesting how in the evening the two temperatures seem to match though.
Now let's find the location with the biggest thermal excursion in our dataset.

What about the biggest thermal excursion during a day? A simple group by will do the trick.

Let's plot the first 2 cities (ignoring the different day):

From this graph it's clear we have spurious data (unless we can accept a sudden temperature drop of more than 20 degrees celsius) so our graphical representation has shown a problem we could have overlooked just by looking at raw data.

Execution plan

Behind the curtain our SparkSQL query - along with all its prerequisites - is converted to an execution plan. Yarn shows it for us we want it to. For example, the "biggest thermal excursion during a day" query has this execution plan:

Or, in more SQL-like terms:

Notice the first execution plan has a green dot. This represents the Existing RDD in the second plan. The Existing RDD is the cached version of an RDD. Think about it as a materialized view. Of course since we are in a distributed world, the cached RDD is distributed also. And since the RDD is able to rebuild itself upon need, a change in the HDInsight cluster topology - either due to a failure or an external event - does not invalidate the whole plan. Only the nodes without cached RDD will rebuild their copy.

Happy coding,
Francesco Cogno and Simone Greci