Share via


Pull-model voor wijzigingenfeeds in Azure Cosmos DB

VAN TOEPASSING OP: NoSQL

U kunt het pull-model van de wijzigingenfeed gebruiken om de Azure Cosmos DB-wijzigingenfeed in uw eigen tempo te gebruiken. Net als bij de verwerker van de wijzigingenfeed kunt u het pull-model van de wijzigingenfeed gebruiken om de verwerking van wijzigingen voor meerdere gebruikers van wijzigingenfeeds te parallelliseren.

Vergelijken met de verwerker van de wijzigingenfeed

Veel scenario's kunnen de wijzigingenfeed verwerken met behulp van de wijzigingenfeedprocessor of het pull-model van de wijzigingenfeed. De vervolgtokens van het pull-model en de leasecontainer van de wijzigingenfeedprocessor werken allebei als bladwijzers voor het laatst verwerkte item of de laatste batch items in de wijzigingenfeed.

U kunt vervolgtokens echter niet converteren naar een lease of omgekeerd.

Notitie

In de meeste gevallen, wanneer u uit de wijzigingenfeed moet lezen, is de eenvoudigste optie om de verwerker van de wijzigingenfeed te gebruiken.

Overweeg het gebruik van het pull-model in deze scenario's:

  • Wijzigingen van een specifieke partitiesleutel lezen.
  • Om het tempo te bepalen waarop uw klant wijzigingen ontvangt voor verwerking.
  • Als u een eenmalige leesbewerking wilt uitvoeren van de bestaande gegevens in de wijzigingenfeed (bijvoorbeeld om een gegevensmigratie uit te voeren).

Hier volgen enkele belangrijke verschillen tussen de wijzigingenfeedprocessor en het pull-model van de wijzigingenfeed:

Functie Verwerker van wijzigingenfeed Pull-model voor feed wijzigen
Het huidige punt bijhouden bij het verwerken van de wijzigingenfeed Lease (opgeslagen in een Azure Cosmos DB-container) Vervolgtoken (opgeslagen in het geheugen of handmatig persistent)
Mogelijkheid om eerdere wijzigingen opnieuw af te spelen Ja, met pushmodel Ja, met pull-model
Polling voor toekomstige wijzigingen Automatisch controleren op wijzigingen op basis van de door de gebruiker opgegeven WithPollInterval waarde Handmatig
Gedrag waarbij er geen nieuwe wijzigingen zijn Automatisch wachten op de waarde WithPollInterval en vervolgens opnieuw controleren Moet de status controleren en handmatig opnieuw controleren
Wijzigingen van een hele container verwerken Ja, en automatisch geparallelliseerd voor meerdere threads en machines die van dezelfde container gebruikmaken Ja, en handmatig geparallelliseerd met behulp van FeedRange
Wijzigingen verwerken van slechts één partitiesleutel Niet ondersteund Ja

Notitie

Wanneer u het pull-model gebruikt, moet u, in tegenstelling tot bij het lezen met behulp van de verwerker van de wijzigingenfeed, expliciet zaken afhandelen waarbij er geen nieuwe wijzigingen zijn.

Werken met het pull-model

Als u de wijzigingenfeed wilt verwerken met behulp van het pull-model, maakt u een exemplaar van FeedIterator. Wanneer u in eerste instantie maakt FeedIterator, moet u een vereiste ChangeFeedStartFrom waarde opgeven, die bestaat uit zowel de beginpositie voor het lezen van wijzigingen als de waarde waarvoor u wilt gebruiken FeedRange. Het FeedRange is een bereik van partitiesleutelwaarden en geeft de items op die kunnen worden gelezen uit de wijzigingenfeed met behulp van die specifieke FeedIterator. U moet ook een vereiste ChangeFeedMode waarde opgeven voor de modus waarin u wijzigingen wilt verwerken: de nieuwste versie of alle versies en verwijderingen. Gebruik of ChangeFeedMode.LatestVersion ChangeFeedMode.AllVersionsAndDeletes geef aan welke modus u wilt gebruiken om de wijzigingenfeed te lezen. Wanneer u alle versies gebruikt en de modus Verwijdert, moet u een wijzigingsfeed selecteren die begint met de waarde van Now() of van een specifiek vervolgtoken.

U kunt desgewenst opgeven ChangeFeedRequestOptions om een PageSizeHint. Wanneer deze eigenschap is ingesteld, wordt het maximum aantal ontvangen items per pagina ingesteld. Als bewerkingen in de bewaakte verzameling worden uitgevoerd via opgeslagen procedures, blijft het transactiebereik behouden bij het lezen van items uit de wijzigingenfeed. Als gevolg hiervan kan het aantal ontvangen items hoger zijn dan de opgegeven waarde, zodat de items die door dezelfde transactie zijn gewijzigd, worden geretourneerd als onderdeel van één atomische batch.

Hier volgt een voorbeeld van het verkrijgen FeedIterator van de meest recente versiemodus die entiteitsobjecten retourneert, in dit geval een User object:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Tip

Vóór de versie 3.34.0kan de meest recente versiemodus worden gebruikt door de instelling in te stellen ChangeFeedMode.Incremental. Zowel Incremental als LatestVersion verwijzen naar de nieuwste versiemodus van de wijzigingenfeed en toepassingen die beide modus gebruiken, zien hetzelfde gedrag.

