Udostępnij za pośrednictwem


Procesor zestawienia zmian w usłudze Azure Cosmos DB

DOTYCZY: NoSQL

Procesor zestawienia zmian jest częścią zestawów SDK platformy .NET usługi Azure Cosmos DB w wersji 3 i języka Java w wersji 4 . Upraszcza to proces odczytywania zestawienia zmian i efektywnego dystrybuowania przetwarzania zdarzeń między wieloma użytkownikami.

Główną zaletą korzystania z procesora zestawienia zmian jest jego odporny na uszkodzenia projekt, który zapewnia "co najmniej jednokrotne" dostarczanie wszystkich zdarzeń w zestawieniach zmian.

Obsługiwane zestawy SDK

.Net V3 Java Node.JS Python

Składniki procesora zestawienia zmian

Procesor zestawienia zmian ma cztery główne składniki:

  • Monitorowany kontener: monitorowany kontener zawiera dane, z których jest generowany kanał zmian. Wszystkie wstawienia i aktualizacje monitorowanego kontenera są odzwierciedlone w jego zestawieniu zmian.

  • Kontener dzierżawy: kontener dzierżawy działa jako magazyn stanu i koordynuje przetwarzanie zestawienia zmian między wieloma procesami roboczymi. Kontener dzierżawy może być przechowywany na tym samym koncie co monitorowany kontener lub na osobnym koncie.

  • Wystąpienie obliczeniowe: wystąpienie obliczeniowe hostuje procesor zestawienia zmian w celu nasłuchiwania zmian. W zależności od platformy może być reprezentowana przez maszynę wirtualną, zasobnik Kubernetes, wystąpienie usługi aplikacja systemu Azure Service lub rzeczywistą maszynę fizyczną. Wystąpienie obliczeniowe ma unikatowy identyfikator, który jest nazywany nazwą wystąpienia w tym artykule.

  • Delegat: Delegat to kod, który definiuje, co ty, deweloper, chcesz zrobić z każdą partią zmian odczytanych przez procesor zestawienia zmian.

Aby dokładniej zrozumieć, jak te cztery elementy procesora zestawienia zmian współpracują ze sobą, przyjrzyjmy się przykładowi na poniższym diagramie. Monitorowany kontener przechowuje elementy i używa wartości "Miasto" jako klucza partycji. Wartości klucza partycji są dystrybuowane w zakresach (każdy zakres reprezentuje partycję fizyczną), która zawiera elementy.

Na diagramie przedstawiono dwa wystąpienia obliczeniowe, a procesor zestawienia zmian przypisuje różne zakresy do każdego wystąpienia w celu zmaksymalizowania dystrybucji obliczeniowej. Każde wystąpienie ma inną, unikatową nazwę.

Każdy zakres jest odczytywany równolegle. Postęp zakresu jest utrzymywany oddzielnie od innych zakresów w kontenerze dzierżawy za pośrednictwem dokumentu dzierżawy . Kombinacja dzierżaw reprezentuje bieżący stan procesora zestawienia zmian.

Przykład procesora zestawienia zmian

Implementowanie procesora zestawienia zmian

Procesor zestawienia zmian na platformie .NET jest dostępny dla najnowszego trybu wersji oraz wszystkich wersji i trybu usuwania. Wszystkie wersje i tryb usuwania są dostępne w wersji zapoznawczej i są obsługiwane dla procesora zestawienia zmian rozpoczynającego się w wersji 3.40.0-preview.0. Punkt wejścia dla obu trybów jest zawsze monitorowany kontener.

Aby odczytać przy użyciu najnowszego trybu wersji, w wystąpieniu wywołasz metodę Container GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Aby odczytać przy użyciu wszystkich wersji i trybu usuwania, wywołaj metodę GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes Container z wystąpienia:

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

W obu trybach pierwszy parametr jest odrębną nazwą, która opisuje cel tego procesora. Druga nazwa to implementacja delegata, która obsługuje zmiany.

Oto przykład delegata dla najnowszego trybu wersji:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Oto przykład delegata dla wszystkich wersji i trybu usuwania:

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Previous.id}.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

Następnie należy zdefiniować nazwę wystąpienia obliczeniowego lub unikatowy identyfikator przy użyciu polecenia WithInstanceName. Nazwa wystąpienia obliczeniowego powinna być unikatowa i inna dla każdego wdrażanego wystąpienia obliczeniowego. Dla kontenera należy ustawić stan dzierżawy przy użyciu polecenia WithLeaseContainer.

Wywołanie Build daje wystąpienie procesora, które można rozpocząć, wywołując polecenie StartAsync.

Uwaga

Poprzednie fragmenty kodu pochodzą z przykładów w usłudze GitHub. Możesz pobrać przykład dla najnowszego trybu wersji lub wszystkich wersji i usunąć tryb.

Cykl życia przetwarzania

Normalny cykl życiowy wystąpienia hosta wygląda następująco:

  1. Odczyt zestawienia zmian.
  2. Jeśli nie ma żadnych zmian, uśpij wstępnie zdefiniowany czas (dostosowywalny przy użyciu narzędzia WithPollInterval Builder) i przejdź do pliku #1.
  3. Jeśli istnieją zmiany, wyślij je do pełnomocnika.
  4. Po pomyślnym zakończeniu przetwarzania zmian przez pełnomocnika zaktualizuj magazyn dzierżawy przy użyciu najnowszego przetworzonego punktu w czasie i przejdź do pliku #1.

Obsługa błędów

Procesor zestawienia zmian jest odporny na błędy kodu użytkownika. Jeśli implementacja delegata ma nieobsługiwany wyjątek (krok 4), wątek, który przetwarza daną partię zmian, zatrzyma się, a nowy wątek zostanie ostatecznie utworzony. Nowy wątek sprawdza ostatni punkt w czasie zapisany przez magazyn dzierżaw dla tego zakresu wartości klucza partycji. Nowy wątek jest uruchamiany ponownie z tego miejsca, skutecznie wysyłając tę samą partię zmian do delegata. To zachowanie będzie kontynuowane do momentu, aż delegat prawidłowo przetworzy zmiany i jest to przyczyna, dla którego procesor zestawienia zmian ma gwarancję "co najmniej raz".

Uwaga

W jednym scenariuszu partia zmian nie jest ponawiana. Jeśli awaria wystąpi podczas pierwszego delegata wykonania, magazyn dzierżawy nie ma poprzedniego zapisanego stanu, który ma być używany podczas ponawiania próby. W takich przypadkach ponawianie próby używa początkowej konfiguracji początkowej, która może lub nie może zawierać ostatniej partii.

Aby zapobiec ciągłemu ponawianiu próby tej samej partii zmian przez procesor zestawienia zmian, należy dodać logikę w kodzie delegata, aby napisać dokumenty, z wyjątkiem, do kolejki komunikatów o błędzie. Ten projekt gwarantuje, że można śledzić nieprzetworzone zmiany, a jednocześnie nadal być w stanie przetworzyć przyszłe zmiany. Błąd kolejki komunikatów może być innym kontenerem usługi Azure Cosmos DB. Dokładny magazyn danych nie ma znaczenia. Po prostu chcesz, aby nieprzetworzone zmiany zostały utrwalone.

Możesz również użyć narzędzia do szacowania zestawienia zmian, aby monitorować postęp wystąpień procesora zestawienia zmian podczas odczytywania zestawienia zmian lub użyć powiadomień cyklu życia w celu wykrywania błędów bazowych.

Powiadomienia dotyczące cyklu życia

Procesor zestawienia zmian można połączyć z dowolnym odpowiednim zdarzeniem w swoim cyklu życia. Możesz otrzymywać powiadomienia do jednego lub wszystkich z nich. Zaleca się co najmniej zarejestrowanie powiadomienia o błędzie:

  • Zarejestruj procedurę obsługi WithLeaseAcquireNotification , aby otrzymywać powiadomienia, gdy bieżący host uzyskuje dzierżawę, aby rozpocząć jej przetwarzanie.
  • Zarejestruj procedurę obsługi, aby WithLeaseReleaseNotification otrzymywać powiadomienia, gdy bieżący host zwalnia dzierżawę i przestaje go przetwarzać.
  • Zarejestruj procedurę obsługi WithErrorNotification , aby otrzymywać powiadomienia, gdy bieżący host napotka wyjątek podczas przetwarzania. Musisz mieć możliwość odróżnienia, czy źródłem jest delegat użytkownika (nieobsługiwany wyjątek) lub błąd, który procesor napotka podczas próby uzyskania dostępu do monitorowanego kontenera (na przykład problemów z siecią).

Powiadomienia o cyklu życia są dostępne w obu trybach zestawienia zmian. Oto przykład powiadomień cyklu życia w trybie najnowszej wersji:

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Jednostka wdrożenia

Pojedyncza jednostka wdrażania procesora zestawienia zmian składa się z co najmniej jednego wystąpienia obliczeniowego, które mają tę samą wartość dla processorName i tej samej konfiguracji kontenera dzierżawy, ale różne nazwy wystąpień. Istnieje wiele jednostek wdrażania, w których każda jednostka ma inny przepływ biznesowy dla zmian, a każda jednostka wdrożenia składa się z co najmniej jednego wystąpienia.

Na przykład może istnieć jedna jednostka wdrożenia, która wyzwala zewnętrzny interfejs API za każdym razem, gdy nastąpi zmiana kontenera. Inna jednostka wdrożenia może przenosić dane w czasie rzeczywistym za każdym razem, gdy nastąpi zmiana. Gdy w monitorowanym kontenerze nastąpi zmiana, wszystkie jednostki wdrożenia zostaną powiadomione.

Dynamiczne skalowanie

Jak wspomniano wcześniej, w ramach lekcji wdrażania można mieć co najmniej jedno wystąpienie obliczeniowe. Aby skorzystać z dystrybucji obliczeniowej w ramach jednostki wdrażania, jedynymi kluczowymi wymaganiami są następujące:

  • Wszystkie wystąpienia powinny mieć taką samą konfigurację kontenera dzierżawy.
  • Wszystkie wystąpienia powinny mieć tę samą wartość dla elementu processorName.
  • Każde wystąpienie musi mieć inną nazwę (WithInstanceName).

Jeśli te trzy warunki mają zastosowanie, procesor zestawienia zmian dystrybuuje wszystkie dzierżawy, które znajdują się w kontenerze dzierżawy we wszystkich uruchomionych wystąpieniach tej jednostki wdrażania, i równoległie oblicza przy użyciu algorytmu równomiernego dystrybucji. Dzierżawa jest własnością jednego wystąpienia w dowolnym momencie, więc liczba wystąpień nie powinna być większa niż liczba dzierżaw.

Liczba wystąpień może rosnąć i zmniejszać. Procesor zestawienia zmian dynamicznie dostosowuje obciążenie, rozpowszechniając je odpowiednio.

Ponadto procesor zestawienia zmian może dynamicznie dostosowywać skalę kontenera w przypadku zwiększenia przepływności lub magazynu kontenera. Gdy kontener rośnie, procesor zestawienia zmian w sposób niewidoczny obsługuje scenariusz, dynamicznie zwiększając dzierżawy i dystrybuując nowe dzierżawy między istniejące wystąpienia.

Godzina rozpoczęcia

Domyślnie gdy procesor zestawienia zmian uruchamia się po raz pierwszy, inicjuje kontener dzierżawy i uruchamia cykl życia przetwarzania. Wszelkie zmiany, które wystąpiły w monitorowanym kontenerze przed zainicjowaniem procesora zestawienia zmian po raz pierwszy, nie są wykrywane.

Odczytywanie z poprzedniej daty i godziny

Istnieje możliwość zainicjowania procesora zestawienia zmian w celu odczytu zmian rozpoczynających się od określonej daty i godziny przez przekazanie wystąpienia do rozszerzenia konstruktoraWithStartTime:DateTime

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

Procesor zestawienia zmian jest inicjowany dla tej konkretnej daty i godziny i zaczyna odczytywać zmiany, które wystąpiły później.

Czytanie od początku

W innych scenariuszach, takich jak w przypadku migracji danych lub analizowania całej historii kontenera, musisz odczytać zestawienie zmian od początku okresu istnienia tego kontenera. Można użyć WithStartTime w rozszerzeniu konstruktora, ale przekazać DateTime.MinValue.ToUniversalTime()wartość , która generuje reprezentację UTC minimalnej DateTime wartości, jak w tym przykładzie:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Procesor zestawienia zmian jest inicjowany i rozpoczyna odczytywanie zmian od początku okresu istnienia kontenera.

Uwaga

Te opcje dostosowywania działają tylko w celu skonfigurowania punktu początkowego w czasie procesora zestawienia zmian. Po zainicjowaniu kontenera dzierżawy po raz pierwszy zmiana tych opcji nie ma wpływu.

Dostosowywanie punktu początkowego jest dostępne tylko dla najnowszego trybu zestawienia zmian wersji. W przypadku korzystania ze wszystkich wersji i usuwania trybu należy rozpocząć odczytywanie od momentu uruchomienia procesora lub wznowić z wcześniejszego stanu dzierżawy, który znajduje się w okresie ciągłego przechowywania kopii zapasowej konta.

Zestawienie zmian i aprowizowana przepływność

Zmiana operacji odczytu kanału informacyjnego w monitorowanym kontenerze zużywa jednostki żądań. Upewnij się, że monitorowany kontener nie ma ograniczenia przepustowości. Ograniczanie zwiększa opóźnienia w odbieraniu zdarzeń zestawienia zmian na procesorach.

Operacje na kontenerze dzierżawy (aktualizowanie i utrzymywanie stanu) zużywają jednostki żądań. Im większa liczba wystąpień korzystających z tego samego kontenera dzierżawy, tym większe jest potencjalne użycie jednostek żądań. Upewnij się, że kontener dzierżawy nie ma ograniczenia przepustowości. Ograniczanie powoduje dodanie opóźnień w odbieraniu zdarzeń zestawienia zmian. Ograniczanie przepustowości może nawet całkowicie zakończyć przetwarzanie.

Udostępnianie kontenera dzierżawy

Kontener dzierżawy można udostępnić w wielu jednostkach wdrażania. W kontenerze dzierżawy udostępnionej każda jednostka wdrożenia nasłuchuje innego monitorowanego kontenera lub ma inną wartość dla elementu processorName. W tej konfiguracji każda jednostka wdrożenia zachowuje stan niezależny w kontenerze dzierżawy. Przejrzyj użycie jednostek żądania w kontenerze dzierżawy, aby upewnić się, że aprowizowana przepływność jest wystarczająca dla wszystkich jednostek wdrożenia.

Zaawansowana konfiguracja dzierżawy

Trzy kluczowe konfiguracje mogą mieć wpływ na sposób działania procesora zestawienia zmian. Każda konfiguracja ma wpływ na użycie jednostek żądania w kontenerze dzierżawy. Podczas tworzenia procesora zestawienia zmian można ustawić jedną z tych konfiguracji, ale należy ich ostrożnie użyć:

  • Uzyskiwanie dzierżawy: domyślnie co 17 sekund. Host okresowo sprawdza stan magazynu dzierżaw i rozważa uzyskanie dzierżaw w ramach procesu dynamicznego skalowania . Ten proces odbywa się przez wykonanie zapytania w kontenerze dzierżawy. Zmniejszenie tej wartości sprawia, że ponowne równoważenie i uzyskiwanie dzierżaw jest szybsze, ale zwiększa użycie jednostek żądań w kontenerze dzierżawy.
  • Wygaśnięcie dzierżawy: domyślnie 60 sekund. Definiuje maksymalny czas, przez jaki dzierżawa może istnieć bez żadnych działań odnawiania przed uzyskaniem go przez innego hosta. W przypadku awarii hosta dzierżawy, które należą do niego, są pobierane przez innych hostów po tym okresie oraz skonfigurowany interwał odnawiania. Zmniejszenie tej wartości sprawia, że odzyskiwanie po awarii hosta jest szybsze, ale wartość wygaśnięcia nigdy nie powinna być niższa niż interwał odnawiania.
  • Odnawianie dzierżawy: domyślnie co 13 sekund. Host, który jest właścicielem dzierżawy okresowo odnawia dzierżawę, nawet jeśli nie ma nowych zmian w użyciu. Ten proces jest wykonywany przez wykonanie zamiany w dzierżawie. Zmniejszenie tej wartości zmniejsza czas wymagany do wykrywania dzierżaw utraconych przez awarię hosta, ale zwiększa zużycie jednostek żądania w kontenerze dzierżawy.

Gdzie hostować procesor zestawienia zmian

Procesor zestawienia zmian może być hostowany na dowolnej platformie, która obsługuje długotrwałe procesy lub zadania. Oto kilka przykładów:

Mimo że procesor zestawienia zmian może działać w środowiskach krótkotrwałych, ponieważ kontener dzierżawy utrzymuje stan, cykl uruchamiania tych środowisk zwiększa opóźnienia w czasie odbierania powiadomień (ze względu na obciążenie związane z uruchamianiem procesora za każdym razem, gdy środowisko jest uruchomione).

Wymagania dotyczące dostępu opartego na rolach

W przypadku korzystania z identyfikatora Entra firmy Microsoft jako mechanizmu uwierzytelniania upewnij się, że tożsamość ma odpowiednie uprawnienia:

  • W monitorowanym kontenerze:
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • W kontenerze dzierżawy:
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

Dodatkowe zasoby

Następne kroki

Dowiedz się więcej o procesorze zestawienia zmian w następujących artykułach: