Migrate from the bulk executor library to the bulk support in Azure Cosmos DB .NET V3 SDK
APPLIES TO: NoSQL
This article describes the required steps to migrate an existing application's code that uses the .NET bulk executor library to the bulk support feature in the latest version of the .NET SDK.
Enable bulk support
Enable bulk support on the CosmosClient
instance through the AllowBulkExecution configuration:
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Create Tasks for each operation
Bulk support in the .NET SDK works by leveraging the Task Parallel Library and grouping operations that occur concurrently.
There is no single method in the SDK that will take your list of documents or operations as an input parameter, but rather, you need to create a Task for each operation you want to execute in bulk, and then simply wait for them to complete.
For example, if your initial input is a list of items where each item has the following schema:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
If you want to do bulk import (similar to using BulkExecutor.BulkImportAsync), you need to have concurrent calls to CreateItemAsync
. For example:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
If you want to do bulk update (similar to using BulkExecutor.BulkUpdateAsync), you need to have concurrent calls to ReplaceItemAsync
method after updating the item value. For example:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
And if you want to do bulk delete (similar to using BulkExecutor.BulkDeleteAsync), you need to have concurrent calls to DeleteItemAsync
, with the id
and partition key of each item. For example:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Capture task result state
In the previous code examples, we have created a concurrent list of tasks, and called the CaptureOperationResponse
method on each of those tasks. This method is an extension that lets us maintain a similar response schema as BulkExecutor, by capturing any errors and tracking the request units usage.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
Where the OperationResponse
is declared as:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Execute operations concurrently
To track the scope of the entire list of Tasks, we use this helper class:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
The ExecuteAsync
method will wait until all operations are completed and you can use it like so:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Capture statistics
The previous code waits until all operations are completed and calculates the required statistics. These statistics are similar to that of the bulk executor library's BulkImportResponse.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
The BulkOperationResponse
contains:
- The total time taken to process the list of operations through bulk support.
- The number of successful operations.
- The total of request units consumed.
- If there are failures, it displays a list of tuples that contain the exception and the associated item for logging and identification purpose.
Retry configuration
Bulk executor library had guidance that mentioned to set the MaxRetryWaitTimeInSeconds
and MaxRetryAttemptsOnThrottledRequests
of RetryOptions to 0
to delegate control to the library.
For bulk support in the .NET SDK, there is no hidden behavior. You can configure the retry options directly through the CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests and CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.
Note
In cases where the provisioned request units is much lower than the expected based on the amount of data, you might want to consider setting these to high values. The bulk operation will take longer but it has a higher chance of completely succeeding due to the higher retries.
Performance improvements
As with other operations with the .NET SDK, using the stream APIs results in better performance and avoids any unnecessary serialization.
Using stream APIs is only possible if the nature of the data you use matches that of a stream of bytes (for example, file streams). In such cases, using the CreateItemStreamAsync
, ReplaceItemStreamAsync
, or DeleteItemStreamAsync
methods and working with ResponseMessage
(instead of ItemResponse
) increases the throughput that can be achieved.
Next steps
- To learn more about the .NET SDK releases, see the Azure Cosmos DB SDK article.
- Get the complete migration source code from GitHub.
- Additional bulk samples on GitHub
- Trying to do capacity planning for a migration to Azure Cosmos DB?
- If all you know is the number of vcores and servers in your existing database cluster, read about estimating request units using vCores or vCPUs
- If you know typical request rates for your current database workload, read about estimating request units using Azure Cosmos DB capacity planner