Pull-model voor wijzigingenfeeds in Azure Cosmos DB
VAN TOEPASSING OP: NoSQL
U kunt het pull-model van de wijzigingenfeed gebruiken om de Azure Cosmos DB-wijzigingenfeed in uw eigen tempo te gebruiken. Net als bij de verwerker van de wijzigingenfeed kunt u het pull-model van de wijzigingenfeed gebruiken om de verwerking van wijzigingen voor meerdere gebruikers van wijzigingenfeeds te parallelliseren.
Vergelijken met de verwerker van de wijzigingenfeed
Veel scenario's kunnen de wijzigingenfeed verwerken met behulp van de wijzigingenfeedprocessor of het pull-model van de wijzigingenfeed. De vervolgtokens van het pull-model en de leasecontainer van de wijzigingenfeedprocessor werken allebei als bladwijzers voor het laatst verwerkte item of de laatste batch items in de wijzigingenfeed.
U kunt vervolgtokens echter niet converteren naar een lease of omgekeerd.
Notitie
In de meeste gevallen, wanneer u uit de wijzigingenfeed moet lezen, is de eenvoudigste optie om de verwerker van de wijzigingenfeed te gebruiken.
Overweeg het gebruik van het pull-model in deze scenario's:
- Wijzigingen van een specifieke partitiesleutel lezen.
- Om het tempo te bepalen waarop uw klant wijzigingen ontvangt voor verwerking.
- Als u een eenmalige leesbewerking wilt uitvoeren van de bestaande gegevens in de wijzigingenfeed (bijvoorbeeld om een gegevensmigratie uit te voeren).
Hier volgen enkele belangrijke verschillen tussen de wijzigingenfeedprocessor en het pull-model van de wijzigingenfeed:
Functie | Verwerker van wijzigingenfeed | Pull-model voor feed wijzigen |
---|---|---|
Het huidige punt bijhouden bij het verwerken van de wijzigingenfeed | Lease (opgeslagen in een Azure Cosmos DB-container) | Vervolgtoken (opgeslagen in het geheugen of handmatig persistent) |
Mogelijkheid om eerdere wijzigingen opnieuw af te spelen | Ja, met pushmodel | Ja, met pull-model |
Polling voor toekomstige wijzigingen | Automatisch controleren op wijzigingen op basis van de door de gebruiker opgegeven WithPollInterval waarde |
Handmatig |
Gedrag waarbij er geen nieuwe wijzigingen zijn | Automatisch wachten op de waarde WithPollInterval en vervolgens opnieuw controleren |
Moet de status controleren en handmatig opnieuw controleren |
Wijzigingen van een hele container verwerken | Ja, en automatisch geparallelliseerd voor meerdere threads en machines die van dezelfde container gebruikmaken | Ja, en handmatig geparallelliseerd met behulp van FeedRange |
Wijzigingen verwerken van slechts één partitiesleutel | Niet ondersteund | Ja |
Notitie
Wanneer u het pull-model gebruikt, moet u, in tegenstelling tot bij het lezen met behulp van de verwerker van de wijzigingenfeed, expliciet zaken afhandelen waarbij er geen nieuwe wijzigingen zijn.
Werken met het pull-model
Als u de wijzigingenfeed wilt verwerken met behulp van het pull-model, maakt u een exemplaar van FeedIterator
. Wanneer u in eerste instantie maakt FeedIterator
, moet u een vereiste ChangeFeedStartFrom
waarde opgeven, die bestaat uit zowel de beginpositie voor het lezen van wijzigingen als de waarde waarvoor u wilt gebruiken FeedRange
. Het FeedRange
is een bereik van partitiesleutelwaarden en geeft de items op die kunnen worden gelezen uit de wijzigingenfeed met behulp van die specifieke FeedIterator
. U moet ook een vereiste ChangeFeedMode
waarde opgeven voor de modus waarin u wijzigingen wilt verwerken: de nieuwste versie of alle versies en verwijderingen. Gebruik of ChangeFeedMode.LatestVersion
ChangeFeedMode.AllVersionsAndDeletes
geef aan welke modus u wilt gebruiken om de wijzigingenfeed te lezen. Wanneer u alle versies gebruikt en de modus Verwijdert, moet u een wijzigingsfeed selecteren die begint met de waarde van Now()
of van een specifiek vervolgtoken.
U kunt desgewenst opgeven ChangeFeedRequestOptions
om een PageSizeHint
. Wanneer deze eigenschap is ingesteld, wordt het maximum aantal ontvangen items per pagina ingesteld. Als bewerkingen in de bewaakte verzameling worden uitgevoerd via opgeslagen procedures, blijft het transactiebereik behouden bij het lezen van items uit de wijzigingenfeed. Als gevolg hiervan kan het aantal ontvangen items hoger zijn dan de opgegeven waarde, zodat de items die door dezelfde transactie zijn gewijzigd, worden geretourneerd als onderdeel van één atomische batch.
Hier volgt een voorbeeld van het verkrijgen FeedIterator
van de meest recente versiemodus die entiteitsobjecten retourneert, in dit geval een User
object:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Tip
Vóór de versie 3.34.0
kan de meest recente versiemodus worden gebruikt door de instelling in te stellen ChangeFeedMode.Incremental
. Zowel Incremental
als LatestVersion
verwijzen naar de nieuwste versiemodus van de wijzigingenfeed en toepassingen die beide modus gebruiken, zien hetzelfde gedrag.
Alle versies en verwijderingsmodus is in preview en kan worden gebruikt met preview .NET SDK-versies >= 3.32.0-preview
. Hier volgt een voorbeeld voor het verkrijgen FeedIterator
in alle versies en het verwijderen User
van objecten:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Notitie
In de nieuwste versiemodus ontvangt u objecten die het item vertegenwoordigen dat is gewijzigd, met enkele extra metagegevens. Alle versies en verwijdermodus retourneert een ander gegevensmodel. Zie Het antwoordobject parseren voor meer informatie.
U kunt het volledige voorbeeld ophalen voor de meest recente versiemodus of alle versies en de modus Voor verwijderen.
De wijzigingenfeed via streams gebruiken
FeedIterator
voor beide modi voor wijzigingenfeeds zijn twee opties beschikbaar. Naast de voorbeelden die entiteitsobjecten retourneren, kunt u ook het antwoord verkrijgen met Stream
ondersteuning. Met streams kunt u gegevens lezen zonder deze eerst gedeserialiseerd te hebben, zodat u op clientresources bespaart.
Hier volgt een voorbeeld van hoe u de meest recente versiemodus kunt verkrijgen FeedIterator
die retourneert Stream
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
De wijzigingen voor een hele container gebruiken
Als u geen parameter opgeeft FeedRange
FeedIterator
, kunt u de wijzigingenfeed van een hele container in uw eigen tempo verwerken. Hier volgt een voorbeeld, dat begint met het lezen van alle wijzigingen, beginnend bij het huidige tijdstip met behulp van de nieuwste versiemodus:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Omdat de wijzigingenfeed effectief een oneindige lijst is met items die alle toekomstige schrijf- en updates omvatten, is de waarde altijd HasMoreResults
true
. Wanneer u de wijzigingenfeed probeert te lezen en er geen nieuwe wijzigingen beschikbaar zijn, ontvangt u een antwoord met NotModified
de status. In het voorgaande voorbeeld wordt het verwerkt door vijf seconden te wachten voordat u opnieuw controleert op wijzigingen.
De wijzigingen voor een partitiesleutel gebruiken
In sommige gevallen wilt u mogelijk alleen de wijzigingen voor een specifieke partitiesleutel verwerken. U kunt een specifieke partitiesleutel verkrijgen FeedIterator
en de wijzigingen op dezelfde manier verwerken als voor een hele container.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
FeedRange gebruiken voor parallelle uitvoering
In de wijzigingenfeedprocessor wordt werk automatisch verspreid over meerdere consumenten. In het pull-model van de wijzigingenfeed kunt u de FeedRange
verwerking van de wijzigingenfeed parallelliseren. A FeedRange
vertegenwoordigt een bereik van partitiesleutelwaarden.
Hier volgt een voorbeeld waarin wordt getoond hoe u een lijst met bereiken voor uw container kunt ophalen:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Wanneer u een lijst FeedRange
met waarden voor uw container krijgt, krijgt u er één FeedRange
per fysieke partitie.
Met behulp van een FeedRange
kunt u een FeedIterator
maken om de verwerking van de wijzigingenfeed te parallelliseren op meerdere computers of threads. In tegenstelling tot het vorige voorbeeld dat laat zien hoe u een FeedIterator
voor de hele container of één partitiesleutel kunt verkrijgen, kunt u FeedRanges gebruiken om meerdere FeedIterators te verkrijgen, waarmee de wijzigingenfeed parallel kan worden verwerkt.
In het geval dat u FeedRanges wilt gebruiken, moet u een orchestratorproces hebben dat FeedRanges verkrijgt en distribueert naar die machines. Deze distributie kan het volgende zijn:
- Deze tekenreekswaarde gebruiken
FeedRange.ToJsonString
en distribueren. De consumenten kunnen deze waarde gebruiken metFeedRange.FromJsonString
. - Als de distributie wordt verwerkt, geeft u de
FeedRange
objectverwijzing door.
Hier volgt een voorbeeld waarin wordt getoond hoe u kunt lezen vanaf het begin van de wijzigingenfeed van de container met behulp van twee hypothetische afzonderlijke machines die parallel worden gelezen:
Machine 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Machine 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Vervolgtokens opslaan
U kunt de positie van uw FeedIterator
bestand opslaan door het vervolgtoken te verkrijgen. Een vervolgtoken is een tekenreekswaarde die de laatst verwerkte wijzigingen van uw FeedIterator bijhoudt en de FeedIterator
laatste verwerkte wijzigingen later kan hervatten. Het vervolgtoken, indien opgegeven, heeft voorrang op de begintijd en begint vanaf beginwaarden. De volgende code leest de wijzigingenfeed door sinds het maken van de container. Nadat er geen wijzigingen meer beschikbaar zijn, blijft er een vervolgtoken behouden, zodat het verbruik van de wijzigingenfeed later kan worden hervat.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
Wanneer u de nieuwste versiemodus gebruikt, verloopt het FeedIterator
vervolgtoken nooit zolang de Azure Cosmos DB-container nog steeds bestaat. Wanneer u alle versies gebruikt en de modus verwijdert, is het FeedIterator
vervolgtoken geldig zolang de wijzigingen in het bewaarvenster voor continue back-ups zijn opgetreden.