Model vyžádání obsahu kanálu změn můžete použít k využívání kanálu změn služby Azure Cosmos DB vlastním tempem. Podobně jako u procesoru kanálu změn můžete použít tento model načítání kanálu změn k paralelizaci zpracování změn napříč více spotřebiteli kanálu změn.
Tokeny pokračování ale nemůžete převést na nájem nebo naopak.
Tady jsou některé klíčové rozdíly mezi procesorem kanálu změn a modelem vyžádání kanálu změn:
Chcete-li zpracovat kanál změn pomocí pull modelu, vytvořte instanci FeedIterator. Při počátečním vytvoření FeedIteratormusíte zadat požadovanou ChangeFeedStartFrom hodnotu, která se skládá z počáteční pozice pro čtení změn a hodnoty, pro FeedRangekterou chcete použít .
FeedRange je rozsah hodnot klíče oddílu a určuje položky, které lze číst z kanálu změn pomocí tohoto konkrétního FeedIterator. Musíte také zadat požadovanou ChangeFeedMode hodnotu pro režim, ve kterém chcete zpracovávat změny: nejnovější verzi nebo všechny verze a odstranění. Použijte buď ChangeFeedMode.LatestVersion nebo ChangeFeedMode.AllVersionsAndDeletes, abyste určili, který režim chcete použít ke čtení kanálu změn. Pokud používáte režim pro všechny verze a mazání, musíte vybrat, zda informační kanál změn začne hodnotou Now() nebo konkrétním tokenem pokračování.
Volitelně můžete zadat ChangeFeedRequestOptions pro nastavení PageSizeHint. Při nastavení nastaví tato vlastnost maximální počet položek přijatých na stránku. Pokud se operace v monitorované kolekci provádějí prostřednictvím uložených procedur, při čtení položek z kanálu změn se zachová obor transakce. V důsledku toho může být počet přijatých položek vyšší než zadaná hodnota, aby položky změněné stejnou transakcí byly vráceny jako součást jedné atomické dávky.
Tady je příklad, jak získat FeedIterator v režimu nejnovější verze, který vrací objekty entity, v tomto případě User objekt:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Návod
Pro verze starší než 3.34.0, nejnovější režim verze lze použít nastavením ChangeFeedMode.Incremental. Jak Incremental tak LatestVersion odkazují na nejnovější režim verze kanálu změn a aplikace, které používají oba režimy, vidí stejné chování.
Všechny verze a režim odstranění jsou v náhledu a je lze použít s verzemi .NET SDK >= 3.32.0-preview. Tady je příklad získání FeedIterator ve všech verzích a režimu odstranění, který vrací User objekty:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Využívání kanálu změn prostřednictvím datových proudů
FeedIterator pro oba režimy kanálu změn má dvě možnosti. Kromě příkladů, které vracejí objekty entity, můžete také získat odpověď s Stream podporou. Streamy umožňují číst data, aniž byste je museli nejprve deserializovat, takže ušetříte prostředky klienta.
Tady je příklad, jak získat FeedIterator v nejnovějším režimu verze, který vrací Stream:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Využití změn pro celý kontejner
Pokud nezadáte parametr FeedRange pro FeedIterator, můžete zpracovat kanál změn celého kontejneru vlastním tempem. Tady je příklad, který začne číst všechny změny počínaje aktuálním časem pomocí nejnovějšího režimu verze:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Protože kanál změn je v podstatě nekonečný seznam položek, které zahrnují všechny budoucí zápisy a aktualizace, hodnota HasMoreResults je vždy true. Když se pokusíte přečíst informační kanál změn a nejsou k dispozici žádné nové změny, obdržíte odpověď se stavem NotModified . To se liší od přijetí odpovědi beze změn a OK stavu. Je možné získat prázdné odpovědi kanálu změn, zatímco jsou k dispozici další změny a měli byste pokračovat v dotazování, dokud nepřijde NotModified. V předchozím příkladu NotModified se zpracovává čekáním pěti sekund před opětovnou kontrolou změn.
Využijte změny pro klíč oddílu
V některých případech můžete chtít zpracovat pouze změny pro specifický klíč oddílu. Můžete získat FeedIterator konkrétní klíč oddílu a zpracovat změny stejným způsobem jako pro celý kontejner.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Použití FeedRange pro paralelizaci
V procesoru kanálu změn se práce automaticky rozdělí mezi více příjemců. V modelu vyžádání kanálu změn můžete pomocí FeedRange paralelizovat zpracování kanálu změn. A FeedRange představuje rozsah hodnot klíče oddílu.
Tady je příklad, který ukazuje, jak získat seznam oblastí kontejneru:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Když získáte seznam FeedRange hodnot kontejneru, získáte jeden FeedRange na fyzický oddíl.
Pomocí FeedRange můžete vytvořit FeedIterator pro paralelizaci zpracovávání kanálu změn napříč více počítači nebo vlákny. Na rozdíl od předchozího příkladu, který ukázal, jak získat FeedIterator pro celý kontejner nebo jediný klíč oddílu, můžete pomocí FeedRanges získat více instancí FeedIterators, které mohou zpracovat změnový kanál paralelně.
V případě, že chcete použít FeedRanges, musíte mít proces orchestrátor, který získává FeedRanges a distribuuje je do těchto počítačů. Toto rozdělení může být:
- Použití
FeedRange.ToJsonString a distribuce této řetězcové hodnoty Spotřebitelé mohou tuto hodnotu použít s FeedRange.FromJsonString.
- Pokud je distribuce v procesu, předávejte odkaz na objekt
FeedRange.
Tady je ukázka, která ukazuje, jak od začátku číst záznam změn kontejneru pomocí dvou hypotetických samostatných počítačů, které čtou paralelně:
Počítač 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Počítač 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Uložení tokenů pro pokračování
Pozici svého FeedIterator objektu můžete uložit získáním tokenu pro pokračování. Token pokračování je řetězcová hodnota, která uchovává přehled o naposledy zpracovaných změnách vašeho FeedIteratoru a umožňuje obnovení v tomto okamžiku FeedIterator později. Pokud je zadán token pro pokračování, má přednost před časem zahájení a začíná od počátečních hodnot. Následující kód čte kanál změn od vytvoření kontejneru. Jakmile nejsou k dispozici žádné další změny, uchová token pro pokračování, aby bylo možné později obnovit čtení kanálu změn.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
Pokud používáte nejnovější režim verze, platnost tokenu FeedIterator pokračování nikdy nevyprší, dokud kontejner Azure Cosmos DB stále existuje. Pokud používáte režim všech verzí a odstranění, token pokračování je platný, pokud změny proběhly během doby uchovávání pro průběžné zálohování.
Chcete-li zpracovat kanál změn pomocí pull modelu, vytvořte instanci Iterator<FeedResponse<JsonNode>> responseIterator. Při vytváření CosmosChangeFeedRequestOptionsmusíte určit, kde začít číst kanál změn a předat FeedRange parametr, který chcete použít. Rozsah hodnot klíče oddílu FeedRange určuje položky, které lze číst z datového proudu změn.
Pokud chcete číst kanál změn v všech verzích a odstraněných verzích, musíte také určit allVersionsAndDeletes() při vytváření CosmosChangeFeedRequestOptions. Režim všech verzí a režim odstranění nepodporují zpracování kanálu změn od začátku nebo od určitého bodu v čase. Změny musíte zpracovat buď od tohoto okamžiku, nebo z pokračovacího tokenu. Všechny verze a režim odstranění jsou ve verzi Preview a jsou k dispozici ve verzi >Java SDK = 4.42.0.
Využití změn pro celý kontejner
Pokud zadáte FeedRange.forFullRange(), můžete feed změn zpracovat pro celý kontejner na vlastním tempem. Volitelně můžete zadat hodnotu v byPage(). Při nastavení nastaví tato vlastnost maximální počet položek přijatých na stránku.
Poznámka:
Všechny následující fragmenty kódu pocházejí z ukázek na GitHubu. Můžete použít ukázku režimu nejnovější verze a ukázku režimu všech verzí a odstranění.
Tady je příklad, jak získat responseIterator hodnotu v režimu nejnovější verze:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Tady je příklad, jak získat responseIterator ve všech verzích a režimech odstranění:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.allVersionsAndDeletes();
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Pak můžeme procházet výsledky. Protože kanál změn je v podstatě nekonečný seznam položek, které zahrnují všechny budoucí zápisy a aktualizace, hodnota responseIterator.hasNext() je vždy true. Tady je příklad v nejnovějším režimu verze, který čte všechny změny od začátku. Každá iterace po zpracování všech událostí zachová token pokračování. Navazuje na poslední zpracovaný bod v kanálu změn a zpracovává se pomocí createForProcessingFromContinuation:
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Využití změn klíče oddílu
V některých případech můžete chtít zpracovat pouze změny pro specifický klíč oddílu. Můžete zpracovat změny pro konkrétní klíč oddílu stejným způsobem, jakým můžete pracovat s celým kontejnerem. Tady je příklad, který používá nejnovější režim verze:
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
Použití FeedRange pro paralelizaci
V procesoru kanálu změn se práce automaticky rozdělí mezi více příjemců. V modelu vyžádání kanálu změn můžete pomocí FeedRange paralelizovat zpracování kanálu změn. A FeedRange představuje rozsah hodnot klíče oddílu.
Tady je příklad, který používá režim nejnovější verze, který ukazuje, jak získat seznam rozsahů kontejneru:
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
Když získáte seznam FeedRanges pro váš kontejner, získáte jeden FeedRange pro každý fyzický oddíl.
Pomocí FeedRange můžete paralelizovat zpracování toku změn na více počítačích nebo vláknech. Na rozdíl od předchozího příkladu, který ukázal, jak zpracovat změny pro celý kontejner nebo jeden klíč oddílu, můžete pomocí možnosti FeedRanges zpracovat kanál změn paralelně.
V případě, že chcete použít FeedRanges, musíte mít proces orchestrátor, který získává FeedRanges a distribuuje je do těchto počítačů. Toto rozdělení může být:
- Použití
FeedRange.toString() a distribuce této řetězcové hodnoty
- Pokud je distribuce v procesu, předávejte odkaz na objekt
FeedRange.
Tady je ukázka, která používá nejnovější režim verze. Ukazuje, jak číst od začátku kanálu změn kontejneru pomocí dvou hypotetických samostatných počítačů, které čtou paralelně:
Počítač 1:
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Počítač 2:
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Pokud chcete zpracovat kanál změn pomocí pull modelu, vytvořte instanci responseIterator s typem ItemPaged[Dict[str, Any]].
Při volání rozhraní API kanálu změn musíte určit, odkud chcete začít číst kanál změn, a předat feed_range parametr, který chcete použít.
Rozsah hodnot klíče oddílu feed_range určuje položky, které lze číst z datového proudu změn.
Můžete také zadat mode parametr pro režim kanálu změn, ve kterém chcete zpracovat změny: LatestVersion nebo AllVersionsAndDeletes. Výchozí hodnota je LatestVersion.
Použijte buď LatestVersion nebo AllVersionsAndDeletes, abyste určili, který režim chcete použít ke čtení kanálu změn.
Při použití AllVersionsAndDeletes režimu můžete buď začít zpracovávat změny od nynějška, nebo z tokenu continuation.
Čtení změnového kanálu od začátku nebo od bodu v čase pomocí start_time není podporováno.
Poznámka:
AllVersionsAndDeletes režim je v režimu preview a je k dispozici v sadě Python SDK verze 4.9.1b1 nebo novější.
Využití změn pro celý kontejner
Pokud nezadáte parametr feed_range, můžete zpracovat změnový kanál celého kontejneru vlastním tempem.
Tady je příklad, jak získat responseIterator v LatestVersion režimu z Beginning. Vzhledem k tomu, že LatestVersion je výchozí režim, mode nemusí být parametr předán:
responseIterator = container.query_items_change_feed(start_time="Beginning")
Tady je příklad, jak získat responseIterator v režimu AllVersionsAndDeletes od Now. Protože Now je výchozí hodnota parametru start_time, nemusí být předán.
responseIterator = container.query_items_change_feed(mode="AllVersionsAndDeletes")
Pak můžeme procházet výsledky. Protože kanál změn je v podstatě nekonečný seznam položek, které zahrnují všechny budoucí zápisy a aktualizace, responseIterator může nekonečně opakovat.
Tady je příklad v nejnovějším režimu verze, který čte všechny změny od začátku.
Každá iterace tiskne kanály změn pro dokumenty.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
Využití změn klíče oddílu
V některých případech můžete chtít zpracovat pouze změny pro specifický klíč oddílu.
Změny můžete zpracovat stejným způsobem jako u celého kontejneru s parametrem partition_key .
Tady je příklad, který používá LatestVersion režim:
pk = "partition_key_value"
responseIterator = container.query_items_change_feed(start_time="Beginning", partition_key=pk)
for doc in responseIterator:
print(doc)
Použití FeedRange pro paralelizaci
V modelu vyžádání kanálu změn můžete pomocí feed_range paralelizovat zpracování kanálu změn.
A feed_range představuje rozsah hodnot klíče oddílu.
Tady je příklad, který ukazuje, jak získat seznam rozsahů pro váš kontejner.
list příkaz převede iterátor na seznam:
rangesIterator = container.read_feed_ranges(force_refresh=False)
ranges = list(rangesIterator)
Když získáte seznam feed_range hodnot kontejneru, získáte jeden feed_range na fyzický oddíl.
Pomocí nástroje feed_rangemůžete vytvořit iterátor pro paralelizaci zpracování kanálu změn ve více počítačích nebo vláknech.
Na rozdíl od předchozího příkladu, který ukázal, jak získat responseIterator pro celý kontejner nebo jeden klíč oddílu, můžete použít feed_range k získání více iterátorů, které mohou paralelně zpracovávat tok změn.
Tady je ukázka, která ukazuje, jak od začátku číst záznam změn kontejneru pomocí dvou hypotetických samostatných počítačů, které čtou paralelně:
Počítač 1:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[0])
for doc in responseIterator:
print(doc)
Počítač 2:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[1])
for doc in responseIterator:
print(doc)
Uložení tokenů pro pokračování
Umístění iterátoru můžete uložit získáním tokenu pro pokračování.
Token pokračování je řetězcová hodnota, která uchovává přehled o naposledy responseIterator zpracovaných změnách a umožňuje iterátoru pokračovat v tomto okamžiku později.
Pokud je zadán token pro pokračování, má přednost před časem zahájení a začíná od počátečních hodnot.
Následující kód čte kanál změn od vytvoření kontejneru.
Jakmile nejsou k dispozici žádné další změny, uchová token pro pokračování, aby bylo možné později obnovit čtení kanálu změn.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
continuation_token = container.client_connection.last_response_headers['etag']
Poznámka:
Vzhledem k tomu, že token continuation obsahuje dříve použitý mode parametr, pokud byl použit continuation, mode parametr se ignoruje a namísto něj se použije mode z tokenu continuation.
Tady je ukázka, která ukazuje, jak číst z kanálu změn kontejneru pomocí tokenu continuation :
responseIterator = container.query_items_change_feed(continuation=continuation_token)
for doc in responseIterator:
print(doc)
Chcete-li zpracovat kanál změn pomocí pull modelu, vytvořte instanci ChangeFeedPullModelIterator. Při vytváření ChangeFeedPullModelIterator je nutné zadat požadovanou hodnotu changeFeedStartFrom uvnitř ChangeFeedIteratorOptions, která se skládá z počáteční pozice pro čtení změn a specifikace prostředku (klíč oddílu nebo "FeedRange"), pro které mají být načteny změny.
Poznámka:
Pokud není zadána hodnota changeFeedStartFrom, kanál změn se načte z Now() pro celý kontejner.
JavaScript SDK v současné době podporuje pouze nejnovější verzi a je nastavena automaticky.
Volitelně můžete použít maxItemCount v ChangeFeedIteratorOptions k nastavení maximálního počtu přijatých položek na jednu stránku.
Tady je příklad, jak získat iterátor v nejnovějším režimu verze, který vrací objekty entity:
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Now()
};
const iterator = container.items.getChangeFeedIterator(options);
Využití změn pro celý kontejner
Pokud nezadáte parametr FeedRange nebo PartitionKey v rámci ChangeFeedStartFrom, můžete zpracovat informační kanál změn celého kontejneru vlastním tempem. Tady je příklad, který začne číst všechny změny počínaje aktuálním časem:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Protože kanál změn je v podstatě nekonečný seznam položek, které zahrnují všechny budoucí zápisy a aktualizace, hodnota hasMoreResults je vždy true. Když se pokusíte přečíst informační kanál změn a nejsou k dispozici žádné nové změny, obdržíte odpověď se stavem NotModified . To se liší od přijetí odpovědi beze změn a OK stavu. Je možné získat prázdné odpovědi kanálu změn, zatímco jsou k dispozici další změny a měli byste pokračovat v dotazování, dokud nepřijde NotModified. V předchozím příkladu NotModified se zpracovává čekáním pěti sekund před opětovnou kontrolou změn.
Využijte změny pro klíč oddílu
V některých případech můžete chtít zpracovat pouze změny pro specifický klíč oddílu. Iterátor pro konkrétní klíč oddílu můžete získat a změny zpracovat stejným způsobem, jakým to můžete provést pro celý kontejner.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("partitionKeyValue")
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Použití FeedRange pro paralelizaci
V modelu vyžádání kanálu změn můžete pomocí FeedRange paralelizovat zpracování kanálu změn. A FeedRange představuje rozsah hodnot klíče oddílu.
Tady je příklad, který ukazuje, jak získat seznam oblastí kontejneru:
const ranges = await container.getFeedRanges();
Když získáte seznam FeedRange hodnot kontejneru, získáte jeden FeedRange na fyzický oddíl.
Pomocí nástroje FeedRangemůžete vytvořit iterátor pro paralelizaci zpracování kanálu změn ve více počítačích nebo vláknech. Na rozdíl od předchozího příkladu, který ukázal, jak získat iterátor záznamu změn pro celý kontejner nebo jediný klíč oddílu, můžete použít FeedRanges k získání více iterátorů, které mohou zpracovat záznam změn paralelně.
Tady je ukázka, která ukazuje, jak od začátku číst záznam změn kontejneru pomocí dvou hypotetických samostatných počítačů, které čtou paralelně:
Počítač 1:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[0])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Počítač 2:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[1])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Uložení tokenů pro pokračování
Umístění iterátoru můžete uložit získáním tokenu pro pokračování. Token pro pokračování je řetězcová hodnota, která uchovává přehled o naposledy zpracovaných změnách iterátoru kanálu změn a umožňuje iterátoru pokračovat v tomto okamžiku později. Pokud je zadán token pro pokračování, má přednost před časem zahájení a začíná od počátečních hodnot. Následující kód čte kanál změn od vytvoření kontejneru. Jakmile nejsou k dispozici žádné další změny, uchová token pro pokračování, aby bylo možné později obnovit čtení kanálu změn.
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
let continuation = "";
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
continuation = response.continuationToken;
break;
}
else {
console.log("Result found", response.result);
}
}
// For checking any new changes using the continuation token
const continuationOptions = {
changeFeedStartFrom: ChangeFeedStartFrom(continuation)
}
const newIterator = container.items.getChangeFeedIterator(continuationOptions);
Token pro pokračování nikdy nevyprší, dokud kontejner Azure Cosmos DB stále existuje.
Použití AsyncIteratoru
K načtení kanálu změn můžete použít JavaScript AsyncIterator. Tady je příklad AsyncIterator.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
let timeout = 0;
for await(const result of container.items.getChangeFeedIterator(options).getAsyncIterator()) {
if (result.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", result.result);
timeout = 0;
}
await waitFor(timeout);
}