Migrate from the change feed processor library to the Azure Cosmos DB .NET V3 SDK
APPLIES TO: NoSQL
This article describes the required steps to migrate an existing application's code that uses the change feed processor library to the change feed feature in the latest version of the .NET SDK (also referred as .NET V3 SDK).
Required code changes
The .NET V3 SDK has several breaking changes, the following are the key steps to migrate your application:
- Convert the
DocumentCollectionInfo
instances intoContainer
references for the monitored and leases containers. - Customizations that use
WithProcessorOptions
should be updated to useWithLeaseConfiguration
andWithPollInterval
for intervals,WithStartTime
for start time, andWithMaxItems
to define the maximum item count. - Set the
processorName
onGetChangeFeedProcessorBuilder
to match the value configured onChangeFeedProcessorOptions.LeasePrefix
, or usestring.Empty
otherwise. - The changes are no longer delivered as a
IReadOnlyList<Document>
, instead, it's aIReadOnlyCollection<T>
whereT
is a type you need to define, there is no base item class anymore. - To handle the changes, you no longer need an implementation of
IChangeFeedObserver
, instead you need to define a delegate. The delegate can be a static Function or, if you need to maintain state across executions, you can create your own class and pass an instance method as delegate.
For example, if the original code to build the change feed processor looks as follows:
ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.monitoredContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
DatabaseName = databaseId,
CollectionName = Program.leasesContainer,
Uri = new Uri(configuration["EndPointUrl"]),
MasterKey = configuration["AuthorizationKey"]
};
ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
.WithHostName("consoleHost")
.WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
{
StartFromBeginning = true,
LeasePrefix = "MyLeasePrefix",
MaxItemCount = 10,
FeedPollDelay = TimeSpan.FromSeconds(1)
})
.WithFeedCollection(monitoredCollectionInfo)
.WithLeaseCollection(leaseCollectionInfo)
.WithObserver<ChangeFeedObserver>()
.BuildAsync();
The migrated code will look like:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithMaxItems(10)
.WithPollInterval(TimeSpan.FromSeconds(1))
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
For the delegate, you can have a static method to receive the events. If you were consuming information from the IChangeFeedObserverContext
you can migrate to use the ChangeFeedProcessorContext
:
ChangeFeedProcessorContext.LeaseToken
can be used instead ofIChangeFeedObserverContext.PartitionKeyRangeId
ChangeFeedProcessorContext.Headers
can be used instead ofIChangeFeedObserverContext.FeedResponse
ChangeFeedProcessorContext.Diagnostics
contains detailed information about request latency for troubleshooting
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate work
await Task.Delay(1);
}
}
Health events and observability
If previously you were using IHealthMonitor
or you were leveraging IChangeFeedObserver.OpenAsync
and IChangeFeedObserver.CloseAsync
, use the Notifications API.
IChangeFeedObserver.OpenAsync
can be replaced withWithLeaseAcquireNotification
.IChangeFeedObserver.CloseAsync
can be replaced withWithLeaseReleaseNotification
.IHealthMonitor.InspectAsync
can be replaced withWithErrorNotification
.
State and lease container
Similar to the change feed processor library, the change feed feature in .NET V3 SDK uses a lease container to store the state. However, the schemas are different.
The SDK V3 change feed processor will detect any old library state and migrate it to the new schema automatically upon the first execution of the migrated application code.
You can safely stop the application using the old code, migrate the code to the new version, start the migrated application, and any changes that happened while the application was stopped, will be picked up and processed by the new version.
Additional resources
Next steps
You can now proceed to learn more about change feed processor in the following articles:
- Overview of change feed processor
- Using the change feed estimator
- Change feed processor start time
- Trying to do capacity planning for a migration to Azure Cosmos DB?
- If all you know is the number of vcores and servers in your existing database cluster, read about estimating request units using vCores or vCPUs
- If you know typical request rates for your current database workload, read about estimating request units using Azure Cosmos DB capacity planner