Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Procesor strumienia zmian jest częścią zestawów SDK usługi Azure Cosmos DB dla platformy .NET w wersji 3 i Java w wersji 4, co upraszcza proces odczytywania strumienia zmian oraz efektywny podział przetwarzania zdarzeń między wielu odbiorców.
Główną zaletą korzystania z procesora kanału zmian jest konstrukcja odporna na uszkodzenia, która zapewnia co najmniej jednokrotne dostarczanie wszystkich zdarzeń w kanale zmian.
Obsługiwane zestawy SDK
| .NET V3 | Java | Node.JS | Python |
|---|---|---|---|
| ✓ | ✓ | ✕ | ✕ |
Składniki procesora strumienia zmian
Procesor przepływu zmian ma cztery główne składniki:
Monitorowany kontener: monitorowany kontener zawiera dane, z których jest generowany kanał zmian. Wszystkie wstawienia i aktualizacje monitorowanego kontenera są odzwierciedlone w jego zestawieniu zmian.
Kontener dzierżawy: Kontener dzierżawy służy jako magazyn stanów i koordynuje przetwarzanie strumienia zmian między wieloma procesami roboczymi. Kontener dzierżawy może być przechowywany na tym samym koncie co monitorowany kontener lub na osobnym koncie.
Wystąpienie obliczeniowe: instancja obliczeniowa hostuje procesor strumienia zmian w celu nasłuchiwania zmian. W zależności od platformy może być reprezentowana przez maszynę wirtualną, zasobnik Kubernetes, instancję usługi Azure App Service lub rzeczywistą maszynę fizyczną. Instancja obliczeniowa ma unikatowy identyfikator, nazwany w tym artykule nazwą instancji.
Delegat: Delegat to fragment kodu, który definiuje, co jako deweloper chcesz zrobić z każdą partią zmian odczytanych przez procesor zmian.
Aby dokładniej zrozumieć, jak te cztery elementy procesora zmiany danych współpracują ze sobą, przyjrzyjmy się przykładowi na poniższym diagramie. Monitorowany kontener przechowuje elementy i używa wartości "Miasto" jako klucza partycji. Wartości klucza partycji są dystrybuowane w zakresach (każdy zakres reprezentuje partycję fizyczną), która zawiera elementy.
Diagram przedstawia dwa elementy obliczeniowe, a procesor strumienia zmian przypisuje różne zakresy każdemu z nich, aby maksymalnie rozłożyć obciążenie obliczeniowe. Każde wystąpienie ma niepowtarzalną nazwę.
Każdy zakres jest odczytywany równolegle. Postęp zakresu jest utrzymywany oddzielnie od innych zakresów w kontenerze dzierżawy za pośrednictwem dokumentu dzierżawy . Kombinacja umów najmu reprezentuje bieżący stan procesora strumienia zmian.
Implementuj procesor strumienia zmian
Procesor zestawienia zmian na platformie .NET jest dostępny w trybie najnowszej wersji oraz trybie wszystkich wersji i usunięć. Wszystkie wersje i tryb usunięć są dostępne w wersji zapoznawczej i są obsługiwane dla procesora zestawienia zmian począwszy od wersji 3.40.0-preview.0. Punkt wejścia dla obu trybów jest zawsze monitorowany kontener.
Aby odczytać przy użyciu najnowszego trybu wersji, w wystąpieniu wywołasz metodę ContainerGetChangeFeedProcessorBuilder:
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
Aby odczytać przy użyciu wszystkich wersji i trybu usuwania, wywołaj GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes z wystąpienia Container.
Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
W obu trybach pierwszy parametr jest odrębną nazwą, która opisuje cel tego procesora. Druga nazwa to implementacja delegata, która obsługuje zmiany.
Oto przykład delegata dla trybu najnowszej wersji:
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
Oto przykład delegata dla wszystkich wersji i trybu usuwania:
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ChangeFeedItem<ToDoItem> item in changes)
{
if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item.");
}
else
{
Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
}
// Simulate work
await Task.Delay(1);
}
}
Następnie należy zdefiniować nazwę wystąpienia obliczeniowego lub unikatowy identyfikator przy użyciu polecenia WithInstanceName. Nazwa wystąpienia obliczeniowego powinna być unikatowa i inna dla każdego wdrażanego wystąpienia obliczeniowego. Stan dzierżawy kontenera ustawia się przy użyciu WithLeaseContainer.
Wywołanie Build daje wystąpienie procesora, które można rozpocząć, wywołując polecenie StartAsync.
Ważne
Podczas tworzenia CosmosClient kontenerów strumienia danych i dzierżawy oraz inicjowania nowego obciążenia procesora zmiany strumienia:
Używanie globalnego punktu końcowego
- Zawsze określ globalny punkt końcowy (na przykład
contoso.documents.azure.com) zamiast regionalnego punktu końcowego (na przykładcontoso-westus.documents.azure.com).
Przełączanie regionów za pomocą ApplicationRegion lub ApplicationPreferredRegions
- Aby przekierować ruch kanału informacyjnego zmian między regionami, należy polegać na właściwości
ApplicationRegionlubApplicationPreferredRegions. - Procesor strumienia zmian tworzy dokumenty dzierżawy, które są przypisane do skonfigurowanego punktu końcowego. Dlatego zmiana punktów końcowych powoduje utworzenie nowych, niezależnych dokumentów dzierżawy.
✅ Zrób to — użyj globalnego punktu końcowego z regionem aplikacji:
CosmosClient client = new CosmosClient(
"https://contoso.documents.azure.com:443/", // Global endpoint
"<account-key>",
new CosmosClientOptions()
{
ApplicationRegion = Regions.WestUS2 // Specify region here
});
Container monitoredContainer = client.GetContainer("myDatabase", "myContainer");
Container leaseContainer = client.GetContainer("myDatabase", "leases");
✅ Zrób to — użyj globalnego punktu końcowego z regionami preferowanymi dla aplikacji:
CosmosClient client = new CosmosClient(
"https://contoso.documents.azure.com:443/", // Global endpoint
"<account-key>",
new CosmosClientOptions()
{
ApplicationPreferredRegions = new List<string> { Regions.WestUS2, Regions.EastUS2 }
});
Container monitoredContainer = client.GetContainer("myDatabase", "myContainer");
Container leaseContainer = client.GetContainer("myDatabase", "leases");
❌ Nie rób tego — unikaj regionalnych punktów końcowych:
// DON'T: Using regional endpoint will create region-scoped lease documents
CosmosClient client = new CosmosClient(
"https://contoso-westus.documents.azure.com:443/", // Regional endpoint - AVOID
"<account-key>");
Ważne
Unikaj asynchronicznego przetwarzania w metodach delegatów: w przypadku używania asynchronicznych interfejsów API w handleChanges() metodzie delegata należy pamiętać, że procesor zestawienia zmian może wskazać dzierżawę przed zakończeniem wszystkich operacji asynchronicznych. Może to prowadzić do nieodebrania wydarzeń, jeśli aplikacja napotka problemy podczas odzyskiwania. Rozważ użycie przetwarzania synchronicznego lub zaimplementowanie odpowiedniego śledzenia ukończenia przed zezwoleniem delegatowi na powrót.
Uwaga / Notatka
Poprzednie fragmenty kodu pochodzą z przykładów w usłudze GitHub. Możesz pobrać przykład dla trybu najnowszej wersji lub trybu wszystkich wersji i usunięć.
Cykl życia procesu
Normalny cykl życiowy wystąpienia hosta wygląda następująco:
- Odczyt zestawienia zmian.
- Jeśli nie ma żadnych zmian, uśpij na wstępnie zdefiniowaną ilość czasu (można dostosować za pomocą
WithPollIntervalw Builderze) i przejdź do kroku 1. - Jeśli istnieją zmiany, wyślij je do pełnomocnika.
- Po pomyślnym zakończeniu przetwarzania zmian przez delegata, zaktualizuj magazyn dzierżawy przy użyciu najnowszego przetworzonego momentu w czasie i przejdź do kroku 1.
Obsługa błędów
Procesor informacji o zmianach jest odporny na błędy kodu użytkownika. Jeśli implementacja delegata ma nieobsługiwany wyjątek (krok 4), wątek, który przetwarza tę konkretną partię zmian, zostaje zatrzymany, a ostatecznie tworzony jest nowy wątek. Nowy wątek sprawdza ostatni punkt w czasie zapisany w magazynie dzierżaw dla tego zakresu wartości klucza partycji. Nowy wątek restartuje się z tego miejsca, efektywnie wysyłając tę samą partię zmian do delegata. To zachowanie będzie kontynuowane do momentu, aż delegat prawidłowo przetworzy zmiany i jest to przyczyna, dla którego procesor zestawienia zmian ma gwarancję "co najmniej raz".
Uwaga / Notatka
W jednym scenariuszu partia zmian nie jest ponawiana. Jeśli awaria wystąpi podczas pierwszego w historii wykonania delegata, to magazyn dzierżawy nie ma poprzedniego zapisanego stanu do użycia podczas ponawiania próby. W takich przypadkach ponawianie próby używa początkowej konfiguracji, która może zawierać lub nie ostatnią partię.
Aby zapobiec ciągłemu blokowaniu procesora kanału zmian, który ponownie próbuje przetworzyć tę samą partię zmian, należy dodać logikę w kodzie delegata, która w przypadku wystąpienia wyjątku zapisze dokumenty do kolejki komunikatów o błędzie. Ten projekt gwarantuje, że można śledzić nieprzetworzone zmiany, a jednocześnie nadal być w stanie przetworzyć przyszłe zmiany. Kolejka z błędami komunikatów może być innym kontenerem usługi Azure Cosmos DB. Dokładny magazyn danych nie ma znaczenia. Po prostu chcesz, aby nieprzetworzone zmiany zostały utrwalone.
Możesz również użyć estymatora strumienia zmian, aby monitorować postęp instancji procesora strumienia zmian podczas ich odczytywania lub użyć powiadomień cyklu życia w celu wykrywania błędów bazowych.
Upewnij się, że limit czasu żądania sieci klienta jest dłuższy niż limit czasu po stronie serwera, aby zapobiec niezgodności limitu czasu, które mogą prowadzić do zatrzymania przetwarzania. Domyślny limit czasu po stronie serwera wynosi 5 sekund. Monitoruj błędy związane z przekroczeniem limitu czasu i odpowiednio dostosuj konfiguracje.
Powiadomienia dotyczące cyklu życia
Procesor kanału zmian można połączyć z dowolnym odpowiednim zdarzeniem w jego cyklu życia. Możesz otrzymywać powiadomienia do jednego lub wszystkich z nich. Zaleca się co najmniej zarejestrowanie powiadomienia o błędzie:
- Zarejestruj procedurę obsługi
WithLeaseAcquireNotification, aby otrzymywać powiadomienia, gdy bieżący host uzyskuje dzierżawę na rozpoczęcie przetwarzania. - Zarejestruj procedurę obsługi, aby
WithLeaseReleaseNotificationotrzymywać powiadomienia, gdy bieżący host zwalnia dzierżawę i przestaje ją przetwarzać. - Zarejestruj procedurę obsługi
WithErrorNotification, aby otrzymywać powiadomienia, gdy bieżący host napotka wyjątek podczas przetwarzania. Musisz mieć możliwość odróżnienia, czy źródłem jest delegat użytkownika (nieobsługiwany wyjątek) lub błąd, który procesor napotka podczas próby uzyskania dostępu do monitorowanego kontenera (na przykład problemów z siecią).
Powiadomienia o cyklu życia są dostępne w obu trybach zestawienia zmian. Oto przykład powiadomień cyklu życia w trybie najnowszej wersji:
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
return Task.CompletedTask;
};
Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
if (exception is ChangeFeedProcessorUserException userException)
{
Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
}
else
{
Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
}
return Task.CompletedTask;
};
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
.WithLeaseAcquireNotification(onLeaseAcquiredAsync)
.WithLeaseReleaseNotification(onLeaseReleaseAsync)
.WithErrorNotification(onErrorAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Jednostka wdrożenia
Pojedyncza jednostka wdrażania procesora strumienia zmian składa się z co najmniej jednego wystąpienia obliczeniowego, które mają tę samą wartość dla processorName i tę samą konfigurację kontenera dzierżawy, ale różne nazwy wystąpień. Istnieje wiele jednostek wdrażania, w których każda jednostka ma inny przepływ biznesowy dla zmian, a każda jednostka wdrożenia składa się z co najmniej jednego wystąpienia.
Na przykład może istnieć jedna jednostka wdrożenia, która wyzwala zewnętrzny interfejs API za każdym razem, gdy nastąpi zmiana kontenera. Inna jednostka wdrożenia może przenosić dane w czasie rzeczywistym za każdym razem, gdy nastąpi zmiana. Gdy w monitorowanym kontenerze nastąpi zmiana, wszystkie jednostki wdrożenia zostaną powiadomione.
Dynamiczne skalowanie
Jak wspomniano wcześniej, w ramach jednostki wdrożeniowej można mieć jedną lub więcej instancję obliczeniową. Aby skorzystać z dystrybucji obliczeniowej w ramach jednostki wdrażania, jedynymi kluczowymi wymaganiami są następujące:
- Wszystkie wystąpienia powinny mieć taką samą konfigurację kontenera dzierżawy.
- Wszystkie wystąpienia powinny mieć tę samą wartość dla elementu
processorName. - Każde wystąpienie musi mieć inną nazwę (
WithInstanceName).
Jeśli te trzy warunki mają zastosowanie, procesor zestawienia zmian dystrybuuje wszystkie dzierżawy, które znajdują się w kontenerze dzierżawy we wszystkich uruchomionych wystąpieniach tej jednostki wdrażania, i równoległie oblicza przy użyciu algorytmu równomiernego dystrybucji. Dzierżawa jest przypisana do jednej instancji w dowolnym momencie, więc liczba instancji nie powinna być większa niż liczba dzierżaw.
Liczba wystąpień może rosnąć i zmniejszać. Procesor przepływu zmian dynamicznie dostosowuje obciążenie, redistribuując je odpowiednio.
Ponadto procesor zestawienia zmian może dynamicznie dostosowywać skalę kontenera w przypadku zwiększenia jego przepustowości lub pojemności magazynowej. Gdy kontener rośnie, procesor strumienia zmian przezroczysto obsługuje scenariusz, dynamicznie zwiększając dzierżawy i dystrybuując nowe dzierżawy między istniejące wystąpienia.
Godzina rozpoczęcia
Domyślnie gdy procesor zestawienia zmian uruchamia się po raz pierwszy, inicjuje kontener dzierżawy i uruchamia cykl życia przetwarzania. Wszelkie zmiany, które wystąpiły w monitorowanym kontenerze przed pierwszym zainicjowaniem procesora zestawienia zmian, nie są wykrywane.
Odczytywanie danych z wcześniejszej daty i godziny
Istnieje możliwość zainicjowania procesora zestawienia zmian, aby odczytywać zmiany rozpoczynające się od określonej daty i godziny, poprzez przekazanie instancji do rozszerzenia konstruktora DateTime: WithStartTime.
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(particularPointInTime)
.Build();
Procesor przepływu zmian jest inicjowany dla tej konkretnej daty i godziny, a następnie zaczyna odczytywać zmiany, które nastąpiły po tej chwili.
Czytanie od początku
W innych scenariuszach, takich jak w przypadku migracji danych lub analizowania całej historii kontenera, musisz odczytać zestawienie zmian od początku okresu istnienia tego kontenera. Można użyć WithStartTime w rozszerzeniu konstruktora, ale przekazać DateTime.MinValue.ToUniversalTime(), która generuje reprezentację UTC minimalnej wartości DateTime, jak w tym przykładzie:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.WithStartTime(DateTime.MinValue.ToUniversalTime())
.Build();
Procesor strumienia zmian jest inicjowany i rozpoczyna odczytywanie zmian od początku okresu istnienia kontenera.
Uwaga / Notatka
Te opcje dostosowywania działają tylko w celu skonfigurowania początkowego momentu czasowego procesora strumienia zmian. Po zainicjowaniu kontenera leasingowego po raz pierwszy zmiana tych opcji nie ma wpływu.
Dostosowywanie punktu początkowego jest dostępne tylko dla najnowszego trybu zestawienia zmian wersji. W przypadku korzystania ze wszystkich wersji i trybu usuwania należy rozpocząć odczytywanie od momentu uruchomienia procesora lub wznowić z wcześniejszego stanu dzierżawy, który znajduje się w okresie ciągłego przechowywania kopii zapasowej konta.
Zestawienie zmian i aprowizowana przepływność
Zmiana operacji odczytu kanału informacyjnego w monitorowanym kontenerze zużywa jednostki żądań. Upewnij się, że monitorowany kontener nie ma ograniczenia przepustowości. Ograniczanie zwiększa opóźnienia w odbieraniu zdarzeń strumienia zmian przez twoje procesory.
Operacje na kontenerze dzierżawy (aktualizowanie i utrzymywanie stanu) zużywają jednostki żądań. Im większa liczba wystąpień korzystających z tego samego kontenera dzierżawy, tym większe jest potencjalne zużycie jednostek operacyjnych. Upewnij się, że kontener dzierżawy nie ma ograniczenia przepustowości. Ograniczanie powoduje dodanie opóźnień w odbieraniu zdarzeń przepływu danych o zmianach. Ograniczanie przepustowości może nawet całkowicie zakończyć przetwarzanie.
Udostępnianie kontenera dzierżawy
Można udostępnić kontener dzierżawy pomiędzy wieloma jednostkami wdrażania. W współdzielonym kontenerze dzierżawy każda jednostka wdrożeniowa nasłuchuje innego monitorowanego kontenera lub ma inną wartość dla elementu processorName. W tej konfiguracji każda jednostka wdrożeniowa zachowuje niezależny stan w kontenerze dzierżawy.
Przejrzyj użycie jednostek żądania w kontenerze dzierżawy, aby upewnić się, że aprowizowana przepływność jest wystarczająca dla wszystkich jednostek wdrożenia.
Zaawansowana konfiguracja dzierżawy
Trzy kluczowe konfiguracje mogą wpływać na funkcjonowanie procesora danych o zmianach. Każda konfiguracja ma wpływ na konsumpcję jednostek żądania w kontenerze dzierżawy. Podczas tworzenia procesora zestawienia zmian można ustawić jedną z tych konfiguracji, ale należy ich ostrożnie użyć:
- Uzyskiwanie dzierżawy: automatycznie co 17 sekund. Host okresowo sprawdza stan magazynu dzierżaw i rozważa nabycie dzierżaw w ramach procesu dynamicznego skalowania. Czynność ta odbywa się przez wykonanie zapytania na kontenerze dzierżawy. Zmniejszenie tej wartości sprawia, że ponowne równoważenie i uzyskiwanie dzierżaw jest szybsze, ale zwiększa zużycie jednostek żądań w kontenerze na dzierżawy.
- Wygaśnięcie dzierżawy: domyślnie 60 sekund. Definiuje maksymalny czas, przez jaki dzierżawa może istnieć bez żadnych działań odnawiania przed uzyskaniem go przez innego hosta. W przypadku awarii hosta dzierżawy, które do niego należą, są przejmowane przez innych hostów po upływie tego czasu oraz skonfigurowanego interwału odnawiania. Zmniejszenie tej wartości sprawia, że odzyskiwanie po awarii hosta jest szybsze, ale wartość wygaśnięcia nigdy nie powinna być niższa niż okres odnawiania.
- Odnawianie dzierżawy: domyślnie co 13 sekund. Host, który jest właścicielem dzierżawy, okresowo ją odnawia, nawet jeśli nie ma nowych zmian do wykorzystania. Ten proces jest wykonywany przez wykonanie zamiany w dzierżawie. Zmniejszenie tej wartości skraca czas potrzebny do wykrywania dzierżaw utraconych przez awarię hosta, ale zwiększa zużycie jednostek żądań w kontenerze dzierżawy.
Gdzie hostować procesor strumienia zmian
Procesor zestawienia zmian może być hostowany na dowolnej platformie, która obsługuje długotrwałe procesy lub zadania. Oto kilka przykładów:
- Proces w instancji usługi Azure Virtual Machines
- Zadanie w tle w usłudze Azure Kubernetes Service
- Funkcja bezserwerowa w usłudze Azure Functions
- Hostowana usługa ASP.NET
Chociaż procesor strumienia zmian może działać w środowiskach krótkotrwałych, ponieważ kontener dzierżawy utrzymuje stan, cykl uruchamiania tych środowisk dodaje opóźnienia do czasu potrzebnego na odbieranie powiadomień (ze względu na obciążenie związane z uruchamianiem procesora za każdym razem, gdy środowisko jest uruchamiane).
Wymagania dotyczące dostępu opartego na rolach
W przypadku korzystania z identyfikatora Entra firmy Microsoft jako mechanizmu uwierzytelniania upewnij się, że tożsamość ma odpowiednie uprawnienia:
- W monitorowanym kontenerze:
Microsoft.DocumentDB/databaseAccounts/readMetadataMicrosoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
- W kontenerze dzierżawy:
Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/readMicrosoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/createMicrosoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replaceMicrosoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/deleteMicrosoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery
Więcej zasobów
- Azure Cosmos DB SDK
- Kompletna przykładowa aplikacja w usłudze GitHub
- Przykłady użycia w usłudze GitHub
- Warsztaty laboratoryjne Azure Cosmos DB dla procesora strumienia zmian
Dalsze kroki
Dowiedz się więcej o procesorze Change Feed w następujących artykułach: