Anteckning
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
GÄLLER FÖR: NoSQL
Ändringsflödesprocessorn är en del av Azure Cosmos DB .NET V3 och Java V4 SDK:er. Det förenklar processen för att läsa ändringsflödet och distribuerar händelsebearbetningen över flera konsumenter effektivt.
Den största fördelen med att använda ändringsflödesprocessorn är dess feltoleranta design, vilket garanterar en "minst en gång"-leverans av alla händelser i ändringsflödet.
SDK:er som stöds
.Net V3 | Java | Node.JS | Python |
---|---|---|---|
✓ | ✓ | ✕ | ✕ |
Komponenter i ändringsflödesprocessorn
Ändringsflödesprocessorn har fyra huvudkomponenter:
Den övervakade containern: Den övervakade containern har de data som ändringsflödet genereras från. Infogningar och uppdateringar till den övervakade containern visas i containerns ändringsflöde.
Leasingbehållaren: Leasingbehållaren fungerar som tillståndslagring och koordinerar bearbetningen av ändringsflödet över flera arbetsinstanser. Lånecontainern kan lagras i samma konto som den övervakade containern eller i ett separat konto.
Beräkningsinstansen: En beräkningsinstans är värd för ändringsflödesprocessorn för att lyssna efter ändringar. Beroende på plattform kan den representeras av en virtuell dator (VM), en Kubernetes-podd, en Azure App Service-instans eller en verklig fysisk dator. Beräkningsinstansen har en unik identifierare som kallas instansnamnet i den här artikeln.
Ombudet: Ombudet är den kod som definierar vad du, utvecklaren, vill göra med varje batch med ändringar som ändringsflödesprocessorn läser.
För att ytterligare förstå hur dessa fyra element i ändringsflödesprocessorn fungerar tillsammans ska vi titta på ett exempel i följande diagram. Den övervakade containern lagrar objekt och använder "City" som partitionsnyckel. Partitionsnyckelvärdena distribueras i intervall (varje intervall representerar en fysisk partition) som innehåller objekt.
Diagrammet visar två beräkningsinstanser och ändringsflödesprocessorn tilldelar olika intervall till varje instans för att maximera beräkningsfördelningen. Varje instans har ett annat unikt namn.
Varje intervall läss parallellt. Ett intervalls förlopp underhålls separat från andra intervall i lånecontainern via ett lånedokument . Kombinationen av lånen representerar det aktuella tillståndet för ändringsflödesprocessorn.
Implementera ändringsflödesprocessorn
Ändringsflödesprocessorn i .NET är tillgänglig för senaste versionsläge och alla versioner och borttagningsläge. Alla versioner och borttagningsläge är i förhandsversion och stöds för ändringsflödesprocessorn som börjar i version 3.40.0-preview.0
. Startpunkten för båda lägena är alltid den övervakade containern.
För att läsa med det senaste versionsläget, i en GetChangeFeedProcessorBuilder
-instans, anropar du Container
:
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
För att läsa med alla versioner och raderingsläge, anropa GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes
från instansen Container
:
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
För båda lägena är den första parametern ett distinkt namn som beskriver målet med den här processorn. Det andra namnet är den delegatimplementering som hanterar ändringar.
Här är ett exempel på ett ombud för senaste versionsläge:
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
Här är ett exempel på en delegering för alla versioner och raderingsläge.
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ChangeFeedItem<ToDoItem> item in changes)
{
if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item.");
}
else
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
}
// Simulate work
await Task.Delay(1);
}
}
Efteråt definierar du namnet på beräkningsinstansen eller den unika identifieraren med hjälp WithInstanceName
av . Namnet på beräkningsinstansen ska vara unikt och olika för varje beräkningsinstans som du distribuerar. Du anger att containern ska behålla lånetillståndet med hjälp av WithLeaseContainer
.
Att anropa Build
ger dig processorinstansen som du kan starta genom att anropa StartAsync
.
Anteckning
Föregående kodfragment tas från exempel i GitHub. Du kan hämta exemplet för det senaste versionsläget eller alla versioner och borttagningsläget.
Livscykel för bearbetning
Den normala livscykeln för en värdinstans är:
- Läs ändringsflödet.
- Om det inte finns några ändringar, vänta en fördefinierad tid (anpassningsbar med hjälp av
WithPollInterval
i Builder) och återgå till #1. - Om det finns ändringar skickar du dem till ombudet.
- När ombudet har framgångsrikt slutfört ändringarna, uppdaterar du leasingbutiken med den senaste bearbetade tidpunkten i tiden och går till #1.
Felhantering
Ändringsflödesprocessorn är motståndskraftig mot fel i användarkod. Om delegatimplementeringen har ett ohanterat undantag (steg 4) stoppas tråden som bearbetar den specifika batchen med ändringar och en ny tråd skapas så småningom. Den nya tråden kontrollerar den senaste tidpunkten som leasingdatabasen har sparat för det intervallet av partitionsnyckelvärden. Den nya tråden startas om därifrån och skickar i praktiken samma batch med ändringar till ombudet. Det här beteendet fortsätter tills ombudet bearbetar ändringarna korrekt, och det är anledningen till att ändringsflödesprocessorn har en "minst en gång"-garanti.
Kommentar
I endast ett scenario görs inte en batch med ändringar på nytt. Om felet inträffar vid den första ombudskörningen någonsin har lånearkivet inget tidigare sparat tillstånd som ska användas vid återförsöket. I dessa fall använder återförsöket den inledande startkonfigurationen, som kanske eller kanske inte innehåller den sista batchen.
Om du vill förhindra att ändringsflödesprocessorn "fastnar" och kontinuerligt försöker utföra samma batch med ändringar igen bör du lägga till logik i ombudskoden för att skriva dokument, med undantag, till en kö med felmeddelanden. Den här designen säkerställer att du kan hålla reda på obearbetade ändringar samtidigt som du kan fortsätta att bearbeta framtida ändringar. Den felaktiga meddelandekön kan vara en annan Azure Cosmos DB-container. Det exakta datalagret spelar ingen roll. Du vill bara att de obearbetade ändringarna ska sparas.
Du kan också använda ändringsflödesestimatorn för att övervaka förloppet för dina instanser av ändringsflödesprocessorn när de läser ändringsflödet, eller så kan du använda livscykelmeddelanden för att identifiera underliggande fel.
Livscykelmeddelanden
Du kan ansluta ändringsflödesprocessorn till alla relevanta händelser i livscykeln. Du kan välja att få meddelanden om en eller alla. Rekommendationen är att minst registrera felmeddelandet:
- Registrera en hanterare för
WithLeaseAcquireNotification
för att meddelas när den aktuella värden skaffar en leasing för att börja bearbeta den. - Registrera en hanterare för
WithLeaseReleaseNotification
för att få meddelande när den aktuella värden släpper ett leasingavtal och slutar bearbeta det. - Registrera en hanterare för
WithErrorNotification
för att få meddelanden när den aktuella värddatorn stöter på ett undantag under bearbetningen. Du måste kunna skilja på om källan är användardelegaten (ett ohanterat undantag) eller ett fel som processorn stöter på när den försöker komma åt den övervakade containern (till exempel nätverksproblem).
Livscykelmeddelanden är tillgängliga i båda ändringsflödeslägena. Här är ett exempel på livscykelmeddelanden i senaste versionsläge:
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Distributionsenhet
En enda distributionsenhet för en ändringsflödesprocessor består av en eller flera beräkningsinstanser som har samma värde för processorName
och samma leasingcontainerkonfiguration, men olika instansnamn. Du kan ha många distributionsenheter där varje enhet har ett annat affärsflöde för ändringarna och varje distributionsenhet består av en eller flera instanser.
Du kan till exempel ha en distributionsenhet som utlöser ett externt API varje gång det sker en ändring i containern. En annan distributionsenhet kan flytta data i realtid varje gång en ändring sker. När en ändring sker i din övervakade container meddelas alla dina distributionsenheter.
Dynamisk skalning
Som tidigare nämnts kan du i en distributionsenhet ha en eller flera beräkningsinstanser. För att dra nytta av beräkningskapaciteten i distributionsenheten, är de enda nyckelkraven att:
- Alla instanser ska ha samma lånecontainerkonfiguration.
- Alla instanser bör ha samma värde för
processorName
. - Varje instans måste ha ett unikt instansnamn (
WithInstanceName
).
Om dessa tre villkor gäller distribuerar ändringsflödesprocessorn alla lån som finns i lånecontainern över alla pågående instanser av distributionsenheten och parallelliserar beräkning med hjälp av en algoritm för lika fördelning. Ett hyresavtal ägs av en instans vid varje given tid, så antalet instanser bör inte vara större än antalet hyresavtal.
Antalet instanser kan öka och minska. Ändringsflödesprocessorn justerar belastningen dynamiskt genom att omdistribuera den.
Dessutom kan ändringsflödesprocessorn dynamiskt justera en containers skala om containerns dataflöde eller lagring ökar. När containern växer, hanterar ändringsloggprocessorn scenariot transparent genom att dynamiskt öka antalet leasingavtal och fördela dessa nya avtal mellan de befintliga instanserna.
Starttid
När en ändringsflödesprocessor startar för första gången initierar den som standard lånecontainern och startar bearbetningslivscykeln. Ändringar som har gjorts i den övervakade containern innan ändringsflödesprocessorn initieras för första gången identifieras inte.
Läsa från ett tidigare datum och en tidigare tid
Det är möjligt att initiera ändringsflödesprocessorn för att läsa ändringar som börjar vid ett visst datum och en viss tid genom att skicka en instans av DateTime
till WithStartTime
builder-tillägget:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(particularPointInTime)
.Build();
Ändringsflödesprocessorn initieras för det specifika datumet och tiden, och den börjar läsa ändringarna som skedde efteråt.
Läsa från början
I andra scenarier, till exempel i datamigreringar eller om du analyserar hela historiken för en container, måste du läsa ändringsflödet från början av containerns livslängd. Du kan använda WithStartTime
på builder-tillägget, men skicka DateTime.MinValue.ToUniversalTime()
, vilket genererar UTC-representationen av minimivärdet DateTime
som i det här exemplet:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
Ändringsflödesprocessorn initieras och börjar läsa ändringar från början av containerns livslängd.
Kommentar
De här anpassningsalternativen fungerar bara för att konfigurera startpunkten i tiden för ändringsflödesprocessorn. När lånecontainern har initierats för första gången har det ingen effekt att ändra de här alternativen.
Det går bara att anpassa startpunkten för senaste ändringsflödesläget. När du använder alla versioner och tar bort läge måste du börja läsa från den tidpunkt då processorn startas, eller återuppta från ett tidigare lånetillstånd som ligger inom kvarhållningsperioden för kontinuerlig säkerhetskopiering för ditt konto.
Ändra feed och tilldelad genomströmning
Läsåtgärder för ändringsflöde på den övervakade containern förbrukar enheter för begäranden. Kontrollera att den övervakade containern inte har någon begränsning. Begränsning lägger till fördröjningar i mottagandet av ändringsflödeshändelser på dina processorer.
Åtgärder i lånecontainern (uppdatering och underhåll av tillstånd) förbrukar begärandenheter. Ju högre antal instanser som använder samma lånecontainer, desto högre är den potentiella förbrukningen av enheter för begäranden. Kontrollera att lånecontainern inte har någon begränsning. Strypning lägger till fördröjningar i mottagandet av ändringsflödeshändelser. Begränsning kan till och med helt avsluta bearbetningen.
Dela lånecontainern
Du kan dela en lånecontainer i flera distributionsenheter. I en container med delat lån lyssnar varje distributionsenhet på en annan övervakad container eller har ett annat värde för processorName
. I den här konfigurationen upprätthåller varje distributionsenhet ett oberoende tillstånd för lånecontainern. Granska förbrukningen av begärandenheter för en leasecontainer för att se till att den tilldelade genomströmningen räcker för alla distributionsenheter.
Avancerad lånekonfiguration
Tre nyckelkonfigurationer kan påverka hur ändringsflödesprocessorn fungerar. Varje konfiguration påverkar förbrukningen för begärandeenheten i lånecontainern. Du kan ange någon av dessa konfigurationer när du skapar ändringsflödesprocessorn, men använd dem noggrant:
- Leasingförvärv: Som standard var 17:e sekund. En värd kontrollerar regelbundet tillståndet för leasearkivet och överväger att förvärva leasings som en del av den dynamiska skalningsprocessen. Den här processen utförs genom att utföra en fråga på hyrescontainern. Om du minskar det här värdet går det snabbare att ombalansera och förvärva lån, men det ökar förbrukningen för begärandeenheter i lånecontainern.
- Förfallotid för lån: Som standard 60 sekunder. Definierar den maximala tid som ett leasingavtal kan vara utan att förnyas innan det förvärvas av en annan host. När en värd kraschar plockas leasingavtal som den ägde upp av andra värdar efter den här tidsperioden samt det konfigurerade förnyelseintervallet. Om du minskar det här värdet går det snabbare att återställa efter en värdkrasch, men förfallovärdet bör aldrig vara lägre än förnyelseintervallet.
- Förnyelse av lån: Som standard var 13:e sekund. En värd som äger ett leasingavtal förnyar regelbundet leasingavtalet, även om det inte finns några nya ändringar att bearbeta. Den här processen görs genom att genomföra en ersättning av leasingavtalet. Om du minskar det här värdet minskar den tid som krävs för att identifiera lån som förlorats av en värd som kraschar, men det ökar förbrukningen för begärandeenhet på lånecontainern.
Var du ska köra ändringsflödesprocessorn
Ändringsflödesprocessorn kan finnas på valfri plattform som stöder tidskrävande processer eller uppgifter. Nedan följer några exempel:
- En instans av WebJobs i Azure App Service som körs kontinuerligt
- En process i en instans av Azure Virtual Machines
- Ett bakgrundsjobb i Azure Kubernetes Service
- En serverlös funktion i Azure Functions
- En ASP.NET värdbaserad tjänst
Även om ändringsflödesprocessorn kan köras i kortvariga miljöer eftersom lånecontainern upprätthåller tillståndet, lägger startcykeln för dessa miljöer till fördröjningar i den tid det tar att ta emot meddelanden (på grund av kostnaden för att starta processorn varje gång miljön startas).
Rollbaserade åtkomstkrav
När du använder Microsoft Entra-ID som autentiseringsmekanism kontrollerar du att identiteten har rätt behörigheter:
- I den övervakade containern:
Microsoft.DocumentDB/databaseAccounts/readMetadata
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
- I lånecontainern:
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery
Ytterligare resurser
- Azure Cosmos DB SDK
- Slutför exempelprogrammet på GitHub
- Ytterligare användningsexempel på GitHub
- Azure Cosmos DB-workshoplabb för ändringsflödesprocessor
Nästa steg
Läs mer om ändringsflödesprocessorn i följande artiklar: