A változáscsatorna feldolgozója az Azure Cosmos DB-ben

A KÖVETKEZŐRE VONATKOZIK: NoSQL

A változáscsatorna feldolgozója az Azure Cosmos DB .NET V3 és Java V4 SDK-k része. Leegyszerűsíti a változáscsatorna olvasásának folyamatát, és hatékonyan osztja el az eseményfeldolgozást több felhasználó között.

A változáscsatorna-feldolgozó használatának fő előnye a hibatűrő kialakítás, amely biztosítja a változáscsatorna összes eseményének "legalább egyszer" történő továbbítását.

A változáscsatorna-feldolgozó összetevői

A változáscsatorna-feldolgozó négy fő összetevővel rendelkezik:

  • A figyelt tároló: A figyelt tároló azokat az adatokat adja meg, amelyekből a változáscsatorna létre lett hozva. A monitorozott tárolóba való beszúrások és a tároló frissítései megjelennek a tároló változáscsatornájában.

  • A bérlettároló: A bérlettároló állapottárolóként működik, és koordinálja a változáscsatorna feldolgozását több feldolgozó között. A bérlettároló tárolható ugyanabban a fiókban, mint a monitorozott tároló, de akár külön fiókban is.

  • A számítási példány: Egy számítási példány üzemelteti a változáscsatorna-feldolgozót a változások figyeléséhez. A platformtól függően virtuális gép (VM), kubernetes-pod, Azure-alkalmazás szolgáltatáspéldány vagy tényleges fizikai gép is képviselheti. A számítási példány egyedi azonosítóval rendelkezik, amelyet ebben a cikkben példánynévnek nevezünk.

  • A meghatalmazott: A meghatalmazott az a kód, amely meghatározza, hogy Ön, a fejlesztő mit szeretne elvégezni a változáscsatorna-feldolgozó által beolvasott módosítások minden egyes kötegével.

A változáscsatorna-feldolgozó ezen négy elemének együttműködésének további megértéséhez lássunk egy példát az alábbi ábrán. A figyelt tároló tárolja az elemeket, és a "Város" partíciókulcsot használja. A partíciókulcs értékei elemeket tartalmazó tartományokban vannak elosztva (minden tartomány egy fizikai partíciót jelöl).

Az ábrán két számítási példány látható, a változáscsatorna feldolgozója pedig különböző tartományokat rendel minden példányhoz a számítási eloszlás maximalizálása érdekében. Minden példánynak más, egyedi neve van.

Az egyes tartományok párhuzamosan lesznek beolvasva. A tartomány előrehaladása a bérlettároló többi tartományától elkülönítve, egy bérletdokumentumon keresztül tartható fenn. A bérletek kombinációja a változáscsatorna-feldolgozó aktuális állapotát jelöli.

Példa adatcsatorna-feldolgozó módosítására

A változáscsatorna-feldolgozó implementálása

A változáscsatorna feldolgozója a .NET-ben jelenleg csak a legújabb verziómódhoz érhető el. A belépési pont mindig a figyelt tároló. Container Egy példányban a következőt kell meghívniGetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Az első paraméter egy különálló név, amely leírja a processzor célját. A második név a módosításokat kezelő delegált implementáció.

Íme egy példa egy meghatalmazottra:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Ezt követően a számítási példány nevét vagy az egyedi azonosítót a használatával WithInstanceNamehatározhatja meg. A számítási példány nevének egyedinek és eltérőnek kell lennie az egyes üzembe helyezett számítási példányok esetében. A tárolót úgy állíthatja be, hogy fenntartsa a bérlet állapotát a következő használatával WithLeaseContainer: .

A hívással Build elindíthatja StartAsynca processzorpéldányt.

Feldolgozási életciklus

A gazdagéppéldány normál életciklusa:

  1. A változáscsatorna olvasása.
  2. Ha nincsenek változások, aludjon előre meghatározott ideig (a Builder használatával WithPollInterval testre szabható), és lépjen az 1.
  3. Ha vannak módosítások, küldje el őket a meghatalmazottnak.
  4. Amikor a meghatalmazott sikeresen befejezi a módosítások feldolgozását, frissítse a bérlettárolót a legújabb feldolgozott időpontra, és lépjen az 1. pontra.

Hibakezelés

A változáscsatorna feldolgozója rugalmas a felhasználói kódhibákkal szemben. Ha a delegált implementáció nem kezelt kivételt (4. lépés) tapasztal, a módosításokat feldolgozó szál leáll, és végül létrejön egy új szál. Az új szál ellenőrzi a bérlettároló által az adott partíciókulcs-értékek tartományához mentett legújabb időpontot. Az új szál onnan újraindul, és gyakorlatilag ugyanazt a módosítási köteget küldi el a meghatalmazottnak. Ez a viselkedés mindaddig folytatódik, amíg a meghatalmazott megfelelően nem dolgozza fel a módosításokat, és ez az oka annak, hogy a változáscsatorna-feldolgozónak "legalább egyszer" garanciája van. A változáscsatorna végleges konzisztenciaszinten való felhasználása ismétlődő eseményeket is eredményezhet a változáscsatorna későbbi olvasási műveletei között. Egy olvasási művelet utolsó eseménye például a következő művelet első eseményeként jelenhet meg.

Feljegyzés

Egyetlen esetben a rendszer nem próbálkozik újra a módosítások kötegével. Ha a hiba az első delegálási végrehajtáskor következik be, a bérlettároló nem rendelkezik korábbi mentett állapotmal az újrapróbálkozáskor. Ezekben az esetekben az újrapróbálkozás a kezdeti indítási konfigurációt használja, amely lehet, hogy nem tartalmazza az utolsó köteget.

Ha meg szeretné akadályozni, hogy a változáscsatorna feldolgozója folyamatosan "elakadjon" ugyanazon módosítási köteg ismételt próbálkozásában, a delegált kódban logikát kell hozzáadnia, hogy kivétel nélkül dokumentumokat írjon egy hibás üzenetsorba. Ez a kialakítás biztosítja, hogy nyomon tudja követni a feldolgozatlan módosításokat, miközben továbbra is feldolgozhatja a jövőbeli módosításokat. A hibaüzenetek üzenetsora lehet egy másik Azure Cosmos DB-tároló. A pontos adattár nem számít. Egyszerűen csak azt szeretné, hogy a feldolgozatlan módosítások megmaradjanak.

A változáscsatorna-becslést is használhatja a változáscsatorna-feldolgozó példányok előrehaladásának figyelésére a változáscsatorna olvasása közben, vagy életciklus-értesítések használatával észlelheti az alapul szolgáló hibákat.

Életciklus-értesítések

A változáscsatorna-feldolgozót az életciklusa bármely releváns eseményéhez csatlakoztathatja. Dönthet úgy, hogy értesítést kap egy vagy mindegyikről. A javaslat az, hogy legalább regisztrálja a hibaértesítést:

  • Regisztráljon egy kezelőt WithLeaseAcquireNotification , hogy értesítést kapjon, amikor az aktuális gazdagép bérletet szerez be a feldolgozás megkezdéséhez.
  • Regisztráljon egy kezelőt WithLeaseReleaseNotification , hogy értesítést kapjon, amikor az aktuális gazdagép kiad egy bérletet, és leállítja a feldolgozást.
  • Regisztráljon egy kezelőt WithErrorNotification , hogy értesítést kapjon, ha az aktuális gazdagép kivételt tapasztal a feldolgozás során. Meg kell tudnia különböztetni, hogy a forrás a felhasználó delegáltja (nem kezelt kivétel), vagy olyan hiba, amelyet a processzor a figyelt tároló elérésekor tapasztal (például hálózati problémák).
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Üzembehelyezési egység

Egyetlen változáscsatorna-feldolgozó üzembehelyezési egysége egy vagy több számítási példányból áll, amelyek azonos értékkel processorName és ugyanazzal a bérlettároló-konfigurációval rendelkeznek, de különböző példánynevek. Számos olyan üzembehelyezési egység lehet, amelyben minden egység eltérő üzleti folyamatokkal rendelkezik a módosításokhoz, és minden üzembe helyezési egység egy vagy több példányból áll.

Előfordulhat például, hogy egy központi telepítési egység minden alkalommal elindít egy külső API-t, amikor változás történt a tárolóban. Egy másik üzembehelyezési egység valós időben helyezheti át az adatokat minden alkalommal, amikor változás történt. Ha változás történik a figyelt tárolóban, az összes üzembehelyezési egység értesítést kap.

Dinamikus méretezés

Ahogy korábban említettük, az üzembe helyezési egységen belül egy vagy több számítási példány is lehet. Az üzembehelyezési egységen belüli számítási eloszlás előnyeinek kihasználásához az egyetlen fő követelmény a következők:

  • Minden példánynak ugyanazzal a bérlettároló-konfigurációval kell rendelkeznie.
  • Minden példánynak ugyanazzal az értékkel kell rendelkeznie a következőhöz processorName: .
  • Minden példánynak különböző példánynévvel kell rendelkeznie (WithInstanceName).

Ha ez a három feltétel érvényes, akkor a változáscsatorna-feldolgozó elosztja a bérlettárolóban lévő összes bérletet az üzembehelyezési egység összes futó példánya között, és egy egyenlőségelosztó algoritmus használatával párhuzamosítja a számítást. A bérletek mindig egy példány tulajdonában vannak, így a példányok száma nem lehet nagyobb, mint a bérletek száma.

A példányok száma nőhet és csökkenhet. A változáscsatorna-feldolgozó dinamikusan módosítja a terhelést a megfelelő újraelosztással.

Emellett a változáscsatorna-feldolgozó dinamikusan módosíthatja a tárolók skáláját, ha a tároló átviteli sebessége vagy tárhelye nő. A tároló növekedésével a változáscsatorna feldolgozója transzparensen kezeli a forgatókönyvet a bérletek dinamikus növelésével és az új bérletek meglévő példányok közötti elosztásával.

Kezdési időpont

Alapértelmezés szerint amikor egy változáscsatorna-feldolgozó első alkalommal indul el, inicializálja a bérlettárolót, és elindítja annak feldolgozási életciklusát. A rendszer nem észlel minden olyan módosítást, amely a figyelt tárolóban történt a változáscsatorna-feldolgozó első inicializálása előtt.

Olvasás egy korábbi dátumból és időpontból

A változáscsatorna-feldolgozó inicializálható egy adott dátumtól és időponttól kezdődő módosítások olvasásához, ha átad egy példányt DateTime a WithStartTime szerkesztőbővítménynek:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

A változáscsatorna-feldolgozó inicializálva van az adott dátumhoz és időponthoz, és elkezdi olvasni a későbbi módosításokat.

Olvasás az elejétől

Más esetekben, például adatmigráláskor vagy egy tároló teljes előzményeinek elemzésekor a tároló élettartamának kezdetétől be kell olvasnia a változáscsatornát. Használhatja WithStartTime a szerkesztőbővítményt, de passz DateTime.MinValue.ToUniversalTime(), amely a minimális DateTime érték UTC-ábrázolását hozza létre, mint ebben a példában:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

A változáscsatorna-feldolgozó inicializálva van, és a tároló élettartamának kezdetétől kezdi olvasni a módosításokat.

Feljegyzés

Ezek a testreszabási lehetőségek csak a változáscsatorna-feldolgozó kezdőpontjának beállítására szolgálnak. A bérlettároló első inicializálása után ezeknek a beállításoknak a módosítása nincs hatással.

A hírcsatorna és a kiosztott átviteli sebesség módosítása

A figyelt tároló adatcsatorna-olvasási műveleteinek módosítása kérelemegységeket használ fel. Győződjön meg arról, hogy a figyelt tároló nem tapasztal szabályozást. A szabályozás késlelteti a változáscsatorna-események fogadását a processzorokon.

A bérlettároló műveletei (az állapot frissítése és karbantartása) kérelemegységeket használnak fel. Minél több példány használja ugyanazt a bérlettárolót, annál nagyobb a kérelemegységek potenciális felhasználása. Győződjön meg arról, hogy a bérlettároló nem tapasztal szabályozást. A szabályozás késlelteti a változáscsatorna-események fogadását. A szabályozás akár teljesen befejezheti a feldolgozást.

A bérlettároló megosztása

A bérlettárolókat több üzembehelyezési egységben is megoszthatja. A megosztott bérlettárolókban minden üzembehelyezési egység figyel egy másik figyelt tárolót, vagy más értéket ad meg.processorName Ebben a konfigurációban minden üzembe helyezési egység egy független állapotot tart fenn a bérlettárolón. Tekintse át a kérelemegység-felhasználást egy bérlettárolón , és győződjön meg arról, hogy a kiosztott átviteli sebesség elegendő az összes üzembe helyezési egységhez.

Speciális bérletkonfiguráció

Három kulcskonfiguráció befolyásolhatja a változáscsatorna processzor működését. Minden konfiguráció hatással van a bérlettároló kérelemegység-felhasználására. A változáscsatorna-feldolgozó létrehozásakor az alábbi konfigurációk egyikét állíthatja be, de gondosan használja őket:

  • Bérlet beszerzése: Alapértelmezés szerint 17 másodpercenként. A gazdagép rendszeresen ellenőrzi a bérlettároló állapotát, és megfontolja a bérletek beszerzését a dinamikus skálázási folyamat részeként. Ez a folyamat úgy történik, hogy végrehajt egy lekérdezést a bérlettárolón. Ennek az értéknek a csökkentése felgyorsítja a bérletek újraelosztását és beszerzését, de növeli a kérelemegység-felhasználást a bérlettárolón.
  • Bérlet lejárata: Alapértelmezés szerint 60 másodperc. Meghatározza, hogy a bérletek mennyi ideig létezhetnek megújítási tevékenység nélkül, mielőtt egy másik gazdagép beolvasta volna. Amikor egy gazdagép összeomlik, a tulajdonában lévő bérleteket más gazdagépek veszik át ezt az időtartamot követően, valamint a konfigurált megújítási időközt. Ennek az értéknek a csökkentése gyorsabb helyreállítást tesz lehetővé a gazdagép összeomlása után, de a lejárati értéknek soha nem szabad alacsonyabbnak lennie a megújítási időköznél.
  • Bérlet megújítása: Alapértelmezés szerint 13 másodpercenként. A bérletet birtoklő gazdagép rendszeresen megújítja a bérletet, még akkor is, ha nincsenek új használatban lévő módosítások. Ez a folyamat egy csere bérleten való végrehajtásával történik. Ennek az értéknek a csökkentése csökkenti a gazdagép összeomlása által elveszett bérletek észleléséhez szükséges időt, de növeli a kérelemegység-felhasználást a bérlettárolón.

A változáscsatorna feldolgozójának helye

A változáscsatorna-feldolgozó bármely olyan platformon üzemeltethető, amely támogatja a hosszú ideig futó folyamatokat vagy feladatokat. Íme néhány példa:

Bár a változáscsatorna-feldolgozó rövid élettartamú környezetekben is futtatható, mert a bérlettároló fenntartja az állapotot, a környezetek indítási ciklusa késlelteti az értesítések fogadásához szükséges időt (mivel a környezet minden indításakor túl nagy a terhelés a processzor indításához).

További erőforrások

Következő lépések

További információ a változáscsatorna-feldolgozóról az alábbi cikkekben: