你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
本文介绍将使用 .NET 批量执行程序库 的现有应用程序代码迁移到最新版本 .NET SDK 中的 批量支持 功能所需的步骤。
启用批量支持
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
为每个操作创建任务
.NET SDK 中的批量支持通过利用 任务并行库 来对同时发生的操作进行分组。
SDK 中没有将文档或操作列表作为输入参数的单个方法,而是需要为要批量执行的每个操作创建一个 Task,然后只需等待这些任务完成。
例如,如果您的初始输入是一个每项具有以下架构的项目列表:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
如果要进行批量导入(类似于使用 BulkExecutor.BulkImportAsync),则需要对CreateItemAsync进行并发调用。 例如:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
如果要进行批量 更新 (类似于使用 BulkExecutor.BulkUpdateAsync),则需要在更新项值后对方法进行并发调用 ReplaceItemAsync 。 例如:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
如果要进行批量删除(类似于使用BulkExecutor.BulkDeleteAsync),需要对DeleteItemAsync进行并发调用,并传入每个项的id和分区键。 例如:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
捕获任务结果状态
在前面的代码示例中,我们创建了一个并发任务列表,并在其中每个任务上调用 CaptureOperationResponse 该方法。 此方法是一个扩展,通过捕获任何错误并跟踪请求单位使用情况,我们可以维护与 BulkExecutor 类似的响应架构。
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
关于OperationResponse的声明如下:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
并发执行操作
若要跟踪整个任务列表的范围,我们将使用此帮助程序类:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
该方法 ExecuteAsync 将等到所有操作完成,您可以像下面这样使用它:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
捕获统计信息
前面的代码会等待所有操作完成后再计算所需的统计信息。 这些统计信息类似于批量执行程序库的 BulkImportResponse 的统计信息。
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
BulkOperationResponse 包含:
- 批量处理操作列表的总耗时。
- 成功的操作次数。
- 消耗的请求单位总数。
- 如果发生故障,它将显示一个包含异常和相关项的元组列表,以用于日志记录和识别目的。
重试配置
批量执行程序库提供了指导,指出需要设置MaxRetryWaitTimeInSecondsMaxRetryAttemptsOnThrottledRequests和RetryOptions的0,以将控制权委托给库。
对于 .NET SDK 中的批量支持,没有隐藏的功能。 可以直接通过 CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests 和 CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests 配置重试选项。
注释
如果预配的请求单位远远低于预期的数据量,可能需要考虑将这些单位设置为高值。 批量操作需要更长的时间,但由于重试次数的增加,操作完全成功的可能性更高。
性能改进
与使用 .NET SDK 的其他作一样,使用流 API 可提供更好的性能,并避免任何不必要的序列化。
仅当使用的数据的性质与字节流(例如文件流)匹配时,才能使用流 API。 在这种情况下,使用CreateItemStreamAsync或ReplaceItemStreamAsyncDeleteItemStreamAsync方法和处理ResponseMessage(而不是ItemResponse)可提高可以实现的吞吐量。
后续步骤
- 若要详细了解 .NET SDK 版本,请参阅 Azure Cosmos DB SDK 一文。
- 从 GitHub 获取完整的 迁移源代码 。
- GitHub 上的其他批量示例
- 尝试为迁移到 Azure Cosmos DB 进行容量规划?
- 如果只知道现有数据库群集中的 vCore 和服务器数量,请阅读使用 vCore 或 vCPU 估算请求单位
- 若知道当前数据库工作负载的典型请求速率,请阅读使用 Azure Cosmos DB 容量计划工具估算请求单位