De schatting van de wijzigingenfeed gebruiken

VAN TOEPASSING OP: NoSQL

In dit artikel wordt beschreven hoe u de voortgang van de wijzigingenfeedprocessorexemplaren kunt bewaken terwijl ze de wijzigingenfeed lezen.

Waarom is het bewaken van de voortgang belangrijk?

De verwerker van de wijzigingenfeed fungeert als een aanwijzer die verder gaat in uw wijzigingenfeed en de wijzigingen aan een gedelegeerde implementatie levert.

De implementatie van de wijzigingenfeedprocessor kan wijzigingen met een bepaalde snelheid verwerken op basis van de beschikbare resources, zoals CPU, geheugen, netwerk, enzovoort.

Als deze snelheid lager is dan de snelheid waarmee uw wijzigingen plaatsvinden in uw Azure Cosmos DB-container, loopt uw processor achter.

Als u dit scenario identificeert, krijgt u inzicht in de vraag of de implementatie van de wijzigingenfeedprocessor moet worden geschaald.

De schatting van de wijzigingenfeed implementeren

Als pushmodel voor automatische meldingen

Net als de verwerker van de wijzigingenfeed kan de wijzigingsfeedschatter als een pushmodel werken. De estimator meet het verschil tussen het laatste verwerkte item (gedefinieerd door de status van de leasecontainer) en de meest recente wijziging in de container en pusht deze waarde naar een gemachtigde. Het interval waarmee de meting wordt uitgevoerd, kan ook worden aangepast met een standaardwaarde van 5 seconden.

Als de verwerker van uw wijzigingenfeed bijvoorbeeld als volgt is gedefinieerd:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

De juiste manier om een estimator te initialiseren om deze processor te meten, zou als volgt worden gebruikt GetChangeFeedEstimatorBuilder :

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Waarbij zowel de processor als de estimator dezelfde leaseContainer en dezelfde naam hebben.

De andere twee parameters zijn de gemachtigde, die een getal ontvangt dat aangeeft hoeveel wijzigingen in behandeling zijn om te worden gelezen door de processor, en het tijdsinterval waarin u deze meting wilt laten uitvoeren.

Een voorbeeld van een gemachtigde die de schatting ontvangt, is:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

U kunt deze schatting naar uw bewakingsoplossing verzenden en deze gebruiken om te begrijpen hoe uw voortgang zich in de loop van de tijd gedraagt.

Als een gedetailleerde schatting op aanvraag

In tegenstelling tot het pushmodel is er een alternatief waarmee u de schatting op aanvraag kunt verkrijgen. Dit model biedt ook meer gedetailleerde informatie:

  • De geschatte vertraging per lease.
  • Het exemplaar dat eigenaar is van elke lease en deze verwerkt, zodat u kunt bepalen of er een probleem is met een exemplaar.

Als de verwerker van uw wijzigingenfeed als volgt is gedefinieerd:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

U kunt de estimator maken met dezelfde leaseconfiguratie:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

En wanneer u het wilt, met de frequentie die u nodig hebt, kunt u de gedetailleerde schatting verkrijgen:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Elk ChangeFeedProcessorState bevat de lease- en vertragingsgegevens, en ook wie het huidige exemplaar is dat de eigenaar is.

Estimator-implementatie

De wijzigingsfeedschatter hoeft niet te worden geïmplementeerd als onderdeel van uw wijzigingenfeedverwerker en hoeft ook geen deel uit te maken van hetzelfde project. We raden u aan de estimator te implementeren op een onafhankelijk en volledig ander exemplaar dan uw processors. Eén estimator-exemplaar kan de voortgang bijhouden voor alle leases en exemplaren in de implementatie van de wijzigingenfeedprocessor.

Elke schatting verbruikt aanvraageenheden van uw bewaakte en leasecontainers. Een frequentie van 1 minuut ertussen is een goed uitgangspunt, hoe lager de frequentie, hoe hoger de verbruikte aanvraageenheden.

Aanvullende resources

Volgende stappen

U kunt nu verdergaan met meer informatie over de verwerker van wijzigingenfeeds in de volgende artikelen: