Редагувати

Поділитися через


Migrate from the change feed processor library to the Azure Cosmos DB .NET V3 SDK

APPLIES TO: NoSQL

This article describes the required steps to migrate an existing application's code that uses the change feed processor library to the change feed feature in the latest version of the .NET SDK (also referred as .NET V3 SDK).

Required code changes

The .NET V3 SDK has several breaking changes, the following are the key steps to migrate your application:

  1. Convert the DocumentCollectionInfo instances into Container references for the monitored and leases containers.
  2. Customizations that use WithProcessorOptions should be updated to use WithLeaseConfiguration and WithPollInterval for intervals, WithStartTime for start time, and WithMaxItems to define the maximum item count.
  3. Set the processorName on GetChangeFeedProcessorBuilder to match the value configured on ChangeFeedProcessorOptions.LeasePrefix, or use string.Empty otherwise.
  4. The changes are no longer delivered as a IReadOnlyList<Document>, instead, it's a IReadOnlyCollection<T> where T is a type you need to define, there is no base item class anymore.
  5. To handle the changes, you no longer need an implementation of IChangeFeedObserver, instead you need to define a delegate. The delegate can be a static Function or, if you need to maintain state across executions, you can create your own class and pass an instance method as delegate.

For example, if the original code to build the change feed processor looks as follows:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

The migrated code will look like:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

For the delegate, you can have a static method to receive the events. If you were consuming information from the IChangeFeedObserverContext you can migrate to use the ChangeFeedProcessorContext:

  • ChangeFeedProcessorContext.LeaseToken can be used instead of IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers can be used instead of IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics contains detailed information about request latency for troubleshooting
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

Health events and observability

If previously you were using IHealthMonitor or you were leveraging IChangeFeedObserver.OpenAsync and IChangeFeedObserver.CloseAsync, use the Notifications API.

  • IChangeFeedObserver.OpenAsync can be replaced with WithLeaseAcquireNotification.
  • IChangeFeedObserver.CloseAsync can be replaced with WithLeaseReleaseNotification.
  • IHealthMonitor.InspectAsync can be replaced with WithErrorNotification.

State and lease container

Similar to the change feed processor library, the change feed feature in .NET V3 SDK uses a lease container to store the state. However, the schemas are different.

The SDK V3 change feed processor will detect any old library state and migrate it to the new schema automatically upon the first execution of the migrated application code.

You can safely stop the application using the old code, migrate the code to the new version, start the migrated application, and any changes that happened while the application was stopped, will be picked up and processed by the new version.

Additional resources

Next steps

You can now proceed to learn more about change feed processor in the following articles: