Delen via


Migreren van de bulkexecutorbibliotheek naar de bulkondersteuning in Azure Cosmos DB .NET V3 SDK

VAN TOEPASSING OP: NoSQL

In dit artikel worden de vereiste stappen beschreven voor het migreren van de code van een bestaande toepassing die gebruikmaakt van de .NET bulkexecutorbibliotheek naar de functie voor bulkondersteuning in de nieuwste versie van de .NET SDK.

Bulkondersteuning inschakelen

Schakel bulkondersteuning voor het CosmosClient exemplaar in via de AllowBulkExecution-configuratie :

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Taken maken voor elke bewerking

Bulkondersteuning in de .NET SDK werkt door gebruik te maken van de taakparallelbibliotheek en groeperingsbewerkingen die gelijktijdig plaatsvinden.

Er is geen enkele methode in de SDK die uw lijst met documenten of bewerkingen als invoerparameter gebruikt, maar u moet een taak maken voor elke bewerking die u bulksgewijs wilt uitvoeren en vervolgens gewoon wachten totdat ze zijn voltooid.

Als uw eerste invoer bijvoorbeeld een lijst is met items waarvan elk item het volgende schema heeft:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Als u bulkimport wilt uitvoeren (vergelijkbaar met het gebruik van BulkExecutor.BulkImportAsync), moet u gelijktijdige aanroepen naar CreateItemAsync. Voorbeeld:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Als u bulksgewijs wilt bijwerken (vergelijkbaar met het gebruik van BulkExecutor.BulkUpdateAsync), moet u gelijktijdige aanroepen naar de ReplaceItemAsync methode hebben na het bijwerken van de itemwaarde. Voorbeeld:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

En als u bulksgewijs wilt verwijderen (vergelijkbaar met het gebruik van BulkExecutor.BulkDeleteAsync), moet u gelijktijdige aanroepen naar DeleteItemAsync, met de id en partitiesleutel van elk item. Voorbeeld:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Status van taakresultaat vastleggen

In de vorige codevoorbeelden hebben we een gelijktijdige lijst met taken gemaakt en de CaptureOperationResponse methode voor elk van deze taken aangeroepen. Deze methode is een extensie waarmee we een vergelijkbaar antwoordschema kunnen onderhouden als BulkExecutor door fouten vast te leggen en het gebruik van aanvraageenheden bij te houden.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

Waar de OperationResponse wordt gedeclareerd als:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Gelijktijdige bewerkingen uitvoeren

Als u het bereik van de volledige lijst met taken wilt bijhouden, gebruiken we deze helperklasse:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

De ExecuteAsync methode wacht totdat alle bewerkingen zijn voltooid en u kunt deze als volgt gebruiken:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Statistieken vastleggen

De vorige code wacht totdat alle bewerkingen zijn voltooid en berekent de vereiste statistieken. Deze statistieken zijn vergelijkbaar met die van de bulkexecutorbibliotheek BulkImportResponse.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

De BulkOperationResponse volgende bevat:

  1. De totale tijd die nodig is om de lijst met bewerkingen te verwerken via bulkondersteuning.
  2. Het aantal geslaagde bewerkingen.
  3. Het totale aantal verbruikte aanvraageenheden.
  4. Als er fouten optreden, wordt er een lijst weergegeven met tuples die de uitzondering en het bijbehorende item bevatten voor logboekregistratie en identificatiedoeleinden.

Configuratie opnieuw proberen

Bulkexecutorbibliotheek had richtlijnen die vermeldden om het MaxRetryWaitTimeInSeconds en MaxRetryAttemptsOnThrottledRequests van RetryOptions in te stellen om 0 het beheer aan de bibliotheek te delegeren.

Voor bulkondersteuning in de .NET SDK is er geen verborgen gedrag. U kunt de opties voor opnieuw proberen rechtstreeks configureren via cosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests en CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Notitie

In gevallen waarin de ingerichte aanvraageenheden veel lager zijn dan verwacht op basis van de hoeveelheid gegevens, kunt u overwegen deze in te stellen op hoge waarden. De bulkbewerking duurt langer, maar heeft een hogere kans om volledig te slagen vanwege de hogere nieuwe pogingen.

Prestatieverbeteringen

Net als bij andere bewerkingen met de .NET SDK leidt het gebruik van de stream-API's tot betere prestaties en voorkomt u onnodige serialisatie.

Het gebruik van stream-API's is alleen mogelijk als de aard van de gegevens die u gebruikt overeenkomt met die van een stroom van bytes (bijvoorbeeld bestandsstromen). In dergelijke gevallen verhoogt het gebruik van de CreateItemStreamAsync, ReplaceItemStreamAsyncof DeleteItemStreamAsync methoden en het werken met ResponseMessage (in plaats van ItemResponse) de doorvoer die kan worden bereikt.

Volgende stappen