Share via


Understanding and Using HDInsight Spark Streaming

There are plenty of blogs and materials out there talking about Spark Streaming. Most of them focus on the internals of Spark Streaming and how in detail Spark Streaming works. I think those are not best suited for developers or data scientists who want to use Spark Streaming. As I worked to enable Spark Streaming for HDInsight, I found that customers are often confused and clueless on how to use Spark Streaming properly in a production system. I hope this blog can help them better understand Spark Streaming from a user's point of view.

1. What is Spark Streaming?

Spark Streaming is the real time data analytics framework provided by Spark. It is a micro-batch system. In a nutshell, you can think it as an additional layer on top of Spark, which reads data from external data source, periodically pushes batch of data to Spark for real time transformation.

2. How does Spark Streaming work?

The key concept in Spark Streaming is a receiver. A receiver is the interface to a specific external system. Before we move on, you need to understand a few basic concepts of Spark:

Application: A user application submitted to Spark. It consists of a driver and many executors.

Driver: A program that manages all executors for a Spark application, declares data transformation and actions on RDDs.

Executor: A process running on the worker node for a Spark application, which stays up for the duration of the application.

Job: A parallel computing consisting of multiple tasks that are spawned in response to a spark action (e.g. save, collect, etc.).

Stage: Smaller set of tasks in a Job that depend on one another.

Task: A unit of work that is sent to one executor. Note that one executor can execute multiple tasks at the same time.

When you submit a spark streaming application, a driver is started, which launches many executors on the worker nodes through sparkmaster. The first job submitted to the application is the receiver job (usually with Job ID 1), which lasts the duration of the streaming application. Each task in the receiver job is one instance of the receiver.

This is how data enters Spark system through a receiver:

Data Source -> Receiver -> Block Generator -> Block Manager

Receiver reads data from the Data Source, and hands off the received data to a component called Block Generator. The Block Generator keeps the data in memory. Every block interval (spark.streaming.blockInterval, defaults to 200ms), it sends the block to Spark Block Manager as a new partition of a RDD. Block Manager is a component running on every Spark node that manages data blocks both locally and remotely. Every batch interval (specified when creating StreamingContext), the RDD is closed and a Spark job (data processing job) is submitted to process that newly generated RDD.

You can see the receiver jobs and all the subsequent data processing jobs in Spark UI. As can be seen, the Job 1 is the receiver job, which has only 1 task (1 instance of receiver). Job 2,3,4,5,6 are data processing jobs, about 10 seconds apart. This is because the batch interval is set to 10 seconds.

3. How to use Spark Streaming with Azure Event Hubs in HDInsight?

A streaming solution consists of not only the Spark Streaming framework, but also the external data source and the persistent store for processed data. For HDInsight, the recommended data source is Azure Event Hubs. Event Hubs is a pub/sub message broker service in the cloud, equivalent to Apache Kafka. For persistent data store, you can use HDFS (in HDInsight this is WASB, basically Azure Storage), HBase, or SQL database. You can find plenty of online examples/tutorials on this topic. Therefore our focus here is Event Hubs.

There is a fantastic tutorial from HDInsight documentation that shows you how to write an end to end Spark Streaming application using Event Hubs:

https://azure.microsoft.com/en-us/documentation/articles/hdinsight-apache-spark-csharp-apache-zeppelin-eventhub-streaming/

This is how you create a DStream from a Event Hubs:

val ehParams = Map[String, String](
  "eventhubs.policyname" -> "<name of policy with listen permissions>",
  "eventhubs.policykey" -> "<key of policy with listen permissions>",
  "eventhubs.namespace" -> "<service bus namespace>",
  "eventhubs.name" -> "<event hub in the service bus namespace>",
  "eventhubs.partition.count" -> "1",
  "eventhubs.consumergroup" -> "$default",
  "eventhubs.checkpoint.dir" -> "/<check point directory in your storage account>",
  "eventhubs.checkpoint.interval" -> "10"
)

val ssc =  new StreamingContext(sc, Seconds(10))

val stream =EventHubsUtils.createUnionStream(ssc, ehParams)

This tutorial uses the EventHubsReceiver to receive data from Event Hubs. Event Hubs supports AMQP protocol, which is how the EventHubsReceiver talks to the Event Hubs server. The EventHubsReceiver uses EventHubsClient, which in turn uses the Apache Qpid project as the AMQP library. Note that the Storm EventHubSpout also uses the same EventHubsClient library.

There is a common mistake in running this tutorial. If you don't assign enough cores to the spark streaming application (or Zeppelin if you use Zeppelin notebook), the application will appear stuck and not produce any results. This is because by design each EventHubsReceiver instance only handles one Event Hubs partition. Each receiver instance requires one CPU core to run, and you need to leave some CPU cores to process the received data. So if you set partition count to N in Event Hubs parameters, you need to make sure you assign 2xN CPU cores to the streaming application.

What this means for HDInsight Spark cluster is that for a default configuration, a 4 node cluster can handle at most 8 Event Hubs partitions (each node has 4 cores, and 16 cores can only handle at most 8 partitions).

The "eventhubs.checkpoint.dir" specifies the checkpoint folder to store the message offset. This parameter needs to be an absolute path. You'll see exceptions when you provides a relative path like "mycheckpoint". You need to use "/mycheckpoint" instead. 

The EventHubsReceiver project is submitted to spark-packages repo here:

https://spark-packages.org/package/hdinsight/spark-eventhubs

Spark-packages is a community maintained repository for all third party Spark integration code. The actual source code is shared on github:

https://github.com/hdinsight/spark-eventhubs

The EventHubsClient source code is shared on github:

https://github.com/hdinsight/eventhubs-client

4. Achieving at-least-once semantics

In general there are two types of receivers, the regular receiver and the reliable receiver. Reliable receiver aims at providing at-least-once semantics, meaning each message is guaranteed to be processed at least once. Regular (or unreliable) receiver do not guarantee at-least once semantics. But with regular receiver you can achieve higher throughput. So if you don't need at-least-once semantics then use the regular receiver.

To the best of my knowledge, the only way to achieve exactly-once semantics in Spark Streaming is to use Kafka as the data source, and use direct stream (DirectKafkaInputDStream.scala) without a receiver.

In HDInsight, we provide both EventHubsReceiver and ReliableEventHubsReceiver so you can choose what you need. The difference in implementation resides in when to do checkpoint. We checkpoint the message offsets in HDFS (in HDInsight this is WASB). Since Block Generator keeps received data in memory for a block interval, it has the risk of losing data when the receiver crashes. ReliableEventHubsReceiver only checkpoints after a block is committed to Block Manager, therefore avoiding data loss. ReliableEventHubsReceiver also needs to work with a Spark Streaming feature called WAL (Write Ahead Log). With WAL enabled, the Block Generator writes a block of data to WAL as well as the Block Manager, avoiding data loss in the later stages of the pipeline. If the driver crashed and restarted, it can reconstruct the RDD partitions from the WAL.

So how do you choose to use the ReliableEventHubsRecriver? You don't need to modify any of the application code (e.g. call EventHubsUtils.createUnionStream() to create a Dstream). You can simply enable WAL (set configuration spark.streaming.receiver.writeAheadLog.enable to true) to enable ReliableEventHubsReceiver.

5. High Availability (HA)

The tutorial we mentioned earlier uses Zeppelin notebook to write a Spark Streaming application. This is quick and convenient, however, it is not production ready. Zeppelin runs the driver on the head node, which will fail in the case of head node failover. Production streaming application needs to run 24x7, regardless of arbitrary node crashes in the cluster.

To achieve High Availability in HDInsight Spark cluster, you need to 1) enable driver checkpointing and 2) RDP to the cluster and submit the Spark Streaming application in cluster mode with supervise:

spark-submit.cmd --deploy-mode cluster --supervise …

This is how HA works in different scenarios:

1) Head node failover

First, we need sparkmaster HA. This means the active sparkmaster saves metadata to Zookeeper, and when head node failover happens, the standby sparkmaster reads metadata from Zookeeper and continue to operate. We have enabled sparkmaster HA by default in HDInsight, so there is no action needed by the user.

Second, we need to submit the application in cluster mode. By default, Spark Streaming driver runs on the head node. During head node failover, the driver is lost and the application is killed. Spark provides a cluster mode in which the driver is shipped to a worker node to run.

2) Worker node failure

If a worker node dies, the tasks assigned to the executors on that worker will be re-assigned somewhere else. There is no problem here. However, if the worker node that runs the driver dies, we need to relaunch the driver. Spark provides a supervise feature in cluster mode that sparkmaster periodically pings the driver and restarts the driver on a different node if it crashed.

Furthermore, the restarted driver should be able to pick up from previous state of the old driver. For this to work we need to enable driver checkpointing. With checkpointing the driver's metadata is stored in HDFS periodically. During startup, the driver tries to read data from the checkpoint folder, and creates StreamingContext from the checkpointed data if found.

For an example of the Spark Streaming application with HA, take a look at this github project:

https://github.com/hdinsight/hdinsight-spark-examples

6. Storm vs. Spark Streaming

Since the inception of Spark Streaming, people are trying to compare it with Apache Storm - an existing successful distributed streaming platform. If you search online, you'll find many articles highlighting the difference as Storm being one-message-at-a-time vs. Spark Streaming's micro-batching implementation. This is only partially true. Regular Storm topology processes one message at a time, which results in low latency but also low throughput. Storm Trident, a software layer on top of Storm, can also do micro-batching and a bunch of high level data transformations.

These are my observations after working on both technologies for a while:

1) Storm is more flexible. With Storm, you can specify the exact processing topology and parallelism levels. With Trident, you can easily achieve exactly-once semantics, which is very hard to achieve in Spark Streaming (as of 1.3).

2) Storm is more reliable. We have been running Storm applications for months and they keep running as expected. As of Spark 1.3, we've observed occasional crashes for Spark Streaming.

3) Spark Streaming is easier to use. Spark Streaming provides higher level abstractions and APIs which make it easier to write business logic. In particular, Spark Streaming provides windowing aggregates out of box, which is not available in Storm.

4) Spark Streaming has an ecosystem. The biggest advantage of Spark Streaming is that it is part of Spark ecosystem. You can use the rich Spark core data transformation functions once the data is pushed into Spark. Another benefit is that if you are interested in implementing an Lambda architecture, your real time and batch data processing can share the same code base.

I also want to stress that Spark Streaming is evolving fast. it is still undergoing heavy development. New features are added constantly. E.g. write ahead log is introduced in 1.2 and direct Kafka stream is introduced in 1.3. So it's reasonable to believe that Spark Streaming will become more feature rich and reliable over time.

So when to use what? My recommendation is this: if you are into Spark and you plan to move your data platform to Spark someday in the future, choose Spark Streaming; otherwise stay with Storm. If you need exactly-once semantics in your application, I would recommend you to use Storm Trident.