Sdílet prostřednictvím


Model vyžádání kanálu změn ve službě Azure Cosmos DB

PLATÍ PRO: NoSQL

Model vyžádání obsahu kanálu změn můžete použít k využívání kanálu změn služby Azure Cosmos DB vlastním tempem. Podobně jako u procesoru kanálu změn můžete použít model vyžádání kanálu změn k paralelizaci zpracování změn napříč více příjemci kanálu změn.

Porovnání s procesorem kanálu změn

Mnoho scénářů může kanál změn zpracovat pomocí procesoru kanálu změn nebo modelu vyžádání kanálu změn. Tokeny pokračování modelu vyžádání a kontejner zapůjčení procesoru kanálu změn fungují jako záložky pro poslední zpracovanou položku nebo dávku položek v kanálu změn.

Tokeny pokračování ale nemůžete převést na zapůjčení nebo naopak.

Poznámka:

Ve většině případů, když potřebujete číst z kanálu změn, nejjednodušší možností je použít procesor kanálu změn.

V těchto scénářích byste měli zvážit použití modelu vyžádané replikace:

  • Čtení změn z konkrétního klíče oddílu
  • Pokud chcete řídit tempo, jakým klient přijímá změny ke zpracování.
  • Jednorázové čtení existujících dat v kanálu změn (například při migraci dat)

Tady jsou některé klíčové rozdíly mezi procesorem kanálu změn a modelem vyžádání kanálu změn:

Funkce Procesor kanálu změn Model vyžádání kanálu změn
Sledování aktuálního bodu zpracování kanálu změn Zapůjčení (uložené v kontejneru Azure Cosmos DB) Token pro pokračování (uložený v paměti nebo ručně uložený)
Možnost přehrání minulých změn Ano, s modelem push Ano, s modelem vyžádané replikace
Dotazování na budoucí změny Automaticky kontroluje změny na základě uživatelem zadané WithPollInterval hodnoty. Ruční
Chování, kdy neexistují žádné nové změny Automaticky počkejte na hodnotu WithPollInterval a pak ji znovu zkontrolujte. Musí zkontrolovat stav a ručně znovu zkontrolovat.
Zpracování změn z celého kontejneru Ano, a automaticky paralelizovány napříč několika vlákny a počítači, které využívají stejný kontejner Ano a ručně paralelizováno pomocí FeedRange
Zpracování změn pouze z jednoho klíče oddílu Nepodporováno Ano

Poznámka:

Při použití modelu vyžádání obsahu, na rozdíl od čtení pomocí procesoru kanálu změn, musíte explicitně zpracovat případy, kdy neexistují žádné nové změny.

Práce s modelem vyžádané replikace

Chcete-li zpracovat kanál změn pomocí modelu vyžádání změn, vytvořte instanci FeedIterator. Při počátečním vytvoření FeedIteratormusíte zadat požadovanou ChangeFeedStartFrom hodnotu, která se skládá z počáteční pozice pro čtení změn a hodnoty, pro FeedRangekterou chcete použít . Jedná se FeedRange o rozsah hodnot klíče oddílu a určuje položky, které lze číst z kanálu změn pomocí tohoto konkrétního FeedIterator. Musíte také zadat požadovanou ChangeFeedMode hodnotu pro režim, ve kterém chcete zpracovávat změny: nejnovější verzi nebo všechny verze a odstranění. Použijte buď ChangeFeedMode.LatestVersion nebo ChangeFeedMode.AllVersionsAndDeletes určete, který režim chcete použít ke čtení kanálu změn. Pokud používáte všechny verze a režim odstranění, musíte vybrat kanál změn, který začíná hodnotou tokenu Now() pokračování nebo z konkrétního tokenu pokračování.

Volitelně můžete zadat ChangeFeedRequestOptions nastavení PageSizeHint. Při nastavení nastaví tato vlastnost maximální počet položek přijatých na stránku. Pokud se operace v monitorované kolekci provádějí prostřednictvím uložených procedur, při čtení položek z kanálu změn se zachovají obor transakce. V důsledku toho může být počet přijatých položek vyšší než zadaná hodnota, aby položky změněné stejnou transakcí byly vráceny jako součást jedné atomické dávky.

Tady je příklad, jak získat FeedIterator v režimu nejnovější verze, který vrací objekty entity, v tomto případě User objekt:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Tip

Před verzí 3.34.0lze použít režim nejnovější verze nastavením ChangeFeedMode.Incremental. Oba Incremental režimy LatestVersion verzí kanálu změn a aplikací, které používají oba režimy, se zobrazí stejné chování.

Všechny verze a režim odstranění jsou ve verzi Preview a lze jej použít s verzemi >.NET SDK = 3.32.0-preview. Tady je příklad získání FeedIterator ve všech verzích a režimu odstranění, který vrací dynamické objekty:

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Poznámka:

V režimu nejnovější verze obdržíte objekty, které představují položku, která se změnila, s některými dalšími metadaty. Všechny verze a režim odstranění vrátí jiný datový model. Další informace naleznete v tématu Parsování objektu odpovědi.

Využívání kanálu změn prostřednictvím datových proudů

FeedIterator pro oba režimy kanálu změn mají dvě možnosti. Kromě příkladů, které vracejí objekty entity, můžete také získat odpověď s Stream podporou. Streamy umožňují číst data, aniž byste je museli nejprve deserializovat, takže ušetříte prostředky klienta.

Tady je příklad, jak získat FeedIterator v nejnovějším režimu verze, který vrací Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Využití změn pro celý kontejner

Pokud parametr nezadáte FeedRangeFeedIterator, můžete zpracovat kanál změn celého kontejneru vlastním tempem. Tady je příklad, který začne číst všechny změny počínaje aktuálním časem pomocí nejnovějšího režimu verze:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Protože kanál změn je v podstatě nekonečný seznam položek, které zahrnují všechny budoucí zápisy a aktualizace, hodnota HasMoreResults je vždy true. Když se pokusíte přečíst informační kanál změn a nejsou k dispozici žádné nové změny, obdržíte odpověď se stavem NotModified . V předchozím příkladu se zpracovává tak, že před opětovnou kontrolou změn čeká pět sekund.

Využití změn klíče oddílu

V některých případech můžete chtít zpracovat pouze změny konkrétního klíče oddílu. Můžete získat FeedIterator konkrétní klíč oddílu a zpracovat změny stejným způsobem jako pro celý kontejner.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Použití FeedRange pro paralelizaci

V procesoru kanálu změn se práce automaticky rozdělí mezi více příjemců. V modelu vyžádání kanálu změn můžete použít FeedRange paralelizaci zpracování kanálu změn. A FeedRange představuje rozsah hodnot klíče oddílu.

Tady je příklad, který ukazuje, jak získat seznam oblastí kontejneru:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Když získáte seznam FeedRange hodnot kontejneru, získáte jeden FeedRange na fyzický oddíl.

Pomocí příkazu FeedRangemůžete vytvořit FeedIterator paralelizaci zpracování kanálu změn ve více počítačích nebo vláknech. Na rozdíl od předchozího příkladu, který ukázal, jak získat FeedIterator pro celý kontejner nebo jeden klíč oddílu, můžete pomocí FeedRanges získat více FeedIterators, které mohou zpracovat informační kanál změn paralelně.

V případě, že chcete použít FeedRanges, musíte mít orchestrátor proces, který získá FeedRanges a distribuuje je do těchto počítačů. Toto rozdělení může být:

  • Použití FeedRange.ToJsonString a distribuce této řetězcové hodnoty Spotřebitelé mohou tuto hodnotu použít s FeedRange.FromJsonString.
  • Pokud je distribuce v procesu, předejte FeedRange odkaz na objekt.

Tady je ukázka, která ukazuje, jak číst od začátku kanálu změn kontejneru pomocí dvou hypotetických samostatných počítačů, které čtou paralelně:

Počítač 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Počítač 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Uložení tokenů pro pokračování

Pozici svého FeedIterator objektu můžete uložit získáním tokenu pro pokračování. Token pokračování je řetězcová hodnota, která uchovává přehled o naposledy zpracovaných změnách vašeho FeedIteratoru a umožňuje obnovení v tomto okamžiku FeedIterator později. Pokud je zadán token pro pokračování, má přednost před časem zahájení a začíná od počátečních hodnot. Následující kód čte kanál změn od vytvoření kontejneru. Jakmile nebudou k dispozici žádné další změny, zachovají se token pro pokračování, aby bylo možné později obnovit spotřebu kanálu změn.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

Pokud používáte nejnovější režim verze, platnost tokenu FeedIterator pokračování nikdy nevyprší, dokud kontejner Azure Cosmos DB stále existuje. Pokud používáte všechny verze a režim odstranění, token pokračování je platný, FeedIterator pokud se změny v okně uchovávání informací pro průběžné zálohování provádí.

Další kroky