Verwenden des Änderungsfeed-Estimators

GILT FÜR: NoSQL

In diesem Artikel erfahren Sie, wie Sie den Fortschritt Ihrer Änderungsfeedprozessor-Instanzen beim Lesen des Änderungsfeeds überwachen können.

Warum ist die Überwachung des Fortschritts wichtig?

Der Änderungsfeedprozessor fungiert als Zeiger, der sich im Änderungsfeed vorwärts bewegt und die Änderungen an eine Delegatimplementierung übergibt.

Durch die Bereitstellung eines Änderungsfeedprozessors können Änderungen mit einer bestimmten Rate verarbeitet werden, die auf den verfügbaren Ressourcen wie CPU, Arbeitsspeicher, Netzwerk usw. basiert.

Wenn diese Rate langsamer ist als die Rate, mit der Ihre Änderungen in Ihrem Azure Cosmos DB-Container durchgeführt werden, gerät der Prozessor in Rückstand.

Mit diesem Szenario können wir ermitteln, ob wir unsere Bereitstellung des Änderungsfeedprozessors skalieren müssen.

Implementieren des Änderungsfeed-Estimators

Als Push-Modell für automatische Benachrichtigungen

Wie der Änderungsvorschubprozessor funktioniert auch der Änderungsfeed-Estimator als Push-Modell. Der Estimator misst die Differenz zwischen dem letzten verarbeiteten Element (durch den Zustand des Leasescontainers definiert) und der letzten Änderung im Container und überträgt diesen Wert per Pushvorgang an einen Delegat. Das Intervall, in dem die Messung durchgeführt wird, kann auch mit einem Standardwert von fünf Sekunden angepasst werden.

Hier sehen Sie ein Beispiel für die Definition eines Änderungsfeedprozessors:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Die richtige Methode zum Initialisieren eines Estimators zum Messen dieses Prozessors wäre die Verwendung von GetChangeFeedEstimatorBuilder wie folgt:

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Hier stimmen sowohl leaseContainer als auch die Namen von Prozessor und Estimator überein.

Die anderen beiden Parameter sind der Delegat. Dieser erhält eine Zahl, die angibt, wie viele Änderungen noch vom Prozessor gelesen werden müssen und in welchem Zeitintervall diese Messung durchgeführt werden soll.

Hier sehen Sie ein Beispiel für einen Delegat, der die Schätzung empfängt:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

Sie können diese Schätzung an Ihre Überwachungslösung senden, um Ihren Fortschritt im Laufe der Zeit zu prüfen.

Als detaillierte Schätzung auf Abruf

Im Gegensatz zum Push-Modell gibt es eine Alternative, bei der Sie die Schätzung auf Anforderung erhalten. Dieses Modell bietet auch ausführlichere Informationen:

  • Die geschätzte Verzögerung pro Lease.
  • Die-Instanz, welche die einzelnen Lease besitzt und bearbeitet, damit Sie ermitteln können, ob ein Problem in einer Instanz vorliegt.

Wenn Ihr Änderungsfeed-Prozessor wie folgt definiert ist:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Sie können die Schätzung mit der gleichen Lease-Konfiguration erstellen:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

Und wenn Sie die gewünschte Häufigkeit benötigen, können Sie die detaillierte Schätzung abrufen:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Jede ChangeFeedProcessorState enthält die Lease-und Lag-Informationen sowie die aktuelle Instanz, die es besitzt.

Bereitstellung des Schätzers

Der Änderungsfeed-Estimator muss weder im Rahmen Ihres Änderungsfeedprozessors bereitgestellt werden noch Teil desselben Projekts sein. Es wird empfohlen, den Schätzer auf einer unabhängigen und ganz anderen Instanz als Ihre Prozessoren bereitzustellen. Eine einzelne Schätzerinstanz kann den Fortschritt für alle Leases und Instanzen in Ihrer Änderungsfeed-Prozessorbereitstellung nachverfolgen.

Jede Schätzung verbraucht Anforderungseinheiten aus Ihren überwachten Containern und Leasecontainern. Ein guter Ausgangspunkt ist eine Häufigkeit mit 1 Minute Abstand dazwischen, je niedriger die Häufigkeit, desto mehr Anforderungseinheiten werden verbraucht.

Zusätzliche Ressourcen

Nächste Schritte

In den folgenden Artikeln erfahren Sie mehr über den Änderungsfeedprozessor: