Megosztás a következőn keresztül:


Adatcsatorna lekérési modelljének módosítása az Azure Cosmos DB-ben

A KÖVETKEZŐRE VONATKOZIK: NoSQL

A változáscsatorna lekérési modelljének használatával saját tempójában használhatja az Azure Cosmos DB változáscsatornáját. A változáscsatorna-feldolgozóhoz hasonlóan a változáscsatorna lekérési modelljével párhuzamossá teheti a módosítások feldolgozását több változáscsatorna-fogyasztó között.

Összehasonlítás a változáscsatorna-feldolgozóval

Számos forgatókönyv feldolgozhatja a változáscsatornát a változáscsatorna-feldolgozó vagy a változáscsatorna lekérési modelljének használatával. A lekéréses modell folytatási jogkivonatai és a változáscsatorna-feldolgozó bérlettárolója egyaránt könyvjelzőként szolgál a változáscsatorna utolsó feldolgozott eleméhez vagy tételkötegéhez.

A folytatási jogkivonatokat azonban nem konvertálhatja bérletté, vagy fordítva.

Feljegyzés

A legtöbb esetben, amikor a változáscsatornából kell olvasnia, a legegyszerűbb megoldás a változáscsatorna-feldolgozó használata.

Érdemes megfontolni a lekéréses modell használatát az alábbi esetekben:

  • Egy adott partíciókulcs módosításainak olvasása.
  • Annak szabályozása, hogy az ügyfél milyen ütemben fogadja a módosításokat a feldolgozáshoz.
  • A változáscsatorna meglévő adatainak egyszeri olvasásához (például adatmigráláshoz).

Íme néhány fontos különbség a változáscsatorna processzora és a változáscsatorna lekérési modellje között:

Szolgáltatás Változáscsatorna feldolgozója Változáscsatorna lekérési modellje
A változáscsatorna feldolgozásának aktuális pontjának nyomon követése Bérlet (Azure Cosmos DB-tárolóban tárolva) Folytatási jogkivonat (memóriában tárolva vagy manuálisan megőrzve)
A korábbi módosítások visszajátszásának képessége Igen, leküldéses modellel Igen, lekéréses modellel
Lekérdezés a jövőbeli változásokról A felhasználó által megadott WithPollInterval érték alapján automatikusan ellenőrzi a módosításokat Manuális
Olyan viselkedés, amelyben nincsenek új módosítások Az érték automatikus várakozása WithPollInterval , majd újraellenőrzése Ellenőriznie kell az állapotot, és manuálisan újra kell ellenőriznie
Változások feldolgozása egy teljes tárolóból Igen, és automatikusan párhuzamosított több szál és gép között, amelyek ugyanazon tárolóból származnak Igen, és manuálisan párhuzamosítva a következő használatával: FeedRange
Csak egyetlen partíciókulcs módosításainak feldolgozása Nem támogatott Igen

Feljegyzés

A lekéréses modell használata esetén – a változáscsatorna-feldolgozóval való olvasástól eltérően – kifejezetten kezelnie kell azokat az eseteket, amikor nincsenek új módosítások.

A lekéréses modell működése

A változáscsatorna lekéréses modellel történő feldolgozásához hozzon létre egy példányt.FeedIterator Az első létrehozáskor FeedIteratormeg kell adnia egy kötelező ChangeFeedStartFrom értéket, amely a módosítások olvasásának kezdő pozíciójából és a használni FeedRangekívánt értékből áll. Ez FeedRange a partíciókulcs-értékek tartománya, és meghatározza azokat az elemeket, amelyek az adott adatcsatornával FeedIteratorolvashatók a változáscsatornából. Meg kell adnia egy kötelező ChangeFeedMode értéket ahhoz a módhoz is, amelyben a módosításokat feldolgozni szeretné: a legújabb verziót vagy az összes verziót és törlést. Használja vagy ChangeFeedMode.LatestVersionChangeFeedMode.AllVersionsAndDeletes jelezze, hogy melyik módot szeretné használni a változáscsatorna olvasásához. Ha az összes verziót és törlési módot használja, ki kell választania egy változáscsatornát, amely egy Now() adott folytatási jogkivonat értékéből indul ki.

Igény szerint megadhatja ChangeFeedRequestOptions , hogy be legyen-e állítva egy PageSizeHint. Ha be van állítva, ez a tulajdonság beállítja az oldalonként fogadott elemek maximális számát. Ha a figyelt gyűjtemény műveletei tárolt eljárásokon keresztül történnek, a tranzakció hatóköre megmarad a változáscsatorna elemeinek beolvasásakor. Ennek eredményeképpen a fogadott elemek száma magasabb lehet a megadott értéknél, így az ugyanazon tranzakcióval módosított elemek egy atomi köteg részeként lesznek visszaadva.

Íme egy példa arra, hogyan szerezhető be FeedIterator a legújabb verziómód, amely entitásobjektumokat ad vissza, ebben az esetben egy User objektumot:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Tipp.

A verzió 3.34.0előtt a legújabb verziómód használható a beállítással ChangeFeedMode.Incremental. LatestVersion A Incremental változáscsatorna legújabb verziómódjára és a mindkét módot használó alkalmazásokra is ugyanez a viselkedés fog vonatkozni.

Az összes verzió és törlési mód előzetes verzióban érhető el, és a .NET SDK előzetes verziójával >is használható . 3.32.0-preview Íme egy példa az összes olyan verzió és törlési mód beszerzésére FeedIterator , amely dinamikus objektumokat ad vissza:

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Feljegyzés

A legújabb verzió módban olyan objektumokat kap, amelyek a módosított elemet jelölik, néhány további metaadattal. Minden verzió és törlési mód egy másik adatmodellt ad vissza. További információ: A válaszobjektum elemzése.

A változáscsatorna felhasználása streameken keresztül

FeedIterator mindkét változáscsatorna-módhoz két lehetőség áll rendelkezésre. Az entitásobjektumokat visszaadandó példák mellett a választ is beszerezheti támogatással Stream . A streamek lehetővé teszik az adatok olvasását anélkül, hogy azokat először deszerializálni kellene, így az ügyfélerőforrásokra menthet.

Íme egy példa a legújabb verzió módban való beszerzésére FeedIterator , amely a következőt adja Streamvissza:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

A teljes tároló módosításainak felhasználása

Ha nem ad meg paramétert FeedRangeFeedIterator, saját ütemben feldolgozhatja egy teljes tároló változáscsatornáját. Íme egy példa, amely elkezdi olvasni az összes módosítást a jelenlegi időponttól kezdve a legújabb verziómód használatával:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Mivel a változáscsatorna gyakorlatilag az összes jövőbeli írást és frissítést magában foglaló elemek végtelen listája, az érték HasMoreResults mindig true. Amikor megpróbálja elolvasni a változáscsatornát, és nincsenek új módosítások, állapotú NotModified választ kap. Az előző példában úgy kezeli a rendszer, hogy öt másodpercet vár a módosítások újbóli ellenőrzése előtt.

Partíciókulcs módosításainak felhasználása

Bizonyos esetekben előfordulhat, hogy csak egy adott partíciókulcs módosításait szeretné feldolgozni. Beszerezhet FeedIterator egy adott partíciókulcsot, és ugyanúgy dolgozhatja fel a módosításokat, mint egy teljes tároló esetében.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

A FeedRange használata párhuzamosításhoz

A változáscsatorna-feldolgozóban a munka automatikusan több felhasználó között oszlik meg. A változáscsatorna lekérési modelljében a FeedRange változáscsatorna feldolgozásának párhuzamossá gombra is használhatja. Az A FeedRange a partíciókulcs értékeinek tartományát jelöli.

Íme egy példa, amely bemutatja, hogyan szerezheti be a tároló tartományainak listáját:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Amikor lekéri a tároló értékeinek listájátFeedRange, fizikai partíciónként egyet FeedRange kap.

A módosítási FeedRangehírcsatorna több gépen vagy szálon történő feldolgozását párhuzamossá teheti FeedIterator . Az előző példától eltérően, amely azt mutatta be, hogyan szerezhet be egy FeedIterator teljes tárolót vagy egy partíciókulcsot, a FeedRanges használatával több FeedIteratort is beszerezhet, amelyek párhuzamosan feldolgozhatják a változáscsatornát.

Abban az esetben, ha a FeedRanges-t szeretné használni, rendelkeznie kell egy vezénylési eljárással, amely lekéri a FeedRanges-eket, és elosztja őket az adott gépeken. Ez a disztribúció a következő lehet:

  • A sztringérték használata FeedRange.ToJsonString és terjesztése. A felhasználók ezt az értéket használhatják a következővel FeedRange.FromJsonString: .
  • Ha az eloszlás folyamatban van, adja át az FeedRange objektumhivatkozást.

Íme egy példa, amely bemutatja, hogyan olvasható be a tároló változáscsatornájának elejétől két, párhuzamosan olvasható, hipotetikus különálló gép:

1. gép:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

2. gép:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Folytatási jogkivonatok mentése

A folytatási jogkivonat beszerzésével mentheti a FeedIterator pozícióját. A folytatási jogkivonat egy sztringérték, amely nyomon követi a FeedIterator utolsó feldolgozott módosításait, és lehetővé teszi a folytatást ezen a FeedIterator ponton később. A folytatási jogkivonat, ha meg van adva, elsőbbséget élvez a kezdési időponttal szemben, és az első értékekből indul ki. A következő kód beolvassa a változáscsatornát a tároló létrehozása óta. Miután nem érhető el több módosítás, a rendszer megőrzi a folytatási jogkivonatot, hogy a változáscsatorna-felhasználás később újrainduljon.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

A legújabb verziómód használata esetén a FeedIterator folytatási jogkivonat soha nem jár le, amíg az Azure Cosmos DB-tároló még létezik. Ha az összes verziót és törlési módot használja, a FeedIterator folytatási jogkivonat akkor érvényes, ha a módosítások a folyamatos biztonsági mentések megőrzési időszakában történtek.

Következő lépések