Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
W tym artykule opisano, jak można monitorować postęp wystąpień procesora zestawienia zmian podczas odczytywania zestawienia zmian.
Dlaczego postęp monitorowania jest ważny?
Procesor zestawienia zmian działa jako wskaźnik, który porusza się do przodu w kanale zmian i przekazuje zmiany do implementacji delegata.
Twoje wdrożenie procesora strumienia zmian może przetwarzać zmiany z określoną szybkością w oparciu o dostępne zasoby, takie jak procesor CPU, pamięć, sieć itd.
Jeśli ta szybkość jest niższa niż szybkość, w jakiej zmiany występują w kontenerze usługi Azure Cosmos DB, procesor zacznie się opóźniać.
Zidentyfikowanie tego scenariusza pomaga zrozumieć, czy musimy skalować wdrożenie procesora strumienia zmian.
Implementacja estymatora zmian w strumieniu danych
Jako tryb push dla powiadomień automatycznych
Narzędzie do szacowania zestawienia zmian może działać jako model oparty na wypychaniu, podobnie jak procesor zestawienia zmian. Narzędzie do szacowania mierzy różnicę między ostatnim przetworzonym elementem (zdefiniowanym przez stan kontenera dzierżaw) a najnowszą zmianą w kontenerze oraz przekazuje tę wartość do delegata. Interwał, w którym pomiar jest wykonywany, można również dostosować z wartością domyślną wynoszącą 5 sekund.
Przykładowo, jeśli procesor strumienia zmian używa trybu najnowszej wersji i jest zdefiniowany w następujący sposób:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Właściwym sposobem na zainicjowanie estymatora do pomiaru tego procesora jest użycie GetChangeFeedEstimatorBuilder w następujący sposób:
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
Gdzie procesor i narzędzie do szacowania mają taką samą leaseContainer i taką samą nazwę.
Pozostałe dwa parametry to delegat, który otrzymuje liczbę, która reprezentuje liczbę oczekujących zmian do odczytania przez procesor, oraz interwał czasu, w którym ma zostać podjęta ta miara.
Przykładem delegata, który otrzymuje oszacowanie, jest:
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
Możesz wysłać to oszacowanie do rozwiązania do monitorowania i użyć go, aby zrozumieć, jak postęp działa w czasie.
Jako szczegółowe szacowanie na żądanie
W przeciwieństwie do modelu wypychania istnieje model alternatywny umożliwiający uzyskanie oszacowania na żądanie. Ten model zawiera również bardziej szczegółowe informacje:
- Szacowane opóźnienie na dzierżawę.
- Każda instancja posiada i przetwarza każdą dzierżawę, dzięki czemu możesz zidentyfikować, czy występuje problem w instancji.
Jeśli procesor przesyłu zmian jest zdefiniowany w następujący sposób:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Estymator można utworzyć przy użyciu tej samej konfiguracji dzierżawy.
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
I za każdym razem, gdy chcesz, z wymaganą częstotliwością, możesz uzyskać szczegółowe oszacowanie:
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
Każdy ChangeFeedProcessorState zawiera informacje o dzierżawie i opóźnieniu, a także o tym, która bieżąca instancja jest jego właścicielem.
Wdrażanie modułu szacującego
Narzędzie do szacowania zestawienia zmian nie musi być wdrażane jako część procesora zestawienia zmian ani być częścią tego samego projektu. Zalecamy wdrożenie narzędzia do szacowania na niezależnym wystąpieniu z procesorów. Pojedyncze wystąpienie estimatora może śledzić postęp dla dzierżaw i wystąpień we wdrożeniu procesora przepływu zmian.
Każde oszacowanie zużywa jednostki żądań z monitorowanych kontenerów i dzierżaw. Częstotliwość co 1 minutę jest dobrym punktem wyjścia, im niższa częstotliwość, tym większe zużycie jednostek żądania.
Obsługiwane tryby zestawienia zmian
Narzędzie do szacowania przepływu zmian może być używane zarówno w trybie najnowszej wersji, jak i w trybie wszystkich wersji i usunięć. W obu trybach podane oszacowanie nie gwarantuje dokładnej liczby zaległych zmian w procesie.
Dodatkowe zasoby
- Azure Cosmos DB SDK
- Przykłady użycia w witrynie GitHub (najnowsza wersja platformy .NET)
- Przykłady użycia na GitHubie (.NET - wszystkie wersje i przykłady usunięć)
- Przykłady użycia w witrynie GitHub (Java)
- Dodatkowe przykłady w witrynie GitHub
Dalsze kroki
Teraz możesz dowiedzieć się więcej o procesorze śledzenia zmian w poniższym artykule.