Változáscsatorna-lekérési modell az Azure Cosmos DB-ben

A KÖVETKEZŐKRE VONATKOZIK: NoSQL

A változáscsatorna lekérési modelljét használva saját tempójában használhatja fel az Azure Cosmos DB változáscsatornáját. A változáscsatorna-feldolgozóhoz hasonlóan a változáscsatorna lekérési modelljével párhuzamosíthatja a módosítások feldolgozását több változáscsatorna-fogyasztó között.

Összehasonlítás a változáscsatorna-feldolgozóval

Számos forgatókönyv képes feldolgozni a változáscsatornát a változáscsatorna feldolgozójának vagy a változáscsatorna lekérési modelljének használatával. A lekérési modell folytatási jogkivonatai és a változáscsatorna feldolgozójának bérlettárolója egyaránt könyvjelzőként működik a változáscsatorna utolsó feldolgozott eleméhez vagy tételkötegéhez.

A folytatási jogkivonatokat azonban nem konvertálhatja bérletté, vagy fordítva.

Megjegyzés

A legtöbb esetben, amikor a változáscsatornából kell olvasnia, a legegyszerűbb megoldás a változáscsatorna feldolgozójának használata.

Az alábbi esetekben érdemes megfontolnia a lekéréses modell használatát:

  • Egy adott partíciókulcs módosításainak beolvasása.
  • Annak szabályozása, hogy az ügyfél milyen ütemben fogadja a módosításokat a feldolgozáshoz.
  • A változáscsatorna meglévő adatainak egyszeri olvasásához (például adatmigráláshoz).

Íme néhány fő különbség a változáscsatorna feldolgozója és a változáscsatorna lekérési modellje között:

Szolgáltatás Változáscsatorna feldolgozója Változáscsatorna lekérési modellje
A változáscsatorna feldolgozásának aktuális pontjának nyomon követése Bérlet (egy Azure Cosmos DB-tárolóban tárolva) Folytatási jogkivonat (a memóriában tárolva vagy manuálisan megőrzve)
A korábbi módosítások visszajátszásának lehetősége Igen, leküldéses modellel Igen, lekéréses modellel
Lekérdezés a jövőbeli változásokról Automatikusan ellenőrzi a felhasználó által megadott WithPollInterval értéken alapuló módosításokat Kézi
Viselkedés, ahol nincsenek új módosítások Automatikusan várja meg az értéket, WithPollInterval majd ellenőrizze újra Ellenőriznie kell az állapotot, és manuálisan újra kell ellenőriznie
Módosítások feldolgozása egy teljes tárolóból Igen, és automatikusan párhuzamosított több szál és gép között, amelyek ugyanabból a tárolóból származnak Igen, és manuálisan párhuzamosítva a következő használatával: FeedRange
Csak egyetlen partíciókulcs módosításainak feldolgozása Nem támogatott Yes

Megjegyzés

A lekéréses modell használatakor – a változáscsatorna feldolgozójának használatával történő olvasástól eltérően – explicit módon kell kezelnie azokat az eseteket, amikor nincsenek új módosítások.

A lekéréses modell működése

A változáscsatorna lekéréses modellel történő feldolgozásához hozza létre a példányát.FeedIterator A létrehozásakor FeedIteratormeg kell adnia egy kötelező ChangeFeedStartFrom értéket, amely a módosítások olvasásának kezdőpozícióját és a használni FeedRangekívánt értéket tartalmazza. A FeedRange a partíciókulcs-értékek tartománya, és meghatározza azokat az elemeket, amelyek az adott FeedIteratoradatcsatorna használatával olvashatók a változáscsatornából. Meg kell adnia egy kötelező ChangeFeedMode értéket ahhoz a módhoz is, amelyben a módosításokat szeretné feldolgozni: a legújabb verziót vagy az összes verziót és törlést. ChangeFeedMode.LatestVersion A vagy ChangeFeedMode.AllVersionsAndDeletes a használatával jelezheti, hogy melyik módot szeretné használni a változáscsatorna olvasásához. Ha az összes verziót és törlési módot használja, ki kell választania egy változáscsatorna kezdőértékét vagy Now() egy adott folytatási jogkivonatból.

Igény szerint megadhatja ChangeFeedRequestOptions a beállítását PageSizeHint. Ha be van állítva, ez a tulajdonság beállítja az oldalonként fogadott elemek maximális számát. Ha a figyelt gyűjteményben tárolt eljárásokkal hajtják végre a műveleteket, a tranzakció hatóköre megmarad, amikor elemeket olvas be a változáscsatornából. Ennek eredményeképpen a fogadott elemek száma magasabb lehet a megadott értéknél, így az ugyanazon tranzakció által módosított elemek egy atomi köteg részeként lesznek visszaadva.

Íme egy példa arra, hogyan szerezhető be FeedIterator a legújabb verzió módban, amely entitásobjektumokat ad vissza, ebben az esetben egy User objektumot:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Tipp

A verzió 3.34.0előtt a legújabb verziómód használható a beállítással ChangeFeedMode.Incremental. LatestVersion A változáscsatorna legújabb verziómódja és a két módot használó alkalmazások is Incremental ugyanazt a viselkedést fogják tapasztalni.

Minden verzió és törlési mód előzetes verzióban érhető el, és a .NET SDK előzetes verziójával >használható = 3.32.0-preview. Íme egy példa a dinamikus objektumokat visszamutató FeedIterator összes verzió és törlési mód beszerzésére:

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Megjegyzés

A legújabb verzió módban olyan objektumokat kap, amelyek a módosított elemet jelölik, néhány további metaadattal együtt. Minden verzió és törlési mód egy másik adatmodellt ad vissza. További információ: A válaszobjektum elemzése.

A változáscsatorna felhasználása streameken keresztül

FeedIterator mindkét változáscsatorna módhoz két lehetőség áll rendelkezésre. Az entitásobjektumokat visszaadandó példákon kívül a választ is beszerezheti támogatással Stream . A streamek lehetővé teszik az adatok olvasását anélkül, hogy először deszerializálni kellene őket, így mentheti az ügyfélerőforrásokat.

Íme egy példa arra, hogyan szerezhető be FeedIterator a legújabb verzió módban, amely a következőt adja Streamvissza:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Egy teljes tároló módosításainak felhasználása

Ha nem ad meg paramétert FeedRange a számára FeedIterator, a teljes tároló változáscsatornáját a saját tempójában dolgozhatja fel. Íme egy példa, amely elkezdi beolvasni az összes módosítást az aktuális időponttól kezdve a legújabb verziómód használatával:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Mivel a változáscsatorna gyakorlatilag az összes jövőbeli írást és frissítést magában foglaló elemek végtelen listája, a értéke HasMoreResults mindig true. Amikor megpróbálja elolvasni a változáscsatornát, és nincsenek elérhető új módosítások, állapotú NotModified választ kap. Az előző példában úgy kezeli a rendszer, hogy öt másodpercet vár a módosítások ismételt ellenőrzése előtt.

Partíciókulcs módosításainak felhasználása

Bizonyos esetekben előfordulhat, hogy csak egy adott partíciókulcs módosításait szeretné feldolgozni. Beszerezhet egy adott partíciókulcsot, és ugyanúgy dolgozhatja FeedIterator fel a módosításokat, mint egy teljes tároló esetében.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

A FeedRange használata párhuzamosításhoz

A változáscsatorna feldolgozójában a munka automatikusan több felhasználó között oszlik meg. A változáscsatorna lekérési modelljében a FeedRange használatával párhuzamosíthatja a változáscsatorna feldolgozását. Az A FeedRange a partíciókulcs-értékek tartományát jelöli.

Íme egy példa, amely bemutatja, hogyan szerezheti be a tároló tartományainak listáját:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Ha lekér egy értéklistát FeedRange a tárolóhoz, fizikai partíciónként egyet FeedRange kap.

A használatával FeedRangelétrehozhat egy parancsot FeedIterator , amellyel párhuzamosíthatja a változáscsatorna feldolgozását több gépen vagy szálon. Az előző példától eltérően, amely azt mutatta be, hogyan szerezhető be a FeedIterator teljes tárolóhoz vagy egyetlen partíciókulcshoz, a FeedRanges használatával több FeedIteratort is beszerezhet, amelyek párhuzamosan tudják feldolgozni a változáscsatornát.

Abban az esetben, ha a FeedRanges-t szeretné használni, rendelkeznie kell egy vezénylőfolyamatkal, amely beszerzi a FeedRanges-eket, és osztja el őket az adott gépeken. Ez a disztribúció a következő lehet:

  • A sztringérték használata FeedRange.ToJsonString és terjesztése. A fogyasztók ezt az értéket használhatják a következővel FeedRange.FromJsonString: .
  • Ha az eloszlás folyamatban van, adja át az FeedRange objektumhivatkozást.

Íme egy minta, amely bemutatja, hogyan olvashatja be a tároló változáscsatornájának kezdetét két, párhuzamosan olvasható, hipotetikus, különálló gép használatával:

1. gép:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

2. gép:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Folytatási jogkivonatok mentése

A folytatási jogkivonat beszerzésével mentheti a FeedIterator pozícióját. A folytatási jogkivonat egy sztringérték, amely nyomon követi a FeedIterator utolsó feldolgozott módosításait, és lehetővé teszi a FeedIterator későbbi folytatást. Ha meg van adva a folytatási jogkivonat, elsőbbséget élvez a kezdési időponttal szemben, és az első értékektől indul. A következő kód végigolvassa a változáscsatornát a tároló létrehozása óta. Miután nem érhető el több módosítás, megőrzi a folytatási jogkivonatot, hogy a változáscsatorna-használat később folytatódjon.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

A legújabb verziómód használata esetén a FeedIterator folytatási jogkivonat soha nem jár le, amíg az Azure Cosmos DB-tároló még létezik. Ha az összes verziót és törlési módot használja, a folytatási FeedIterator jogkivonat akkor érvényes, ha a módosítások a folyamatos biztonsági mentések megőrzési időszakában történtek.

Következő lépések