Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этой статье описывается, как отслеживать ход выполнения экземпляров обработчика канала изменений по мере их чтения канала изменений.
Почему важно отслеживать ход выполнения?
Обработчик канала изменений выступает в качестве указателя, который продвигается вперед по каналу изменений и передает изменения в реализацию делегата.
Развертывание обработчика канала изменений может обрабатывать изменения с определенной скоростью в зависимости от доступных ресурсов, таких как ЦП, память, сеть и т. д.
Если эта скорость медленнее, чем скорость, с которой происходят изменения в контейнере Azure Cosmos DB, процессор начинает отставать.
Определение этого сценария помогает понять, нужно ли масштабировать развертывание обработчика потока изменений.
Реализуйте эстиматор канала изменений
В качестве push-модели для автоматических уведомлений
Как и в обработчике канала изменений, оценщик канала изменений может работать по модели push. Оценщик оценивает разницу между последним обработанным элементом (определяемым состоянием контейнера арендованных элементов) и последним изменением этого контейнера, и отправляет это значение делегату. Интервал, с которым выполняется измерение, также можно настроить со значением по умолчанию 5 секунд.
Например, если обработчик канала изменений использует последний режим версии и определяется следующим образом:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Правильный способ инициализации оценщика для измерения этого процессора — использовать GetChangeFeedEstimatorBuilder следующим образом:
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
Где и процессор, и оценщик имеют одинаковые leaseContainer и одинаковое имя.
Остальные два параметра — это делегат, который получает число, представляющее количество изменений, ожидающих прочтения обработчиком, и интервал времени, с которым необходимо выполнить это измерение.
Пример делегата, получающего оценку:
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
Вы можете отправить этот расчёт в ваше решение для мониторинга и использовать его, чтобы понять, как изменяется ваш прогресс с течением времени.
Подробная оценка по запросу
В отличие от модели push-отправки, существует альтернатива, которая позволяет получить оценку по запросу. Эта модель также содержит более подробные сведения:
- Оценочная задержка по аренде.
- Экземпляр, принадлежащий и обрабатывающий каждую аренду, поэтому можно определить, есть ли проблема в экземпляре.
Если обработчик канала изменений определен следующим образом:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Вы можете создать оцениватель с той же конфигурацией аренды:
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
И всякий раз, когда вы хотите, с частотой, которую требуется, вы можете получить подробную оценку:
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
Каждый ChangeFeedProcessorState содержит сведения об аренде и задержке, а также о текущем экземпляре, которому он принадлежит.
Развертывание эстиматоров
Проверяющий элемент канала изменений не требуется развертывать в составе обработчика канала изменений, а также не быть частью того же проекта. Рекомендуется развернуть алгоритм оценки на отдельном экземпляре, независимом от процессоров. Один экземпляр оценки может отслеживать ход выполнения всех аренд и экземпляров в развертывании обработчика канала изменений.
Каждая оценка использует единицы запросов из отслеживаемых и арендованных контейнеров. Частота в 1 минуту является хорошей отправной точкой, чем ниже частота, тем больше потребляется единиц запроса.
Поддерживаемые режимы канала изменений
Оценка канала изменений может использоваться как для последней версии, так и для всех версий и режима удаления. В обоих режимах указанная оценка не гарантирует точное количество невыполненных изменений в процессе.
Дополнительные ресурсы
- Azure Cosmos DB SDK
- Примеры использования на GitHub (последняя версия .NET)
- Примеры использования на GitHub (.NET все версии и удаления)
- Примеры использования на GitHub (Java)
- Дополнительные примеры на GitHub
Дальнейшие шаги
Теперь вы можете узнать больше о обработчике канала изменений в следующей статье: