Change feed processor in Azure Cosmos DB

APPLIES TO: NoSQL

The change feed processor is part of the Azure Cosmos DB .NET V3 and Java V4 SDKs. It simplifies the process of reading the change feed and distributes the event processing across multiple consumers effectively.

The main benefit of using the change feed processor is its fault-tolerant design, which assures an "at-least-once" delivery of all the events in the change feed.

Supported SDKs

.Net V3 Java Node.JS Python

Components of the change feed processor

The change feed processor has four main components:

  • The monitored container: The monitored container has the data from which the change feed is generated. Any inserts and updates to the monitored container are reflected in the change feed of the container.

  • The lease container: The lease container acts as state storage and coordinates the processing of the change feed across multiple workers. The lease container can be stored in the same account as the monitored container or in a separate account.

  • The compute instance: A compute instance hosts the change feed processor to listen for changes. Depending on the platform, it might be represented by a virtual machine (VM), a Kubernetes pod, an Azure App Service instance, or an actual physical machine. The compute instance has a unique identifier that's called the instance name throughout this article.

  • The delegate: The delegate is the code that defines what you, the developer, want to do with each batch of changes that the change feed processor reads.

To further understand how these four elements of the change feed processor work together, let's look at an example in the following diagram. The monitored container stores items and uses 'City' as the partition key. The partition key values are distributed in ranges (each range represents a physical partition) that contain items.

The diagram shows two compute instances, and the change feed processor assigns different ranges to each instance to maximize compute distribution. Each instance has a different, unique name.

Each range is read in parallel. A range's progress is maintained separately from other ranges in the lease container through a lease document. The combination of the leases represents the current state of the change feed processor.

Change feed processor example

Implement the change feed processor

The change feed processor in .NET is available for latest version mode and all versions and deletes mode. All versions and deletes mode is in preview and is supported for the change feed processor beginning in version 3.40.0-preview.0. The point of entry for both modes is always the monitored container.

To read using latest version mode, in a Container instance, you call GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

To read using all versions and deletes mode, call GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes from the Container instance:

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

For both modes, the first parameter is a distinct name that describes the goal of this processor. The second name is the delegate implementation that handles changes.

Here's an example of a delegate for latest version mode:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Here's an example of a delegate for all versions and deletes mode:

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

Afterward, you define the compute instance name or unique identifier by using WithInstanceName. The compute instance name should be unique and different for each compute instance you're deploying. You set the container to maintain the lease state by using WithLeaseContainer.

Calling Build gives you the processor instance that you can start by calling StartAsync.

Note

The preceding code snippets are taken from samples in GitHub. You can get the sample for latest version mode or all versions and deletes mode.

Processing life cycle

The normal life cycle of a host instance is:

  1. Read the change feed.
  2. If there are no changes, sleep for a predefined amount of time (customizable by using WithPollInterval in the Builder) and go to #1.
  3. If there are changes, send them to the delegate.
  4. When the delegate finishes processing the changes successfully, update the lease store with the latest processed point in time and go to #1.

Error handling

The change feed processor is resilient to user code errors. If your delegate implementation has an unhandled exception (step #4), the thread that is processing that particular batch of changes stops, and a new thread is eventually created. The new thread checks the latest point in time that the lease store has saved for that range of partition key values. The new thread restarts from there, effectively sending the same batch of changes to the delegate. This behavior continues until your delegate processes the changes correctly, and it's the reason the change feed processor has an "at least once" guarantee.

Note

In only one scenario, a batch of changes is not retried. If the failure happens on the first-ever delegate execution, the lease store has no previous saved state to be used on the retry. In those cases, the retry uses the initial starting configuration, which might or might not include the last batch.

To prevent your change feed processor from getting "stuck" continuously retrying the same batch of changes, you should add logic in your delegate code to write documents, upon exception, to an errored-message queue. This design ensures that you can keep track of unprocessed changes while still being able to continue to process future changes. The errored-message queue might be another Azure Cosmos DB container. The exact data store doesn't matter. You simply want the unprocessed changes to be persisted.

You also can use the change feed estimator to monitor the progress of your change feed processor instances as they read the change feed, or you can use life cycle notifications to detect underlying failures.

Life cycle notifications

You can connect the change feed processor to any relevant event in its life cycle. You can choose to be notified to one or all of them. The recommendation is to at least register the error notification:

  • Register a handler for WithLeaseAcquireNotification to be notified when the current host acquires a lease to start processing it.
  • Register a handler for WithLeaseReleaseNotification to be notified when the current host releases a lease and stops processing it.
  • Register a handler for WithErrorNotification to be notified when the current host encounters an exception during processing. You need to be able to distinguish whether the source is the user delegate (an unhandled exception) or an error that the processor encounters when it tries to access the monitored container (for example, networking issues).

Life cycle notifications are available in both change feed modes. Here's an example of life cycle notifications in latest version mode:

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Deployment unit

A single change feed processor deployment unit consists of one or more compute instances that have the same value for processorName and the same lease container configuration, but different instance names. You can have many deployment units in which each unit has a different business flow for the changes and each deployment unit consists of one or more instances.

For example, you might have one deployment unit that triggers an external API each time there's a change in your container. Another deployment unit might move data in real time each time there's a change. When a change happens in your monitored container, all your deployment units are notified.

Dynamic scaling

As mentioned earlier, within a deployment unit, you can have one or more compute instances. To take advantage of the compute distribution within the deployment unit, the only key requirements are that:

  • All instances should have the same lease container configuration.
  • All instances should have the same value for processorName.
  • Each instance needs to have a different instance name (WithInstanceName).

If these three conditions apply, then the change feed processor distributes all the leases that are in the lease container across all running instances of that deployment unit, and it parallelizes compute by using an equal-distribution algorithm. A lease is owned by one instance at any time, so the number of instances shouldn't be greater than the number of leases.

The number of instances can grow and shrink. The change feed processor dynamically adjusts the load by redistributing it accordingly.

Moreover, the change feed processor can dynamically adjust a container's scale if the container's throughput or storage increases. When your container grows, the change feed processor transparently handles the scenario by dynamically increasing the leases and distributing the new leases among existing instances.

Starting time

By default, when a change feed processor starts for the first time, it initializes the lease container and starts its processing life cycle. Any changes that happened in the monitored container before the change feed processor is initialized for the first time aren't detected.

Reading from a previous date and time

It's possible to initialize the change feed processor to read changes starting at a specific date and time by passing an instance of DateTime to the WithStartTime builder extension:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

The change feed processor is initialized for that specific date and time, and it starts to read the changes that happened afterward.

Reading from the beginning

In other scenarios, like in data migrations or if you're analyzing the entire history of a container, you need to read the change feed from the beginning of that container's lifetime. You can use WithStartTime on the builder extension, but pass DateTime.MinValue.ToUniversalTime(), which generates the UTC representation of the minimum DateTime value like in this example:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

The change feed processor is initialized, and it starts reading changes from the beginning of the lifetime of the container.

Note

These customization options work only to set up the starting point in time of the change feed processor. After the lease container is initialized for the first time, changing these options has no effect.

Customizing the starting point is only available for latest version change feed mode. When using all versions and deletes mode you must start reading from the time the processor is started, or resume from a prior lease state that is within the continuous backup retention period of your account.

Change feed and provisioned throughput

Change feed read operations on the monitored container consume request units. Make sure that your monitored container isn't experiencing throttling. Throttling adds delays in receiving change feed events on your processors.

Operations on the lease container (updating and maintaining state) consume request units. The higher the number of instances that use the same lease container, the higher the potential consumption of request units. Make sure that your lease container isn't experiencing throttling. Throttling adds delays in receiving change feed events. Throttling can even completely end processing.

Share the lease container

You can share a lease container across multiple deployment units. In a shared lease container, each deployment unit listens to a different monitored container or has a different value for processorName. In this configuration, each deployment unit maintains an independent state on the lease container. Review the request unit consumption on a lease container to make sure that the provisioned throughput is enough for all the deployment units.

Advanced lease configuration

Three key configurations can affect how the change feed processor works. Each configuration affects the request unit consumption on the lease container. You can set one of these configurations when you create the change feed processor, but use them carefully:

  • Lease Acquire: By default, every 17 seconds. A host periodically checks the state of the lease store and consider acquiring leases as part of the dynamic scaling process. This process is done by executing a Query on the lease container. Reducing this value makes rebalancing and acquiring leases faster, but it increases request unit consumption on the lease container.
  • Lease Expiration: By default, 60 seconds. Defines the maximum amount of time that a lease can exist without any renewal activity before it's acquired by another host. When a host crashes, the leases it owned are picked up by other hosts after this period of time plus the configured renewal interval. Reducing this value makes recovering after a host crash faster, but the expiration value should never be lower than the renewal interval.
  • Lease Renewal: By default, every 13 seconds. A host that owns a lease periodically renews the lease, even if there are no new changes to consume. This process is done by executing a Replace on the lease. Reducing this value lowers the time that's required to detect leases lost by a host crashing, but it increases request unit consumption on the lease container.

Where to host the change feed processor

The change feed processor can be hosted in any platform that supports long-running processes or tasks. Here are some examples:

Although the change feed processor can run in short-lived environments because the lease container maintains the state, the startup cycle of these environments adds delays to the time it takes to receive notifications (due to the overhead of starting the processor every time the environment is started).

Role-based access requirements

When using Microsoft Entra ID as authentication mechanism, make sure the identity has the proper permissions:

  • On the monitored container:
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • On the lease container:
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

Additional resources

Next steps

Learn more about the change feed processor in the following articles: