Sdílet prostřednictvím


Migrace z knihovny Bulk Executor na hromadnou podporu v sadě .NET SDK služby Azure Cosmos DB v3

PLATÍ PRO: NoSQL

Tento článek popisuje požadované kroky k migraci kódu existující aplikace, který používá knihovnu bulk Executor .NET k funkci hromadné podpory v nejnovější verzi sady .NET SDK.

Povolení hromadné podpory

Povolení hromadné podpory instance CosmosClient prostřednictvím konfigurace AllowBulkExecution :

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Vytvoření úkolů pro každou operaci

Hromadná podpora v sadě .NET SDK funguje s využitím paralelní knihovny úloh a operací seskupování, ke kterým dochází souběžně.

Sada SDK neobsahuje jedinou metodu, která jako vstupní parametr vezme seznam dokumentů nebo operací, ale potřebujete vytvořit úlohu pro každou operaci, kterou chcete hromadně spustit, a pak jednoduše počkat, až se dokončí.

Pokud je například počáteční vstup seznamem položek, ve kterých má každá položka následující schéma:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Pokud chcete provést hromadný import (podobně jako bulkExecutor.BulkImportAsync), musíte mít souběžná volání CreateItemAsync. Příklad:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Pokud chcete provést hromadnou aktualizaci (podobně jako pomocí BulkExecutor.BulkUpdateAsync), musíte mít souběžná volání metody ReplaceItemAsync po aktualizaci hodnoty položky. Příklad:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

A pokud chcete provést hromadné odstranění (podobně jako bulkExecutor.BulkDeleteAsync), musíte mít souběžná volání DeleteItemAsync, s klíčem oddílu id každé položky. Příklad:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Zaznamenat stav výsledku úkolu

V předchozích příkladech kódu jsme vytvořili souběžný seznam úkolů a pro každou z těchto úloh jsme volali metodu CaptureOperationResponse . Tato metoda je rozšíření, které nám umožňuje udržovat podobné schéma odpovědí jako BulkExecutor tím, že zachytává všechny chyby a sleduje využití jednotek žádostí.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

OperationResponse Kde je deklarován jako:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Souběžné spouštění operací

Ke sledování rozsahu celého seznamu úkolů používáme tuto pomocnou třídu:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

Metoda ExecuteAsync počká, dokud se nedokončí všechny operace, a můžete ji použít takto:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Zachytávání statistik

Předchozí kód čeká na dokončení všech operací a vypočítá požadované statistiky. Tyto statistiky jsou podobné této knihovně BulkImportResponse knihovny BulkImportResponse.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

Obsahuje BulkOperationResponse :

  1. Celková doba potřebná ke zpracování seznamu operací prostřednictvím hromadné podpory.
  2. Počet úspěšných operací.
  3. Celkový počet spotřebovaných jednotek žádostí
  4. Pokud dojde k selhání, zobrazí se seznam řazených kolekcí členů, které obsahují výjimku, a přidruženou položku pro účely protokolování a identifikace.

Konfigurace opakování

Knihovna Bulk Executor obsahovala pokyny, které se zmínily o nastavení MaxRetryWaitTimeInSeconds a MaxRetryAttemptsOnThrottledRequests opakování, aby 0 delegovala řízení do knihovny.

Pro hromadnou podporu v sadě .NET SDK neexistuje žádné skryté chování. Možnosti opakování můžete nakonfigurovat přímo prostřednictvím CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests a CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Poznámka:

V případech, kdy jsou zřízené jednotky žádostí mnohem nižší než očekávané na základě množství dat, můžete zvážit nastavení těchto jednotek na vysoké hodnoty. Hromadná operace bude trvat déle, ale má větší šanci na to, že úspěšně proběhne kvůli vyšším opakovaným pokusům.

Zlepšení výkonu

Stejně jako u jiných operací se sadou .NET SDK má použití rozhraní API datových proudů za následek lepší výkon a vyhnout se zbytečnému serializaci.

Použití rozhraní API streamu je možné pouze v případě, že povaha dat, která používáte, odpovídá datovému proudu bajtů (například datových proudů souborů). Vtakovýchch CreateItemStreamAsyncReplaceItemStreamAsyncDeleteItemStreamAsync ResponseMessage ItemResponse

Další kroky