Sdílet prostřednictvím


Migrace z knihovny Bulk Executor na podporu pro práci s velkým objemem dat v sadě .NET V3 SDK služby Azure Cosmos DB

PLATÍ PRO: NoSQL

Tento článek popisuje požadované kroky k migraci kódu existující aplikace, který používá knihovnu .NET bulk executor na funkci hromadné podpory v nejnovější verzi sady .NET SDK.

Umožnit hromadnou podporu

Povolení hromadné podpory instance CosmosClient prostřednictvím konfigurace AllowBulkExecution :

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Vytvoření úkolů pro každou operaci

Hromadná podpora v sadě .NET SDK funguje s využitím paralelní knihovny úloh a operací seskupování, ke kterým dochází souběžně.

Sada SDK neobsahuje jedinou metodu, která jako vstupní parametr vezme seznam dokumentů nebo operací, ale potřebujete vytvořit úlohu pro každou operaci, kterou chcete hromadně spustit, a pak jednoduše počkat, až se dokončí.

Pokud je například počáteční vstup seznamem položek, ve kterých má každá položka následující schéma:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Pokud chcete provést hromadný import (podobně jako BulkExecutor.BulkImportAsync), je nutné mít souběžná volání CreateItemAsync. Příklad:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Pokud chcete provést hromadnou aktualizaci (podobně jako pomocí BulkExecutor.BulkUpdateAsync), musíte mít souběžná volání metody ReplaceItemAsync po aktualizaci hodnoty položky. Příklad:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

A pokud chcete provést hromadné odstranění (podobně jako při použití BulkExecutor.BulkDeleteAsync), je třeba mít souběžná volání na DeleteItemAsync, spolu s klíčem oddílu id pro každou položku. Příklad:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Zaznamenat stav výsledku úkolu

V předchozích příkladech kódu jsme vytvořili souběžný seznam úkolů a pro každou z těchto úloh jsme volali metodu CaptureOperationResponse . Tato metoda je rozšíření, které nám umožňuje udržovat podobné schéma odpovědí jako BulkExecutor tím, že zachytává všechny chyby a sleduje využití jednotek žádostí.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

OperationResponse Kde je deklarován jako:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Souběžné spouštění operací

Ke sledování rozsahu celého seznamu úkolů používáme tuto pomocnou třídu:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

Metoda ExecuteAsync počká, dokud se nedokončí všechny operace, a můžete ji použít takto:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Zachytávání statistik

Předchozí kód čeká na dokončení všech operací a vypočítá požadované statistiky. Tyto statistiky jsou podobné statistice knihovny hromadného exekutora BulkImportResponse.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

Obsahuje BulkOperationResponse :

  1. Celková doba potřebná ke zpracování seznamu operací prostřednictvím hromadné podpory.
  2. Počet úspěšných operací.
  3. Celkový počet využitých jednotek žádostí
  4. Pokud dojde k selhání, zobrazí se seznam n-tic obsahujících výjimku a přidruženou položku pro účely protokolování a identifikace.

Konfigurace opakování

Knihovna Bulk Executor obsahovala pokyny, které se zmínily o nastavení MaxRetryWaitTimeInSeconds a MaxRetryAttemptsOnThrottledRequests v RetryOptions na 0, aby se řízení delegovalo na knihovnu.

Pro hromadnou podporu v sadě .NET SDK neexistuje žádné skryté chování. Možnosti opakování pokusů můžete nakonfigurovat přímo prostřednictvím CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests a CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Poznámka:

V případech, kdy jsou zřízené jednotky žádostí mnohem nižší než očekávané na základě množství dat, můžete zvážit nastavení těchto jednotek na vysoké hodnoty. Hromadná operace bude trvat déle, ale má větší šanci na úspěch kvůli vyššímu počtu opakovaných pokusů.

Zlepšení výkonu

Stejně jako u jiných operací s .NET SDK, použití rozhraní API datových proudů zajišťuje lepší výkon a vyhýbá se zbytečné serializaci.

Použití rozhraní API streamu je možné pouze v případě, že povaha dat, která používáte, odpovídá datovému proudu bajtů (například datových proudů souborů). Vtakovýchch CreateItemStreamAsyncReplaceItemStreamAsyncDeleteItemStreamAsyncResponseMessageItemResponse

Další kroky