CosmosDB change feed processor - leases and work balancing strategy
The work distribution is the one of the core change feed processor features. The purpose of the change feed processor is to receive and dispatch all the documents from all partitions of the feed collection for further processing. In case of the high load, it's necessary to scale-out the number of processors, so that the documents are processed within desired time SLA.
As you can see, the change feed processing system has to deal with it. The system has to distribute the work between instances somehow. That's the aim of the so called "partitions balancing strategy". The change feed processor library contains a default implementation of this strategy "EqualPartitionsBalancingStrategy".
EqualPartitionsBalancingStrategy
This strategy tries to distribute the work evenly between all active hosts. The algorithm is based on the lease documents.
Just a recap. (Taken from my first post in this serie)There are two collections in the game:
- Data collection - it's known also as a feed collection. It's a collection of the data over which change feed is processed. It could be created as a partitioned collection so that data are split into separated physical locations.
- Lease collection - it's a collection where the change feed keeps the pointer of last processed document per partition.
Let's see an example of such lease document:
{
"id": "account.documents.azure.com_EH9XAA==_EH9XAP7EOwA=..2",
"PartitionId": "2",
"Owner": "unique_owner_id",
"ContinuationToken": "\"2468759\"",
"properties": {},
"timestamp": "2018-08-01T20:43:54.9476294Z"
}
There will be as many lease documents created as there are partitions in the feed collection.
Id
Each lease document has "id" which is in the following form:
{prefix}{feed database account}_{feed database resource id}_{collection resource id}..{partition range ID}
The prefix is used to differentiate the change feed processor groups if they use same lease collection. The rest of the parameters in the "id" is self-explanatory.
Owner
That's a string which is uniquely identifying the processor instance. Make sure this string is unique per each processor instance. Otherwise the same documents will be processed as many time as you have the processor instances. We used to name it the following way: {deployment id}_{processor name}_{instance id}
Continuation token
That's the pointer of the last successfully processed document change on the feed.
Timestamp
That's the epoch timestamp of the last change feed checkpoint time or heartbeat time. This timestamp is used by the work balancing strategy to determine whether the lease is expired or not.
If the lease is expired, then it's available to be taken over by any change feed processor instance.
If the lease is not expired, it means that the instance is active and the feed is processed.
The aim of the strategy is to distribute the work evenly between all active instances.
Properties
That's the free form dictionary of string to string. The properties are the form how the strategy executions communicate between each other. It give you a possibility to create more advanced strategies, e.g. when shifting the load between "A/B" deployments.
The properties can be manipulated by the strategy execution and then they will be persisted.
Configuration - Lease expiration interval
The strategy is configured by the ChangeFeedProcessorOptions.LeaseExpirationInterval. It's the interval for which the lease is taken by the particular owner. It's an interval determining how often the strategy is executed.
So, if the lease is not renewed or checkpoint-ed within this interval, it will be identified as expired and ownership of the partition will move to another change feed processor instance.
If the processor is stopped, the lease is freed-up. It means that the partition won't be processed until some processor instance takes it (which depends on lease expiration interval).
There are special situations when you need to enhance the default work balancing strategy. Imagine you stop the instance. The partitions processed by the stopped instance are not processed until some other instance takes it over. This "gap" depends on the expiration interval. So, if you have strongly time-limited SLA, you will be required to write the custom strategy and leveraging the properties. Ping me if you need more information about it.
Bear in mind, this is really rare condition and almost in all cases you will be more than happy with default strategy. We run quite heavy system using change feed processor with default strategy without issues.
Conclusion, consequences
In case the change feed processor instance crashed and then starts within lease expiration time, it will take back all previously owned leases immediately after start.
In case the instance is gracefully stopped, the owner is reset which means the lease is free to be owned by other change feed processor instance.
In case the number of change feed processor instances is scaled-out, the new ideal work balance is calculated based on the new active instances. The new instance(s) will try to take the ownership of partitions by 1 per each execution cycle configured by lease expiration interval.
Keep in mind that the leases are stored in the lease collection. The collection has its own throughput (== money). In case you set low expiration interval, you can end up with the throtlling issues because leases are scanned once per lease expiration interval by each instance. This can cause that the leases are not taken for longer time and their processing will be delayed.
Previous posts: