Pull-modell för ändringsflöde i Azure Cosmos DB
GÄLLER FÖR: NoSQL
Du kan använda pull-modellen för ändringsflöde för att använda Azure Cosmos DB-ändringsflödet i din egen takt. På samma sätt som ändringsflödesprocessorn kan du använda pull-modellen för ändringsflöde för att parallellisera bearbetningen av ändringar mellan flera ändringsflödeskonsumenter.
Jämför med ändringsflödesprocessorn
Många scenarier kan bearbeta ändringsflödet med hjälp av antingen ändringsflödesprocessorn eller pull-modellen för ändringsflöde. Pull-modellens fortsättningstoken och ändringsflödesprocessorns lånecontainer fungerar båda som bokmärken för det senast bearbetade objektet eller batchen med objekt i ändringsflödet.
Du kan dock inte konvertera fortsättningstoken till ett lån eller tvärtom.
Kommentar
I de flesta fall är det enklaste alternativet att använda ändringsflödesprocessorn när du behöver läsa från ändringsflödet.
Du bör överväga att använda pull-modellen i följande scenarier:
- Så här läser du ändringar från en specifik partitionsnyckel.
- För att kontrollera i vilken takt klienten får ändringar för bearbetning.
- För att utföra en engångsläsning av befintliga data i ändringsflödet (till exempel för att utföra en datamigrering).
Här följer några viktiga skillnader mellan ändringsflödesprocessorn och pull-modellen för ändringsflöde:
Funktion | Ändringsflödesprocessor | Hämtningsmodell för ändringsflöde |
---|---|---|
Hålla reda på den aktuella punkten i bearbetningen av ändringsflödet | Lån (lagras i en Azure Cosmos DB-container) | Fortsättningstoken (lagras i minnet eller sparas manuellt) |
Möjlighet att spela upp tidigare ändringar | Ja, med push-modell | Ja, med pull-modell |
Avsökning för framtida ändringar | Söker automatiskt efter ändringar baserat på användarangivet WithPollInterval värde |
Manuell |
Beteende där det inte finns några nya ändringar | Vänta automatiskt på värdet och WithPollInterval kontrollera sedan igen |
Måste kontrollera statusen och markera igen manuellt |
Bearbeta ändringar från en hel container | Ja, och parallelliseras automatiskt över flera trådar och datorer som förbrukar från samma container | Ja, och parallelliseras manuellt med hjälp av FeedRange |
Bearbeta ändringar från endast en enda partitionsnyckel | Stöds inte | Ja |
Kommentar
När du använder pull-modellen, till skillnad från när du läser med hjälp av ändringsflödesprocessorn, måste du uttryckligen hantera fall där det inte finns några nya ändringar.
Arbeta med pull-modellen
Om du vill bearbeta ändringsflödet med hjälp av pull-modellen skapar du en instans av FeedIterator
. När du först skapar FeedIterator
måste du ange ett obligatoriskt ChangeFeedStartFrom
värde, som består av både startpositionen för att läsa ändringar och det värde som du vill använda för FeedRange
. FeedRange
är ett intervall med partitionsnyckelvärden och anger de objekt som kan läsas från ändringsflödet med hjälp av den specifika FeedIterator
. Du måste också ange ett obligatoriskt ChangeFeedMode
värde för det läge där du vill bearbeta ändringar: den senaste versionen eller alla versioner och borttagningar. Använd antingen ChangeFeedMode.LatestVersion
eller ChangeFeedMode.AllVersionsAndDeletes
för att ange vilket läge du vill använda för att läsa ändringsflödet. När du använder alla versioner och tar bort läge måste du välja en ändringsflödesstart från värdet för antingen Now()
eller från en specifik fortsättningstoken.
Du kan också ange ChangeFeedRequestOptions
för att ange en PageSizeHint
. När den här egenskapen anges anger den maximala antalet mottagna objekt per sida. Om åtgärder i den övervakade samlingen utförs via lagrade procedurer bevaras transaktionsomfånget vid läsning av objekt från ändringsflödet. Därför kan antalet mottagna objekt vara högre än det angivna värdet så att objekten som ändras av samma transaktion returneras som en del av en atomisk batch.
Här är ett exempel på hur du hämtar FeedIterator
i senaste versionsläge som returnerar entitetsobjekt, i det här fallet ett User
objekt:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Dricks
Före version 3.34.0
kan det senaste versionsläget användas genom att ange ChangeFeedMode.Incremental
. Både Incremental
och LatestVersion
refererar till det senaste versionsläget för ändringsflödet och program som använder något av lägena ser samma beteende.
Alla versioner och borttagningsläge är i förhandsversion och kan användas med förhandsversionen av .NET SDK-versioner >= 3.32.0-preview
. Här är ett exempel för att FeedIterator
hämta i alla versioner och tar bort läge som returnerar User
objekt:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Kommentar
I det senaste versionsläget får du objekt som representerar objektet som har ändrats, med några extra metadata. Alla versioner och borttagningsläge returnerar en annan datamodell. Mer information finns i Parsa svarsobjektet.
Du kan hämta det fullständiga exemplet för det senaste versionsläget eller alla versioner och borttagningsläget.
Använda ändringsflödet via strömmar
FeedIterator
för båda ändringsflödeslägena finns två alternativ. Förutom de exempel som returnerar entitetsobjekt kan du också få svaret med Stream
stöd. Med strömmar kan du läsa data utan att först deserialisera dem, så att du sparar på klientresurser.
Här är ett exempel på hur du hämtar FeedIterator
i senaste versionsläge som returnerar Stream
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Använda ändringarna för en hel container
Om du inte anger en FeedRange
parameter till FeedIterator
kan du bearbeta en hel containers ändringsflöde i din egen takt. Här är ett exempel som börjar läsa alla ändringar, med början vid den aktuella tidpunkten med hjälp av det senaste versionsläget:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Eftersom ändringsflödet i praktiken är en oändlig lista över objekt som omfattar alla framtida skrivningar och uppdateringar är värdet HasMoreResults
för alltid true
. När du försöker läsa ändringsflödet och det inte finns några nya ändringar tillgängliga får du ett svar med NotModified
status. I föregående exempel hanteras det genom att vänta fem sekunder innan du söker efter ändringar igen.
Använda ändringarna för en partitionsnyckel
I vissa fall kanske du bara vill bearbeta ändringarna för en specifik partitionsnyckel. Du kan hämta FeedIterator
för en specifik partitionsnyckel och bearbeta ändringarna på samma sätt som för en hel container.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Använda FeedRange för parallellisering
I ändringsflödesprocessorn sprids arbetet automatiskt över flera konsumenter. I pull-modellen för ändringsflöde kan du använda FeedRange
för att parallellisera bearbetningen av ändringsflödet. A FeedRange
representerar ett intervall med partitionsnyckelvärden.
Här är ett exempel som visar hur du hämtar en lista över intervall för din container:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
När du får en lista med FeedRange
värden för containern får du en FeedRange
per fysisk partition.
Med hjälp av kan FeedRange
du skapa en FeedIterator
för att parallellisera bearbetningen av ändringsflödet mellan flera datorer eller trådar. Till skillnad från föregående exempel som visade hur du hämtar en FeedIterator
för hela containern eller en enda partitionsnyckel kan du använda FeedRanges för att hämta flera FeedIterators, vilket kan bearbeta ändringsflödet parallellt.
Om du vill använda FeedRanges måste du ha en orkestreringsprocess som hämtar FeedRanges och distribuerar dem till dessa datorer. Den här fördelningen kan vara:
- Använda
FeedRange.ToJsonString
och distribuera det här strängvärdet. Konsumenterna kan använda det här värdet medFeedRange.FromJsonString
. - Om fördelningen pågår skickar du objektreferensen
FeedRange
.
Här är ett exempel som visar hur du läser från början av containerns ändringsflöde med hjälp av två hypotetiska separata datorer som läser parallellt:
Dator 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Dator 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Spara fortsättningstoken
Du kan spara positionen för din FeedIterator
genom att hämta fortsättningstoken. En fortsättningstoken är ett strängvärde som håller reda på feediteratorns senast bearbetade ändringar och tillåter att den återupptas vid den här tidpunkten FeedIterator
senare. Fortsättningstoken, om den anges, har företräde framför starttiden och börjar från startvärdena. Följande kod läser igenom ändringsflödet sedan containern skapades. När inga fler ändringar är tillgängliga bevaras en fortsättningstoken så att ändringsflödesförbrukningen kan återupptas senare.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
När du använder det senaste versionsläget FeedIterator
upphör fortsättningstoken aldrig att gälla så länge Azure Cosmos DB-containern fortfarande finns. När du använder alla versioner och tar bort läget FeedIterator
är fortsättningstoken giltig så länge ändringarna har gjorts i kvarhållningsfönstret för kontinuerliga säkerhetskopieringar.