你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Azure Cosmos DB 更改源处理器

适用范围: NoSQL

更改源处理器是 Azure Cosmos DB .NET V3Java V4 SDK 的一部分。 它简化了读取更改源的过程,可有效地在多个使用者之间分布事件处理。

使用更改源处理器的主要优点是其容错设计,可保证更改源中的所有事件传递“至少一次”。

支持的 SDK

.Net V3 Java Node.JS Python

更改源处理器的组件

更改源处理器有四个主要组件:

  • 监视的容器:监视的容器具有用于生成更改源的数据。 对受监视的容器的任何插入和更新都会反映在容器的更改源中。

  • 租用容器:租用容器充当状态存储并协调多个辅助角色之间的更改源的处理。 租用容器可以与受监视的容器存储在同一帐户中,也可以存储在单独的帐户中。

  • 计算实例:计算实例承载更改源处理器以便侦听更改。 它可以由虚拟机 (VM)、Kubernetes Pod、Azure 应用服务实例或实际的物理计算机来表示,具体取决于平台。 计算实例具有唯一标识符,在本文中称为“实例名称”。

  • 委托:委托是用于定义开发人员要对更改源处理器读取的每一批更改执行何种操作的代码。

若要进一步了解更改源处理器的四个元素是如何协同工作的,请看下图中的一个示例。 受监视的容器会存储项,并将“City”用作分区键。 分区键值分布在包含项的范围内(每个范围表示一个物理分区)。

图中显示了两个计算实例,更改源处理器向每个实例分配不同的范围,以最大程度地提高计算分布率。 每个实例拥有一个不同的唯一名称。

每个区域都是并行读取的。 范围进程的维护通过租用文档独立于租用容器中的其他范围。 租用的组合表示更改源处理器的当前状态。

更改源处理器示例

实现更改源处理器

.NET 中的更改源处理器适用于最新版本模式所有版本与删除模式。 所有版本与删除模式以预览版提供,从版本 3.40.0-preview.0 开始支持用于更改源处理器。 两种模式的入口点始终是受监视的容器。

若要使用最新版本模式进行读取,请在 Container 实例中调用 GetChangeFeedProcessorBuilder

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

若要使用所有版本与删除模式进行读取,请从 Container 实例调用 GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

对于这两种模式,第一个参数是描述此处理器的目标的非重复名称。 第二个名称是处理更改的委托实现。

下面是最新版本模式的委托示例:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

下面是所有版本与删除模式的委托示例:

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

然后,使用 WithInstanceName 定义计算实例名称或唯一标识符。 对于要部署的每个计算实例,计算实例名称应该是唯一且不同的。 使用 WithLeaseContainer 设置容器以保持租约状态。

调用 Build 可让你获得可通过调用 StartAsync 启动的处理器实例。

注意

上述代码片段摘自 GitHub 中的示例。 可获取最新版本模式的示例,或所有版本和删除模式的示例。

处理生命周期

主机实例的正常生命周期为:

  1. 读取更改源。
  2. 如果没有发生更改,请在一段预定义的时间内保持睡眠状态(可在生成器中使用 WithPollInterval 进行自定义),然后转到 #1。
  3. 如果发生了更改,请将其发送给委托。
  4. 委托成功地处理更改后,以最新的处理时间点更新租用存储,然后转到 #1。

错误处理。

更改源处理器可在发生用户代码错误后复原。 如果委托实现发生未经处理的异常(步骤 #4),处理那一批特定更改的线程将停止,并最终创建新的线程。 新线程将检查租约存储为该分区键值范围保存的最新时间点。 新线程将从该时间点重启,从而有效地向委托发送同一批更改。 此行为将一直持续到委托正确处理完更改为止,这也是更改源处理器能够提供“至少一次”保证的原因。

注意

当仅有一个方案时,不会重试一批更改。 如果第一次执行委托时发生故障,则租用存储没有以前保存的状态可在重试时使用。 在这种情况下,重试将使用初始启动配置,该配置可能包含也可能不包含最后一批次。

若要防止更改源处理器不断地重试同一批更改,应在委托代码中添加逻辑,以便在出现异常时将文档写入出错的消息队列中。 此设计可确保你可以跟踪未处理的更改,同时仍然能够继续处理将来的更改。 出错消息队列可能是另一个 Azure Cosmos DB 容器。 确切的数据存储并不重要。 只需保留未处理的更改。

还可使用更改源估算器在更改源处理器实例读取更改源时监视其进度,或使用生命周期通知来检测潜在故障。

生命周期通知

可以将更改源处理器连接到其生命周期中的任何相关事件。 可以选择接收其中一个或全部事件的通知。 建议至少注册错误通知:

  • WithLeaseAcquireNotification 注册处理程序,以便在当前主机获得租约开始处理它时收到通知。
  • WithLeaseReleaseNotification 注册处理程序,以便在当前主机释放租约并停止处理它时收到通知。
  • WithErrorNotification 注册处理程序,以便当前主机在处理过程中遇到异常时收到通知。 你需要能够区分源是用户委托(未经处理的异常),还是处理器在尝试访问受监视容器时遇到的错误(例如网络问题)。

生命周期通知在两种更改源模式下均可用。 下面是最新版本模式下的生命周期通知示例:

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

部署单元

单个更改源处理器部署单元包含一个或多个具有相同 processorName 值、相同租用容器配置但实例名称不同的计算实例。 可以有多个部署单元,其中每个单元可以具有不同的更改业务流,且每个部署单元可由一个或多个实例组成。

例如,你可能有一个部署单元,每次容器发生更改时,该部署单元就会触发外部 API。 另一个部署单元可能会在每次发生更改时实时移动数据。 当受监视的容器中发生更改时,所有部署单元都会收到通知。

动态缩放

如前所述,在某个部署单元中,可以有一个或多个计算实例。 若要充分利用部署单元内的计算分布,只需满足以下关键要求:

  • 所有实例应具有相同的租用容器配置。
  • 所有实例都应具有相同的 processorName 值。
  • 每个实例都需要具有不同的实例名称 (WithInstanceName)。

如果符合这三个条件,更改源处理器将使用均等分布算法,将租用容器中的所有租用分布到该部署单元的所有正在运行的实例,并将计算并行化。 在任意时间,一个租用归一个实例所有,因此,实例数应不大于租用数。

实例数可以增长和收缩。 更改源处理器通过相应地重新分发负载来动态调整负载。

而且,如果容器的吞吐量或存储增加,更改源处理器还可以动态调整到容器规模。 当容器增多时,更改源处理器会以透明方式应对这种情况,方法是动态增加租约并将新的租约分布到现有实例。

开始时间

默认情况下,首次启动更改源处理器时,它会初始化租用容器,并开始其处理生命周期。 不会检测到首次初始化更改源处理器之前在受监视容器中发生的任何更改。

从以前的某个日期和时间读取

DateTime 的实例传递给 WithStartTime 生成器扩展,可将更改源处理器初始化为从特定的日期和时间开始读取更改:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

更改源处理器将根据该特定日期和时间初始化,并开始读取此日期和时间之后发生的更改。

从头开始读取

在其他方案(例如数据迁移,或分析容器的整个历史记录)中,需要从该容器的生存期开始时间读取更改源。 可以在生成器扩展中使用 WithStartTime,但需要传递 DateTime.MinValue.ToUniversalTime(),以便生成最小 DateTime 值的 UTC 表示形式,如以下示例所示:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

更改源处理器将会初始化,并从容器生存期的开始时间读取更改。

注意

这些自定义选项仅用于设置更改源处理器的起始时间点。 首次初始化租用容器后,更改这些选项不起作用。

自定义起点操作仅适用于最新版本更改源模式。 使用所有版本与删除模式时,必须从处理器启动时开始读取,或者从帐户的连续备份保留期内的前一租用状态恢复。

更改源和预配吞吐量

针对受监视容器的更改源读取操作会消耗请求单位。 确保受监视容器未遇到限制。 限制会增加处理器接收更改源事件的延迟。

租用容器上的操作(更新和维护状态)将使用请求单位。 使用相同租用容器的实例数越多,潜在的请求单位消耗量就越高。 确保租用容器未遇到限制。 限制会增加接收更改源事件的延迟。 限制甚至可能完全中止处理。

共享租用容器

可以在多个部署单元之间共享租用容器。 在共享租用容器中,每个部署单元侦听不同的受监视容器或具有不同的 processorName 值。 在此配置中,每个部署单元都将在租用容器上保持独立状态。 查看租用容器上的请求单位消耗量,确保预配的吞吐量足以满足所有部署单元的需求。

高级租用配置

有三个关键配置会影响更改源处理器的工作方式。 每个配置都会影响租用容器上的请求单位消耗量。 可以在创建更改源处理器时设置以下配置之一,但请谨慎使用:

  • 租约获取:默认间隔为 17 秒。 主机将定期检查租约存储的状态,并考虑将租约获取操作设为动态缩放过程的一部分。 此过程是通过对租用容器执行查询来完成的。 减小此值可以加快重新平衡和获取租约的速度,但会增大租用容器上的请求单位消耗量
  • 租约有效期:默认为 60 秒。 定义租用在被另一个主机获取之前,在没有任何续订活动的情况下可以存在的最长时间量。 当某台主机崩溃时,它拥有的租约将在这段时间加上配置的续订间隔之后被其他主机拾取。 减小此值能够加快主机崩溃后的恢复速度,但有效期值永远不应低于续订间隔。
  • 租约续订:默认间隔为 13 秒。 拥有租约的主机将定期续订该租约,即使没有新的更改可供使用。 此过程是通过对租用执行替换操作来完成的。 减小此值能够减少检测租约因主机崩溃而丢失的问题所需的时间,但会增大租用容器上的请求单位消耗量

托管更改源处理器的位置

更改源处理器可以托管在任何支持长时间运行的进程或任务的平台中。 下面是一些示例:

虽然更改源处理器可以在生存期较短的环境中运行,但由于租用容器会对状态进行维护,这些环境的启动周期会导致接收通知的延迟时间增加(因为每次启动环境时存在启动处理器的开销)。

基于角色的访问要求

使用 Microsoft Entra ID 作为身份验证机制时,请确保标识具有适当的权限

  • 在被监视的容器上:
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • 在租用容器上:
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

其他资源

后续步骤

通过以下文章详细了解更改源处理器: