Delen via


Processor voor wijzigingenfeed in Azure Cosmos DB

VAN TOEPASSING OP: NoSQL

De wijzigingenfeedprocessor maakt deel uit van de SDK's van Azure Cosmos DB .NET V3 en Java V4 . Het vereenvoudigt het proces van het lezen van de wijzigingenfeed en distribueert de gebeurtenisverwerking effectief over meerdere consumenten.

Het belangrijkste voordeel van het gebruik van de wijzigingenfeedprocessor is het fouttolerante ontwerp, dat zorgt voor een 'ten minste één keer'-levering van alle gebeurtenissen in de wijzigingenfeed.

Ondersteunde SDK's

.Net V3 Java Node.JS Python

Onderdelen van de wijzigingenfeedprocessor

De wijzigingenfeedprocessor heeft vier hoofdonderdelen:

  • De bewaakte container: de bewaakte container bevat de gegevens waaruit de wijzigingenfeed wordt gegenereerd. Eventuele toevoegingen en updates van de bewaakte container worden weergegeven in de wijzigingenfeed van de container.

  • De leasecontainer: De leasecontainer fungeert als statusopslag en coördineert de verwerking van de wijzigingenfeed voor meerdere werkrollen. De leasecontainer kan worden opgeslagen in hetzelfde account als de bewaakte container of in een afzonderlijk account.

  • Het rekenproces: een rekenproces fungeert als host voor de processor van de wijzigingenfeed om te luisteren naar wijzigingen. Afhankelijk van het platform kan het worden vertegenwoordigd door een virtuele machine (VM), een Kubernetes-pod, een Azure-app Service-exemplaar of een werkelijke fysieke machine. Het rekenproces heeft een unieke id die de naam van het exemplaar in dit artikel wordt genoemd.

  • De gemachtigde: De gemachtigde is de code die definieert wat u, de ontwikkelaar, wilt doen met elke batch wijzigingen die de wijzigingenfeedprocessor leest.

Laten we eens kijken naar een voorbeeld in het volgende diagram voor meer informatie over hoe deze vier elementen van de wijzigingenfeedprocessor samenwerken. De bewaakte container slaat items op en gebruikt 'Plaats' als partitiesleutel. De partitiesleutelwaarden worden gedistribueerd in bereiken (elk bereik vertegenwoordigt een fysieke partitie) die items bevatten.

Het diagram toont twee rekeninstanties en de wijzigingenfeedprocessor wijst verschillende bereiken toe aan elk exemplaar om de rekendistributie te maximaliseren. Elk exemplaar heeft een andere, unieke naam.

Elk bereik wordt parallel gelezen. De voortgang van een bereik wordt afzonderlijk bijgehouden van andere bereiken in de leasecontainer via een leasedocument . De combinatie van de leases vertegenwoordigt de huidige status van de wijzigingenfeedverwerker.

Voorbeeld van wijzigingenfeedprocessor

De wijzigingenfeedprocessor implementeren

De processor van de wijzigingenfeed in .NET is beschikbaar voor de nieuwste versiemodus en alle versies en verwijdermodus. Alle versies en verwijderingsmodus is in preview en wordt ondersteund voor de wijzigingenfeedprocessor vanaf versie 3.40.0-preview.0. Het toegangspunt voor beide modi is altijd de bewaakte container.

Als u wilt lezen met de nieuwste versiemodus, roept GetChangeFeedProcessorBuilderu in een Container exemplaar het volgende aan:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Als u wilt lezen met alle versies en de modus Voor verwijderen, roept u het volgende GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes aan vanuit het Container exemplaar:

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Voor beide modi is de eerste parameter een afzonderlijke naam die het doel van deze processor beschrijft. De tweede naam is de gedelegeerde-implementatie die wijzigingen verwerkt.

Hier volgt een voorbeeld van een gemachtigde voor de nieuwste versiemodus:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Hier volgt een voorbeeld van een gemachtigde voor alle versies en de modus Voor verwijderen:

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Previous.id}.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

Daarna definieert u de naam van het rekenproces of de unieke id met behulp van WithInstanceName. De naam van het rekenproces moet uniek en verschillend zijn voor elk rekenproces dat u implementeert. U stelt de container in om de leasestatus te behouden met behulp van WithLeaseContainer.

Aanroepen Build geeft u het processorexemplaren dat u kunt starten door aan te roepen StartAsync.

Notitie

De voorgaande codefragmenten worden genomen uit voorbeelden in GitHub. U kunt het voorbeeld ophalen voor de meest recente versiemodus of alle versies en de modus Voor verwijderen.

Levenscyclus verwerken

De normale levenscyclus van een host-exemplaar is:

  1. Lees de wijzigingenfeed.
  2. Als er geen wijzigingen zijn, slaapstand voor een vooraf gedefinieerde hoeveelheid tijd (aanpasbaar met behulp van WithPollInterval de opbouwfunctie) en gaat u naar #1.
  3. Als er wijzigingen zijn, stuurt u deze naar de gemachtigde.
  4. Wanneer de gemachtigde klaar is met het verwerken van de wijzigingen, werkt u het leasearchief bij met het laatst verwerkte tijdstip en gaat u naar #1.

Foutafhandeling

De wijzigingenfeedprocessor is bestand tegen fouten in gebruikerscode. Als uw gedelegeerde-implementatie een niet-verwerkte uitzondering heeft (stap 4), wordt de thread die die bepaalde batch wijzigingen verwerkt, gestopt en wordt er uiteindelijk een nieuwe thread gemaakt. De nieuwe thread controleert het laatste tijdstip dat het leasearchief heeft opgeslagen voor dat bereik van partitiesleutelwaarden. De nieuwe thread wordt van daaruit opnieuw opgestart, zodat dezelfde batch wijzigingen in de gemachtigde worden verzonden. Dit gedrag wordt voortgezet totdat de gedelegeerde de wijzigingen correct verwerkt. Dit is de reden waarom de wijzigingenfeedverwerker ten minste één keer een garantie heeft.

Notitie

In slechts één scenario wordt een batch wijzigingen niet opnieuw geprobeerd. Als de fout optreedt bij de eerste gedelegeerde uitvoering, heeft het leasearchief geen eerder opgeslagen status die moet worden gebruikt voor de nieuwe poging. In dergelijke gevallen wordt voor de nieuwe poging de eerste beginconfiguratie gebruikt, die de laatste batch al dan niet bevat.

Als u wilt voorkomen dat uw wijzigingenfeedprocessor continu 'vastgelopen' blijft proberen om dezelfde batch wijzigingen opnieuw uit te voeren, moet u logica toevoegen aan uw gedelegeerde-code om documenten te schrijven, op uitzondering, naar een wachtrij met foutberichten. Dit ontwerp zorgt ervoor dat u niet-verwerkte wijzigingen kunt bijhouden terwijl u toekomstige wijzigingen nog steeds kunt blijven verwerken. De wachtrij met foutberichten kan een andere Azure Cosmos DB-container zijn. Het exacte gegevensarchief maakt niet uit. U wilt gewoon dat de niet-verwerkte wijzigingen behouden blijven.

U kunt de wijzigingsfeedschatter ook gebruiken om de voortgang van de exemplaren van de wijzigingenfeedprocessor te controleren terwijl ze de wijzigingenfeed lezen, of u kunt levenscyclusmeldingen gebruiken om onderliggende fouten te detecteren.

Meldingen over levenscyclus

U kunt de wijzigingenfeedprocessor verbinden met elke relevante gebeurtenis in de levenscyclus. U kunt ervoor kiezen om een of meer meldingen te ontvangen. De aanbeveling is om ten minste de foutmelding te registreren:

  • Registreer een handler om WithLeaseAcquireNotification op de hoogte te worden gesteld wanneer de huidige host een lease verkrijgt om deze te verwerken.
  • Registreer een handler om WithLeaseReleaseNotification op de hoogte te worden gesteld wanneer de huidige host een lease vrijgeeft en stopt met verwerken.
  • Registreer een handler voor WithErrorNotification een melding wanneer de huidige host tijdens de verwerking een uitzondering tegenkomt. U moet kunnen onderscheiden of de bron delegeren van de gebruiker is (een niet-verwerkte uitzondering) of een fout die de processor tegenkomt wanneer deze toegang probeert te krijgen tot de bewaakte container (bijvoorbeeld netwerkproblemen).

Meldingen over levenscyclus zijn beschikbaar in beide modi voor wijzigingenfeeds. Hier volgt een voorbeeld van levenscyclusmeldingen in de nieuwste versiemodus:

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Implementatie-eenheid

Eén implementatie-eenheid voor de wijzigingenfeedprocessor bestaat uit een of meer rekeninstanties met dezelfde waarde voor processorName en dezelfde leasecontainerconfiguratie, maar verschillende exemplaarnamen. U kunt veel implementatie-eenheden hebben waarin elke eenheid een andere bedrijfsstroom heeft voor de wijzigingen en elke implementatie-eenheid bestaat uit een of meer exemplaren.

U hebt bijvoorbeeld één implementatie-eenheid die een externe API activeert telkens wanneer er een wijziging in uw container is. Een andere implementatie-eenheid kan gegevens in realtime verplaatsen telkens wanneer er een wijziging is. Wanneer er een wijziging in uw bewaakte container plaatsvindt, worden al uw implementatie-eenheden op de hoogte gebracht.

Dynamische schaalbaarheid

Zoals eerder vermeld, kunt u binnen een implementatie-eenheid een of meer rekeninstanties hebben. Als u wilt profiteren van de rekendistributie binnen de implementatie-eenheid, zijn de enige belangrijke vereisten:

  • Alle instanties moeten dezelfde configuratie voor de leasecontainer hebben.
  • Alle exemplaren moeten dezelfde waarde hebben voor processorName.
  • Elk exemplaar moet een andere naam hebben (WithInstanceName).

Als deze drie voorwaarden van toepassing zijn, distribueert de verwerker van de wijzigingenfeed alle leases die zich in de leasecontainer bevinden voor alle actieve exemplaren van die implementatie-eenheid en wordt de berekening parallelliseren met behulp van een algoritme voor gelijke distributie. Een lease is op elk gewenst moment eigendom van één exemplaar, dus het aantal exemplaren mag niet groter zijn dan het aantal leases.

Het aantal exemplaren kan toenemen en verkleinen. De wijzigingenfeedprocessor past de belasting dynamisch aan door deze dienovereenkomstig te distribueren.

Bovendien kan de wijzigingenfeedprocessor dynamisch de schaal van een container aanpassen als de doorvoer of opslag van de container toeneemt. Wanneer uw container groeit, verwerkt de wijzigingenfeedprocessor het scenario transparant door de leases dynamisch te verhogen en de nieuwe leases tussen bestaande exemplaren te distribueren.

Begintijd

Wanneer een verwerker van een wijzigingenfeed voor het eerst wordt gestart, wordt de leasecontainer geïnitialiseerd en wordt de levenscyclus van de verwerking gestart. Wijzigingen die zijn aangebracht in de bewaakte container voordat de verwerker van de wijzigingenfeed voor het eerst wordt geïnitialiseerd, worden niet gedetecteerd.

Lezen vanaf een vorige datum en tijd

Het is mogelijk om de wijzigingenfeedprocessor te initialiseren om wijzigingen te lezen vanaf een specifieke datum en tijd door een exemplaar van DateTime de WithStartTime opbouwuitbreiding door te geven:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

De wijzigingenfeedverwerker wordt geïnitialiseerd voor die specifieke datum en tijd en begint de wijzigingen te lezen die daarna zijn aangebracht.

Lezen vanaf het begin

In andere scenario's, zoals in gegevensmigraties of als u de hele geschiedenis van een container analyseert, moet u de wijzigingenfeed lezen vanaf het begin van de levensduur van die container. U kunt de opbouwfunctieextensie gebruikenWithStartTime, maar doorgeven, waarmee de UTC-weergave van de minimale DateTime waarde wordt gegenereerdDateTime.MinValue.ToUniversalTime(), zoals in dit voorbeeld:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

De processor van de wijzigingenfeed wordt geïnitialiseerd en begint met het lezen van wijzigingen vanaf het begin van de levensduur van de container.

Notitie

Deze aanpassingsopties werken alleen om het beginpunt van de wijzigingenfeedprocessor in te stellen. Nadat de leasecontainer voor het eerst is geïnitialiseerd, heeft het wijzigen van deze opties geen effect.

Het aanpassen van het beginpunt is alleen beschikbaar voor de nieuwste versie van de wijzigingenfeedmodus. Wanneer u alle versies gebruikt en de modus Verwijdert, moet u beginnen met lezen vanaf het moment dat de processor wordt gestart of hervatten vanuit een eerdere leasestatus die zich binnen de retentieperiode voor continue back-up van uw account bevindt.

Wijzigingenfeed en ingerichte doorvoer

Leesbewerkingen voor wijzigingenfeeds voor de bewaakte container verbruiken aanvraageenheden. Zorg ervoor dat de bewaakte container geen beperking ondervindt. Met beperking worden vertragingen toegevoegd bij het ontvangen van wijzigingenfeed-gebeurtenissen op uw processors.

Bewerkingen in de leasecontainer (bijwerken en onderhouden van status) verbruiken aanvraageenheden. Hoe hoger het aantal exemplaren dat dezelfde leasecontainer gebruikt, hoe hoger het potentiële verbruik van aanvraageenheden. Zorg ervoor dat uw leasecontainer geen beperking ondervindt. Met beperking worden vertragingen toegevoegd bij het ontvangen van wijzigingenfeed-gebeurtenissen. Beperking kan zelfs de verwerking volledig beëindigen.

De leasecontainer delen

U kunt een leasecontainer delen over meerdere implementatie-eenheden. In een gedeelde leasecontainer luistert elke implementatie-eenheid naar een andere bewaakte container of heeft een andere waarde voor processorName. In deze configuratie onderhoudt elke implementatie-eenheid een onafhankelijke status op de leasecontainer. Controleer het verbruik van de aanvraageenheid voor een leasecontainer om ervoor te zorgen dat de ingerichte doorvoer voldoende is voor alle implementatie-eenheden.

Geavanceerde leaseconfiguratie

Drie sleutelconfiguraties kunnen van invloed zijn op de werking van de wijzigingenfeedprocessor. Elke configuratie is van invloed op het verbruik van de aanvraageenheid in de leasecontainer. U kunt een van deze configuraties instellen wanneer u de wijzigingenfeedprocessor maakt, maar deze zorgvuldig gebruiken:

  • Lease verkrijgen: standaard om de 17 seconden. Een host controleert regelmatig de status van het leasearchief en overweeg leases te verkrijgen als onderdeel van het dynamische schaalproces . Dit proces wordt uitgevoerd door een query uit te voeren op de leasecontainer. Door deze waarde te verminderen, wordt herverdeling en het verkrijgen van leases sneller, maar wordt het verbruik van aanvraageenheden voor de leasecontainer verhoogd.
  • Verlooptijd van lease: standaard 60 seconden. Definieert de maximale hoeveelheid tijd die een lease kan bestaan zonder verlengingsactiviteit voordat deze wordt verkregen door een andere host. Wanneer een host vastloopt, worden de leases waarvan deze eigenaar is, opgehaald door andere hosts na deze periode plus het geconfigureerde verlengingsinterval. Als u deze waarde vermindert, wordt het herstellen na een crash van de host sneller, maar de verloopwaarde mag nooit lager zijn dan het vernieuwingsinterval.
  • Verlenging van lease: standaard om de 13 seconden. Een host die eigenaar is van een lease, verlengt de lease periodiek, zelfs als er geen nieuwe wijzigingen zijn die moeten worden gebruikt. Dit proces wordt uitgevoerd door een replace uit te voeren voor de lease. Als u deze waarde verlaagt, wordt de tijd verlaagd die nodig is om leases te detecteren die verloren gaan door een host die vastloopt, maar het verhoogt het verbruik van aanvraageenheden voor de leasecontainer.

Waar kan ik de wijzigingenfeedprocessor hosten?

De wijzigingenfeedprocessor kan worden gehost in elk platform dat langlopende processen of taken ondersteunt. Hieronder volgen een aantal voorbeelden:

Hoewel de wijzigingenfeedprocessor kan worden uitgevoerd in kortdurende omgevingen omdat de leasecontainer de status behoudt, voegt de opstartcyclus van deze omgevingen vertragingen toe aan de tijd die nodig is om meldingen te ontvangen (vanwege de overhead van het starten van de processor telkens wanneer de omgeving wordt gestart).

Toegangsvereisten op basis van rollen

Wanneer u Microsoft Entra ID als verificatiemechanisme gebruikt, moet u ervoor zorgen dat de identiteit over de juiste machtigingen beschikt:

  • Op de bewaakte container:
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • In de leasecontainer:
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

Aanvullende bronnen

Volgende stappen

Meer informatie over de verwerker van wijzigingenfeeds vindt u in de volgende artikelen: