Поделиться через


Использовать оценщик потока изменений

В этой статье описывается, как отслеживать ход выполнения экземпляров обработчика канала изменений по мере их чтения канала изменений.

Почему важно отслеживать ход выполнения?

Обработчик канала изменений выступает в качестве указателя, который продвигается вперед по каналу изменений и передает изменения в реализацию делегата.

Развертывание обработчика канала изменений может обрабатывать изменения с определенной скоростью в зависимости от доступных ресурсов, таких как ЦП, память, сеть и т. д.

Если эта скорость медленнее, чем скорость, с которой происходят изменения в контейнере Azure Cosmos DB, процессор начинает отставать.

Определение этого сценария помогает понять, нужно ли масштабировать развертывание обработчика потока изменений.

Реализуйте эстиматор канала изменений

В качестве push-модели для автоматических уведомлений

Как и в обработчике канала изменений, оценщик канала изменений может работать по модели push. Оценщик оценивает разницу между последним обработанным элементом (определяемым состоянием контейнера арендованных элементов) и последним изменением этого контейнера, и отправляет это значение делегату. Интервал, с которым выполняется измерение, также можно настроить со значением по умолчанию 5 секунд.

Например, если обработчик канала изменений использует последний режим версии и определяется следующим образом:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Правильный способ инициализации оценщика для измерения этого процессора — использовать GetChangeFeedEstimatorBuilder следующим образом:

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Где и процессор, и оценщик имеют одинаковые leaseContainer и одинаковое имя.

Остальные два параметра — это делегат, который получает число, представляющее количество изменений, ожидающих прочтения обработчиком, и интервал времени, с которым необходимо выполнить это измерение.

Пример делегата, получающего оценку:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

Вы можете отправить этот расчёт в ваше решение для мониторинга и использовать его, чтобы понять, как изменяется ваш прогресс с течением времени.

Подробная оценка по запросу

В отличие от модели push-отправки, существует альтернатива, которая позволяет получить оценку по запросу. Эта модель также содержит более подробные сведения:

  • Оценочная задержка по аренде.
  • Экземпляр, принадлежащий и обрабатывающий каждую аренду, поэтому можно определить, есть ли проблема в экземпляре.

Если обработчик канала изменений определен следующим образом:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Вы можете создать оцениватель с той же конфигурацией аренды:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

И всякий раз, когда вы хотите, с частотой, которую требуется, вы можете получить подробную оценку:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Каждый ChangeFeedProcessorState содержит сведения об аренде и задержке, а также о текущем экземпляре, которому он принадлежит.

Развертывание эстиматоров

Проверяющий элемент канала изменений не требуется развертывать в составе обработчика канала изменений, а также не быть частью того же проекта. Рекомендуется развернуть алгоритм оценки на отдельном экземпляре, независимом от процессоров. Один экземпляр оценки может отслеживать ход выполнения всех аренд и экземпляров в развертывании обработчика канала изменений.

Каждая оценка использует единицы запросов из отслеживаемых и арендованных контейнеров. Частота в 1 минуту является хорошей отправной точкой, чем ниже частота, тем больше потребляется единиц запроса.

Поддерживаемые режимы канала изменений

Оценка канала изменений может использоваться как для последней версии, так и для всех версий и режима удаления. В обоих режимах указанная оценка не гарантирует точное количество невыполненных изменений в процессе.

Дополнительные ресурсы

Дальнейшие шаги

Теперь вы можете узнать больше о обработчике канала изменений в следующей статье: