Поделиться через


Использование средства оценки канала изменений

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

В этой статье описывается, как можно отслеживать ход выполнения экземпляров обработчика канала изменений, считывающих канал изменений.

Почему процесс отслеживания хода выполнения важен?

Обработчик канала изменений действует как указатель, который перемещается по каналу изменений и доставляет изменения в реализацию делегата.

Экземпляр развертывания обработчика канала изменений может обрабатывать изменения с определенной скоростью на основе доступных ресурсов, таких как ЦП, память, сеть и т. д.

Если эта скорость медленнее, чем скорость, с которой происходят изменения в контейнере Azure Cosmos DB, процессор начинает отставать.

Определение этого сценария помогает понять, нужно ли масштабировать развертывание обработчика канала изменений.

Реализация средства оценки канала изменений

В качестве модели отправки автоматических уведомлений

Как и обработчик канала изменений, средство оценки канала изменений может использоваться как модель отправки уведомлений. Оценка измеряет разницу между последним обработанным элементом (определенным состоянием контейнера аренды) и последним изменением контейнера и отправляет это значение делегату. Интервал, с которым производится измерение, также можно настроить по умолчанию на 5 секунд.

Например, если обработчик канала изменений использует последний режим версии и определяется следующим образом:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Правильным способом инициализации средства оценки для измерения этого обработчика будет использование GetChangeFeedEstimatorBuilder следующим образом:

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Здесь и обработчик, и средство оценки имеют одно значение leaseContainer и одно и то же имя.

Остальные два параметра — делегат, который получает число, представляющее количество ожидающих изменений обработчиком, и интервал времени, с которым требуется выполнить это измерение.

Пример делегата, который получает оценку:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

Вы можете отправить эту оценку в решение для мониторинга и использовать ее, чтобы понять, как ход выполнения меняется с течением времени.

В качестве подробной оценки по запросу

Помимо модели отправки, существует альтернативный вариант, позволяющий получить оценку по запросу. Эта модель также предоставляет более подробные сведения:

  • Предполагаемое отставание на аренду.
  • Экземпляр владеет арендой и обрабатывает ее, что позволяет выяснить, есть ли в экземпляре проблемы.

Например, если обработчик канала изменений определен таким образом:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Вы можете создать оценщик с той же конфигурацией аренды:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

И при необходимости можно получить подробную оценку с требуемой частотой:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Каждый ChangeFeedProcessorState содержит сведения об аренде и задержке, а также тот, кто является текущим экземпляром, владельцем которого он является.

Развертывание оценки

Проверяющий элемент канала изменений не требуется развертывать в составе обработчика канала изменений, а также не быть частью того же проекта. Рекомендуется развернуть оценщик на независимом экземпляре от процессоров. Один экземпляр оценки может отслеживать ход выполнения всех аренд и экземпляров в развертывании обработчика канала изменений.

Каждая оценка использует единицы запросов из отслеживаемых и арендных контейнеров. Частота в 1 минуту между хорошей отправной точкой, чем ниже частота, тем выше единиц запроса, потребляемых.

Поддерживаемые режимы канала изменений

Оценка канала изменений может использоваться как для последней версии, так и для всех версий и режима удаления. В обоих режимах указанная оценка не гарантирует точное количество невыполненных изменений в процессе.

Дополнительные ресурсы

Следующие шаги

Теперь вы можете узнать больше о обработчике канала изменений в следующей статье: