分享方式:


從大量執行程式程式庫移轉至 Azure Cosmos DB .NET 第 3 版 SDK 的大量支援

適用於:NoSQL

本文描述將使用 .NET 大量執行程式程式庫的現有應用程式程式碼移轉至最新版 .NET SDK 中的大量支援功能所需的步驟。

啟用大量支援

透過 AllowBulkExecution 設定在 CosmosClient 執行個體上啟用大量支援:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

為每個作業建立工作

.NET SDK 大量支援的運作方式是利用工作平行程式庫和同時進行的群組作業。

SDK 中沒有任何單一方法可將您的文件或作業清單做為輸入參數,但您必須為您想要大量執行的每個作業建立工作,然後等候其完成。

例如,如果您的初始輸入是項目清單,其中每個項目都有下列結構描述:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

如果您想要進行大量匯入 (類似於使用 BulkExecutor.BulkImportAsync),您必須同時呼叫 CreateItemAsync。 例如:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

如果您想要大量更新 (類似於使用 BulkExecutor.BulkUpdateAsync),則在更新項目值之後,您必須同時呼叫 ReplaceItemAsync 方法。 例如:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

如果您想要大量刪除 (類似於使用 BulkExecutor.BulkDeleteAsync),則您必須使用每個項目的 id 和分割區索引鍵,對 DeleteItemAsync 進行同時呼叫。 例如:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

擷取工作結果狀態

在先前的程式碼範例中,我們建立了一份並行的工作清單,並在每個工作上呼叫 CaptureOperationResponse 方法。 此方法是一個延伸模組,可讓我們藉由擷取任何錯誤並追蹤要求單位使用方式,來維持與 BulkExecutor 類似的回應結構描述

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

其中會將 OperationResponse 宣告為:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

同時執行作業

為了追蹤整個工作清單的範圍,我們使用此協助程式類別:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

ExecuteAsync 方法會等候所有作業完成,而且您可以如下所示加以使用:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

擷取統計資料

先前的程式碼會等候所有作業完成,並計算所需的統計資料。 這些統計資料類似於大量執行程式程式庫的 BulkImportResponse

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

BulkOperationResponse 包含:

  1. 透過大量支援來處理作業清單所花費的時間總計。
  2. 成功的作業數目。
  3. 耗用的要求單位總計。
  4. 如果發生失敗,則會顯示包含例外狀況的元組清單,以及用於記錄和識別用途的相關聯項目。

重試設定

大量執行程式程式庫的指導,提到要將 RetryOptionsMaxRetryWaitTimeInSecondsMaxRetryAttemptsOnThrottledRequests 設定為 0,以將控制項委派給程式庫。

若要在 .NET SDK 中進行大量支援,則不會有隱藏的行為。 您可以直接透過 CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequestsCosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests 設定重試選項。

注意

如果佈建的要求單位遠低於預期的資料量,您可能會想要考慮將這些單位設定為較高的值。 大量作業需要較長的時間,但由於重試次數較高,完全成功的機率也較高。

效能改善

如同使用 .NET SDK 的其他作業,使用串流 API 會產生較佳的效能,並避免任何不必要的序列化。

只有當您使用的資料本質與位元組串流 (例如,檔案串流) 的本質相符時,才可能使用串流 API。 在這種情況下,使用 CreateItemStreamAsyncReplaceItemStreamAsyncDeleteItemStreamAsync 方法,並使用 ResponseMessage (而不是 ItemResponse) 會提高可達到的輸送量。

下一步