Dela via


Ändringsflödesprocessorn i Azure Cosmos DB

GÄLLER FÖR: NoSQL

Ändringsflödesprocessorn är en del av Azure Cosmos DB .NET V3 och Java V4 SDK:er. Det förenklar processen för att läsa ändringsflödet och distribuerar händelsebearbetningen över flera konsumenter effektivt.

Den största fördelen med att använda ändringsflödesprocessorn är dess feltoleranta design, vilket garanterar en "minst en gång"-leverans av alla händelser i ändringsflödet.

SDK:er som stöds

.Net V3 Java Node.JS Python

Komponenter i ändringsflödesprocessorn

Ändringsflödesprocessorn har fyra huvudkomponenter:

  • Den övervakade containern: Den övervakade containern har de data som ändringsflödet genereras från. Infogningar och uppdateringar till den övervakade containern visas i containerns ändringsflöde.

  • Lånecontainern: Lånecontainern fungerar som tillståndslagring och samordnar bearbetningen av ändringsflödet mellan flera arbetare. Lånecontainern kan lagras i samma konto som den övervakade containern eller i ett separat konto.

  • Beräkningsinstansen: En beräkningsinstans är värd för ändringsflödesprocessorn för att lyssna efter ändringar. Beroende på plattform kan den representeras av en virtuell dator (VM), en Kubernetes-podd, en Azure App Service-instans eller en verklig fysisk dator. Beräkningsinstansen har en unik identifierare som kallas instansnamnet i den här artikeln.

  • Ombudet: Ombudet är den kod som definierar vad du, utvecklaren, vill göra med varje batch med ändringar som ändringsflödesprocessorn läser.

För att ytterligare förstå hur dessa fyra element i ändringsflödesprocessorn fungerar tillsammans ska vi titta på ett exempel i följande diagram. Den övervakade containern lagrar objekt och använder "City" som partitionsnyckel. Partitionsnyckelvärdena distribueras i intervall (varje intervall representerar en fysisk partition) som innehåller objekt.

Diagrammet visar två beräkningsinstanser och ändringsflödesprocessorn tilldelar olika intervall till varje instans för att maximera beräkningsfördelningen. Varje instans har ett annat unikt namn.

Varje intervall läss parallellt. Ett intervalls förlopp underhålls separat från andra intervall i lånecontainern via ett lånedokument . Kombinationen av lånen representerar det aktuella tillståndet för ändringsflödesprocessorn.

Exempel på ändringsflödesprocessor

Implementera ändringsflödesprocessorn

Ändringsflödesprocessorn i .NET är tillgänglig för senaste versionsläge och alla versioner och borttagningsläge. Alla versioner och borttagningsläge är i förhandsversion och stöds för ändringsflödesprocessorn som börjar i version 3.40.0-preview.0. Startpunkten för båda lägena är alltid den övervakade containern.

Om du vill läsa med det senaste versionsläget anropar GetChangeFeedProcessorBuilderdu i en Container instans :

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Om du vill läsa med alla versioner och borttagningsläge anropar GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes du från instansen Container :

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

För båda lägena är den första parametern ett distinkt namn som beskriver målet med den här processorn. Det andra namnet är den delegatimplementering som hanterar ändringar.

Här är ett exempel på ett ombud för senaste versionsläge:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Här är ett exempel på ett ombud för alla versioner och borttagningsläge:

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Previous.id}.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

Efteråt definierar du namnet på beräkningsinstansen eller den unika identifieraren med hjälp WithInstanceNameav . Namnet på beräkningsinstansen ska vara unikt och olika för varje beräkningsinstans som du distribuerar. Du anger att containern ska behålla lånetillståndet med hjälp WithLeaseContainerav .

Samtal Build ger dig den processorinstans som du kan starta genom att anropa StartAsync.

Kommentar

Föregående kodfragment tas från exempel i GitHub. Du kan hämta exemplet för det senaste versionsläget eller alla versioner och borttagningsläget.

Livscykel för bearbetning

Den normala livscykeln för en värdinstans är:

  1. Läs ändringsflödet.
  2. Om det inte finns några ändringar kan du viloläge under en fördefinierad tid (anpassningsbar med hjälp WithPollInterval av i Builder) och gå till #1.
  3. Om det finns ändringar skickar du dem till ombudet.
  4. När ombudet har slutfört bearbetningen av ändringarna uppdaterar du lånearkivet med den senaste bearbetade tidpunkten och går till #1.

Felhantering

Ändringsflödesprocessorn är motståndskraftig mot fel i användarkod. Om delegatimplementeringen har ett ohanterat undantag (steg 4) stoppas tråden som bearbetar den specifika batchen med ändringar och en ny tråd skapas så småningom. Den nya tråden kontrollerar den senaste tidpunkten som lånearkivet har sparat för det intervallet med partitionsnyckelvärden. Den nya tråden startas om därifrån och skickar i praktiken samma batch med ändringar till ombudet. Det här beteendet fortsätter tills ombudet bearbetar ändringarna korrekt, och det är anledningen till att ändringsflödesprocessorn har en "minst en gång"-garanti.

Kommentar

I endast ett scenario görs inte en batch med ändringar på nytt. Om felet inträffar vid den första ombudskörningen någonsin har lånearkivet inget tidigare sparat tillstånd som ska användas vid återförsöket. I dessa fall använder återförsöket den inledande startkonfigurationen, som kanske eller kanske inte innehåller den sista batchen.

Om du vill förhindra att ändringsflödesprocessorn "fastnar" och kontinuerligt försöker utföra samma batch med ändringar igen bör du lägga till logik i ombudskoden för att skriva dokument, med undantag, till en kö med felmeddelanden. Den här designen säkerställer att du kan hålla reda på obearbetade ändringar samtidigt som du kan fortsätta att bearbeta framtida ändringar. Den felande meddelandekön kan vara en annan Azure Cosmos DB-container. Det exakta datalagret spelar ingen roll. Du vill bara att de obearbetade ändringarna ska sparas.

Du kan också använda ändringsflödesestimatorn för att övervaka förloppet för dina instanser av ändringsflödesprocessorn när de läser ändringsflödet, eller så kan du använda livscykelmeddelanden för att identifiera underliggande fel.

Livscykelmeddelanden

Du kan ansluta ändringsflödesprocessorn till alla relevanta händelser i livscykeln. Du kan välja att bli meddelad till en eller alla av dem. Rekommendationen är att minst registrera felmeddelandet:

  • Registrera en hanterare för att meddelas när den aktuella värden skaffar ett lån för WithLeaseAcquireNotification att börja bearbeta det.
  • Registrera en hanterare för WithLeaseReleaseNotification att meddelas när den aktuella värden släpper ett lån och slutar bearbeta det.
  • Registrera en hanterare för WithErrorNotification att meddelas när den aktuella värden stöter på ett undantag under bearbetningen. Du måste kunna skilja på om källan är användardelegaten (ett ohanterat undantag) eller ett fel som processorn stöter på när den försöker komma åt den övervakade containern (till exempel nätverksproblem).

Livscykelmeddelanden är tillgängliga i båda ändringsflödeslägena. Här är ett exempel på livscykelmeddelanden i senaste versionsläge:

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Distributionsenhet

En distributionsenhet för en ändringsflödesprocessor består av en eller flera beräkningsinstanser som har samma värde för processorName och samma lånecontainerkonfiguration, men olika instansnamn. Du kan ha många distributionsenheter där varje enhet har ett annat affärsflöde för ändringarna och varje distributionsenhet består av en eller flera instanser.

Du kan till exempel ha en distributionsenhet som utlöser ett externt API varje gång det sker en ändring i containern. En annan distributionsenhet kan flytta data i realtid varje gång en ändring sker. När en ändring sker i din övervakade container meddelas alla dina distributionsenheter.

Dynamisk skalning

Som tidigare nämnts kan du i en distributionsenhet ha en eller flera beräkningsinstanser. För att dra nytta av beräkningsdistributionen i distributionsenheten är de enda viktiga kraven att:

  • Alla instanser ska ha samma lånecontainerkonfiguration.
  • Alla instanser bör ha samma värde för processorName.
  • Varje instans måste ha ett unikt instansnamn (WithInstanceName).

Om dessa tre villkor gäller distribuerar ändringsflödesprocessorn alla lån som finns i lånecontainern över alla pågående instanser av distributionsenheten och parallelliserar beräkning med hjälp av en algoritm för lika fördelning. Ett lån ägs av en instans när som helst, så antalet instanser bör inte vara större än antalet lån.

Antalet instanser kan öka och minska. Ändringsflödesprocessorn justerar belastningen dynamiskt genom att omdistribuera den.

Dessutom kan ändringsflödesprocessorn dynamiskt justera en containers skala om containerns dataflöde eller lagring ökar. När containern växer hanterar ändringsflödesprocessorn scenariot transparent genom att dynamiskt öka lånen och distribuera de nya lånen mellan befintliga instanser.

Starttid

När en ändringsflödesprocessor startar för första gången initierar den som standard lånecontainern och startar bearbetningslivscykeln. Ändringar som har gjorts i den övervakade containern innan ändringsflödesprocessorn initieras för första gången identifieras inte.

Läsa från ett tidigare datum och en tidigare tid

Det är möjligt att initiera ändringsflödesprocessorn för att läsa ändringar som börjar vid ett visst datum och en viss tid genom att skicka en instans av DateTime till WithStartTime builder-tillägget:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

Ändringsflödesprocessorn initieras för det specifika datumet och tiden, och den börjar läsa ändringarna som skedde efteråt.

Läsa från början

I andra scenarier, till exempel i datamigreringar eller om du analyserar hela historiken för en container, måste du läsa ändringsflödet från början av containerns livslängd. Du kan använda WithStartTime på builder-tillägget, men skicka DateTime.MinValue.ToUniversalTime(), vilket genererar UTC-representationen av minimivärdet DateTime som i det här exemplet:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Ändringsflödesprocessorn initieras och börjar läsa ändringar från början av containerns livslängd.

Kommentar

De här anpassningsalternativen fungerar bara för att konfigurera startpunkten i tiden för ändringsflödesprocessorn. När lånecontainern har initierats för första gången har det ingen effekt att ändra de här alternativen.

Det går bara att anpassa startpunkten för senaste ändringsflödesläget. När du använder alla versioner och tar bort läge måste du börja läsa från den tidpunkt då processorn startas, eller återuppta från ett tidigare lånetillstånd som ligger inom kvarhållningsperioden för kontinuerlig säkerhetskopiering för ditt konto.

Ändra feed och etablerat dataflöde

Läsåtgärder för ändringsflöde på den övervakade containern förbrukar enheter för begäranden. Kontrollera att den övervakade containern inte har någon begränsning. Begränsning lägger till fördröjningar i mottagandet av ändringsflödeshändelser på dina processorer.

Åtgärder i lånecontainern (uppdatering och underhåll av tillstånd) förbrukar enheter för begäranden. Ju högre antal instanser som använder samma lånecontainer, desto högre är den potentiella förbrukningen av enheter för begäranden. Kontrollera att lånecontainern inte har någon begränsning. Begränsning lägger till fördröjningar i mottagandet av ändringsflödeshändelser. Begränsning kan till och med helt avsluta bearbetningen.

Dela lånecontainern

Du kan dela en lånecontainer i flera distributionsenheter. I en container med delat lån lyssnar varje distributionsenhet på en annan övervakad container eller har ett annat värde för processorName. I den här konfigurationen upprätthåller varje distributionsenhet ett oberoende tillstånd för lånecontainern. Granska enhetsförbrukningen för begäran för en lånecontainer för att se till att det etablerade dataflödet räcker för alla distributionsenheter.

Avancerad lånekonfiguration

Tre nyckelkonfigurationer kan påverka hur ändringsflödesprocessorn fungerar. Varje konfiguration påverkar förbrukningen för begärandeenheten i lånecontainern. Du kan ange någon av dessa konfigurationer när du skapar ändringsflödesprocessorn, men använd dem noggrant:

  • Låneförvärv: Som standard var 17:e sekund. En värd kontrollerar regelbundet tillståndet för lånearkivet och överväger att skaffa lån som en del av den dynamiska skalningsprocessen . Den här processen utförs genom att köra en fråga på lånecontainern. Om du minskar det här värdet går det snabbare att ombalansera och förvärva lån, men det ökar förbrukningen för begärandeenheter i lånecontainern.
  • Förfallotid för lån: Som standard 60 sekunder. Definierar den maximala tid som ett lån kan finnas utan någon förnyelseaktivitet innan det förvärvas av en annan värd. När en värd kraschar hämtas lånen som den ägde av andra värdar efter den här tidsperioden plus det konfigurerade förnyelseintervallet. Om du minskar det här värdet går det snabbare att återställa efter en värdkrasch, men förfallovärdet bör aldrig vara lägre än förnyelseintervallet.
  • Förnyelse av lån: Som standard var 13:e sekund. En värd som äger ett lån förnyar lånet regelbundet, även om det inte finns några nya ändringar att använda. Den här processen görs genom att köra en Replace på lånet. Om du minskar det här värdet minskar den tid som krävs för att identifiera lån som förlorats av en värd som kraschar, men det ökar förbrukningen för begärandeenhet på lånecontainern.

Var du ska vara värd för ändringsflödesprocessorn

Ändringsflödesprocessorn kan finnas på valfri plattform som stöder tidskrävande processer eller uppgifter. Nedan följer några exempel:

Även om ändringsflödesprocessorn kan köras i kortvariga miljöer eftersom lånecontainern upprätthåller tillståndet, lägger startcykeln för dessa miljöer till fördröjningar i den tid det tar att ta emot meddelanden (på grund av kostnaden för att starta processorn varje gång miljön startas).

Rollbaserade åtkomstkrav

När du använder Microsoft Entra-ID som autentiseringsmekanism kontrollerar du att identiteten har rätt behörigheter:

  • I den övervakade containern:
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • I lånecontainern:
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

Ytterligare resurser

Nästa steg

Läs mer om ändringsflödesprocessorn i följande artiklar: