Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
GILT FÜR: NoSQL
In diesem Artikel werden die erforderlichen Schritte zum Migrieren eines vorhandenen Anwendungscodes, der die Bulk Executor-Bibliothek von .NET verwendet, zur Unterstützung von Massenvorgängen in der neuesten Version des .NET SDK beschrieben.
Aktivieren der Unterstützung von Massenvorgängen
Sie aktivieren die Unterstützung von Massenvorgängen für die CosmosClient-Instanz über die AllowBulkExecution-Konfiguration:
new CosmosClient(endpoint, authKey, new CosmosClientOptions() { AllowBulkExecution = true });
Erstellen von Aufgaben für jeden Vorgang
Für die Unterstützung von Massenvorgängen im .NET SDK werden die Task Parallel Library und parallel ausgeführte Gruppierungsvorgänge genutzt.
Im SDK gibt es keine einzelne Methode, die Ihre Liste mit Dokumenten oder Vorgängen als Eingabeparameter übernimmt. Sie müssen also eine Aufgabe für jeden Vorgang erstellen, den Sie in einem Massenvorgang ausführen möchten, und dann können Sie einfach warten, bis alle Aufgaben ausgeführt wurden.
Angenommen, die anfängliche Eingabe ist beispielsweise eine Liste von Elementen mit folgendem Schema:
public class MyItem
{
public string id { get; set; }
public string pk { get; set; }
public int operationCounter { get; set; } = 0;
}
Wenn Sie einen Massenimport ausführen möchten (ähnlich der Verwendung von „BulkExecutor.BulkImportAsync“), erfordert dies gleichzeitige Aufrufe von CreateItemAsync. Beispiel:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
bulkOperations.Tasks.Add(CaptureOperationResponse(container.CreateItemAsync(document, new PartitionKey(document.pk)), document));
}
Wenn Sie ein Massenupdate ausführen möchten (ähnlich der Verwendung von BulkExecutor.BulkUpdateAsync), erfordert dies gleichzeitige Aufrufe der ReplaceItemAsync-Methode nach dem Aktualisieren jedes Elementwerts. Beispiel:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.ReplaceItemAsync(document, document.id, new PartitionKey(document.pk)), document));
}
Wenn Sie eine Massenlöschung ausführen möchten (ähnlich der Verwendung von BulkExecutor.BulkDeleteAsync), erfordert dies gleichzeitige Aufrufe von DeleteItemAsync mit der id und dem Partitionsschlüssel für jede Element. Beispiel:
BulkOperations<MyItem> bulkOperations = new BulkOperations<MyItem>(documentsToWorkWith.Count);
foreach (MyItem document in documentsToWorkWith)
{
document.operationCounter++;
bulkOperations.Tasks.Add(CaptureOperationResponse(container.DeleteItemAsync<MyItem>(document.id, new PartitionKey(document.pk)), document));
}
Ergebniszustand der Erfassungsaufgabe
In den vorherigen Codebeispielen haben wir eine Liste paralleler Aufgaben erstellt und für jede dieser Aufgaben die CaptureOperationResponse-Methode aufgerufen. Bei dieser Methode handelt es sich um eine Erweiterung, mit der Sie ein ähnliches Antwortschema wie bei BulkExecutor verwalten können, indem Sie Fehler erfassen und die Verwendung von Anforderungseinheiten verfolgen.
private static async Task<OperationResponse<T>> CaptureOperationResponse<T>(Task<ItemResponse<T>> task, T item)
{
try
{
ItemResponse<T> response = await task;
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = true,
RequestUnitsConsumed = task.Result.RequestCharge
};
}
catch (Exception ex)
{
if (ex is CosmosException cosmosException)
{
return new OperationResponse<T>()
{
Item = item,
RequestUnitsConsumed = cosmosException.RequestCharge,
IsSuccessful = false,
CosmosException = cosmosException
};
}
return new OperationResponse<T>()
{
Item = item,
IsSuccessful = false,
CosmosException = ex
};
}
}
Dabei ist die OperationResponse deklariert als:
public class OperationResponse<T>
{
public T Item { get; set; }
public double RequestUnitsConsumed { get; set; } = 0;
public bool IsSuccessful { get; set; }
public Exception CosmosException { get; set; }
}
Paralleles Ausführen von Vorgängen
Um den Umfang der gesamten Aufgabenliste nachzuverfolgen, verwenden wir die folgende Hilfsklasse:
public class BulkOperations<T>
{
public readonly List<Task<OperationResponse<T>>> Tasks;
private readonly Stopwatch stopwatch = Stopwatch.StartNew();
public BulkOperations(int operationCount)
{
this.Tasks = new List<Task<OperationResponse<T>>>(operationCount);
}
public async Task<BulkOperationResponse<T>> ExecuteAsync()
{
await Task.WhenAll(this.Tasks);
this.stopwatch.Stop();
return new BulkOperationResponse<T>()
{
TotalTimeTaken = this.stopwatch.Elapsed,
TotalRequestUnitsConsumed = this.Tasks.Sum(task => task.Result.RequestUnitsConsumed),
SuccessfulDocuments = this.Tasks.Count(task => task.Result.IsSuccessful),
Failures = this.Tasks.Where(task => !task.Result.IsSuccessful).Select(task => (task.Result.Item, task.Result.CosmosException)).ToList()
};
}
}
Die ExecuteAsync-Methode wartet, bis alle Vorgänge abgeschlossen sind, und Sie können sie wie folgt verwenden:
BulkOperationResponse<MyItem> bulkOperationResponse = await bulkOperations.ExecuteAsync();
Erfassen von Statistiken
Der vorherige Code wartet, bis alle Vorgänge abgeschlossen sind, und berechnet dann die erforderlichen Statistiken. Diese Statistiken ähneln der BulkImportResponse der Bulk Executor-Bibliothek.
public class BulkOperationResponse<T>
{
public TimeSpan TotalTimeTaken { get; set; }
public int SuccessfulDocuments { get; set; } = 0;
public double TotalRequestUnitsConsumed { get; set; } = 0;
public IReadOnlyList<(T, Exception)> Failures { get; set; }
}
Die BulkOperationResponse enthält Folgendes:
- Die Gesamtdauer der Verarbeitung der Vorgangsliste durch die Unterstützung von Massenvorgängen.
- Die Anzahl der erfolgreichen Vorgänge
- Die Anzahl der verbrauchten Anforderungseinheiten.
- Wenn Fehler auftreten, wird eine Liste von Tupeln angezeigt, die die Ausnahme und das zugehörige Element für die Protokollierung und Identifizierung enthalten.
Konfigurieren von Wiederholungen
Für die Bulk Executor-Bibliothek stand eine Anleitung zur Verfügung, in der das Festlegen von MaxRetryWaitTimeInSeconds und MaxRetryAttemptsOnThrottledRequests für RetryOptions auf 0 empfohlen wurde, um die Steuerung an die Bibliothek zu delegieren.
Bei der Unterstützung von Massenoperationen im .NET SDK gibt es kein verstecktes Verhalten. Sie können die Wiederholungsoptionen direkt über CosmosClientOptions.MaxRetryAttemptsOnRateLimitedRequests und CosmosClientOptions.MaxRetryWaitTimeOnRateLimitedRequests konfigurieren.
Hinweis
Falls die bereitgestellten Anforderungseinheiten erheblich geringer als aufgrund der Menge an Daten erwartet ausfallen, sollten Sie höhere Werte festlegen. Der Massenvorgang dauert dann zwar länger, aber die Wahrscheinlichkeit, dass er vollständig erfolgreich ist, ist wegen der höheren Anzahl an Wiederholungen höher.
Leistungsverbesserungen
Wie bei anderen Vorgängen mit dem .NET SDK führt die Verwendung der Stream-APIs zu einer besseren Leistung und einer Vermeidung unnötiger Serialisierungen.
Die Verwendung von Stream-APIs ist nur möglich, wenn die verwendeten Datentypen, mit denen eines Bytestreams (z. B. Dateidatenströme) übereinstimmen. In solchen Fällen erhöht die Verwendung der Methoden CreateItemStreamAsync, ReplaceItemStreamAsync oder DeleteItemStreamAsync und die Arbeit mit ResponseMessage (anstelle von ItemResponse) den Durchsatz, der erreicht werden kann.
Nächste Schritte
- Weitere Informationen zu den .NET SDK-Releases finden Sie im Artikel Azure Cosmos DB SDK.
- Den vollständigen Quellcode zur Migration finden Sie auf GitHub.
- Weitere Beispiele für Massenvorgänge auf GitHub
- Versuchen Sie, eine Kapazitätsplanung für eine Migration auf Azure Cosmos DB vorzunehmen?
- Wenn Sie nur die Anzahl der virtuellen Kerne und Server in Ihrem vorhandenen Datenbankcluster kennen, lesen Sie die Informationen zum Schätzen von Anforderungseinheiten mithilfe von virtuellen Kernen oder virtuellen CPUs
- Wenn Sie die typischen Anforderungsraten für Ihre aktuelle Datenbankworkload kennen, lesen Sie die Informationen zum Schätzen von Anforderungseinheiten mit dem Azure Cosmos DB-Kapazitätsplaner