你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

从批量执行程序库迁移到 Azure Cosmos DB .NET V3 SDK 中的批量支持

本文介绍将使用 .NET 批量执行程序库 的现有应用程序代码迁移到最新版本 .NET SDK 中的 批量支持 功能所需的步骤。

启用批量支持

通过 CosmosClient 配置对实例启用批量支持:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

为每个操作创建任务

.NET SDK 中的批量支持通过利用 任务并行库 来对同时发生的操作进行分组。

SDK 中没有将文档或操作列表作为输入参数的单个方法,而是需要为要批量执行的每个操作创建一个 Task,然后只需等待这些任务完成。

例如,如果您的初始输入是一个每项具有以下架构的项目列表:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

如果要进行批量导入(类似于使用 BulkExecutor.BulkImportAsync),则需要对CreateItemAsync进行并发调用。 例如:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

如果要进行批量 更新 (类似于使用 BulkExecutor.BulkUpdateAsync),则需要在更新项值后对方法进行并发调用 ReplaceItemAsync 。 例如:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

如果要进行批量删除(类似于使用BulkExecutor.BulkDeleteAsync),需要对DeleteItemAsync进行并发调用,并传入每个项的id和分区键。 例如:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

捕获任务结果状态

在前面的代码示例中,我们创建了一个并发任务列表,并在其中每个任务上调用 CaptureOperationResponse 该方法。 此方法是一个扩展,通过捕获任何错误并跟踪请求单位使用情况,我们可以维护与 BulkExecutor 类似的响应架构

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

关于OperationResponse的声明如下:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

并发执行操作

若要跟踪整个任务列表的范围,我们将使用此帮助程序类:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

该方法 ExecuteAsync 将等到所有操作完成,您可以像下面这样使用它:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

捕获统计信息

前面的代码会等待所有操作完成后再计算所需的统计信息。 这些统计信息类似于批量执行程序库的 BulkImportResponse 的统计信息。

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

BulkOperationResponse 包含:

  1. 批量处理操作列表的总耗时。
  2. 成功的操作次数。
  3. 消耗的请求单位总数。
  4. 如果发生故障,它将显示一个包含异常和相关项的元组列表,以用于日志记录和识别目的。

重试配置

批量执行程序库提供了指导,指出需要设置MaxRetryWaitTimeInSecondsMaxRetryAttemptsOnThrottledRequestsRetryOptions0,以将控制权委托给库。

对于 .NET SDK 中的批量支持,没有隐藏的功能。 可以直接通过 CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequestsCosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests 配置重试选项。

注释

如果预配的请求单位远远低于预期的数据量,可能需要考虑将这些单位设置为高值。 批量操作需要更长的时间,但由于重试次数的增加,操作完全成功的可能性更高。

性能改进

与使用 .NET SDK 的其他作一样,使用流 API 可提供更好的性能,并避免任何不必要的序列化。

仅当使用的数据的性质与字节流(例如文件流)匹配时,才能使用流 API。 在这种情况下,使用CreateItemStreamAsyncReplaceItemStreamAsyncDeleteItemStreamAsync方法和处理ResponseMessage(而不是ItemResponse)可提高可以实现的吞吐量。

后续步骤