Dela via


Använda ändringsflödesestimatorn

GÄLLER FÖR: NoSQL

Den här artikeln beskriver hur du kan övervaka förloppet för dina instanser av ändringsflödesprocessorn när de läser ändringsflödet.

Varför är det viktigt att övervaka förloppet?

Ändringsflödesprocessorn fungerar som en pekare som går framåt i ändringsflödet och levererar ändringarna till en delegatimplementering.

Distributionen av ändringsflödesprocessorn kan bearbeta ändringar med en viss hastighet baserat på dess tillgängliga resurser, till exempel CPU, minne, nätverk och så vidare.

Om den här hastigheten är långsammare än den hastighet med vilken dina ändringar sker i Azure Cosmos DB-containern börjar processorn släpa efter.

Att identifiera det här scenariot hjälper dig att förstå om vi behöver skala distributionen av ändringsflödesprocessorn.

Implementera ändringsflödesestimatorn

Som push-modell för automatiska meddelanden

Precis som ändringsflödesprocessorn kan ändringsflödesberäknaren fungera som en push-modell. Skattaren mäter skillnaden mellan det senast bearbetade objektet (definierat av tillståndet för lånecontainern) och den senaste ändringen i containern och skickar det här värdet till ett ombud. Det intervall med vilket mätningen utförs kan också anpassas med ett standardvärde på 5 sekunder.

Om ändringsflödesprocessorn till exempel använder det senaste versionsläget och definieras så här:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Rätt sätt att initiera en uppskattning för att mäta att processorn skulle använda GetChangeFeedEstimatorBuilder så här:

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Där både processorn och skattaren delar samma leaseContainer och samma namn.

De andra två parametrarna är ombudet, som tar emot ett tal som representerar hur många ändringar som väntar på att läsas av processorn och det tidsintervall som du vill att mätningen ska utföras med.

Ett exempel på ett ombud som tar emot uppskattningen är:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

Du kan skicka den här uppskattningen till din övervakningslösning och använda den för att förstå hur dina framsteg fungerar över tid.

Som en detaljerad uppskattning på begäran

Till skillnad från push-modellen finns det ett alternativ som gör att du kan hämta uppskattningen på begäran. Den här modellen innehåller också mer detaljerad information:

  • Den uppskattade fördröjningen per lån.
  • Instansen äger och bearbetar varje lån, så att du kan identifiera om det finns ett problem på en instans.

Om ändringsflödesprocessorn definieras så här:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Du kan skapa uppskattningen med samma lånekonfiguration:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

Och när du vill ha det, med den frekvens du behöver, kan du få den detaljerade uppskattningen:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Var ChangeFeedProcessorState och en innehåller låne- och fördröjningsinformationen, samt vem som är den aktuella instansen som äger den.

Beräknardistribution

Ändringsflödesberäknaren behöver inte distribueras som en del av ändringsflödesprocessorn eller vara en del av samma projekt. Vi rekommenderar att du distribuerar uppskattningen på en oberoende instans från dina processorer. En enskild beräkningsinstans kan spåra förloppet för alla lån och instanser i distributionen av ändringsflödesprocessorn.

Varje uppskattning förbrukar enheter för begäranden från dina övervakade containrar och lånecontainrar. En frekvens på 1 minut mellan är en bra startpunkt, desto lägre frekvens, desto högre förbrukas enheter för begäran.

Ändringsflödeslägen som stöds

Ändringsflödesberäknaren kan användas för både det senaste versionsläget och för alla versioner och borttagningsläge. I båda lägena är den angivna uppskattningen inte garanterad att vara ett exakt antal utestående ändringar att bearbeta.

Ytterligare resurser

Nästa steg

Du kan nu fortsätta med att lära dig mer om ändringsflödesprocessorn i följande artikel: