Változáscsatorna-lekérési modell az Azure Cosmos DB-ben
A KÖVETKEZŐKRE VONATKOZIK: NoSQL
A változáscsatorna lekérési modelljét használva saját tempójában használhatja fel az Azure Cosmos DB változáscsatornáját. A változáscsatorna-feldolgozóhoz hasonlóan a változáscsatorna lekérési modelljével párhuzamosíthatja a módosítások feldolgozását több változáscsatorna-fogyasztó között.
Összehasonlítás a változáscsatorna-feldolgozóval
Számos forgatókönyv képes feldolgozni a változáscsatornát a változáscsatorna feldolgozójának vagy a változáscsatorna lekérési modelljének használatával. A lekérési modell folytatási jogkivonatai és a változáscsatorna feldolgozójának bérlettárolója egyaránt könyvjelzőként működik a változáscsatorna utolsó feldolgozott eleméhez vagy tételkötegéhez.
A folytatási jogkivonatokat azonban nem konvertálhatja bérletté, vagy fordítva.
Megjegyzés
A legtöbb esetben, amikor a változáscsatornából kell olvasnia, a legegyszerűbb megoldás a változáscsatorna feldolgozójának használata.
Az alábbi esetekben érdemes megfontolnia a lekéréses modell használatát:
- Egy adott partíciókulcs módosításainak beolvasása.
- Annak szabályozása, hogy az ügyfél milyen ütemben fogadja a módosításokat a feldolgozáshoz.
- A változáscsatorna meglévő adatainak egyszeri olvasásához (például adatmigráláshoz).
Íme néhány fő különbség a változáscsatorna feldolgozója és a változáscsatorna lekérési modellje között:
Szolgáltatás | Változáscsatorna feldolgozója | Változáscsatorna lekérési modellje |
---|---|---|
A változáscsatorna feldolgozásának aktuális pontjának nyomon követése | Bérlet (egy Azure Cosmos DB-tárolóban tárolva) | Folytatási jogkivonat (a memóriában tárolva vagy manuálisan megőrzve) |
A korábbi módosítások visszajátszásának lehetősége | Igen, leküldéses modellel | Igen, lekéréses modellel |
Lekérdezés a jövőbeli változásokról | Automatikusan ellenőrzi a felhasználó által megadott WithPollInterval értéken alapuló módosításokat |
Kézi |
Viselkedés, ahol nincsenek új módosítások | Automatikusan várja meg az értéket, WithPollInterval majd ellenőrizze újra |
Ellenőriznie kell az állapotot, és manuálisan újra kell ellenőriznie |
Módosítások feldolgozása egy teljes tárolóból | Igen, és automatikusan párhuzamosított több szál és gép között, amelyek ugyanabból a tárolóból származnak | Igen, és manuálisan párhuzamosítva a következő használatával: FeedRange |
Csak egyetlen partíciókulcs módosításainak feldolgozása | Nem támogatott | Yes |
Megjegyzés
A lekéréses modell használatakor – a változáscsatorna feldolgozójának használatával történő olvasástól eltérően – explicit módon kell kezelnie azokat az eseteket, amikor nincsenek új módosítások.
A lekéréses modell működése
A változáscsatorna lekéréses modellel történő feldolgozásához hozza létre a példányát.FeedIterator
A létrehozásakor FeedIterator
meg kell adnia egy kötelező ChangeFeedStartFrom
értéket, amely a módosítások olvasásának kezdőpozícióját és a használni FeedRange
kívánt értéket tartalmazza. A FeedRange
a partíciókulcs-értékek tartománya, és meghatározza azokat az elemeket, amelyek az adott FeedIterator
adatcsatorna használatával olvashatók a változáscsatornából. Meg kell adnia egy kötelező ChangeFeedMode
értéket ahhoz a módhoz is, amelyben a módosításokat szeretné feldolgozni: a legújabb verziót vagy az összes verziót és törlést. ChangeFeedMode.LatestVersion
A vagy ChangeFeedMode.AllVersionsAndDeletes
a használatával jelezheti, hogy melyik módot szeretné használni a változáscsatorna olvasásához. Ha az összes verziót és törlési módot használja, ki kell választania egy változáscsatorna kezdőértékét vagy Now()
egy adott folytatási jogkivonatból.
Igény szerint megadhatja ChangeFeedRequestOptions
a beállítását PageSizeHint
. Ha be van állítva, ez a tulajdonság beállítja az oldalonként fogadott elemek maximális számát. Ha a figyelt gyűjteményben tárolt eljárásokkal hajtják végre a műveleteket, a tranzakció hatóköre megmarad, amikor elemeket olvas be a változáscsatornából. Ennek eredményeképpen a fogadott elemek száma magasabb lehet a megadott értéknél, így az ugyanazon tranzakció által módosított elemek egy atomi köteg részeként lesznek visszaadva.
Íme egy példa arra, hogyan szerezhető be FeedIterator
a legújabb verzió módban, amely entitásobjektumokat ad vissza, ebben az esetben egy User
objektumot:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Tipp
A verzió 3.34.0
előtt a legújabb verziómód használható a beállítással ChangeFeedMode.Incremental
. LatestVersion
A változáscsatorna legújabb verziómódja és a két módot használó alkalmazások is Incremental
ugyanazt a viselkedést fogják tapasztalni.
Minden verzió és törlési mód előzetes verzióban érhető el, és a .NET SDK előzetes verziójával >használható = 3.32.0-preview
. Íme egy példa a dinamikus objektumokat visszamutató FeedIterator
összes verzió és törlési mód beszerzésére:
FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Megjegyzés
A legújabb verzió módban olyan objektumokat kap, amelyek a módosított elemet jelölik, néhány további metaadattal együtt. Minden verzió és törlési mód egy másik adatmodellt ad vissza. További információ: A válaszobjektum elemzése.
A változáscsatorna felhasználása streameken keresztül
FeedIterator
mindkét változáscsatorna módhoz két lehetőség áll rendelkezésre. Az entitásobjektumokat visszaadandó példákon kívül a választ is beszerezheti támogatással Stream
. A streamek lehetővé teszik az adatok olvasását anélkül, hogy először deszerializálni kellene őket, így mentheti az ügyfélerőforrásokat.
Íme egy példa arra, hogyan szerezhető be FeedIterator
a legújabb verzió módban, amely a következőt adja Stream
vissza:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Egy teljes tároló módosításainak felhasználása
Ha nem ad meg paramétert FeedRange
a számára FeedIterator
, a teljes tároló változáscsatornáját a saját tempójában dolgozhatja fel. Íme egy példa, amely elkezdi beolvasni az összes módosítást az aktuális időponttól kezdve a legújabb verziómód használatával:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Mivel a változáscsatorna gyakorlatilag az összes jövőbeli írást és frissítést magában foglaló elemek végtelen listája, a értéke HasMoreResults
mindig true
. Amikor megpróbálja elolvasni a változáscsatornát, és nincsenek elérhető új módosítások, állapotú NotModified
választ kap. Az előző példában úgy kezeli a rendszer, hogy öt másodpercet vár a módosítások ismételt ellenőrzése előtt.
Partíciókulcs módosításainak felhasználása
Bizonyos esetekben előfordulhat, hogy csak egy adott partíciókulcs módosításait szeretné feldolgozni. Beszerezhet egy adott partíciókulcsot, és ugyanúgy dolgozhatja FeedIterator
fel a módosításokat, mint egy teljes tároló esetében.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
A FeedRange használata párhuzamosításhoz
A változáscsatorna feldolgozójában a munka automatikusan több felhasználó között oszlik meg. A változáscsatorna lekérési modelljében a FeedRange
használatával párhuzamosíthatja a változáscsatorna feldolgozását. Az A FeedRange
a partíciókulcs-értékek tartományát jelöli.
Íme egy példa, amely bemutatja, hogyan szerezheti be a tároló tartományainak listáját:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
Ha lekér egy értéklistát FeedRange
a tárolóhoz, fizikai partíciónként egyet FeedRange
kap.
A használatával FeedRange
létrehozhat egy parancsot FeedIterator
, amellyel párhuzamosíthatja a változáscsatorna feldolgozását több gépen vagy szálon. Az előző példától eltérően, amely azt mutatta be, hogyan szerezhető be a FeedIterator
teljes tárolóhoz vagy egyetlen partíciókulcshoz, a FeedRanges használatával több FeedIteratort is beszerezhet, amelyek párhuzamosan tudják feldolgozni a változáscsatornát.
Abban az esetben, ha a FeedRanges-t szeretné használni, rendelkeznie kell egy vezénylőfolyamatkal, amely beszerzi a FeedRanges-eket, és osztja el őket az adott gépeken. Ez a disztribúció a következő lehet:
- A sztringérték használata
FeedRange.ToJsonString
és terjesztése. A fogyasztók ezt az értéket használhatják a következővelFeedRange.FromJsonString
: . - Ha az eloszlás folyamatban van, adja át az
FeedRange
objektumhivatkozást.
Íme egy minta, amely bemutatja, hogyan olvashatja be a tároló változáscsatornájának kezdetét két, párhuzamosan olvasható, hipotetikus, különálló gép használatával:
1. gép:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
2. gép:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Folytatási jogkivonatok mentése
A folytatási jogkivonat beszerzésével mentheti a FeedIterator
pozícióját. A folytatási jogkivonat egy sztringérték, amely nyomon követi a FeedIterator utolsó feldolgozott módosításait, és lehetővé teszi a FeedIterator
későbbi folytatást. Ha meg van adva a folytatási jogkivonat, elsőbbséget élvez a kezdési időponttal szemben, és az első értékektől indul. A következő kód végigolvassa a változáscsatornát a tároló létrehozása óta. Miután nem érhető el több módosítás, megőrzi a folytatási jogkivonatot, hogy a változáscsatorna-használat később folytatódjon.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
A legújabb verziómód használata esetén a FeedIterator
folytatási jogkivonat soha nem jár le, amíg az Azure Cosmos DB-tároló még létezik. Ha az összes verziót és törlési módot használja, a folytatási FeedIterator
jogkivonat akkor érvényes, ha a módosítások a folyamatos biztonsági mentések megőrzési időszakában történtek.