Share via


Migreren van de bulkexecutorbibliotheek naar de bulkondersteuning in Azure Cosmos DB .NET V3 SDK

VAN TOEPASSING OP: NoSQL

In dit artikel worden de vereiste stappen beschreven voor het migreren van de code van een bestaande toepassing die gebruikmaakt van de .NET bulkexecutorbibliotheek naar de functie voor bulkondersteuning in de nieuwste versie van de .NET SDK.

Bulkondersteuning inschakelen

Schakel bulksgewijs ondersteuning voor het CosmosClient exemplaar in via de configuratie AllowBulkExecution :

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Taken maken voor elke bewerking

Bulkondersteuning in de .NET SDK werkt door gebruik te maken van de taakparallelbibliotheek en groeperingsbewerkingen die gelijktijdig plaatsvinden.

Er is geen enkele methode in de SDK die uw lijst met documenten of bewerkingen als invoerparameter gebruikt, maar in plaats daarvan moet u een taak maken voor elke bewerking die u bulksgewijs wilt uitvoeren en vervolgens wachten totdat ze zijn voltooid.

Als uw eerste invoer bijvoorbeeld een lijst met items is waarvoor elk item het volgende schema heeft:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Als u bulkimport wilt uitvoeren (vergelijkbaar met het gebruik van BulkExecutor.BulkImportAsync), moet u gelijktijdige aanroepen naar CreateItemAsynchebben. Bijvoorbeeld:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Als u bulksgewijs wilt bijwerken (vergelijkbaar met het gebruik van BulkExecutor.BulkUpdateAsync), moet u gelijktijdige aanroepen naar ReplaceItemAsync de methode hebben nadat de itemwaarde is bijgewerkt. Bijvoorbeeld:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

En als u bulksgewijs verwijderen wilt uitvoeren (vergelijkbaar met het gebruik van BulkExecutor.BulkDeleteAsync), moet u gelijktijdig aanroepen hebben naar DeleteItemAsync, met de id partitiesleutel en van elk item. Bijvoorbeeld:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Status van taakresultaat vastleggen

In de vorige codevoorbeelden hebben we een gelijktijdige lijst met taken gemaakt en de CaptureOperationResponse methode voor elk van deze taken aangeroepen. Deze methode is een extensie waarmee we een vergelijkbaar antwoordschema kunnen onderhouden als BulkExecutor, door eventuele fouten vast te leggen en het gebruik van aanvraageenheden bij te houden.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

Wanneer de OperationResponse wordt aangegeven als:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Bewerkingen gelijktijdig uitvoeren

We gebruiken deze helperklasse om het bereik van de volledige lijst met taken bij te houden:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

De ExecuteAsync methode wacht totdat alle bewerkingen zijn voltooid en u kunt deze als volgt gebruiken:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Statistieken vastleggen

De vorige code wacht totdat alle bewerkingen zijn voltooid en berekent de vereiste statistieken. Deze statistieken zijn vergelijkbaar met die van de bulkexecutorbibliotheek BulkImportResponse.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

De BulkOperationResponse bevat:

  1. De totale tijd die nodig is om de lijst met bewerkingen via bulksgewijs te verwerken.
  2. Het aantal geslaagde bewerkingen.
  3. Het totaal van de verbruikte aanvraageenheden.
  4. Als er fouten zijn, wordt een lijst met tuples weergegeven die de uitzondering en het bijbehorende item bevatten voor logboekregistratie en identificatiedoeleinden.

Configuratie opnieuw proberen

Bulkexecutorbibliotheek bevat richtlijnen voor het instellen van de MaxRetryWaitTimeInSeconds en MaxRetryAttemptsOnThrottledRequests van RetryOptions op 0 om het beheer naar de bibliotheek te delegeren.

Voor bulkondersteuning in de .NET SDK is er geen verborgen gedrag. U kunt de opties voor opnieuw proberen rechtstreeks configureren via CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests en CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests.

Notitie

In gevallen waarin de ingerichte aanvraageenheden veel lager zijn dan verwacht op basis van de hoeveelheid gegevens, kunt u overwegen deze in te stellen op hoge waarden. De bulkbewerking duurt langer, maar heeft een grotere kans om volledig te slagen vanwege de hogere nieuwe pogingen.

Prestatieverbeteringen

Net als bij andere bewerkingen met de .NET SDK leidt het gebruik van de stream-API's tot betere prestaties en wordt onnodige serialisatie voorkomen.

Het gebruik van stream-API's is alleen mogelijk als de aard van de gegevens die u gebruikt overeenkomt met die van een stroom bytes (bijvoorbeeld bestandsstromen). In dergelijke gevallen verhoogt het gebruik van de CreateItemStreamAsyncmethoden , ReplaceItemStreamAsyncof DeleteItemStreamAsync en het werken met ResponseMessage (in plaats van ItemResponse) de doorvoer die kan worden bereikt.

Volgende stappen