從大量執行程式程式庫移轉至 Azure Cosmos DB .NET 第 3 版 SDK 的大量支援
適用於:NoSQL
本文描述將使用 .NET 大量執行程式程式庫的現有應用程式程式碼移轉至最新版 .NET SDK 中的大量支援功能所需的步驟。
啟用大量支援
透過 AllowBulkExecution 設定在 CosmosClient
執行個體上啟用大量支援:
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
為每個作業建立工作
.NET SDK 大量支援的運作方式是利用工作平行程式庫和同時進行的群組作業。
SDK 中沒有任何單一方法可將您的文件或作業清單做為輸入參數,但您必須為您想要大量執行的每個作業建立工作,然後等候其完成。
例如,如果您的初始輸入是項目清單,其中每個項目都有下列結構描述:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
如果您想要進行大量匯入 (類似於使用 BulkExecutor.BulkImportAsync),您必須同時呼叫 CreateItemAsync
。 例如:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
如果您想要大量更新 (類似於使用 BulkExecutor.BulkUpdateAsync),則在更新項目值之後,您必須同時呼叫 ReplaceItemAsync
方法。 例如:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
如果您想要大量刪除 (類似於使用 BulkExecutor.BulkDeleteAsync),則您必須使用每個項目的 id
和分割區索引鍵,對 DeleteItemAsync
進行同時呼叫。 例如:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
擷取工作結果狀態
在先前的程式碼範例中,我們建立了一份並行的工作清單,並在每個工作上呼叫 CaptureOperationResponse
方法。 此方法是一個延伸模組,可讓我們藉由擷取任何錯誤並追蹤要求單位使用方式,來維持與 BulkExecutor 類似的回應結構描述。
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
其中會將 OperationResponse
宣告為:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
同時執行作業
為了追蹤整個工作清單的範圍,我們使用此協助程式類別:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
ExecuteAsync
方法會等候所有作業完成,而且您可以如下所示加以使用:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
擷取統計資料
先前的程式碼會等候所有作業完成,並計算所需的統計資料。 這些統計資料類似於大量執行程式程式庫的 BulkImportResponse。
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
BulkOperationResponse
包含:
- 透過大量支援來處理作業清單所花費的時間總計。
- 成功的作業數目。
- 耗用的要求單位總計。
- 如果發生失敗,則會顯示包含例外狀況的元組清單,以及用於記錄和識別用途的相關聯項目。
重試設定
大量執行程式程式庫的指導,提到要將 RetryOptions 的 MaxRetryWaitTimeInSeconds
和 MaxRetryAttemptsOnThrottledRequests
設定為 0
,以將控制項委派給程式庫。
若要在 .NET SDK 中進行大量支援,則不會有隱藏的行為。 您可以直接透過 CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests 和 CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests 設定重試選項。
注意
如果佈建的要求單位遠低於預期的資料量,您可能會想要考慮將這些單位設定為較高的值。 大量作業需要較長的時間,但由於重試次數較高,完全成功的機率也較高。
效能改善
如同使用 .NET SDK 的其他作業,使用串流 API 會產生較佳的效能,並避免任何不必要的序列化。
只有當您使用的資料本質與位元組串流 (例如,檔案串流) 的本質相符時,才可能使用串流 API。 在這種情況下,使用 CreateItemStreamAsync
、ReplaceItemStreamAsync
或 DeleteItemStreamAsync
方法,並使用 ResponseMessage
(而不是 ItemResponse
) 會提高可達到的輸送量。
下一步
- 若要深入瞭解 .NET SDK 版本,請參閱 Azure Cosmos DB SDK 文章。
- 從 GitHub 取得完整的移轉原始程式碼。
- GitHub 上的其他大量樣本
- 正在嘗試為遷移至 Azure Cosmos DB 進行容量規劃嗎?
- 如果您知道現有資料庫叢集中的虛擬核心和伺服器數目,請參閱使用虛擬核心或 vCPU 來估計要求單位
- 如果您知道目前資料庫工作負載的一般要求率,請參閱使用 Azure Cosmos DB 容量規劃工具來估計要求單位