本文說明將使用 .NET bulk executor函式庫 的現有應用程式程式碼遷移到最新版本.NET SDK的 批量支援 功能所需的步驟。
啟用批量支援
透過 AllowBulkExecution 設定啟用 CosmosClient 實例的批次執行支援:
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
為每個操作建立任務
.NET SDK 的批量支援是利用 任務平行函式庫 並進行同時進行的群組操作。
SDK 裡沒有單一方法會把你的文件或操作清單當作輸入參數,而是你需要為每個想批量執行的操作建立一個任務,然後等它們完成。
例如,如果你的初始輸入是一個項目清單,每個項目的結構如下:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
如果你想做批量匯入(類似使用 BulkExecutor.BulkImportAsync),你需要同時呼叫 CreateItemAsync。 例如:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
如果你想進行批量 更新(類似使用BulkExecutor.BulkUpdateAsync),在更新項目值後,需要並發呼叫ReplaceItemAsync方法。 例如:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
如果你想做批量刪除(類似使用BulkExecutor.BulkDeleteAsync),你需要同時呼叫DeleteItemAsync,並且提供id和每個項目的分割鍵。 例如:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
擷取任務結果的狀態
在前面的程式碼範例中,我們建立了一個並行任務清單,並在每個任務上呼叫了 CaptureOperationResponse 該方法。 此方法是一種擴充,讓我們能維持類似 BulkExecutor 的 回應結構 ,透過捕捉錯誤並追蹤 請求單元的使用情況。
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
其中宣告OperationResponse:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
同時執行操作
為了追蹤整個任務清單的範圍,我們使用這個輔助類別:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
這個 ExecuteAsync 方法會等到所有操作都完成後,你可以這樣使用:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
捕獲統計
前一程式碼會等到所有操作都完成後,計算所需的統計數據。 這些統計數據與 bulk executor 函式庫的 BulkImportResponse 類似。
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
BulkOperationResponse 包含的內容包括:
- 透過批次支援處理作業清單所花費的總時間。
- 成功手術的數量。
- 消耗的請求單位總數。
- 若發生失敗,會顯示包含例外及相關項目的元組清單,以便記錄與識別。
重試配置
Bulk executor 函式庫有指引提到要將MaxRetryWaitTimeInSecondsMaxRetryAttemptsOnThrottledRequestsRetryOptions設為0,以便將控制權委派給函式庫。
在 .NET SDK 中,對於批次支援,沒有任何隱藏的行為。 你可以直接透過 CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests 和 CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests 直接設定重試選項。
備註
如果配置的請求單元遠低於根據資料量預期的數值,你可以考慮將它們設為較高的值。 批量操作會花較長時間,但因為重複次數較高,成功機率更高。
效能改善
與 .NET SDK 的其他操作一樣,使用 stream API 能提升效能並避免不必要的序列化。
只有當你所用的資料性質與位元組流(例如檔案串流)相符時,才可使用串流 API。 在這種情況下,使用 CreateItemStreamAsync、 ReplaceItemStreamAsync或 DeleteItemStreamAsync 方法並以 ResponseMessage (取代 ItemResponse)來工作,可以提升可達成的吞吐量。
後續步驟
- 想了解更多 .NET SDK 版本,請參閱 Azure Cosmos DB SDK 文章。
- 從 GitHub 取得完整的 遷移原始碼 。
- GitHub 上有更多大量樣本
- 正在嘗試為遷移至 Azure Cosmos DB 進行容量規劃嗎?
- 如果您知道現有資料庫叢集中的虛擬核心和伺服器數目,請參閱使用虛擬核心或 vCPU 來估計要求單位
- 如果您知道目前資料庫工作負載的一般要求率,請參閱使用 Azure Cosmos DB 容量規劃工具來估計要求單位