Поделиться через


Использование средства оценки канала изменений

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

В этой статье описывается, как можно отслеживать ход выполнения экземпляров обработчика канала изменений, считывающих канал изменений.

Почему процесс отслеживания хода выполнения важен?

Обработчик канала изменений действует как указатель, который перемещается по каналу изменений и доставляет изменения в реализацию делегата.

Экземпляр развертывания обработчика канала изменений может обрабатывать изменения с определенной скоростью на основе доступных ресурсов, таких как ЦП, память, сеть и т. д.

Если эта скорость медленнее, чем скорость, с которой происходят изменения в контейнере Azure Cosmos DB, ваш процессор начнет отставать.

Определение этого сценария помогает понять, нужно ли масштабировать развертывание обработчика канала изменений.

Реализация средства оценки канала изменений

В качестве модели отправки автоматических уведомлений

Как и обработчик канала изменений, средство оценки канала изменений может использоваться как модель отправки уведомлений. Средство оценки будет измерять разницу между последним обработанным элементом (определяемым состоянием контейнера аренд) и последним изменением в контейнере и передавать это значение делегату. Интервал, с которым производится измерение, также можно настроить по умолчанию на 5 секунд.

Например, если обработчик канала изменений определен таким образом:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Правильным способом инициализации средства оценки для измерения этого обработчика будет использование GetChangeFeedEstimatorBuilder следующим образом:

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Здесь и обработчик, и средство оценки имеют одно значение leaseContainer и одно и то же имя.

Два других параметра — это делегат, который получит число, представляющее, сколько изменений ожидают чтения обработчиком, и интервал времени, в течение которого вы хотите, чтобы это измерение было выполнено.

Пример делегата, который получает оценку:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

Вы можете отправить эту оценку в решение для мониторинга и использовать ее, чтобы понять, как ход выполнения меняется с течением времени.

В качестве подробной оценки по запросу

Помимо модели отправки, существует альтернативный вариант, позволяющий получить оценку по запросу. Эта модель также предоставляет более подробные сведения:

  • Предполагаемое отставание на аренду.
  • Экземпляр владеет арендой и обрабатывает ее, что позволяет выяснить, есть ли в экземпляре проблемы.

Например, если обработчик канала изменений определен таким образом:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Вы можете создать оценщик с той же конфигурацией аренды:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

И при необходимости можно получить подробную оценку с требуемой частотой:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Каждый объект ChangeFeedProcessorState будет содержать сведения об аренде и отставании, а также о текущем экземпляре-владельце.

Развертывание оценщика

Средство оценки канала изменений не нужно развертывать как часть обработчика канала изменений. Оно также не должно быть частью того же проекта. Рекомендуется развернуть оценщик на независимом экземпляре, совершенно отличном от процессоров. Один экземпляр средства оценки может отслеживать ход выполнения всех аренд и экземпляров в развертывании обработчика канала изменений.

Каждая оценка будет использовать единицы запроса из отслеживаемых контейнеров и контейнеров аренды. Частота между 1 минутой является хорошей отправной точкой: чем ниже частота, тем выше использованные единицы запроса.

Дополнительные ресурсы

Дальнейшие действия

Вы можете продолжить знакомство с обработчиком канала изменений, перейдя к следующим статьям: