Dela via


ChangeFeedEventHost Class

Definition

Caution

Switch to the ChangeFeedProcessorBuilder class and use the BuildAsync method for building the change feed processor host or the BuildEstimatorAsync method for building the remaining work estimator.

Simple host for distributing change feed events across observers and thus allowing these observers scale. It distributes the load across its instances and allows dynamic scaling:

  • Partitions in partitioned collections are distributed across instances/observers.
  • New instance takes leases from existing instances to make distribution equal.
  • If an instance dies, the leases are distributed across remaining instances. It's useful for scenario when partition count is high so that one host/VM is not capable of processing that many change feed events. Client application needs to implement IChangeFeedObserver and register processor implementation with ChangeFeedEventHost.
[System.Obsolete("Switch to the ChangeFeedProcessorBuilder class and use the BuildAsync method for building the change feed processor host or the BuildEstimatorAsync method for building the remaining work estimator.")]
public class ChangeFeedEventHost
[<System.Obsolete("Switch to the ChangeFeedProcessorBuilder class and use the BuildAsync method for building the change feed processor host or the BuildEstimatorAsync method for building the remaining work estimator.")>]
type ChangeFeedEventHost = class
Public Class ChangeFeedEventHost
Inheritance
ChangeFeedEventHost
Attributes

Examples

class DocumentFeedObserver : IChangeFeedObserver
{
    private static int s_totalDocs = 0;
    public Task OpenAsync(ChangeFeedObserverContext context)
    {
        Console.WriteLine("Worker opened, {0}", context.PartitionKeyRangeId);
        return Task.CompletedTask;  // Requires targeting .NET 4.6+.
    }
    public Task CloseAsync(ChangeFeedObserverContext context, ChangeFeedObserverCloseReason reason)
    {
        Console.WriteLine("Worker closed, {0}", context.PartitionKeyRangeId);
        return Task.CompletedTask;
    }
    public Task ProcessChangesAsync(ChangeFeedObserverContext context, IReadOnlyList<Document> docs)
    {
        Console.WriteLine("Change feed: total {0} doc(s)", Interlocked.Add(ref s_totalDocs, docs.Count));
        return Task.CompletedTask;
    }
}
static async Task StartChangeFeedHost()
{
    string hostName = Guid.NewGuid().ToString();
    DocumentCollectionInfo documentCollectionLocation = new DocumentCollectionInfo
    {
        Uri = new Uri("https://YOUR_SERVICE.documents.azure.com:443/"),
        MasterKey = "YOUR_SECRET_KEY==",
        DatabaseName = "db1",
        CollectionName = "documents"
    };
    DocumentCollectionInfo leaseCollectionLocation = new DocumentCollectionInfo
    {
        Uri = new Uri("https://YOUR_SERVICE.documents.azure.com:443/"),
        MasterKey = "YOUR_SECRET_KEY==",
        DatabaseName = "db1",
        CollectionName = "leases"
    };
    Console.WriteLine("Main program: Creating ChangeFeedEventHost...");
    ChangeFeedEventHost host = new ChangeFeedEventHost(hostName, documentCollectionLocation, leaseCollectionLocation);
    await host.RegisterObserverAsync<DocumentFeedObserver>();
    Console.WriteLine("Main program: press Enter to stop...");
    Console.ReadLine();
    await host.UnregisterObserversAsync();
}

Remarks

It uses auxiliary document collection for managing leases for a partition. Every EventProcessorHost instance is performing the following two tasks: 1) Renew Leases: It keeps track of leases currently owned by the host and continuously keeps on renewing the leases. 2) Acquire Leases: Each instance continuously polls all leases to check if there are any leases it should acquire for the system to get into balanced state.

Constructors

ChangeFeedEventHost(String, DocumentCollectionInfo, DocumentCollectionInfo, ChangeFeedHostOptions)

Initializes a new instance of the ChangeFeedEventHost class.

ChangeFeedEventHost(String, DocumentCollectionInfo, DocumentCollectionInfo, ChangeFeedOptions, ChangeFeedHostOptions)

Initializes a new instance of the ChangeFeedEventHost class.

ChangeFeedEventHost(String, DocumentCollectionInfo, DocumentCollectionInfo)

Initializes a new instance of the ChangeFeedEventHost class.

Properties

HostName

Gets the host name, which is a unique name for the instance.

Methods

GetEstimatedRemainingWork()

Asynchronously checks the current existing leases and calculates an estimate of remaining work per leased partitions.

RegisterObserverAsync<T>()

Asynchronously registers the observer interface implementation with the host. This method also starts the host and enables it to start participating in the partition distribution process.

RegisterObserverFactoryAsync(IChangeFeedObserverFactory)

Asynchronously registers the observer factory implementation with the host. This method also starts the host and enables it to start participating in the partition distribution process.

UnregisterObserversAsync()

Asynchronously shuts down the host instance. This method maintains the leases on all partitions currently held, and enables each host instance to shut down cleanly by invoking the method with object.

Applies to