Pullmodell für den Änderungsfeed in Azure Cosmos DB

GILT FÜR: NoSQL

Mit dem Pullmodell für den Änderungsfeed können Sie den Azure Cosmos DB-Änderungsfeed in Ihrem eigenen Tempo verwenden. Ähnlich wie beim Änderungsfeedprozessor können Sie das Pullmodell für den Änderungsfeed verwenden, um die Verarbeitung von Änderungen über mehrere Änderungsfeedconsumer zu parallelisieren.

Vergleichen mit dem Änderungsfeedprozessor

In vielen Szenarien können Sie den Änderungsfeed entweder mit dem Änderungsfeedprozessor oder mit dem Änderungsfeed-Pullmodell verarbeiten. Die Fortsetzungstoken des Pullmodells und der Leasecontainer des Änderungsfeedprozessors fungieren beide als „Lesezeichen“ für das zuletzt verarbeitete Element oder den Batch von Elementen im Änderungsfeed.

Es ist jedoch nicht möglich, Fortsetzungstoken in eine Lease oder umgekehrt zu konvertieren.

Hinweis

In den meisten Fällen ist es am einfachsten, zum Lesen des Änderungsfeeds den Änderungsfeedprozessor zu verwenden.

In folgenden Szenarien sollten Sie die Verwendung des Pullmodells in Erwägung ziehen:

  • Zum Lesen der Änderungen für einen bestimmten Partitionsschlüsselwert.
  • Zum Steuern der Geschwindigkeit, mit der der Client Änderungen für die Verarbeitung empfängt.
  • Zum Ausführen eines einmaligen Lesevorgangs über die vorhandenen Daten im Änderungsfeed (z. B. für eine Datenmigration).

Hier sind einige wesentliche Unterschiede zwischen dem Änderungsfeedprozessor und dem Änderungsfeed-Pullmodell:

Funktion Change Feed Processor Pullmodell für Änderungsfeed
Verfolgen der aktuellen Position bei der Verarbeitung des Änderungsfeeds Lease (gespeichert in einem Azure Cosmos DB-Container) Fortsetzungstoken (im Arbeitsspeicher gespeichert oder manuell persistent gespeichert)
Möglichkeit zur Wiederholung vergangener Änderungen Ja, mit Pushmodell Ja, mit Pullmodell
Pullen zukünftiger Änderungen Automatische Überprüfung auf Änderungen basierend auf einem vom Benutzer angegebenen WithPollInterval-Wert Manuell
Verhalten ohne neue Änderungen Warten Sie automatisch auf den Wert WithPollInterval , und überprüfen Sie dann erneut Der Status muss überprüft und manuell erneut überprüft werden
Verarbeiten von Änderungen aus dem gesamten Container Ja, und automatische Parallelisierung über mehrere Threads/Computer hinweg, die denselben Container verwenden Ja, mit manueller Parallelisierung mithilfe von FeedRange
Verarbeiten von Änderungen von nur einem Partitionsschlüssel Nicht unterstützt Ja

Hinweis

Wenn Sie das Pullmodell nutzen, müssen Sie, im Gegensatz zum Lesen mithilfe des Änderungsfeedprozessors Fälle explizit behandeln, bei denen keine neuen Änderungen vorliegen.

Arbeiten mit dem Pullmodell

Zum Verarbeiten des Änderungsfeeds mithilfe des Pullmodells erstellen Sie eine FeedIterator-Instanz. Wenn Sie erstmalig einen FeedIterator erstellen, müssen Sie einen erforderlichen ChangeFeedStartFrom-Wert angeben, der sowohl die Anfangsposition für das Lesen von Änderungen als auch den gewünschten Wert umfasst, den Sie für FeedRange nutzen möchten. Der FeedRange ist ein Bereich von Partitionsschlüsselwerten und gibt die Elemente an, die mithilfe dieses speziellen FeedIterator aus dem Änderungsfeed gelesen werden können. Sie müssen auch einen erforderlichen ChangeFeedMode-Wert für den Modus angeben, in dem Sie Änderungen verarbeiten möchten: Neueste Version oder Alle Versionen und Löschvorgänge. Verwenden Sie entweder ChangeFeedMode.LatestVersion oder ChangeFeedMode.AllVersionsAndDeletes, um anzugeben, welchen Modus Sie zum Lesen des Änderungsfeeds verwenden möchten. Wenn Sie den Modus „Alle Versionen und Löschvorgänge“ verwenden, müssen Sie als Startwert für den Änderungsfeed entweder Now() oder ein bestimmtes Fortsetzungstoken auswählen.

Optional können Sie ChangeFeedRequestOptions zum Festlegen eines PageSizeHint angeben. Diese Eigenschaft legt die Höchstzahl von Elementen fest, die pro Seite empfangen werden können. Wenn Vorgänge in der überwachten Sammlung über gespeicherte Prozeduren ausgeführt werden, wird der Transaktionsbereich beim Lesen von Elementen aus dem Änderungsfeed beibehalten. Dadurch kann die Anzahl der empfangenen Elemente unter Umständen höher als der angegebene Wert sein, sodass die von derselben Transaktion geänderten Elemente als Teil eines atomischen Batches zurückgegeben werden.

Hier sehen Sie ein Beispiel für das Abrufen eines FeedIterator im Modus „Neueste Version“, der Entitätsobjekte zurückgibt (in diesem Fall ein User-Objekt):

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Tipp

Vor Version 3.34.0 kann der Modus „Neueste Version“ durch Festlegen von ChangeFeedMode.Incremental verwendet werden. Sowohl Incremental als auch LatestVersion verweisen auf den Modus „Neueste Version“ des Änderungsfeeds, und Anwendungen, die einen dieser Modi verwenden, weisen das gleiche Verhalten auf.

