Sdílet prostřednictvím


Model stahování kanálu změn ve službě Azure Cosmos DB

PLATÍ PRO: NoSQL

Model vyžádání obsahu kanálu změn můžete použít k využívání kanálu změn služby Azure Cosmos DB vlastním tempem. Podobně jako u procesoru kanálu změn můžete použít tento model načítání kanálu změn k paralelizaci zpracování změn napříč více spotřebiteli kanálu změn.

Porovnejte s procesorem změn kanálu

Mnoho scénářů může zpracovat kanál změn pomocí procesoru kanálu změn nebo modelu tahového kanálu změn. Tokeny pokračování pull modelu a kontejner pro výpůjčky procesoru change feedu fungují jako záložky pro poslední zpracovanou položku nebo dávku položek v change feedu.

Tokeny pokračování ale nemůžete převést na nájem nebo naopak.

Poznámka:

Ve většině případů, když potřebujete číst z kanálu změn, je nejjednodušší možností použít procesor kanálu změn.

V těchto scénářích byste měli zvážit použití pull modelu:

  • Čtení změn z konkrétního klíče oddílu
  • Řízení tempa, jakým klient přijímá změny ke zpracování
  • Jednorázové čtení existujících dat v kanálu změn (například migrace dat)

Tady jsou některé klíčové rozdíly mezi procesorem kanálu změn a modelem vyžádání kanálu změn:

Funkce Procesor změnového kanálu Model poptávkového kanálu změn
Sledování aktuálního stavu zpracování kanálu změn Zapůjčení (uložené v kontejneru Azure Cosmos DB) Token pro pokračování (uložený v paměti nebo ručně uložený)
Možnost přehrání minulých změn Ano, s modelem push Ano, s modelem na vyžádání
Dotazování na budoucí změny Automaticky kontroluje změny na základě uživatelem zadané WithPollInterval hodnoty. Příručka
Chování v případě, že nedojde k žádným novým změnám Automaticky počkejte na hodnotu WithPollInterval a pak ji znovu zkontrolujte. Musí zkontrolovat stav a ručně znovu zkontrolovat.
Zpracování změn z celého kontejneru Ano, a automaticky paralelizovány napříč několika vlákny a počítači, které využívají stejný kontejner Ano a ručně paralelizováno pomocí FeedRange
Zpracování změn z jediného klíče dělení Nepodporováno Ano

Poznámka:

Na rozdíl od čtení pomocí procesoru změnového kanálu, při použití pull modelu musíte výslovně zpracovat případy, kdy nejsou žádné nové změny. Toto je indikováno kódem HTTP 304 NotModified. Odpověď kanálu změn vracející 0 dokumentů se stavovým kódem HTTP 200 OK nemusí nutně znamenat, že jste dosáhli konce kanálu změn a měli byste pokračovat v dotazování.

Práce s taženým modelem

Chcete-li zpracovat kanál změn pomocí pull modelu, vytvořte instanci FeedIterator. Při počátečním vytvoření FeedIteratormusíte zadat požadovanou ChangeFeedStartFrom hodnotu, která se skládá z počáteční pozice pro čtení změn a hodnoty, pro FeedRangekterou chcete použít . FeedRange je rozsah hodnot klíče oddílu a určuje položky, které lze číst z kanálu změn pomocí tohoto konkrétního FeedIterator. Musíte také zadat požadovanou ChangeFeedMode hodnotu pro režim, ve kterém chcete zpracovávat změny: nejnovější verzi nebo všechny verze a odstranění. Použijte buď ChangeFeedMode.LatestVersion nebo ChangeFeedMode.AllVersionsAndDeletes, abyste určili, který režim chcete použít ke čtení kanálu změn. Pokud používáte režim pro všechny verze a mazání, musíte vybrat, zda informační kanál změn začne hodnotou Now() nebo konkrétním tokenem pokračování.

Volitelně můžete zadat ChangeFeedRequestOptions pro nastavení PageSizeHint. Při nastavení nastaví tato vlastnost maximální počet položek přijatých na stránku. Pokud se operace v monitorované kolekci provádějí prostřednictvím uložených procedur, při čtení položek z kanálu změn se zachová obor transakce. V důsledku toho může být počet přijatých položek vyšší než zadaná hodnota, aby položky změněné stejnou transakcí byly vráceny jako součást jedné atomické dávky.

Tady je příklad, jak získat FeedIterator v režimu nejnovější verze, který vrací objekty entity, v tomto případě User objekt:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Návod

Pro verze starší než 3.34.0, nejnovější režim verze lze použít nastavením ChangeFeedMode.Incremental. Jak Incremental tak LatestVersion odkazují na nejnovější režim verze kanálu změn a aplikace, které používají oba režimy, vidí stejné chování.

Všechny verze a režim odstranění jsou v náhledu a je lze použít s verzemi .NET SDK >= 3.32.0-preview. Tady je příklad získání FeedIterator ve všech verzích a režimu odstranění, který vrací User objekty:

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Poznámka:

V režimu nejnovější verze obdržíte objekty, které představují položku, která se změnila, s některými dalšími metadaty. Všechny verze a režim odstranění vrátí jiný datový model.

Kompletní ukázku můžete získat pro režim nejnovější verze nebo režim všech verzí a odstranění.

Využívání kanálu změn prostřednictvím datových proudů

FeedIterator pro oba režimy kanálu změn má dvě možnosti. Kromě příkladů, které vracejí objekty entity, můžete také získat odpověď s Stream podporou. Streamy umožňují číst data, aniž byste je museli nejprve deserializovat, takže ušetříte prostředky klienta.

Tady je příklad, jak získat FeedIterator v nejnovějším režimu verze, který vrací Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Využití změn pro celý kontejner

Pokud nezadáte parametr FeedRange pro FeedIterator, můžete zpracovat kanál změn celého kontejneru vlastním tempem. Tady je příklad, který začne číst všechny změny počínaje aktuálním časem pomocí nejnovějšího režimu verze:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Protože kanál změn je v podstatě nekonečný seznam položek, které zahrnují všechny budoucí zápisy a aktualizace, hodnota HasMoreResults je vždy true. Když se pokusíte přečíst informační kanál změn a nejsou k dispozici žádné nové změny, obdržíte odpověď se stavem NotModified . To se liší od přijetí odpovědi beze změn a OK stavu. Je možné získat prázdné odpovědi kanálu změn, zatímco jsou k dispozici další změny a měli byste pokračovat v dotazování, dokud nepřijde NotModified. V předchozím příkladu NotModified se zpracovává čekáním pěti sekund před opětovnou kontrolou změn.

Využijte změny pro klíč oddílu

V některých případech můžete chtít zpracovat pouze změny pro specifický klíč oddílu. Můžete získat FeedIterator konkrétní klíč oddílu a zpracovat změny stejným způsobem jako pro celý kontejner.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Použití FeedRange pro paralelizaci

V procesoru kanálu změn se práce automaticky rozdělí mezi více příjemců. V modelu vyžádání kanálu změn můžete pomocí FeedRange paralelizovat zpracování kanálu změn. A FeedRange představuje rozsah hodnot klíče oddílu.

Tady je příklad, který ukazuje, jak získat seznam oblastí kontejneru:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Když získáte seznam FeedRange hodnot kontejneru, získáte jeden FeedRange na fyzický oddíl.

Pomocí FeedRange můžete vytvořit FeedIterator pro paralelizaci zpracovávání kanálu změn napříč více počítači nebo vlákny. Na rozdíl od předchozího příkladu, který ukázal, jak získat FeedIterator pro celý kontejner nebo jediný klíč oddílu, můžete pomocí FeedRanges získat více instancí FeedIterators, které mohou zpracovat změnový kanál paralelně.

V případě, že chcete použít FeedRanges, musíte mít proces orchestrátor, který získává FeedRanges a distribuuje je do těchto počítačů. Toto rozdělení může být:

  • Použití FeedRange.ToJsonString a distribuce této řetězcové hodnoty Spotřebitelé mohou tuto hodnotu použít s FeedRange.FromJsonString.
  • Pokud je distribuce v procesu, předávejte odkaz na objekt FeedRange.

Tady je ukázka, která ukazuje, jak od začátku číst záznam změn kontejneru pomocí dvou hypotetických samostatných počítačů, které čtou paralelně:

Počítač 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Počítač 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Uložení tokenů pro pokračování

Pozici svého FeedIterator objektu můžete uložit získáním tokenu pro pokračování. Token pokračování je řetězcová hodnota, která uchovává přehled o naposledy zpracovaných změnách vašeho FeedIteratoru a umožňuje obnovení v tomto okamžiku FeedIterator později. Pokud je zadán token pro pokračování, má přednost před časem zahájení a začíná od počátečních hodnot. Následující kód čte kanál změn od vytvoření kontejneru. Jakmile nejsou k dispozici žádné další změny, uchová token pro pokračování, aby bylo možné později obnovit čtení kanálu změn.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

Pokud používáte nejnovější režim verze, platnost tokenu FeedIterator pokračování nikdy nevyprší, dokud kontejner Azure Cosmos DB stále existuje. Pokud používáte režim všech verzí a odstranění, token pokračování je platný, pokud změny proběhly během doby uchovávání pro průběžné zálohování.

Další kroky