Поделиться через


Event Processor Host Best Practices Part 1

Azure Event Hubs is a very powerful telemetry ingestion service that was created by the Service Bus team. It went into General Availability in late October and today can be used to stream millions of events for very low cost. The key to scale for Event Hubs is the idea of partitioned consumers. In contrast to competing consumers pattern partitioned consumers enables very high scale by removing the contention bottleneck and facilitating end to end parallelism. This pattern does require some tradeoffs that can be difficult to deal with - specifically reader coordination and offset tracking - both explained in the Event Hubs Overview on MSDN. In an effort to alleviate this overhead the Service Bus team has created EventProcessorHost an intelligent agent for .NET consumers that manages partition access and per partition offset for consumers. 

 

To use this class you first must implement the IEventProcessor interface which has three methods: OpenAsync, CloseAsync, and ProcessEventsAsnyc. A simple implementation is shown below.

 

 

 

After implementing this class instantiate EventProcessorHost providing the necessary parameters to the constructor

  • Hostname - be sure not to hard code this, each instance of EventProcessorHost must have a unique value for this within a consumer group.
  • EventHubPath - this is an easy one.
  • ConsumerGroupName - also an easy one, "$Default" is the name of the default consumer group, but it generally is a good idea to create a consumer group for your specific aspect of processing.
  • EventHubConnectionString - this is the connection string to the particular event hub, which can be retrieved from the Azure portal. This connection string should have Listen permissions on the Event Hub.
  • StorageConnectionString - this is the storage account that will be used for partition distribution and leases. When Checkpointing the lastest offset values will also be stored here.

 

Finally call RegisterEventProcessorAsync on the EventProcessorHost and register your implementation of IEventProcessor. At this point the agent will begin obtaining leases for partitions and creating receivers to read from them. For each partition that a lease is acquired for an instance of your IEventProcessor class will be created and then used for processing events from that specific partition.

 

Thread safety & processor instances

It's important to know that by default EventProcessorHost is thread safe and will behave in a synchronous manner as far as your instance of IEventProcessor is concerned. When events arrive for a particular partition ProcessEventsAsync will be called on the IEventProcessor instance for that partition and will block further calls to ProcessEventsAsync for the particular partition. Subsequent messages and calls to ProcessEventsAsync will queue up behind the scenes as the message pump continues to run in the background on other threads. This thread safety removes the need for thread safe collections and dramatically increases performance.

 

Receiving Messages

Each call to ProcessEventsAsync will deliver a collection of events. It is your responsibility to do whatever it is you intend to do with these events. Keep in mind you want to keep whatever it is you're doing relatively fast - i.e. don't try to do many processes from here - that's what consumer groups are for. If you need to write to storage and do some routing it is generally better to use two consumer groups and have two IEventProcessor implementations that run separately. 

 

At some point during your processing you're going to want to keep track of what you have read and completed. This will be critical if you have to restart reading - so you don't start back at the beginning of the stream. EventProcessorHost greatly simplifies this with the concept of Checkpoints. A Checkpoint is a location, or offset, for a given partition, within a given consumer group, where you are satisfied that you have processed the messages up to that point. It is where you are currently "done". Marking a checkpoint in EventProcessorHost is accomplished by calling the CheckpointAsync method on the PartitionContext object. This is generally done within the ProcessEventsAsync method but can be done in CloseAsync as well.

 

CheckpointAsync has two overloads: the first, with no parameters, checkpoints to the highest event offset within the collection returned by ProcessEventsAsync. This is a "high water mark" in that it is optimistically assuming you have processed all recent events when you call it. If you use this method in this way be aware that you are expected to perform this after your other event processing code has returned. The second overload allows you to specify an EventData instance to checkpoint to. This allows you to use a different type of watermark to checkpoint to. With this you could implement a "low water mark" - the lowest sequenced event you are certain has been processed. This overload is provided to enable flexibility in offset management.

 

When the checkpoint is performed a JSON file with partition specific information, the offset in particular, is written to the storage account supplied in the constructor to EventProcessorHost. This file will be continually updated. It is critical to consider checkpointing in context - it would be unwise to checkpoint every message. The storage account used for checkpointing probably wouldn't handle this load, but more importantly checkpointing every single event is indicative of a queued messaging pattern for which a Service Bus Queue may be a better option than an Event Hub. The idea behind Event Hubs is that you will get at least once delivery at great scale. By making your downstream systems idempotent it is easy to recover from failures or restarts that result in the same events being received multiple times.

 

Shutting down gracefully

Finally EventProcessorHost.UnregisterEventProcessorAsync allows for the clean shut down of all partition readers and should always be called when shutting down an instance of EventProcessorHost. Failure to do this can cause delays when starting other instances of EventProcessorHost due to lease expiration and Epoch conflicts.

 

The next blog will discuss lease management, auto scale, and runtime options.

Comments

  • Anonymous
    February 02, 2015
    stackoverflow.com/.../27616614 What role does the Task returned by ProcessEventsAsync serve? ie is it the completion of that task that blocks additional calls to ProcessEventsAsync, or is it simply returning from the method that allows it to be called again? Does the EventProcessorHost give a thread to each partition for running ProcessEvnetsAsync so that they can be slow (as implied by "will block further calls to ProcessEventsAsync for the particular partition" vs all partitions)?

  • Anonymous
    February 02, 2015
    I guess, the correct link to IEventProcessor is msdn.microsoft.com/.../microsoft.servicebus.messaging.ieventprocessor.aspx Is it the infrastructure technology which used by Rx in Cloud used by Cortana, the one is talked about by Bart De Smet?

  • Anonymous
    February 02, 2015
    Christian - The task is to wait on for completion - your second paragraph is almost right - it doesn't get an exclusive thread, but it does get thread separation (thread pool or something behind the scenes). All that happens is the message pump doesn't deliver to that IEventProcessor until the previous task is complete. It can get slow if you do slow things, but is not slow itself. If you're asking do we just Thread.Create for every partition, no.

  • Anonymous
    February 06, 2015
    Seems the ProcessEventsAsync gets all messages from the last checkpoint to the current time. Is that assumption correct? Is that mean the ProcessEventsAsync finishes a loop after process all these messages? What defines the size of the messages parameter? What is the protocol of calling ProcessEventsAsync? Do we have to include it in the infinite loop in our code OR we just register MyProcessor class and that's enough? Is it possible to add the sequential diagram, because the whole process workflow is not clear???

  • Anonymous
    February 18, 2015
    Shouldn't "messageCount" be reset to zero after the CheckpointAsync operation has been called here?

  • Anonymous
    April 14, 2015
    I ran into an issue where in my eventHub I have times where I don't have any data pretty often during this time, I have seen the checkpointing fails and leases get lost (even if I assign one EventProcessor to all instances). This causes duplicate consumption of messages too often from my queue.   What is the recommended way to avoid this? My downstream is okay with multiple events, but not too frequently as it makes some calculations/telemetry unpredictable.

  • Anonymous
    April 28, 2015
    How do we handle the case where I want multiple processors (using its own consumer group) for each event type ?  For eample, the sensor sends heartbeat and lowbattery events. I need two IEventProcessors to process these messages and they will use "heartbeatsubscription" and "lowbatterysubscription" as the consumer groups.

  • Anonymous
    July 08, 2015
    The comment has been removed

  • Anonymous
    September 03, 2015
    On occaision I am in a position inside of ProcessEventsAsync where I can not successfully process any of the events. For instance if I am storing the data and storage is temporarily unavailable. What is the best way for me to signal that I want the messages to be redelivered? I am not convinced that setting a checkpoint to an older version will do it. Is there something that I can return to tell the caller that I could not process anything?

    • Anonymous
      June 21, 2016
      If you can't process any messages, then don't call context.CheckpointAsync().
  • Anonymous
    July 21, 2016
    Is there a download link for this sample code?