Der Modus „Alle Versionen und Löschvorgänge“ befindet sich in der Vorschauphase und kann mit .NET SDK-Vorschauversionen >= 3.32.0-preview verwendet werden. Hier sehen Sie ein Beispiel für das Abrufen von FeedIterator im Modus „Alle Versionen und Löschvorgänge“, der dynamische Objekte zurückgibt:

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Hinweis

Im Modus „Neueste Version“ erhalten Sie Objekte, die das Element darstellen, das mit einigen zusätzlichen Metadaten geändert wurde. Im Modus „Alle Versionen und Löschvorgänge“ wird ein anderes Datenmodell zurückgegeben. Weitere Informationen finden Sie unter Analysieren des Antwortobjekts.

Verwenden des Änderungsfeeds über Streams

FeedIteratorfür beide Änderungsfeedmodi gibt es zwei Optionen. Zusätzlich zu den Beispielen, die Entitätsobjekte zurückgeben, können Sie die Antwort auch mit Stream-Unterstützung abrufen. Datenströme ermöglichen Ihnen, Daten zu lesen, ohne sie zuvor deserialisieren zu müssen. Auf diese Weise sparen Sie Clientressourcen.

Hier sehen Sie ein Beispiel für das Abrufen eines FeedIterator im Modus „Neueste Version“, der einen Stream zurückgibt:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Verwenden der Änderungen für einen vollständigen Container

Wenn Sie keinen ParameterFeedRange für einen FeedIterator angeben, können Sie den Änderungsfeed eines vollständigen Containers in Ihrem eigenen Tempo verarbeiten. Im folgenden Beispiel wird mit dem Lesen aller Änderungen ab dem aktuellen Zeitpunkt im Modus „Neueste Version“ begonnen:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Da es sich beim Änderungsfeed im Grunde um eine unbegrenzte Liste von Elementen handelt, die alle zukünftigen Schreib- und Updatevorgänge enthält, ist der Wert von HasMoreResults immer true. Wenn Sie versuchen, den Änderungsfeed zu lesen, und keine neuen Änderungen vorliegen, erhalten Sie eine Antwort mit dem Status NotModified. Im vorherigen Beispiel wird es so gehandhabt, dass fünf Sekunden gewartet wird, bevor nochmal auf Änderungen geprüft wird.

Verwenden der Änderungen für einen Partitionsschlüssel

In einigen Fällen können Sie nur die Änderungen für einen bestimmten Partitionsschlüssel verarbeiten. Sie können FeedIterator für einen bestimmten Partitionsschlüssel abrufen und die Änderungen auf die gleiche Weise wie für einen vollständigen Container verarbeiten.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Verwenden von FeedRange für die Parallelisierung

Beim Änderungsfeedprozessor wird die Arbeit automatisch auf mehrere Consumer verteilt. Beim Pullmodell für den Änderungsfeed können Sie FeedRange verwenden, um die Verarbeitung des Änderungsfeeds zu parallelisieren. Ein FeedRange stellt einen Bereich von Partitionsschlüsselwerten dar.

Das folgende Beispiel zeigt, wie Sie eine Liste von Bereichen für Ihren Container abrufen:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Wenn Sie eine Liste von FeedRange-Werten für Ihren Container abrufen, erhalten Sie einen FeedRange pro physische Partition.

Durch Verwenden eines FeedRange können Sie dann einen FeedIterator erstellen, um die Verarbeitung des Änderungsfeeds parallel auf mehrere Computer oder Threads zu verteilen. Im Gegensatz zum vorherigen Beispiel, in dem gezeigt wurde, wie Sie einen FeedIterator für den gesamten Container oder einen einzelnen Partitionsschlüssel abrufen, können Sie mithilfe von FeedRanges mehrere FeedIterators abrufen, über die der Änderungsfeed parallel verarbeitet werden kann.

Wenn Sie FeedRanges verwenden möchten, benötigen Sie einen Orchestratorprozess, der sie abruft und auf diese Computer verteilt. Diese Verteilung kann wie folgt sein:

  • Verwenden von FeedRange.ToJsonString und Verteilen dieses Zeichenfolgenwerts. Die Consumer können diesen Wert mit FeedRange.FromJsonString verwenden.
  • Wenn die Verteilung bereits bearbeitet wird: Übergeben eines Verweises auf das FeedRange-Objekt

Das folgende Beispiel zeigt, wie Sie vom Anfang des Änderungsfeeds des Containers lesen und dafür zwei hypothetische, getrennte Computer verwenden, die parallel lesen:

Computer 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Computer 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Speichern von Fortsetzungstoken

Sie können die Position Ihres FeedIterator speichern, indem Sie das Fortsetzungstoken abrufen. Ein Fortsetzungstoken ist ein Zeichenfolgenwert, der die letzten verarbeiteten Änderungen Ihres FeedIterators nachverfolgt und dem FeedIterator eine spätere Fortsetzung an dieser Stelle ermöglicht. Das Fortsetzungstoken (sofern angegeben) hat Vorrang vor den Werten für die Startzeit und das Beginnen am Anfang. Der folgende Code liest den Änderungsfeed seit der Erstellung des Containers. Wenn keine weiteren Änderungen mehr verfügbar sind, wird ein Fortsetzungstoken persistent gespeichert, sodass der Änderungsfeed später weiter verarbeitet werden kann.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

Wenn Sie den Modus der neuesten Version verwenden, läuft das FeedIterator Fortsetzungstoken nie ab, solange der Azure Cosmos DB-Container noch vorhanden ist. Wenn Sie alle Versionen und den Löschmodus verwenden, ist das FeedIterator Fortsetzungstoken gültig, solange die Änderungen im Aufbewahrungsfenster für fortlaufende Sicherungen vorgenommen wurden.

Nächste Schritte