Freigeben über


Migrieren von der Bulk Executor-Bibliothek zur Unterstützung von Massenvorgängen im .NET SDK V3 von Azure Cosmos DB

GILT FÜR: NoSQL

In diesem Artikel werden die erforderlichen Schritte zum Migrieren eines vorhandenen Anwendungscodes, der die Bulk Executor-Bibliothek von .NET verwendet, zur Unterstützung von Massenvorgängen in der neuesten Version des .NET SDK beschrieben.

Aktivieren der Unterstützung von Massenvorgängen

Sie aktivieren die Unterstützung von Massenvorgängen für die CosmosClient-Instanz über die AllowBulkExecution-Konfiguration:

new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });

Erstellen von Aufgaben für jeden Vorgang

Für die Unterstützung von Massenvorgängen im .NET SDK werden die Task Parallel Library und parallel ausgeführte Gruppierungsvorgänge genutzt.

Im SDK gibt es keine einzelne Methode, die Ihre Liste mit Dokumenten oder Vorgängen als Eingabeparameter übernimmt. Sie müssen also eine Aufgabe für jeden Vorgang erstellen, den Sie in einem Massenvorgang ausführen möchten, und dann können Sie einfach warten, bis alle Aufgaben ausgeführt wurden.

Angenommen, die anfängliche Eingabe ist beispielsweise eine Liste von Elementen mit folgendem Schema:

public class MyItem
{
    public string id { get; set; }

    public string pk { get; set; }

    public int operationCounter { get; set; } = 0;
}

Wenn Sie einen Massenimport ausführen möchten (ähnlich der Verwendung von „BulkExecutor.BulkImportAsync“), erfordert dies gleichzeitige Aufrufe von CreateItemAsync. Beispiel:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}

Wenn Sie ein Massenupdate ausführen möchten (ähnlich der Verwendung von BulkExecutor.BulkUpdateAsync), erfordert dies gleichzeitige Aufrufe der ReplaceItemAsync-Methode nach dem Aktualisieren jedes Elementwerts. Beispiel:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}

Wenn Sie eine Massenlöschung ausführen möchten (ähnlich der Verwendung von BulkExecutor.BulkDeleteAsync), erfordert dies gleichzeitige Aufrufe von DeleteItemAsync mit der id und dem Partitionsschlüssel für jede Element. Beispiel:

BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
    document.operationCounter++;
    bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}

Ergebniszustand der Erfassungsaufgabe

In den vorherigen Codebeispielen haben wir eine Liste paralleler Aufgaben erstellt und für jede dieser Aufgaben die CaptureOperationResponse-Methode aufgerufen. Bei dieser Methode handelt es sich um eine Erweiterung, mit der Sie ein ähnliches Antwortschema wie bei BulkExecutor verwalten können, indem Sie Fehler erfassen und die Verwendung von Anforderungseinheiten verfolgen.

private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
    try
    {
        ItemResponse<T> response = await task;
        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = true,
            RequestUnitsConsumed = task.Result.RequestCharge
        };
    }
    catch (Exception ex)
    {
        if (ex is CosmosException cosmosException)
        {
            return new OperationResponse<T>()
            {
                Item = item,
                RequestUnitsConsumed = cosmosException.RequestCharge,
                IsSuccessful = false,
                CosmosException = cosmosException
            };
        }

        return new OperationResponse<T>()
        {
            Item = item,
            IsSuccessful = false,
            CosmosException = ex
        };
    }
}

Dabei ist die OperationResponse deklariert als:

public class OperationResponse<T>
{
    public T Item { get; set; }
    public double RequestUnitsConsumed { get; set; } = 0;
    public bool IsSuccessful { get; set; }
    public Exception CosmosException { get; set; }
}

Paralleles Ausführen von Vorgängen

Um den Umfang der gesamten Aufgabenliste nachzuverfolgen, verwenden wir die folgende Hilfsklasse:

public class BulkOperations<T>
{
    public readonly List<Task<OperationResponse<T>>> Tasks;

    private readonly Stopwatch stopwatch = Stopwatch.StartNew();

    public BulkOperations(int operationCount)
    {
        this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
    }

    public async Task<BulkOperationResponse<T>> ExecuteAsync()
    {
        await Task.WhenAll(this.Tasks);
        this.stopwatch.Stop();
        return new BulkOperationResponse<T>()
        {
            TotalTimeTaken = this.stopwatch.Elapsed,
            TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
            SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
            Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
        };
    }
}

Die ExecuteAsync-Methode wartet, bis alle Vorgänge abgeschlossen sind, und Sie können sie wie folgt verwenden:

BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();

Erfassen von Statistiken

Der vorherige Code wartet, bis alle Vorgänge abgeschlossen sind, und berechnet dann die erforderlichen Statistiken. Diese Statistiken ähneln der BulkImportResponse der Bulk Executor-Bibliothek.

public class BulkOperationResponse<T>
{
    public TimeSpan TotalTimeTaken { get; set; }
    public int SuccessfulDocuments { get; set; } = 0;
    public double TotalRequestUnitsConsumed { get; set; } = 0;

    public IReadOnlyList<(T, Exception)> Failures { get; set; }
}

Die BulkOperationResponse enthält Folgendes:

  1. Die Gesamtdauer der Verarbeitung der Vorgangsliste durch die Unterstützung von Massenvorgängen
  2. Die Anzahl der erfolgreichen Vorgänge
  3. Die Anzahl der verbrauchten Anforderungseinheiten
  4. Wenn Fehler auftreten, wird eine Liste von Tupeln angezeigt, die die Ausnahme und das zugehörige Element für die Protokollierung und Identifizierung enthalten.

Konfigurieren von Wiederholungen

Für die Bulk Executor-Bibliothek stand eine Anleitung zur Verfügung, in der das Festlegen von MaxRetryWaitTimeInSeconds und MaxRetryAttemptsOnThrottledRequests für RetryOptions auf 0 empfohlen wurde, um die Steuerung an die Bibliothek zu delegieren.

Bei der Unterstützung von Massenvorgängen im .NET SDK gibt es kein verborgenes Verhalten. Sie können die Wiederholungsoptionen direkt über CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests und CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests konfigurieren.

Hinweis

Falls die bereitgestellten Anforderungseinheiten erheblich geringer als aufgrund der Menge an Daten erwartet ausfallen, sollten Sie höhere Werte festlegen. Der Massenvorgang dauert dann zwar länger, aber die Wahrscheinlichkeit, dass er vollständig erfolgreich ausgeführt wird, ist aufgrund der höheren Wiederholungsrate höher.

Leistungsverbesserungen

Wie bei anderen Vorgängen mit dem .NET SDK führt die Verwendung der Stream-APIs zu einer besseren Leistung und einer Vermeidung unnötiger Serialisierungen.

Die Verwendung von Stream-APIs ist nur möglich, wenn die verwendeten Datentypen, mit denen eines Bytestreams (z. B. Dateidatenströme) übereinstimmen. In solchen Fällen erhöht sich der Durchsatz, der erreicht werden kann, wenn Sie die Methoden CreateItemStreamAsync, ReplaceItemStreamAsync oder DeleteItemStreamAsync und die ResponseMessage (anstelle der ItemResponse) verwenden.

Nächste Schritte