Pullmodell für den Änderungsfeed in Azure Cosmos DB
GILT FÜR: NoSQL
Mit dem Pullmodell für den Änderungsfeed können Sie den Azure Cosmos DB-Änderungsfeed in Ihrem eigenen Tempo verwenden. Ähnlich wie beim Änderungsfeedprozessor können Sie das Pullmodell für den Änderungsfeed verwenden, um die Verarbeitung von Änderungen über mehrere Änderungsfeedconsumer zu parallelisieren.
Vergleichen mit dem Änderungsfeedprozessor
In vielen Szenarien können Sie den Änderungsfeed entweder mit dem Änderungsfeedprozessor oder mit dem Änderungsfeed-Pullmodell verarbeiten. Die Fortsetzungstoken des Pullmodells und der Leasecontainer des Änderungsfeedprozessors fungieren beide als „Lesezeichen“ für das zuletzt verarbeitete Element oder den Batch von Elementen im Änderungsfeed.
Es ist jedoch nicht möglich, Fortsetzungstoken in eine Lease oder umgekehrt zu konvertieren.
Hinweis
In den meisten Fällen ist es am einfachsten, zum Lesen des Änderungsfeeds den Änderungsfeedprozessor zu verwenden.
In folgenden Szenarien sollten Sie die Verwendung des Pullmodells in Erwägung ziehen:
- Zum Lesen der Änderungen für einen bestimmten Partitionsschlüsselwert.
- Zum Steuern der Geschwindigkeit, mit der der Client Änderungen für die Verarbeitung empfängt.
- Zum Ausführen eines einmaligen Lesevorgangs über die vorhandenen Daten im Änderungsfeed (z. B. für eine Datenmigration).
Hier sind einige wesentliche Unterschiede zwischen dem Änderungsfeedprozessor und dem Änderungsfeed-Pullmodell:
Funktion | Change Feed Processor | Pullmodell für Änderungsfeed |
---|---|---|
Verfolgen der aktuellen Position bei der Verarbeitung des Änderungsfeeds | Lease (gespeichert in einem Azure Cosmos DB-Container) | Fortsetzungstoken (im Arbeitsspeicher gespeichert oder manuell persistent gespeichert) |
Möglichkeit zur Wiederholung vergangener Änderungen | Ja, mit Pushmodell | Ja, mit Pullmodell |
Pullen zukünftiger Änderungen | Automatische Überprüfung auf Änderungen basierend auf einem vom Benutzer angegebenen WithPollInterval -Wert |
Manuell |
Verhalten ohne neue Änderungen | Warten Sie automatisch auf den Wert WithPollInterval , und überprüfen Sie dann erneut |
Der Status muss überprüft und manuell erneut überprüft werden |
Verarbeiten von Änderungen aus dem gesamten Container | Ja, und automatische Parallelisierung über mehrere Threads/Computer hinweg, die denselben Container verwenden | Ja, mit manueller Parallelisierung mithilfe von FeedRange |
Verarbeiten von Änderungen von nur einem Partitionsschlüssel | Nicht unterstützt | Ja |
Hinweis
Wenn Sie das Pullmodell nutzen, müssen Sie, im Gegensatz zum Lesen mithilfe des Änderungsfeedprozessors Fälle explizit behandeln, bei denen keine neuen Änderungen vorliegen.
Arbeiten mit dem Pullmodell
Zum Verarbeiten des Änderungsfeeds mithilfe des Pullmodells erstellen Sie eine FeedIterator
-Instanz. Wenn Sie erstmalig einen FeedIterator
erstellen, müssen Sie einen erforderlichen ChangeFeedStartFrom
-Wert angeben, der sowohl die Anfangsposition für das Lesen von Änderungen als auch den gewünschten Wert umfasst, den Sie für FeedRange
nutzen möchten. Der FeedRange
ist ein Bereich von Partitionsschlüsselwerten und gibt die Elemente an, die mithilfe dieses speziellen FeedIterator
aus dem Änderungsfeed gelesen werden können. Sie müssen auch einen erforderlichen ChangeFeedMode
-Wert für den Modus angeben, in dem Sie Änderungen verarbeiten möchten: Neueste Version oder Alle Versionen und Löschvorgänge. Verwenden Sie entweder ChangeFeedMode.LatestVersion
oder ChangeFeedMode.AllVersionsAndDeletes
, um anzugeben, welchen Modus Sie zum Lesen des Änderungsfeeds verwenden möchten. Wenn Sie den Modus „Alle Versionen und Löschvorgänge“ verwenden, müssen Sie als Startwert für den Änderungsfeed entweder Now()
oder ein bestimmtes Fortsetzungstoken auswählen.
Optional können Sie ChangeFeedRequestOptions
zum Festlegen eines PageSizeHint
angeben. Diese Eigenschaft legt die Höchstzahl von Elementen fest, die pro Seite empfangen werden können. Wenn Vorgänge in der überwachten Sammlung über gespeicherte Prozeduren ausgeführt werden, wird der Transaktionsbereich beim Lesen von Elementen aus dem Änderungsfeed beibehalten. Dadurch kann die Anzahl der empfangenen Elemente unter Umständen höher als der angegebene Wert sein, sodass die von derselben Transaktion geänderten Elemente als Teil eines atomischen Batches zurückgegeben werden.
Hier sehen Sie ein Beispiel für das Abrufen eines FeedIterator
im Modus „Neueste Version“, der Entitätsobjekte zurückgibt (in diesem Fall ein User
-Objekt):
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Tipp
Vor Version 3.34.0
kann der Modus „Neueste Version“ durch Festlegen von ChangeFeedMode.Incremental
verwendet werden. Sowohl Incremental
als auch LatestVersion
verweisen auf den Modus „Neueste Version“ des Änderungsfeeds, und Anwendungen, die einen dieser Modi verwenden, weisen das gleiche Verhalten auf.
Der Modus „Alle Versionen und Löschvorgänge“ befindet sich in der Vorschauphase und kann mit .NET SDK-Vorschauversionen >= 3.32.0-preview
verwendet werden. Ein Beispiel für das Abrufen von FeedIterator
im Modus „Alle Versionen und Löschvorgänge“, der User
-Objekte zurückgibt:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Hinweis
Im Modus „Neueste Version“ erhalten Sie Objekte, die das Element darstellen, das mit einigen zusätzlichen Metadaten geändert wurde. Im Modus „Alle Versionen und Löschvorgänge“ wird ein anderes Datenmodell zurückgegeben. Weitere Informationen finden Sie unter Analysieren des Antwortobjekts.
Sie können das vollständige Beispiel für den Modus „Neueste Version“ oder den Modus „Alle Versionen und Löschvorgänge“ abrufen.
Verwenden des Änderungsfeeds über Streams
FeedIterator
für beide Änderungsfeedmodi gibt es zwei Optionen. Zusätzlich zu den Beispielen, die Entitätsobjekte zurückgeben, können Sie die Antwort auch mit Stream
-Unterstützung abrufen. Datenströme ermöglichen Ihnen, Daten zu lesen, ohne sie zuvor deserialisieren zu müssen. Auf diese Weise sparen Sie Clientressourcen.
Hier sehen Sie ein Beispiel für das Abrufen eines FeedIterator
im Modus „Neueste Version“, der einen Stream
zurückgibt:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Verwenden der Änderungen für einen vollständigen Container
Wenn Sie keinen ParameterFeedRange
für einen FeedIterator
angeben, können Sie den Änderungsfeed eines vollständigen Containers in Ihrem eigenen Tempo verarbeiten. Im folgenden Beispiel wird mit dem Lesen aller Änderungen ab dem aktuellen Zeitpunkt im Modus „Neueste Version“ begonnen:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Da es sich beim Änderungsfeed im Grunde um eine unbegrenzte Liste von Elementen handelt, die alle zukünftigen Schreib- und Updatevorgänge enthält, ist der Wert von HasMoreResults
immer true
. Wenn Sie versuchen, den Änderungsfeed zu lesen, und keine neuen Änderungen vorliegen, erhalten Sie eine Antwort mit dem Status NotModified
. Im vorherigen Beispiel wird es so gehandhabt, dass fünf Sekunden gewartet wird, bevor nochmal auf Änderungen geprüft wird.
Verwenden der Änderungen für einen Partitionsschlüssel
In einigen Fällen können Sie nur die Änderungen für einen bestimmten Partitionsschlüssel verarbeiten. Sie können FeedIterator
für einen bestimmten Partitionsschlüssel abrufen und die Änderungen auf die gleiche Weise wie für einen vollständigen Container verarbeiten.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Verwenden von FeedRange für die Parallelisierung
Beim Änderungsfeedprozessor wird die Arbeit automatisch auf mehrere Consumer verteilt. Beim Pullmodell für den Änderungsfeed können Sie FeedRange
verwenden, um die Verarbeitung des Änderungsfeeds zu parallelisieren. Ein FeedRange
stellt einen Bereich von Partitionsschlüsselwerten dar.
Das folgende Beispiel zeigt, wie Sie eine Liste von Bereichen für Ihren Container abrufen:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Wenn Sie eine Liste von FeedRange
-Werten für Ihren Container abrufen, erhalten Sie einen FeedRange
pro physische Partition.
Durch Verwenden eines FeedRange
können Sie dann einen FeedIterator
erstellen, um die Verarbeitung des Änderungsfeeds parallel auf mehrere Computer oder Threads zu verteilen. Im Gegensatz zum vorherigen Beispiel, in dem gezeigt wurde, wie Sie einen FeedIterator
für den gesamten Container oder einen einzelnen Partitionsschlüssel abrufen, können Sie mithilfe von FeedRanges mehrere FeedIterators abrufen, über die der Änderungsfeed parallel verarbeitet werden kann.
Wenn Sie FeedRanges verwenden möchten, benötigen Sie einen Orchestratorprozess, der sie abruft und auf diese Computer verteilt. Diese Verteilung kann wie folgt sein:
- Verwenden von
FeedRange.ToJsonString
und Verteilen dieses Zeichenfolgenwerts. Die Consumer können diesen Wert mitFeedRange.FromJsonString
verwenden. - Wenn die Verteilung bereits bearbeitet wird: Übergeben eines Verweises auf das
FeedRange
-Objekt
Das folgende Beispiel zeigt, wie Sie vom Anfang des Änderungsfeeds des Containers lesen und dafür zwei hypothetische, getrennte Computer verwenden, die parallel lesen:
Computer 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Computer 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Speichern von Fortsetzungstoken
Sie können die Position Ihres FeedIterator
speichern, indem Sie das Fortsetzungstoken abrufen. Ein Fortsetzungstoken ist ein Zeichenfolgenwert, der die letzten verarbeiteten Änderungen Ihres FeedIterators nachverfolgt und dem FeedIterator
eine spätere Fortsetzung an dieser Stelle ermöglicht. Das Fortsetzungstoken (sofern angegeben) hat Vorrang vor den Werten für die Startzeit und das Beginnen am Anfang. Der folgende Code liest den Änderungsfeed seit der Erstellung des Containers. Wenn keine weiteren Änderungen mehr verfügbar sind, wird ein Fortsetzungstoken persistent gespeichert, sodass der Änderungsfeed später weiter verarbeitet werden kann.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
Wenn Sie den Modus der neuesten Version verwenden, läuft das FeedIterator
Fortsetzungstoken nie ab, solange der Azure Cosmos DB-Container noch vorhanden ist. Wenn Sie alle Versionen und den Löschmodus verwenden, ist das FeedIterator
Fortsetzungstoken gültig, solange die Änderungen im Aufbewahrungsfenster für fortlaufende Sicherungen vorgenommen wurden.