Миграция с библиотеки обработчика канала изменений в пакет SDK Azure Cosmos DB для .NET версии 3
ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL
В этой статье описано, как перенести код существующего приложения, использующего библиотеку обработчика канала изменений, в функцию канала изменений в последней версии пакета SDK для .NET (также называется пакетом SDK для .NET версии 3).
Необходимые изменения в коде
В пакет SDK для .NET версии 3 внесено несколько критических изменений, поэтому ниже приведены основные этапы миграции приложения:
- Преобразуйте экземпляры
DocumentCollectionInfo
в ссылкиContainer
для отслеживаемых контейнеров и контейнеров аренды. - Настройки, использующие
WithProcessorOptions
, нужно изменить, указав интервалыWithLeaseConfiguration
иWithPollInterval
, время началаWithStartTime
, а также максимальное число элементовWithMaxItems
. - Установите значение
processorName
в параметреGetChangeFeedProcessorBuilder
, совпадающее со значениемChangeFeedProcessorOptions.LeasePrefix
, или используйтеstring.Empty
. - Изменения больше не предоставляются в виде
IReadOnlyList<Document>
. ИспользуетсяIReadOnlyCollection<T>
, гдеT
— это тип, который нужно определить. Базового класса элементов больше нет. - В связи с этими изменениями реализация
IChangeFeedObserver
больше не требуется — вместо этого необходимо определить делегат. Делегат может быть статической функцией или, если необходимо сохранить состояние между выполнениями, вы можете создать собственный класс и передать метод экземпляра в качестве делегата.
Например, если исходный код для сборки обработчика канала изменений выглядит так:
ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.monitoredContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.leasesContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
.WithHostName("consoleHost")
.WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
{
StartFromBeginning = true,
LeasePrefix = "MyLeasePrefix",
MaxItemCount = 10,
FeedPollDelay = TimeSpan.FromSeconds(1)
})
.WithFeedCollection(monitoredCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithObserver<ChangeFeedObserver>()
.BuildAsync();
Перенесенный код будет выглядеть так:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithMaxItems(10)
.WithPollInterval(TimeSpan.FromSeconds(1))
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
Для делегата можно использовать статический способ получения событий. Если вы использовали информацию из IChangeFeedObserverContext
, вы можете перейти на использование ChangeFeedProcessorContext
:
ChangeFeedProcessorContext.LeaseToken
можно использовать вместоIChangeFeedObserverContext.PartitionKeyRangeId
ChangeFeedProcessorContext.Headers
можно использовать вместоIChangeFeedObserverContext.FeedResponse
ChangeFeedProcessorContext.Diagnostics
содержит подробные сведения о задержках запросов для устранения неполадок
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate work
await Task.Delay(1);
}
}
События работоспособности и наблюдаемость
Если вы ранее использовали или IHealthMonitor
использовались IChangeFeedObserver.OpenAsync
IChangeFeedObserver.CloseAsync
, используйте API уведомлений.
IChangeFeedObserver.OpenAsync
можно заменитьWithLeaseAcquireNotification
на .IChangeFeedObserver.CloseAsync
можно заменитьWithLeaseReleaseNotification
на .IHealthMonitor.InspectAsync
можно заменитьWithErrorNotification
на .
Контейнер состояния и аренды
Как и в случае с библиотекой обработчика канала изменений, функция канала изменений в пакете SDK для .NET версии 3 использует контейнер аренды для хранения состояния. При этом схемы различаются.
Обработчик канала изменений пакета SDK версии 3 обнаружит старое состояние библиотеки и автоматически перенесет его в новую схему при первом выполнении перенесенного кода приложения.
Вы можете безопасно остановить приложение, использующее старый код, перенести код в новую версию, а затем запустить перенесенное приложение. Все изменения, произошедшие во время простоя приложения, будут выявлены и обработаны новой версией.
Дополнительные ресурсы
Следующие шаги
Вы можете продолжить знакомство с обработчиком канала изменений, перейдя к следующим статьям:
- Обработчик канала изменений в Azure Cosmos DB
- Использование оценщика канала изменений
- Настройка времени запуска обработчика канала изменений
- Если вы планируете ресурсы для миграции в Azure Cosmos DB,
- Если вам известно только количество виртуальных ядер и серверов в существующем кластере баз данных, прочитайте об оценке единиц запроса на основе этих данных.
- Если вам известна стандартная частота запросов для текущей рабочей нагрузки базы данных, ознакомьтесь со статьей о расчете единиц запросов с помощью планировщика ресурсов Azure Cosmos DB