Model vyžádání kanálu změn ve službě Azure Cosmos DB
PLATÍ PRO: NoSQL
Model vyžádání obsahu kanálu změn můžete použít k využívání kanálu změn služby Azure Cosmos DB vlastním tempem. Podobně jako u procesoru kanálu změn můžete použít model vyžádání kanálu změn k paralelizaci zpracování změn napříč více příjemci kanálu změn.
Porovnání s procesorem kanálu změn
Mnoho scénářů může kanál změn zpracovat pomocí procesoru kanálu změn nebo modelu vyžádání kanálu změn. Tokeny pokračování modelu vyžádání a kontejner zapůjčení procesoru kanálu změn fungují jako záložky pro poslední zpracovanou položku nebo dávku položek v kanálu změn.
Tokeny pokračování ale nemůžete převést na zapůjčení nebo naopak.
Poznámka:
Ve většině případů, když potřebujete číst z kanálu změn, nejjednodušší možností je použít procesor kanálu změn.
V těchto scénářích byste měli zvážit použití modelu vyžádané replikace:
- Čtení změn z konkrétního klíče oddílu
- Pokud chcete řídit tempo, jakým klient přijímá změny ke zpracování.
- Jednorázové čtení existujících dat v kanálu změn (například při migraci dat)
Tady jsou některé klíčové rozdíly mezi procesorem kanálu změn a modelem vyžádání kanálu změn:
Funkce | Procesor kanálu změn | Model vyžádání kanálu změn |
---|---|---|
Sledování aktuálního bodu zpracování kanálu změn | Zapůjčení (uložené v kontejneru Azure Cosmos DB) | Token pro pokračování (uložený v paměti nebo ručně uložený) |
Možnost přehrání minulých změn | Ano, s modelem push | Ano, s modelem vyžádané replikace |
Dotazování na budoucí změny | Automaticky kontroluje změny na základě uživatelem zadané WithPollInterval hodnoty. |
Ruční |
Chování, kdy neexistují žádné nové změny | Automaticky počkejte na hodnotu WithPollInterval a pak ji znovu zkontrolujte. |
Musí zkontrolovat stav a ručně znovu zkontrolovat. |
Zpracování změn z celého kontejneru | Ano, a automaticky paralelizovány napříč několika vlákny a počítači, které využívají stejný kontejner | Ano a ručně paralelizováno pomocí FeedRange |
Zpracování změn pouze z jednoho klíče oddílu | Nepodporováno | Ano |
Poznámka:
Při použití modelu vyžádání obsahu, na rozdíl od čtení pomocí procesoru kanálu změn, musíte explicitně zpracovat případy, kdy neexistují žádné nové změny.
Práce s modelem vyžádané replikace
Chcete-li zpracovat kanál změn pomocí modelu vyžádání změn, vytvořte instanci FeedIterator
. Při počátečním vytvoření FeedIterator
musíte zadat požadovanou ChangeFeedStartFrom
hodnotu, která se skládá z počáteční pozice pro čtení změn a hodnoty, pro FeedRange
kterou chcete použít . Jedná se FeedRange
o rozsah hodnot klíče oddílu a určuje položky, které lze číst z kanálu změn pomocí tohoto konkrétního FeedIterator
. Musíte také zadat požadovanou ChangeFeedMode
hodnotu pro režim, ve kterém chcete zpracovávat změny: nejnovější verzi nebo všechny verze a odstranění. Použijte buď ChangeFeedMode.LatestVersion
nebo ChangeFeedMode.AllVersionsAndDeletes
určete, který režim chcete použít ke čtení kanálu změn. Pokud používáte všechny verze a režim odstranění, musíte vybrat kanál změn, který začíná hodnotou tokenu Now()
pokračování nebo z konkrétního tokenu pokračování.
Volitelně můžete zadat ChangeFeedRequestOptions
nastavení PageSizeHint
. Při nastavení nastaví tato vlastnost maximální počet položek přijatých na stránku. Pokud se operace v monitorované kolekci provádějí prostřednictvím uložených procedur, při čtení položek z kanálu změn se zachovají obor transakce. V důsledku toho může být počet přijatých položek vyšší než zadaná hodnota, aby položky změněné stejnou transakcí byly vráceny jako součást jedné atomické dávky.
Tady je příklad, jak získat FeedIterator
v režimu nejnovější verze, který vrací objekty entity, v tomto případě User
objekt:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Tip
Před verzí 3.34.0
lze použít režim nejnovější verze nastavením ChangeFeedMode.Incremental
. Oba Incremental
režimy LatestVersion
verzí kanálu změn a aplikací, které používají oba režimy, se zobrazí stejné chování.
Všechny verze a režim odstranění jsou ve verzi Preview a lze jej použít s verzemi >.NET SDK = 3.32.0-preview
. Tady je příklad získání FeedIterator
ve všech verzích a režimu odstranění, který vrací User
objekty:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Poznámka:
V režimu nejnovější verze obdržíte objekty, které představují položku, která se změnila, s některými dalšími metadaty. Všechny verze a režim odstranění vrátí jiný datový model. Další informace naleznete v tématu Parsování objektu odpovědi.
Kompletní ukázku můžete získat pro nejnovější režim verze nebo všechny verze a režim odstranění.
Využívání kanálu změn prostřednictvím datových proudů
FeedIterator
pro oba režimy kanálu změn mají dvě možnosti. Kromě příkladů, které vracejí objekty entity, můžete také získat odpověď s Stream
podporou. Streamy umožňují číst data, aniž byste je museli nejprve deserializovat, takže ušetříte prostředky klienta.
Tady je příklad, jak získat FeedIterator
v nejnovějším režimu verze, který vrací Stream
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Využití změn pro celý kontejner
Pokud parametr nezadáte FeedRange
FeedIterator
, můžete zpracovat kanál změn celého kontejneru vlastním tempem. Tady je příklad, který začne číst všechny změny počínaje aktuálním časem pomocí nejnovějšího režimu verze:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Protože kanál změn je v podstatě nekonečný seznam položek, které zahrnují všechny budoucí zápisy a aktualizace, hodnota HasMoreResults
je vždy true
. Když se pokusíte přečíst informační kanál změn a nejsou k dispozici žádné nové změny, obdržíte odpověď se stavem NotModified
. V předchozím příkladu se zpracovává tak, že před opětovnou kontrolou změn čeká pět sekund.
Využití změn klíče oddílu
V některých případech můžete chtít zpracovat pouze změny konkrétního klíče oddílu. Můžete získat FeedIterator
konkrétní klíč oddílu a zpracovat změny stejným způsobem jako pro celý kontejner.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Použití FeedRange pro paralelizaci
V procesoru kanálu změn se práce automaticky rozdělí mezi více příjemců. V modelu vyžádání kanálu změn můžete použít FeedRange
paralelizaci zpracování kanálu změn. A FeedRange
představuje rozsah hodnot klíče oddílu.
Tady je příklad, který ukazuje, jak získat seznam oblastí kontejneru:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Když získáte seznam FeedRange
hodnot kontejneru, získáte jeden FeedRange
na fyzický oddíl.
Pomocí příkazu FeedRange
můžete vytvořit FeedIterator
paralelizaci zpracování kanálu změn ve více počítačích nebo vláknech. Na rozdíl od předchozího příkladu, který ukázal, jak získat FeedIterator
pro celý kontejner nebo jeden klíč oddílu, můžete pomocí FeedRanges získat více FeedIterators, které mohou zpracovat informační kanál změn paralelně.
V případě, že chcete použít FeedRanges, musíte mít orchestrátor proces, který získá FeedRanges a distribuuje je do těchto počítačů. Toto rozdělení může být:
- Použití
FeedRange.ToJsonString
a distribuce této řetězcové hodnoty Spotřebitelé mohou tuto hodnotu použít sFeedRange.FromJsonString
. - Pokud je distribuce v procesu, předejte
FeedRange
odkaz na objekt.
Tady je ukázka, která ukazuje, jak číst od začátku kanálu změn kontejneru pomocí dvou hypotetických samostatných počítačů, které čtou paralelně:
Počítač 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Počítač 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Uložení tokenů pro pokračování
Pozici svého FeedIterator
objektu můžete uložit získáním tokenu pro pokračování. Token pokračování je řetězcová hodnota, která uchovává přehled o naposledy zpracovaných změnách vašeho FeedIteratoru a umožňuje obnovení v tomto okamžiku FeedIterator
později. Pokud je zadán token pro pokračování, má přednost před časem zahájení a začíná od počátečních hodnot. Následující kód čte kanál změn od vytvoření kontejneru. Jakmile nebudou k dispozici žádné další změny, zachovají se token pro pokračování, aby bylo možné později obnovit spotřebu kanálu změn.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
Pokud používáte nejnovější režim verze, platnost tokenu FeedIterator
pokračování nikdy nevyprší, dokud kontejner Azure Cosmos DB stále existuje. Pokud používáte všechny verze a režim odstranění, token pokračování je platný, FeedIterator
pokud se změny v okně uchovávání informací pro průběžné zálohování provádí.