Implementieren des Änderungsfeed-Estimators

Abgeschlossen

Während der Änderungsfeedprozessor Änderungen verarbeitet, fungiert er als zeitbasierter Zeiger. Der Zeiger bewegt sich in der Zeit über den Änderungsfeed vorwärts und sendet Batches von Änderungen an den Delegaten, um Geschäftslogik auszuführen.

Der Änderungsfeedprozessor kann möglicherweise durch die physischen Ressourcen der Hostanwendung eingeschränkt werden. Wenn dies der Fall ist, wird fast sofort angenommen, dass der Änderungsfeedprozessor über mehrere Hosts horizontal skaliert werden sollte, die alle gleichzeitig aus dem Änderungsfeed lesen.

Die Ermittlung, ob Ihre Änderungsfeedlösung horizontal skaliert werden muss, kann jedoch schwierig sein und erfordert eine Schätzerfunktion. Der Änderungsfeedschätzer ist eine Zusatzfunktion des Prozessors, die die Anzahl der Änderungen misst, die zu einem beliebigen Zeitpunkt zum Lesen durch den Prozessor ausstehen.

Um den Schätzer zu implementieren, müssen Sie zuerst den Prozessor analysieren. In diesem Beispiel werden ein Prozessor mit einem bestimmten Leasecontainer und ein Delegat zum Verarbeiten von Änderungen erstellt.

Container sourceContainer = client.GetContainer("cosmicworks", "products");

Container leaseContainer = client.GetContainer("cosmicworks", "productslease");

ChangeFeedProcessor processor = sourceContainer.GetChangeFeedProcessorBuilder<Product>(
    processorName: "productItemProcessor",
    onChangesDelegate: changeHandlerDelegate)
    .WithInstanceName("desktopApplication")
    .WithLeaseContainer(leaseContainer)
    .Build();

Im nächsten Schritt sollten Sie einen Delegaten für die Verwendung des Typs ChangesEstimationHandler implementieren, der jedes Mal, wenn der Schätzer den Änderungsfeed abfragt, um festzustellen, wie viele Änderungen noch nicht verarbeitet wurden, die Verarbeitung übernimmt.

ChangesEstimationHandler changeEstimationDelegate = async (
    long estimation, 
    CancellationToken cancellationToken
) => {
    // Do something with the estimation
};

Schließlich erstellen Sie den Schätzer auf ähnliche Weise wie der Prozessor unter Wiederverwendung desselben Leasecontainers.

ChangeFeedProcessor estimator = sourceContainer.GetChangeFeedEstimatorBuilder(
    processorName: "productItemEstimator",
    estimationDelegate: changeEstimationDelegate)
    .WithLeaseContainer(leaseContainer)
    .Build();