Du kan använda pull-modellen för ändringsflöde för att använda Azure Cosmos DB-ändringsflödet i din egen takt. På samma sätt som ändringsflödesprocessorn kan du använda pull-modellen för ändringsflöde för att parallellisera bearbetningen av ändringar mellan flera ändringsflödeskonsumenter.
Du kan dock inte konvertera fortsättningstoken till ett leasingavtal eller tvärtom.
Här följer några viktiga skillnader mellan ändringsflödesprocessorn och pull-modellen för ändringsflöde:
Om du vill bearbeta ändringsflödet med hjälp av pull-modellen skapar du en instans av FeedIterator
. När du först skapar FeedIterator
måste du ange ett obligatoriskt ChangeFeedStartFrom
värde, som består av både startpositionen för att läsa ändringar och det värde som du vill använda för FeedRange
.
FeedRange
är ett intervall med partitionsnyckelvärden och anger de objekt som kan läsas från ändringsflödet med hjälp av den specifika FeedIterator
. Du måste också ange ett obligatoriskt ChangeFeedMode
värde för det läge där du vill bearbeta ändringar: den senaste versionen eller alla versioner och borttagningar. Använd antingen ChangeFeedMode.LatestVersion
eller ChangeFeedMode.AllVersionsAndDeletes
för att ange vilket läge du vill använda för att läsa ändringsflödet. När du använder alla versioner och raderingsläge måste du välja en startpunkt för ändringsflödet från värdet för antingen Now()
eller från en specifik fortsättningstoken.
Du kan också ange ChangeFeedRequestOptions
för att ställa in en PageSizeHint
. När den här egenskapen anges anger den maximala antalet mottagna objekt per sida. Om åtgärder i den övervakade samlingen utförs via lagrade procedurer bevaras transaktionsomfånget vid läsning av objekt från ändringsflödet. Därför kan antalet mottagna objekt vara högre än det angivna värdet så att objekten som ändras av samma transaktion returneras som en del av en atomisk batch.
Här är ett exempel på hur du hämtar FeedIterator
i senaste versionsläge som returnerar entitetsobjekt, i det här fallet ett User
objekt:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Tips
Före version 3.34.0
kan det senaste versionsläget användas genom att ange ChangeFeedMode.Incremental
. Både Incremental
och LatestVersion
refererar till det senaste versionsläget för ändringsflödet och program som använder något av lägena ser samma beteende.
Alla versioner och borttagningsläge är i förhandsversion och kan användas med förhandsversionen av .NET SDK-versioner >= 3.32.0-preview
. Här är ett exempel för att FeedIterator
hämta i alla versioner och tar bort läge som returnerar User
objekt:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Konsumera ändringsflödet via strömningar
FeedIterator
för båda ändringsflödeslägena finns två alternativ. Förutom de exempel som returnerar entitetsobjekt kan du också få svaret med Stream
stöd. Med strömmar kan du läsa data utan att först deserialisera dem, så att du sparar på klientresurser.
Här är ett exempel på hur du hämtar FeedIterator
i senaste versionsläge som returnerar Stream
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Tillämpa ändringarna för en hel container
Om du inte anger en FeedRange
parameter till FeedIterator
kan du bearbeta en hel containers ändringsflöde i din egen takt. Här är ett exempel som börjar läsa alla ändringar, med början vid den aktuella tidpunkten med hjälp av det senaste versionsläget:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Eftersom ändringsflödet i praktiken är en oändlig lista över objekt som omfattar alla framtida skrivningar och uppdateringar är värdet HasMoreResults
för alltid true
. När du försöker läsa ändringsflödet och det inte finns några nya ändringar tillgängliga får du ett svar med NotModified
status. I föregående exempel hanteras det genom att vänta fem sekunder innan du söker efter ändringar igen.
Bearbeta ändringarna för en partitionsnyckel
I vissa fall kanske du bara vill bearbeta ändringarna för en specifik partitionsnyckel. Du kan hämta FeedIterator
för en specifik partitionsnyckel och bearbeta ändringarna på samma sätt som för en hel container.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Använda FeedRange för parallellisering
I ändringsflödesprocessorn sprids arbetet automatiskt över flera konsumenter. I pull-modellen för ändringsflöde kan du använda FeedRange
för att parallellisera bearbetningen av ändringsflödet. A FeedRange
representerar ett intervall med partitionsnyckelvärden.
Här är ett exempel som visar hur du hämtar en lista över intervall för din container:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
När du får en lista med FeedRange
värden för containern får du en FeedRange
per fysisk partition.
Med hjälp av kan FeedRange
du skapa en FeedIterator
för att parallellisera bearbetningen av ändringsflödet mellan flera datorer eller trådar. Till skillnad från föregående exempel som visade hur du hämtar en FeedIterator
för hela containern eller en enda partitionsnyckel kan du använda FeedRanges för att hämta flera FeedIterators, vilket kan bearbeta ändringsflödet parallellt.
Om du vill använda FeedRanges måste du ha en orkestreringsprocess som hämtar FeedRanges och distribuerar dem till dessa datorer. Den här fördelningen kan vara:
- Använda
FeedRange.ToJsonString
och distribuera det här strängvärdet. Konsumenterna kan använda det här värdet med FeedRange.FromJsonString
.
- Om fördelningen pågår skickar du objektreferensen
FeedRange
.
Här är ett exempel som visar hur du läser från början av containerns ändringsflöde med hjälp av två hypotetiska separata datorer som läser parallellt:
Dator 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Dator 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Spara fortsättningstoken
Du kan spara positionen för din FeedIterator
genom att hämta fortsättningstoken. En fortsättningstoken är ett strängvärde som håller reda på FeedIterators senast bearbetade ändringar och gör det möjligt för FeedIterator att återupptas vid den här tidpunkten FeedIterator
senare. Fortsättningstoken, om den anges, har företräde framför starttiden och börjar från startvärdena. Följande kod läser igenom ändringsflödet sedan containern skapades. När inga fler ändringar är tillgängliga bevaras en fortsättningstoken så att ändringsflödesförbrukningen kan återupptas senare.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
När du använder det senaste versionsläget FeedIterator
upphör fortsättningstoken aldrig att gälla så länge Azure Cosmos DB-containern fortfarande finns. När du använder alla versioner och tar bort läget FeedIterator
är fortsättningstoken giltig så länge ändringarna har gjorts i kvarhållningsfönstret för kontinuerliga säkerhetskopieringar.
Om du vill bearbeta ändringsflödet med hjälp av pull-modellen skapar du en instans av Iterator<FeedResponse<JsonNode>> responseIterator
. När du skapar CosmosChangeFeedRequestOptions
måste du ange var du ska börja läsa ändringsflödet från och skicka den FeedRange
parameter som du vill använda.
FeedRange
är ett intervall med partitionsnyckelvärden som anger de objekt som kan läsas från ändringsflödet.
Om du vill läsa ändringsflödet i läget för alla versioner och raderingar måste du också ange när du skapar allVersionsAndDeletes()
. Alla versioner och borttagningsläget stöder inte bearbetning av ändringsflödet, vare sig från början eller från en specifik tidpunkt. Du måste antingen bearbeta ändringar från och med nu eller från en fortsättningstoken. Alla versioner och borttagningsläge är i förhandsversion och är tillgängligt i Java SDK-version >= 4.42.0
.
Tillämpa ändringarna för en hel container
Om du anger FeedRange.forFullRange()
kan du bearbeta ändringsflödet för en hel container i din egen takt. Du kan också ange ett värde i byPage()
. När den här egenskapen anges anger den maximala antalet mottagna objekt per sida.
Här är ett exempel på hur du hämtar ett responseIterator
värde i senaste versionsläge:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Här är ett exempel på hur du hämtar en responseIterator
i alla versioner och i borttagningsläge:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.allVersionsAndDeletes();
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Vi kan sedan iterera över resultaten. Eftersom ändringsflödet i praktiken är en oändlig lista över objekt som omfattar alla framtida skrivningar och uppdateringar är värdet responseIterator.hasNext()
för alltid true
. Här är ett exempel i senaste versionsläge som läser alla ändringar från början. Varje iteration bevarar en fortsättningstoken efter att den har bearbetat alla händelser. Den plockas upp från den senast bearbetade punkten i ändringsflödet och hanteras med hjälp av createForProcessingFromContinuation
.
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Använda ändringar i en partitionsnyckel
I vissa fall kanske du bara vill bearbeta ändringarna för en specifik partitionsnyckel. Du kan bearbeta ändringarna för en specifik partitionsnyckel på samma sätt som för en hel container. Här är ett exempel som använder det senaste versionsläget:
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
Använda FeedRange för parallellisering
I ändringsflödesprocessorn sprids arbetet automatiskt över flera konsumenter. I pull-modellen för ändringsflöde kan du använda FeedRange
för att parallellisera bearbetningen av ändringsflödet. A FeedRange
representerar ett intervall med partitionsnyckelvärden.
Här är ett exempel som använder det senaste versionsläget som visar hur du hämtar en lista över intervall för din container:
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
När du får en lista över FeedRanges för din container, får du en FeedRange
per fysisk partition.
Med hjälp av kan FeedRange
du parallellisera bearbetningen av ändringsflödet mellan flera datorer eller trådar. Till skillnad från föregående exempel som visade hur du bearbetar ändringar för hela containern eller en enda partitionsnyckel kan du använda FeedRanges för att bearbeta ändringsflödet parallellt.
Om du vill använda FeedRanges måste du ha en orkestreringsprocess som hämtar FeedRanges och distribuerar dem till dessa datorer. Den här fördelningen kan vara:
- Använda
FeedRange.toString()
och distribuera det här strängvärdet.
- Om fördelningen pågår skickar du objektreferensen
FeedRange
.
Här är ett exempel som använder det senaste versionsläget. Den visar hur du läser från början av containerns ändringsflöde med hjälp av två hypotetiska separata datorer som läser parallellt:
Dator 1:
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Dator 2:
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Om du vill bearbeta ändringsflödet med hjälp av pull-modellen skapar du en instans av responseIterator med typen ItemPaged[Dict[str, Any]]
.
När du anropar API för ändringsflöde måste du ange var du ska börja läsa ändringsflödet från och skicka den feed_range
parameter som du vill använda.
feed_range
är ett intervall med partitionsnyckelvärden som anger de objekt som kan läsas från ändringsflödet.
Du kan också ange mode
parametern för ändringsflödesläget där du vill bearbeta ändringar: LatestVersion eller AllVersionsAndDeletes (standardvärdet: LatestVersion
).
Använd antingen LatestVersion
eller AllVersionsAndDeletes
för att ange vilket läge du vill använda för att läsa ändringsflödet.
När du använder AllVersionsAndDeletes
läget kan du antingen börja bearbeta ändringar från och med nu eller från en continuation
token.
Läsning av ändringsflödet från början eller från en viss tidpunkt med hjälp av start_time
stöds inte.
Kommentar
AllVersionsAndDeletes
läget är i förhandsversion och är tillgängligt i Python SDK-version >= 4.9.1b1.
Tillämpa ändringarna för en hel container
Om du inte anger en feed_range
parameter kan du bearbeta en hel containers ändringsflöde i din egen takt.
Kommentar
Alla följande kodfragment tas från ett exempel i GitHub. Du kan använda exemplen.
Här är ett exempel på hur du hämtar responseIterator
i LatestVersion
läge från Beginning
. Eftersom LatestVersion
är ett standardläge mode
behöver parametern inte skickas:
responseIterator = container.query_items_change_feed(start_time="Beginning")
Här är ett exempel på hur du hämtar responseIterator
i AllVersionsAndDeletes
läge från Now
. Eftersom Now
är standardvärdet för start_time
-parametern behöver det inte skickas:
responseIterator = container.query_items_change_feed(mode="AllVersionsAndDeletes")
Vi kan sedan iterera över resultaten. Eftersom ändringsflödet i praktiken är en oändlig lista över objekt som omfattar alla framtida skrivningar och uppdateringar kan responseIterator loopa oändligt.
Här är ett exempel i senaste versionsläge som läser alla ändringar från början.
Varje iteration skriver ut ändringsfeeds för dokument.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
Använda ändringar i en partitionsnyckel
I vissa fall kanske du bara vill bearbeta ändringarna för en specifik partitionsnyckel.
Du kan bearbeta ändringarna på samma sätt som för en hel container med parametern partition_key
.
Här är ett exempel som använder LatestVersion
läge:
pk = "partition_key_value"
responseIterator = container.query_items_change_feed(start_time="Beginning", partition_key=pk)
for doc in responseIterator:
print(doc)
Använda FeedRange för parallellisering
I pull-modellen för ändringsflöde kan du använda feed_range
för att parallellisera bearbetningen av ändringsflödet.
A feed_range
representerar ett intervall med partitionsnyckelvärden.
Här är ett exempel som visar hur du hämtar en lista över intervall för din container.
list
kommandot konverterar iteratorn till en lista:
rangesIterator = container.read_feed_ranges(force_refresh=False)
ranges = list(rangesIterator)
När du får en lista med feed_range
värden för containern får du en feed_range
per fysisk partition.
Med hjälp av en feed_range
kan du skapa iterator för att parallellisera bearbetningen av ändringsflödet mellan flera datorer eller trådar.
Till skillnad från föregående exempel som visade hur du hämtar en responseIterator
för hela containern eller en enda partitionsnyckel kan du använda feed_range
för att hämta flera iteratorer, vilket kan bearbeta ändringsflödet parallellt.
Här är ett exempel som visar hur du läser från början av containerns ändringsflöde med hjälp av två hypotetiska separata datorer som läser parallellt:
Dator 1:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[0])
for doc in responseIterator:
print(doc)
Dator 2:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[1])
for doc in responseIterator:
print(doc)
Spara fortsättningstoken
Du kan spara iteratorns position genom att hämta fortsättningstoken.
En fortsättningstoken är ett strängvärde som håller reda på dina responseIterator
senast bearbetade ändringar och gör att iteratorn kan återupptas vid den här tidpunkten senare.
Fortsättningstoken, om den anges, har företräde framför starttiden och börjar från startvärdena.
Följande kod läser igenom ändringsflödet sedan containern skapades.
När inga fler ändringar är tillgängliga bevaras en fortsättningstoken så att ändringsflödesförbrukningen kan återupptas senare.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
continuation_token = container.client_connection.last_response_headers['etag']
Kommentar
Eftersom continuation
-token innehåller den tidigare använda mode
-parametern, kommer continuation
-parametern att ignoreras och mode
kommer att användas från mode
-token i stället om continuation
har använts.
Här är ett exempel som visar hur du läser från containerns ändringsflöde med hjälp av en continuation
token:
responseIterator = container.query_items_change_feed(continuation=continuation_token)
for doc in responseIterator:
print(doc)
Om du vill bearbeta ändringsflödet med hjälp av pull-modellen skapar du en instans av ChangeFeedPullModelIterator
. När du först skapar ChangeFeedPullModelIterator
måste du ange ett obligatoriskt changeFeedStartFrom
värde inuti ChangeFeedIteratorOptions
som består av både startpositionen för att läsa ändringar och resursen (en partitionsnyckel eller en FeedRange) som ändringarna ska hämtas för.
Kommentar
Om inget changeFeedStartFrom
värde anges hämtas changefeed för en hel container från Now().
För närvarande stöds endast den senaste versionen av JS SDK och väljs som standard.
Du kan också använda maxItemCount
i ChangeFeedIteratorOptions
för att ange det maximala antalet objekt som tas emot per sida.
Här är ett exempel på hur du hämtar iteratorn i det senaste versionsläget som returnerar entitetsobjekt:
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Now()
};
const iterator = container.items.getChangeFeedIterator(options);
Tillämpa ändringarna för en hel container
Om du inte anger en FeedRange
- eller PartitionKey
-parameter inuti ChangeFeedStartFrom
, kan du bearbeta en hel containers ändringsflöde i din egen takt. Här är ett exempel som börjar läsa alla ändringar, med början vid den aktuella tidpunkten:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Eftersom ändringsflödet i praktiken är en oändlig lista över objekt som omfattar alla framtida skrivningar och uppdateringar är värdet hasMoreResults
för alltid true
. När du försöker läsa ändringsflödet och det inte finns några nya ändringar tillgängliga får du ett svar med NotModified
status. I föregående exempel hanteras det genom att vänta fem sekunder innan du söker efter ändringar igen.
Bearbeta ändringarna för en partitionsnyckel
I vissa fall kanske du bara vill bearbeta ändringarna för en specifik partitionsnyckel. Du kan hämta iteratorn för en specifik partitionsnyckel och bearbeta ändringarna på samma sätt som för en hel container.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("partitionKeyValue")
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Använda FeedRange för parallellisering
I pull-modellen för ändringsflöde kan du använda FeedRange
för att parallellisera bearbetningen av ändringsflödet. A FeedRange
representerar ett intervall med partitionsnyckelvärden.
Här är ett exempel som visar hur du hämtar en lista över intervall för din container:
const ranges = await container.getFeedRanges();
När du får en lista med FeedRange
värden för containern får du en FeedRange
per fysisk partition.
Med hjälp av en FeedRange
kan du skapa iterator för att parallellisera bearbetningen av ändringsflödet mellan flera datorer eller trådar. Till skillnad från föregående exempel som visade hur du hämtar en changefeed-iterator för hela containern eller en enda partitionsnyckel kan du använda FeedRanges för att hämta flera iteratorer, vilket kan bearbeta ändringsflödet parallellt.
Här är ett exempel som visar hur du läser från början av containerns ändringsflöde med hjälp av två hypotetiska separata datorer som läser parallellt:
Dator 1:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[0])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Dator 2:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[1])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Spara fortsättningstoken
Du kan spara iteratorns position genom att hämta fortsättningstoken. En fortsättningstoken är ett strängvärde som håller reda på de senast bearbetade ändringarna i changefeed-iteratorn och gör att iteratorn kan återupptas senare. Fortsättningstoken, om den anges, har företräde framför starttiden och börjar från startvärdena. Följande kod läser igenom ändringsflödet sedan containern skapades. När inga fler ändringar är tillgängliga bevaras en fortsättningstoken så att ändringsflödesförbrukningen kan återupptas senare.
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
let continuation = "";
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
continuation = response.continuationToken;
break;
}
else {
console.log("Result found", response.result);
}
}
// For checking any new changes using the continuation token
const continuationOptions = {
changeFeedStartFrom: ChangeFeedStartFrom(continuation)
}
const newIterator = container.items.getChangeFeedIterator(continuationOptions);
Fortsättningstoken upphör aldrig att gälla så länge Azure Cosmos DB-containern fortfarande finns.
Använd AsyncIterator
Du kan använda JavaScript Async Iterator för att hämta ändringsfeeden. Här är ett exempel på hur du använder Async Iterator.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
let timeout = 0;
for await(const result of container.items.getChangeFeedIterator(options).getAsyncIterator()) {
if (result.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", result.result);
timeout = 0;
}
await waitFor(timeout);
}