Поделиться через


Миграция с библиотеки обработчика канала изменений в пакет SDK Azure Cosmos DB для .NET версии 3

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

В этой статье описано, как перенести код существующего приложения, использующего библиотеку обработчика канала изменений, в функцию канала изменений в последней версии пакета SDK для .NET (также называется пакетом SDK для .NET версии 3).

Необходимые изменения в коде

В пакет SDK для .NET версии 3 внесено несколько критических изменений, поэтому ниже приведены основные этапы миграции приложения:

  1. Преобразуйте экземпляры DocumentCollectionInfo в ссылки Container для отслеживаемых контейнеров и контейнеров аренды.
  2. Настройки, использующие WithProcessorOptions, нужно изменить, указав интервалы WithLeaseConfiguration и WithPollInterval, время началаWithStartTime, а также максимальное число элементов WithMaxItems.
  3. Установите значение processorName в параметре GetChangeFeedProcessorBuilder, совпадающее со значением ChangeFeedProcessorOptions.LeasePrefix, или используйте string.Empty.
  4. Изменения больше не предоставляются в виде IReadOnlyList<Document>. Используется IReadOnlyCollection<T>, где T — это тип, который нужно определить. Базового класса элементов больше нет.
  5. В связи с этими изменениями реализация IChangeFeedObserver больше не требуется — вместо этого необходимо определить делегат. Делегат может быть статической функцией или, если необходимо сохранить состояние между выполнениями, вы можете создать собственный класс и передать метод экземпляра в качестве делегата.

Например, если исходный код для сборки обработчика канала изменений выглядит так:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

Перенесенный код будет выглядеть так:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Для делегата можно использовать статический способ получения событий. Если вы использовали информацию из IChangeFeedObserverContext, вы можете перейти на использование ChangeFeedProcessorContext:

  • ChangeFeedProcessorContext.LeaseToken можно использовать вместо IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers можно использовать вместо IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics содержит подробные сведения о задержках запросов для устранения неполадок
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

События работоспособности и наблюдаемость

Если вы ранее использовали или IHealthMonitor использовались IChangeFeedObserver.OpenAsyncIChangeFeedObserver.CloseAsync, используйте API уведомлений.

  • IChangeFeedObserver.OpenAsync можно заменить WithLeaseAcquireNotificationна .
  • IChangeFeedObserver.CloseAsync можно заменить WithLeaseReleaseNotificationна .
  • IHealthMonitor.InspectAsync можно заменить WithErrorNotificationна .

Контейнер состояния и аренды

Как и в случае с библиотекой обработчика канала изменений, функция канала изменений в пакете SDK для .NET версии 3 использует контейнер аренды для хранения состояния. При этом схемы различаются.

Обработчик канала изменений пакета SDK версии 3 обнаружит старое состояние библиотеки и автоматически перенесет его в новую схему при первом выполнении перенесенного кода приложения.

Вы можете безопасно остановить приложение, использующее старый код, перенести код в новую версию, а затем запустить перенесенное приложение. Все изменения, произошедшие во время простоя приложения, будут выявлены и обработаны новой версией.

Дополнительные ресурсы

Следующие шаги

Вы можете продолжить знакомство с обработчиком канала изменений, перейдя к следующим статьям: