A változáscsatorna feldolgozója az Azure Cosmos DB-ben
A KÖVETKEZŐRE VONATKOZIK: NoSQL
A változáscsatorna feldolgozója az Azure Cosmos DB .NET V3 és Java V4 SDK-k része. Leegyszerűsíti a változáscsatorna olvasásának folyamatát, és hatékonyan osztja el az eseményfeldolgozást több felhasználó között.
A változáscsatorna-feldolgozó használatának fő előnye a hibatűrő kialakítás, amely biztosítja a változáscsatorna összes eseményének "legalább egyszer" történő továbbítását.
Támogatott SDK-k
.Net V3 | Java | Node.JS | Python |
---|---|---|---|
✓ | ✓ | ✕ | ✕ |
A változáscsatorna-feldolgozó összetevői
A változáscsatorna-feldolgozó négy fő összetevővel rendelkezik:
A figyelt tároló: A figyelt tároló azokat az adatokat adja meg, amelyekből a változáscsatorna létre lett hozva. A monitorozott tárolóba való beszúrások és a tároló frissítései megjelennek a tároló változáscsatornájában.
A bérlettároló: A bérlettároló állapottárolóként működik, és koordinálja a változáscsatorna feldolgozását több feldolgozó között. A bérlettároló tárolható ugyanabban a fiókban, mint a monitorozott tároló, de akár külön fiókban is.
A számítási példány: Egy számítási példány üzemelteti a változáscsatorna-feldolgozót a változások figyeléséhez. A platformtól függően virtuális gép (VM), Kubernetes-pod, Azure-alkalmazás szolgáltatáspéldány vagy tényleges fizikai gép is képviselheti. A számítási példány egyedi azonosítóval rendelkezik, amelyet ebben a cikkben példánynévnek nevezünk.
A meghatalmazott: A meghatalmazott az a kód, amely meghatározza, hogy Ön, a fejlesztő mit szeretne elvégezni a változáscsatorna-feldolgozó által beolvasott módosítások minden egyes kötegével.
A változáscsatorna-feldolgozó ezen négy elemének együttműködésének további megértéséhez lássunk egy példát az alábbi ábrán. A figyelt tároló tárolja az elemeket, és a "Város" partíciókulcsot használja. A partíciókulcs értékei elemeket tartalmazó tartományokban vannak elosztva (minden tartomány egy fizikai partíciót jelöl).
Az ábrán két számítási példány látható, a változáscsatorna feldolgozója pedig különböző tartományokat rendel minden példányhoz a számítási eloszlás maximalizálása érdekében. Minden példánynak más, egyedi neve van.
Az egyes tartományok párhuzamosan lesznek beolvasva. A tartomány előrehaladása a bérlettároló többi tartományától elkülönítve, egy bérletdokumentumon keresztül tartható fenn. A bérletek kombinációja a változáscsatorna-feldolgozó aktuális állapotát jelöli.
A változáscsatorna-feldolgozó implementálása
A .NET változáscsatorna-feldolgozója a legújabb verziómódhoz, valamint az összes verzióhoz és törlési módhoz érhető el. Az összes verzió és törlési mód előzetes verzióban érhető el, és a verziótól 3.40.0-preview.0
kezdve támogatott a változáscsatorna-feldolgozó. Mindkét mód belépési pontja mindig a figyelt tároló.
A legújabb verziómóddal való olvasáshoz egy Container
példányban a következőt kell meghívnia GetChangeFeedProcessorBuilder
:
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
Az összes verzió és törlési mód használatával történő olvasáshoz hívja meg GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes
a Container
példányt:
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Mindkét mód esetében az első paraméter egy külön név, amely leírja a processzor célját. A második név a módosításokat kezelő delegált implementáció.
Íme egy példa egy delegáltra a legújabb verziómódhoz:
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
Íme egy példa egy meghatalmazottra az összes verzióhoz és törlési módhoz:
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ChangeFeedItem<ToDoItem> item in changes)
{
if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Previous.id}.");
}
else
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
}
// Simulate work
await Task.Delay(1);
}
}
Ezt követően a számítási példány nevét vagy az egyedi azonosítót a használatával WithInstanceName
határozhatja meg. A számítási példány nevének egyedinek és eltérőnek kell lennie az egyes üzembe helyezett számítási példányok esetében. A tárolót úgy állíthatja be, hogy fenntartsa a bérlet állapotát a következő használatával WithLeaseContainer
: .
A hívással Build
elindíthatja StartAsync
a processzorpéldányt.
Feljegyzés
Az előző kódrészletek a GitHubon található mintákból származnak. A legújabb verziómódhoz, illetve az összes verzióhoz és törlési módhoz is beszerezheti a mintát.
Feldolgozási életciklus
A gazdagéppéldány normál életciklusa:
- A változáscsatorna olvasása.
- Ha nincsenek változások, aludjon előre meghatározott ideig (a Builder használatával
WithPollInterval
testre szabható), és lépjen az 1. - Ha vannak módosítások, küldje el őket a meghatalmazottnak.
- Amikor a meghatalmazott sikeresen befejezi a módosítások feldolgozását, frissítse a bérlettárolót a legújabb feldolgozott időpontra, és lépjen az 1. pontra.
Hibakezelés
A változáscsatorna feldolgozója rugalmas a felhasználói kódhibákkal szemben. Ha a delegált implementáció nem kezelt kivételt (4. lépés) tapasztal, a módosításokat feldolgozó szál leáll, és végül létrejön egy új szál. Az új szál ellenőrzi a bérlettároló által az adott partíciókulcs-értékek tartományához mentett legújabb időpontot. Az új szál onnan újraindul, és gyakorlatilag ugyanazt a módosítási köteget küldi el a meghatalmazottnak. Ez a viselkedés mindaddig folytatódik, amíg a meghatalmazott megfelelően nem dolgozza fel a módosításokat, és ez az oka annak, hogy a változáscsatorna-feldolgozónak "legalább egyszer" garanciája van.
Feljegyzés
Egyetlen esetben a rendszer nem próbálkozik újra a módosítások kötegével. Ha a hiba az első alkalommal delegált végrehajtáskor következik be, a bérlettároló nem rendelkezik korábbi mentett állapotmal az újrapróbálkozási művelethez. Ezekben az esetekben az újrapróbálkozás a kezdeti indítási konfigurációt használja, amely lehet, hogy nem tartalmazza az utolsó köteget.
Ha meg szeretné akadályozni, hogy a változáscsatorna feldolgozója folyamatosan "elakadjon" ugyanazon módosítási köteg ismételt próbálkozásában, a delegált kódban logikát kell hozzáadnia, hogy kivétel nélkül dokumentumokat írjon egy hibás üzenetsorba. Ez a kialakítás biztosítja, hogy nyomon tudja követni a feldolgozatlan módosításokat, miközben továbbra is feldolgozhatja a jövőbeli módosításokat. A hibaüzenetek üzenetsora lehet egy másik Azure Cosmos DB-tároló. A pontos adattár nem számít. Egyszerűen csak azt szeretné, hogy a feldolgozatlan módosítások megmaradjanak.
A változáscsatorna-becslést is használhatja a változáscsatorna-feldolgozó példányok előrehaladásának figyelésére a változáscsatorna olvasása közben, vagy életciklus-értesítések használatával észlelheti az alapul szolgáló hibákat.
Életciklus-értesítések
A változáscsatorna-feldolgozót az életciklusa bármely releváns eseményéhez csatlakoztathatja. Dönthet úgy, hogy értesítést kap egy vagy mindegyikről. A javaslat az, hogy legalább regisztrálja a hibaértesítést:
- Regisztráljon egy kezelőt
WithLeaseAcquireNotification
, hogy értesítést kapjon, amikor az aktuális gazdagép bérletet szerez be a feldolgozás megkezdéséhez. - Regisztráljon egy kezelőt
WithLeaseReleaseNotification
, hogy értesítést kapjon, amikor az aktuális gazdagép kiad egy bérletet, és leállítja a feldolgozást. - Regisztráljon egy kezelőt
WithErrorNotification
, hogy értesítést kapjon, ha az aktuális gazdagép kivételt tapasztal a feldolgozás során. Meg kell tudnia különböztetni, hogy a forrás a felhasználó delegáltja (nem kezelt kivétel), vagy olyan hiba, amelyet a processzor a figyelt tároló elérésekor tapasztal (például hálózati problémák).
Az életciklus-értesítések mindkét változáscsatorna-módban elérhetők. Íme egy példa az életciklus-értesítésekre a legújabb verzió módban:
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Üzembehelyezési egység
Egyetlen változáscsatorna-feldolgozó üzembehelyezési egysége egy vagy több számítási példányból áll, amelyek azonos értékkel processorName
és ugyanazzal a bérlettároló-konfigurációval rendelkeznek, de különböző példánynevek. Számos olyan üzembehelyezési egység lehet, amelyben minden egység eltérő üzleti folyamatokkal rendelkezik a módosításokhoz, és minden üzembe helyezési egység egy vagy több példányból áll.
Előfordulhat például, hogy egy központi telepítési egység minden alkalommal elindít egy külső API-t, amikor változás történt a tárolóban. Egy másik üzembehelyezési egység valós időben helyezheti át az adatokat minden alkalommal, amikor változás történt. Ha változás történik a figyelt tárolóban, az összes üzembehelyezési egység értesítést kap.
Dinamikus méretezés
Ahogy korábban említettük, az üzembe helyezési egységen belül egy vagy több számítási példány is lehet. Az üzembehelyezési egységen belüli számítási eloszlás előnyeinek kihasználásához az egyetlen fő követelmény a következők:
- Minden példánynak ugyanazzal a bérlettároló-konfigurációval kell rendelkeznie.
- Minden példánynak ugyanazzal az értékkel kell rendelkeznie a következőhöz
processorName
: . - Minden példánynak különböző példánynévvel kell rendelkeznie (
WithInstanceName
).
Ha ez a három feltétel érvényes, akkor a változáscsatorna-feldolgozó elosztja a bérlettárolóban lévő összes bérletet az üzembehelyezési egység összes futó példánya között, és egy egyenlőségelosztó algoritmus használatával párhuzamosítja a számítást. A bérletek mindig egy példány tulajdonában vannak, így a példányok száma nem lehet nagyobb, mint a bérletek száma.
A példányok száma nőhet és csökkenhet. A változáscsatorna-feldolgozó dinamikusan módosítja a terhelést a megfelelő újraelosztással.
Emellett a változáscsatorna-feldolgozó dinamikusan módosíthatja a tárolók skáláját, ha a tároló átviteli sebessége vagy tárhelye nő. A tároló növekedésével a változáscsatorna feldolgozója transzparensen kezeli a forgatókönyvet a bérletek dinamikus növelésével és az új bérletek meglévő példányok közötti elosztásával.
Kezdési időpont
Alapértelmezés szerint amikor egy változáscsatorna-feldolgozó első alkalommal indul el, inicializálja a bérlettárolót, és elindítja annak feldolgozási életciklusát. A rendszer nem észlel minden olyan módosítást, amely a figyelt tárolóban történt a változáscsatorna-feldolgozó első inicializálása előtt.
Olvasás egy korábbi dátumból és időpontból
A változáscsatorna-feldolgozó inicializálható egy adott dátumtól és időponttól kezdődő módosítások olvasásához, ha átad egy példányt DateTime
a WithStartTime
szerkesztőbővítménynek:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(particularPointInTime)
.Build();
A változáscsatorna-feldolgozó inicializálva van az adott dátumhoz és időponthoz, és elkezdi olvasni a későbbi módosításokat.
Olvasás az elejétől
Más esetekben, például adatmigráláskor vagy egy tároló teljes előzményeinek elemzésekor a tároló élettartamának kezdetétől be kell olvasnia a változáscsatornát. Használhatja WithStartTime
a szerkesztőbővítményt, de passz DateTime.MinValue.ToUniversalTime()
, amely a minimális DateTime
érték UTC-ábrázolását hozza létre, mint ebben a példában:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
A változáscsatorna-feldolgozó inicializálva van, és a tároló élettartamának kezdetétől kezdi olvasni a módosításokat.
Feljegyzés
Ezek a testreszabási lehetőségek csak a változáscsatorna-feldolgozó kezdőpontjának beállítására szolgálnak. A bérlettároló első inicializálása után ezeknek a beállításoknak a módosítása nincs hatással.
A kezdőpont testreszabása csak a legújabb verziómódosítási módhoz érhető el. Ha az összes verziót és törlési módot használja, a processzor indításától kezdve kell elkezdeni az olvasást, vagy a fiók folyamatos biztonsági mentési megőrzési időszakán belüli korábbi bérletállapotból kell folytatnia az olvasást.
A hírcsatorna és a kiosztott átviteli sebesség módosítása
A figyelt tároló adatcsatorna-olvasási műveleteinek módosítása kérelemegységeket használ fel. Győződjön meg arról, hogy a figyelt tároló nem tapasztal szabályozást. A szabályozás késlelteti a változáscsatorna-események fogadását a processzorokon.
A bérlettároló műveletei (az állapot frissítése és karbantartása) kérelemegységeket használnak fel. Minél több példány használja ugyanazt a bérlettárolót, annál nagyobb a kérelemegységek potenciális felhasználása. Győződjön meg arról, hogy a bérlettároló nem tapasztal szabályozást. A szabályozás késlelteti a változáscsatorna-események fogadását. A szabályozás akár teljesen befejezheti a feldolgozást.
A bérlettároló megosztása
A bérlettárolókat több üzembehelyezési egységben is megoszthatja. A megosztott bérlettárolókban minden üzembehelyezési egység figyel egy másik figyelt tárolót, vagy más értéket ad meg.processorName
Ebben a konfigurációban minden üzembe helyezési egység egy független állapotot tart fenn a bérlettárolón. Tekintse át a kérelemegység-felhasználást egy bérlettárolón , és győződjön meg arról, hogy a kiosztott átviteli sebesség elegendő az összes üzembe helyezési egységhez.
Speciális bérletkonfiguráció
Három kulcskonfiguráció befolyásolhatja a változáscsatorna processzor működését. Minden konfiguráció hatással van a bérlettároló kérelemegység-felhasználására. A változáscsatorna-feldolgozó létrehozásakor az alábbi konfigurációk egyikét állíthatja be, de gondosan használja őket:
- Bérlet beszerzése: Alapértelmezés szerint 17 másodpercenként. A gazdagép rendszeresen ellenőrzi a bérlettároló állapotát, és megfontolja a bérletek beszerzését a dinamikus skálázási folyamat részeként. Ez a folyamat úgy történik, hogy végrehajt egy lekérdezést a bérlettárolón. Ennek az értéknek a csökkentése felgyorsítja a bérletek újraelosztását és beszerzését, de növeli a kérelemegység-felhasználást a bérlettárolón.
- Bérlet lejárata: Alapértelmezés szerint 60 másodperc. Meghatározza, hogy a bérletek mennyi ideig létezhetnek megújítási tevékenység nélkül, mielőtt egy másik gazdagép beolvasta volna. Amikor egy gazdagép összeomlik, a tulajdonában lévő bérleteket más gazdagépek veszik át ezt az időtartamot követően, valamint a konfigurált megújítási időközt. Ennek az értéknek a csökkentése gyorsabb helyreállítást tesz lehetővé a gazdagép összeomlása után, de a lejárati értéknek soha nem szabad alacsonyabbnak lennie a megújítási időköznél.
- Bérlet megújítása: Alapértelmezés szerint 13 másodpercenként. A bérletet birtoklő gazdagép rendszeresen megújítja a bérletet, még akkor is, ha nincsenek új használatban lévő módosítások. Ez a folyamat egy csere bérleten való végrehajtásával történik. Ennek az értéknek a csökkentése csökkenti a gazdagép összeomlása által elveszett bérletek észleléséhez szükséges időt, de növeli a kérelemegység-felhasználást a bérlettárolón.
A változáscsatorna feldolgozójának helye
A változáscsatorna-feldolgozó bármely olyan platformon üzemeltethető, amely támogatja a hosszú ideig futó folyamatokat vagy feladatokat. Íme néhány példa:
- A WebJobs folyamatos futó példánya a Azure-alkalmazás Szolgáltatásban
- Folyamat az Azure Virtual Machines egy példányában
- Háttérfeladat az Azure Kubernetes Service-ben
- Kiszolgáló nélküli függvény az Azure Functionsben
- Egy ASP.NET üzemeltetett szolgáltatás
Bár a változáscsatorna-feldolgozó rövid élettartamú környezetekben is futtatható, mert a bérlettároló fenntartja az állapotot, a környezetek indítási ciklusa késlelteti az értesítések fogadásához szükséges időt (mivel a környezet minden indításakor túl nagy a terhelés a processzor indításához).
Szerepköralapú hozzáférési követelmények
Ha a Microsoft Entra ID-t hitelesítési mechanizmusként használja, győződjön meg arról, hogy az identitás rendelkezik a megfelelő engedélyekkel:
- A figyelt tárolón:
Microsoft.DocumentDB/databaseAccounts/readMetadata
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
- A bérlettárolón:
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery
További erőforrások
- Azure Cosmos DB SDK
- Mintaalkalmazás befejezése a GitHubon
- További használati minták a GitHubon
- Azure Cosmos DB workshop tesztkörnyezetek változáscsatorna-feldolgozóhoz
Következő lépések
További információ a változáscsatorna-feldolgozóról az alábbi cikkekben: