Use the change feed estimator
APPLIES TO: NoSQL
This article describes how you can monitor the progress of your change feed processor instances as they read the change feed.
Why is monitoring progress important?
The change feed processor acts as a pointer that moves forward across your change feed and delivers the changes to a delegate implementation.
Your change feed processor deployment can process changes at a particular rate based on its available resources like CPU, memory, network, and so on.
If this rate is slower than the rate at which your changes happen in your Azure Cosmos DB container, your processor starts to lag behind.
Identifying this scenario helps understand if we need to scale our change feed processor deployment.
Implement the change feed estimator
As a push model for automatic notifications
Like the change feed processor, the change feed estimator can work as a push model. The estimator measures the difference between the last processed item (defined by the state of the leases container) and the latest change in the container, and pushes this value to a delegate. The interval at which the measurement is taken can also be customized with a default value of 5 seconds.
As an example, if your change feed processor is using latest version mode and is defined like this:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
The correct way to initialize an estimator to measure that processor would be using GetChangeFeedEstimatorBuilder
like so:
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
Where both the processor and the estimator share the same leaseContainer
and the same name.
The other two parameters are the delegate, which receives a number that represents how many changes are pending to be read by the processor, and the time interval at which you want this measurement to be taken.
An example of a delegate that receives the estimation is:
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
You can send this estimation to your monitoring solution and use it to understand how your progress is behaving over time.
As an on-demand detailed estimation
In contrast with the push model, there's an alternative that lets you obtain the estimation on demand. This model also provides more detailed information:
- The estimated lag per lease.
- The instance owning and processing each lease, so you can identify if there's an issue on an instance.
If your change feed processor is defined like this:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
You can create the estimator with the same lease configuration:
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
And whenever you want it, with the frequency you require, you can obtain the detailed estimation:
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
Each ChangeFeedProcessorState
contains the lease and lag information, and also who is the current instance owning it.
Estimator deployment
The change feed estimator doesn't need to be deployed as part of your change feed processor, nor be part of the same project. We recommend deploying the estimator on an independent instance from your processors. A single estimator instance can track the progress for the all the leases and instances in your change feed processor deployment.
Each estimation consumes request units from your monitored and lease containers. A frequency of 1 minute in-between is a good starting point, the lower the frequency, the higher the request units consumed.
Supported change feed modes
The change feed estimator can be used for both latest version mode and all versions and deletes mode. In both modes, the estimate provided isn't guaranteed to be an exact count of outstanding changes to process.
Additional resources
- Azure Cosmos DB SDK
- Usage samples on GitHub (.NET latest version)
- Usage samples on GitHub (.NET all versions and deletes)
- Usage samples on GitHub (Java)
- Additional samples on GitHub
Next steps
You can now proceed to learn more about change feed processor in the following article: