Freigeben über


Event Processor Host Best Practices Part 2

In the last blog we talked about the basics of the EventProcessorHost. In this blog we'll talk about some deeper details around lease management and EventProcessorHost options.

Lease management

Checkpointing is not the only use of the storage connection string performed by EventProcessorHost.  Partition ownership (that is reader ownership) is also performed for you.  This way only a single reader can read from any given partition at a time within a consumer group.  This is accomplished using Azure Storage Blob Leases and implemented using Epoch.  This greatly simplifies the auto-scale nature of EventProcessorHost.  As an instance of EventProcessorHost starts it will acquire as many leases as possible and begin reading events. As the leases draw near expiration EventProcessorHost will attempt to renew them by placing a reservation. If the lease is available for renewal the processor continues reading, but if it is not the reader is closed and CloseAsync is called - this is a good time to perform any final cleanup for that partition.

EventProcessorHost has a member PartitionManagerOptions. This member allows for control over lease management. Set these options before registering your IEventProcessor implementation.

Controlling the runtime

Additionally the call to RegisterEventProcessorAsync allows for a parameter EventProcessorOptions. This is where you can control the behavior of the EventProcessorHost itself. There are four properties and one event that you should be aware of.

MaxBatchSize - this is the maximum size of the collection the user wants to receive in an invocation of ProcessEventsAsync. Note that this is not the minimum, only the maximum. If there are not this many messages to be received the ProcessEventsAsync will execute with as many as were available.

PrefetchCount - this is a value used by the underlying AMQP channel to determine the upper limit of how many messages the client should receive. This value should be greater than or equal to MaxBatchSize.

InvokeProcessorAfterReceiveTimeout - setting this parameter to true will result in ProcessEventsAsync being called when the underlying call the receive events on a partition times out. This is useful for taking time based actions during periods of inactivity on the partition.

InitialOffsetProvider - this allows a function pointer or lambda expression to be set that will be called to provide the initial offset when a reader begins reading a partition. Without setting this the reader will start at the oldest event unless a JSON file with an offset has already been saved in the storage account supplied to the EventProcessorHost constructor. This is useful when you want to change the behavior of reader start up. When this method is invoked the object parameter will contain the partition id that the reader is being started for.

ExceptionReceived  - this event allows you to receive notification of any underlying exceptions that occur in the EventProcessorHost. If things aren't working as you expect, this is a great place to start looking. 

If there is one best practice you take from this blog make sure it is to use the ExceptionReceived event when things don't work the way you expect.

Comments

  • Anonymous
    February 06, 2015
    The purpose of leas is not clear. Is it for managing the limited number of receivers or groups or anything else? What is the lease protocol / life cycle??? Is it possible to add a diagram of it?

  • Anonymous
    February 26, 2015
    The lease is for "locking" the partition so that other readers know it is being read. The instances of EventProcessorHost don't need to talk directly to each other because they communicate via the leasing mechanism via a greedy algorithm.

  • Anonymous
    April 20, 2015
    Hi Dan, Is there any option to insert dependencies in IEventProcessor class implementation ?  Since RegisterEventProcessorAsync<> does not take any parameters, how do I pass my custom objects to my IEventProcessor class implementation ? Appreciate your inputs here.

  • Anonymous
    April 28, 2015
    During some testing at fairly moderate load with 2 instances hosted in Worker Roles, I noticed that on occasion the Event Processors started reprocessing large numbers of events, much larger than would have been received within the checkpoint time that I had set to 1 minute. In fact it seemed like the stream was completely rewound. As this data was displayed on a dashboard it meant the reporting was very inaccurate. The only way to then stop this was to stop and restart the worker roles after which the Event Processors seemed to pick up from the last checkpoint. In the end I had to stop and start every 30 minutes or this behavior would be seen. I simply followed various blog articles on how to create these in roles. Any thoughts on what may have happened, what I may have done wrong and how it could be avoided?

  • Anonymous
    August 10, 2015
    Shadab, You can use the register factory method and pass custom objects using its ctor to your implementation of CreateEventProcessor.

  • Anonymous
    September 08, 2015
    Hi Dan Rosanova MSFT, Could you elaborate on using lease in eventprocessorhost. How it behaves during auto scaling, will it process duplicate event data in event hub

  • Anonymous
    September 08, 2015
    Hi Dan Rosanova MSFT, Could you elaborate a little on using lease in event processor host. How to avoid duplicate processing of event data during auto scaling of event receivers.

  • Anonymous
    August 18, 2016
    Hi Dan Rosanova MSFT, While doing the test of load performance, with 12 instance of sender I could able to send 600K Events to Event hub in less than minute. But on receiver side it taking 14 minutes to read/Fetch all 600K events from event hub. I am making use of Default consumer group and user 32 partition for Event hub.I would like to fetch 600K from receiver for a particular consumer group. How should I get it?RegardsAnil