Alle versies en verwijderingsmodus is in preview en kan worden gebruikt met preview .NET SDK-versies >= 3.32.0-preview. Hier volgt een voorbeeld voor het verkrijgen FeedIterator in alle versies en het verwijderen User van objecten:

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Notitie

In de nieuwste versiemodus ontvangt u objecten die het item vertegenwoordigen dat is gewijzigd, met enkele extra metagegevens. Alle versies en verwijdermodus retourneert een ander gegevensmodel. Zie Het antwoordobject parseren voor meer informatie.

U kunt het volledige voorbeeld ophalen voor de meest recente versiemodus of alle versies en de modus Voor verwijderen.

De wijzigingenfeed via streams gebruiken

FeedIterator voor beide modi voor wijzigingenfeeds zijn twee opties beschikbaar. Naast de voorbeelden die entiteitsobjecten retourneren, kunt u ook het antwoord verkrijgen met Stream ondersteuning. Met streams kunt u gegevens lezen zonder deze eerst gedeserialiseerd te hebben, zodat u op clientresources bespaart.

Hier volgt een voorbeeld van hoe u de meest recente versiemodus kunt verkrijgen FeedIterator die retourneert Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

De wijzigingen voor een hele container gebruiken

Als u geen parameter opgeeft FeedRange FeedIterator, kunt u de wijzigingenfeed van een hele container in uw eigen tempo verwerken. Hier volgt een voorbeeld, dat begint met het lezen van alle wijzigingen, beginnend bij het huidige tijdstip met behulp van de nieuwste versiemodus:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Omdat de wijzigingenfeed effectief een oneindige lijst is met items die alle toekomstige schrijf- en updates omvatten, is de waarde altijd HasMoreResults true. Wanneer u de wijzigingenfeed probeert te lezen en er geen nieuwe wijzigingen beschikbaar zijn, ontvangt u een antwoord met NotModified de status. In het voorgaande voorbeeld wordt het verwerkt door vijf seconden te wachten voordat u opnieuw controleert op wijzigingen.

De wijzigingen voor een partitiesleutel gebruiken

In sommige gevallen wilt u mogelijk alleen de wijzigingen voor een specifieke partitiesleutel verwerken. U kunt een specifieke partitiesleutel verkrijgen FeedIterator en de wijzigingen op dezelfde manier verwerken als voor een hele container.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

FeedRange gebruiken voor parallelle uitvoering

In de wijzigingenfeedprocessor wordt werk automatisch verspreid over meerdere consumenten. In het pull-model van de wijzigingenfeed kunt u de FeedRange verwerking van de wijzigingenfeed parallelliseren. A FeedRange vertegenwoordigt een bereik van partitiesleutelwaarden.

Hier volgt een voorbeeld waarin wordt getoond hoe u een lijst met bereiken voor uw container kunt ophalen:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Wanneer u een lijst FeedRange met waarden voor uw container krijgt, krijgt u er één FeedRange per fysieke partitie.

Met behulp van een FeedRangekunt u een FeedIterator maken om de verwerking van de wijzigingenfeed te parallelliseren op meerdere computers of threads. In tegenstelling tot het vorige voorbeeld dat laat zien hoe u een FeedIterator voor de hele container of één partitiesleutel kunt verkrijgen, kunt u FeedRanges gebruiken om meerdere FeedIterators te verkrijgen, waarmee de wijzigingenfeed parallel kan worden verwerkt.

In het geval dat u FeedRanges wilt gebruiken, moet u een orchestratorproces hebben dat FeedRanges verkrijgt en distribueert naar die machines. Deze distributie kan het volgende zijn:

  • Deze tekenreekswaarde gebruiken FeedRange.ToJsonString en distribueren. De consumenten kunnen deze waarde gebruiken met FeedRange.FromJsonString.
  • Als de distributie wordt verwerkt, geeft u de FeedRange objectverwijzing door.

Hier volgt een voorbeeld waarin wordt getoond hoe u kunt lezen vanaf het begin van de wijzigingenfeed van de container met behulp van twee hypothetische afzonderlijke machines die parallel worden gelezen:

Machine 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Machine 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Vervolgtokens opslaan

U kunt de positie van uw FeedIterator bestand opslaan door het vervolgtoken te verkrijgen. Een vervolgtoken is een tekenreekswaarde die de laatst verwerkte wijzigingen van uw FeedIterator bijhoudt en de FeedIterator laatste verwerkte wijzigingen later kan hervatten. Het vervolgtoken, indien opgegeven, heeft voorrang op de begintijd en begint vanaf beginwaarden. De volgende code leest de wijzigingenfeed door sinds het maken van de container. Nadat er geen wijzigingen meer beschikbaar zijn, blijft er een vervolgtoken behouden, zodat het verbruik van de wijzigingenfeed later kan worden hervat.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

Wanneer u de nieuwste versiemodus gebruikt, verloopt het FeedIterator vervolgtoken nooit zolang de Azure Cosmos DB-container nog steeds bestaat. Wanneer u alle versies gebruikt en de modus verwijdert, is het FeedIterator vervolgtoken geldig zolang de wijzigingen in het bewaarvenster voor continue back-ups zijn opgetreden.

Volgende stappen