Migrieren von der Bulk Executor-Bibliothek zur Unterstützung von Massenvorgängen im .NET SDK V3 von Azure Cosmos DB
GILT FÜR: NoSQL
In diesem Artikel werden die erforderlichen Schritte zum Migrieren eines vorhandenen Anwendungscodes, der die Bulk Executor-Bibliothek von .NET verwendet, zur Unterstützung von Massenvorgängen in der neuesten Version des .NET SDK beschrieben.
Aktivieren der Unterstützung von Massenvorgängen
Sie aktivieren die Unterstützung von Massenvorgängen für die CosmosClient
-Instanz über die AllowBulkExecution-Konfiguration:
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Erstellen von Aufgaben für jeden Vorgang
Für die Unterstützung von Massenvorgängen im .NET SDK werden die Task Parallel Library und parallel ausgeführte Gruppierungsvorgänge genutzt.
Im SDK gibt es keine einzelne Methode, die Ihre Liste mit Dokumenten oder Vorgängen als Eingabeparameter übernimmt. Sie müssen also eine Aufgabe für jeden Vorgang erstellen, den Sie in einem Massenvorgang ausführen möchten, und dann können Sie einfach warten, bis alle Aufgaben ausgeführt wurden.
Angenommen, die anfängliche Eingabe ist beispielsweise eine Liste von Elementen mit folgendem Schema:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Wenn Sie einen Massenimport ausführen möchten (ähnlich der Verwendung von „BulkExecutor.BulkImportAsync“), erfordert dies gleichzeitige Aufrufe von CreateItemAsync
. Beispiel:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Wenn Sie ein Massenupdate ausführen möchten (ähnlich der Verwendung von BulkExecutor.BulkUpdateAsync), erfordert dies gleichzeitige Aufrufe der ReplaceItemAsync
-Methode nach dem Aktualisieren jedes Elementwerts. Beispiel:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
Wenn Sie eine Massenlöschung ausführen möchten (ähnlich der Verwendung von BulkExecutor.BulkDeleteAsync), erfordert dies gleichzeitige Aufrufe von DeleteItemAsync
mit der id
und dem Partitionsschlüssel für jede Element. Beispiel:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Ergebniszustand der Erfassungsaufgabe
In den vorherigen Codebeispielen haben wir eine Liste paralleler Aufgaben erstellt und für jede dieser Aufgaben die CaptureOperationResponse
-Methode aufgerufen. Bei dieser Methode handelt es sich um eine Erweiterung, mit der Sie ein ähnliches Antwortschema wie bei BulkExecutor verwalten können, indem Sie Fehler erfassen und die Verwendung von Anforderungseinheiten verfolgen.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
Dabei ist die OperationResponse
deklariert als:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Paralleles Ausführen von Vorgängen
Um den Umfang der gesamten Aufgabenliste nachzuverfolgen, verwenden wir die folgende Hilfsklasse:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
Die ExecuteAsync
-Methode wartet, bis alle Vorgänge abgeschlossen sind, und Sie können sie wie folgt verwenden:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Erfassen von Statistiken
Der vorherige Code wartet, bis alle Vorgänge abgeschlossen sind, und berechnet dann die erforderlichen Statistiken. Diese Statistiken ähneln der BulkImportResponse der Bulk Executor-Bibliothek.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
Die BulkOperationResponse
enthält Folgendes:
- Die Gesamtdauer der Verarbeitung der Vorgangsliste durch die Unterstützung von Massenvorgängen
- Die Anzahl der erfolgreichen Vorgänge
- Die Anzahl der verbrauchten Anforderungseinheiten
- Wenn Fehler auftreten, wird eine Liste von Tupeln angezeigt, die die Ausnahme und das zugehörige Element für die Protokollierung und Identifizierung enthalten.
Konfigurieren von Wiederholungen
Für die Bulk Executor-Bibliothek stand eine Anleitung zur Verfügung, in der das Festlegen von MaxRetryWaitTimeInSeconds
und MaxRetryAttemptsOnThrottledRequests
für RetryOptions auf 0
empfohlen wurde, um die Steuerung an die Bibliothek zu delegieren.
Bei der Unterstützung von Massenvorgängen im .NET SDK gibt es kein verborgenes Verhalten. Sie können die Wiederholungsoptionen direkt über CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests und CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests konfigurieren.
Hinweis
Falls die bereitgestellten Anforderungseinheiten erheblich geringer als aufgrund der Menge an Daten erwartet ausfallen, sollten Sie höhere Werte festlegen. Der Massenvorgang dauert dann zwar länger, aber die Wahrscheinlichkeit, dass er vollständig erfolgreich ausgeführt wird, ist aufgrund der höheren Wiederholungsrate höher.
Leistungsverbesserungen
Wie bei anderen Vorgängen mit dem .NET SDK führt die Verwendung der Stream-APIs zu einer besseren Leistung und einer Vermeidung unnötiger Serialisierungen.
Die Verwendung von Stream-APIs ist nur möglich, wenn die verwendeten Datentypen, mit denen eines Bytestreams (z. B. Dateidatenströme) übereinstimmen. In solchen Fällen erhöht sich der Durchsatz, der erreicht werden kann, wenn Sie die Methoden CreateItemStreamAsync
, ReplaceItemStreamAsync
oder DeleteItemStreamAsync
und die ResponseMessage
(anstelle der ItemResponse
) verwenden.
Nächste Schritte
- Weitere Informationen zu den .NET SDK-Releases finden Sie im Artikel Azure Cosmos DB SDK.
- Den vollständigen Quellcode zur Migration finden Sie auf GitHub.
- Weitere Beispiele für Massenvorgänge auf GitHub
- Versuchen Sie, eine Kapazitätsplanung für eine Migration auf Azure Cosmos DB vorzunehmen?
- Wenn Sie nur die Anzahl der virtuellen Kerne und Server in Ihrem vorhandenen Datenbankcluster kennen, lesen Sie die Informationen zum Schätzen von Anforderungseinheiten mithilfe von virtuellen Kernen oder virtuellen CPUs
- Wenn Sie die typischen Anforderungsraten für Ihre aktuelle Datenbankworkload kennen, lesen Sie die Informationen zum Schätzen von Anforderungseinheiten mit dem Azure Cosmos DB-Kapazitätsplaner