Verwenden des Änderungsfeed-Estimators
GILT FÜR: NoSQL
In diesem Artikel erfahren Sie, wie Sie den Fortschritt Ihrer Änderungsfeedprozessor-Instanzen beim Lesen des Änderungsfeeds überwachen können.
Warum ist die Überwachung des Fortschritts wichtig?
Der Änderungsfeedprozessor fungiert als Zeiger, der sich im Änderungsfeed vorwärts bewegt und die Änderungen an eine Delegatimplementierung übergibt.
Durch die Bereitstellung eines Änderungsfeedprozessors können Änderungen mit einer bestimmten Rate verarbeitet werden, die auf den verfügbaren Ressourcen wie CPU, Arbeitsspeicher, Netzwerk usw. basiert.
Wenn diese Rate langsamer als die Rate ist, mit der Ihre Änderungen in Ihrem Azure Cosmos DB-Container erfolgen, beginnt ihr Prozessor hinterherzuhinken.
Mit diesem Szenario können wir ermitteln, ob wir unsere Bereitstellung des Änderungsfeedprozessors skalieren müssen.
Implementieren des Änderungsfeed-Estimators
Als Push-Modell für automatische Benachrichtigungen
Wie der Änderungsvorschubprozessor funktioniert auch der Änderungsfeed-Estimator als Push-Modell. Der Schätzer misst die Differenz zwischen dem letzten verarbeiteten Element (durch den Zustand des Leasescontainers definiert) und der letzten Änderung im Container und pusht diesen Wert an einen Delegaten. Das Intervall, in dem die Messung durchgeführt wird, kann auch mit einem Standardwert von fünf Sekunden angepasst werden.
Wenn Ihr Änderungsfeedprozessor z. B. den Modus „Neueste Version“ verwendet und wie folgt definiert ist:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Die richtige Methode zum Initialisieren eines Estimators zum Messen dieses Prozessors wäre die Verwendung von GetChangeFeedEstimatorBuilder
wie folgt:
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
Hier stimmen sowohl leaseContainer
als auch die Namen von Prozessor und Estimator überein.
Die anderen beiden Parameter sind der Delegat. Dieser erhält eine Zahl, die angibt, wie viele Änderungen noch vom Prozessor gelesen werden müssen und in welchem Zeitintervall diese Messung durchgeführt werden soll.
Hier sehen Sie ein Beispiel für einen Delegat, der die Schätzung empfängt:
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
Sie können diese Schätzung an Ihre Überwachungslösung senden, um Ihren Fortschritt im Laufe der Zeit zu prüfen.
Als detaillierte Schätzung auf Abruf
Im Gegensatz zum Push-Modell gibt es eine Alternative, bei der Sie die Schätzung auf Anforderung erhalten. Dieses Modell bietet auch ausführlichere Informationen:
- Die geschätzte Verzögerung pro Lease.
- Die-Instanz, welche die einzelnen Lease besitzt und bearbeitet, damit Sie ermitteln können, ob ein Problem in einer Instanz vorliegt.
Wenn Ihr Änderungsfeed-Prozessor wie folgt definiert ist:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Sie können die Schätzung mit der gleichen Lease-Konfiguration erstellen:
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
Und wenn Sie die gewünschte Häufigkeit benötigen, können Sie die detaillierte Schätzung abrufen:
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
Jede ChangeFeedProcessorState
enthält die Lease- und Verzögerungsinformationen sowie die aktuelle Instanz, die es besitzt.
Bereitstellung des Schätzers
Der Änderungsfeedschätzer muss weder zusammen mit Ihrem Änderungsfeedprozessor bereitgestellt werden noch Teil desselben Projekts sein. Es wird empfohlen, den Schätzer auf einer von Ihren Prozessoren unabhängigen Instanz bereitzustellen. Eine einzelne Schätzerinstanz kann den Fortschritt für alle Leases und Instanzen in Ihrer Änderungsfeed-Prozessorbereitstellung nachverfolgen.
Jede Schätzung verbraucht Anforderungseinheiten aus Ihren überwachten Containern und Leasecontainern. Ein guter Ausgangspunkt ist eine Häufigkeit mit 1 Minute Abstand dazwischen, je niedriger die Häufigkeit, desto mehr Anforderungseinheiten werden verbraucht.
Unterstützte Änderungsfeedmodi
Der Änderungsfeedschätzer kann sowohl im Modus „Neueste Version“ als auch im Modus „Alle Versionen und Löschvorgänge“ verwendet werden. In beiden Modi kann nicht garantiert werden, dass die bereitgestellte Schätzung der genauen Anzahl noch ausstehender zu verarbeitender Änderungen entspricht.
Zusätzliche Ressourcen
- Azure Cosmos DB SDK
- Verwendungsbeispiele auf GitHub (.NET, „Neueste Version“)
- Verwendungsbeispiele auf GitHub (.NET, „Alle Versionen und Löschvorgänge“)
- Verwendungsbeispiele auf GitHub (Java)
- Zusätzliche Beispiele auf GitHub
Nächste Schritte
In den folgenden Artikeln erfahren Sie mehr über den Änderungsfeedprozessor: