共用方式為


從批量執行庫遷移到 Azure Cosmos DB .NET V3 SDK 的批量支援

本文說明將使用 .NET bulk executor函式庫 的現有應用程式程式碼遷移到最新版本.NET SDK的 批量支援 功能所需的步驟。

啟用批量支援

透過 AllowBulkExecution 設定啟用 CosmosClient 實例的批次執行支援:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

為每個操作建立任務

.NET SDK 的批量支援是利用 任務平行函式庫 並進行同時進行的群組操作。

SDK 裡沒有單一方法會把你的文件或操作清單當作輸入參數,而是你需要為每個想批量執行的操作建立一個任務,然後等它們完成。

例如,如果你的初始輸入是一個項目清單,每個項目的結構如下:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

如果你想做批量匯入(類似使用 BulkExecutor.BulkImportAsync),你需要同時呼叫 CreateItemAsync。 例如:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

如果你想進行批量 更新(類似使用BulkExecutor.BulkUpdateAsync),在更新項目值後,需要並發呼叫ReplaceItemAsync方法。 例如:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

如果你想做批量刪除(類似使用BulkExecutor.BulkDeleteAsync),你需要同時呼叫DeleteItemAsync,並且提供id和每個項目的分割鍵。 例如:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

擷取任務結果的狀態

在前面的程式碼範例中,我們建立了一個並行任務清單,並在每個任務上呼叫了 CaptureOperationResponse 該方法。 此方法是一種擴充,讓我們能維持類似 BulkExecutor 的 回應結構 ,透過捕捉錯誤並追蹤 請求單元的使用情況

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

其中宣告OperationResponse

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

同時執行操作

為了追蹤整個任務清單的範圍,我們使用這個輔助類別:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

這個 ExecuteAsync 方法會等到所有操作都完成後,你可以這樣使用:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

捕獲統計

前一程式碼會等到所有操作都完成後,計算所需的統計數據。 這些統計數據與 bulk executor 函式庫的 BulkImportResponse 類似。

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

BulkOperationResponse 包含的內容包括:

  1. 透過批次支援處理作業清單所花費的總時間。
  2. 成功手術的數量。
  3. 消耗的請求單位總數。
  4. 若發生失敗,會顯示包含例外及相關項目的元組清單,以便記錄與識別。

重試配置

Bulk executor 函式庫有指引提到要將MaxRetryWaitTimeInSecondsMaxRetryAttemptsOnThrottledRequestsRetryOptions設為0,以便將控制權委派給函式庫。

在 .NET SDK 中,對於批次支援,沒有任何隱藏的行為。 你可以直接透過 CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequestsCosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests 直接設定重試選項。

備註

如果配置的請求單元遠低於根據資料量預期的數值,你可以考慮將它們設為較高的值。 批量操作會花較長時間,但因為重複次數較高,成功機率更高。

效能改善

與 .NET SDK 的其他操作一樣,使用 stream API 能提升效能並避免不必要的序列化。

只有當你所用的資料性質與位元組流(例如檔案串流)相符時,才可使用串流 API。 在這種情況下,使用 CreateItemStreamAsyncReplaceItemStreamAsyncDeleteItemStreamAsync 方法並以 ResponseMessage (取代 ItemResponse)來工作,可以提升可達成的吞吐量。

後續步驟