Eksplorowanie zestawienia zmian w usłudze Azure Cosmos DB

Ukończone

Zestawienie zmian w usłudze Azure Cosmos DB to trwały rekord zmian w kontenerze w kolejności ich wystąpienia. Obsługa zestawienia zmian w usłudze Azure Cosmos DB działa przez nasłuchiwanie zmian w kontenerze usługi Azure Cosmos DB. Następnie tworzone są dane wyjściowe w postaci posortowanej listy zmienionych dokumentów w kolejności, w której zostały zmodyfikowane. Utrwalone zmiany mogą być przetwarzane asynchronicznie i przyrostowo, a dane wyjściowe mogą być dystrybuowane między co najmniej jednym odbiorcą przetwarzania równoległego.

Zestawienie zmian i różne operacje

Obecnie wszystkie wstawki i aktualizacje są widoczne w kanale informacyjnym zmian. Nie można filtrować zestawienia zmian dla określonego typu operacji. Obecnie kanał informacyjny zmian nie rejestruje operacji usuwania. Aby obejść ten problem, można dodać znacznik nietrwały do elementów, które są usuwane. Na przykład możesz dodać atrybut w elemencie o nazwie "deleted", ustawić jego wartość na "true", a następnie ustawić wartość czasu wygaśnięcia (TTL) dla elementu. Ustawienie czasu wygaśnięcia gwarantuje, że element zostanie automatycznie usunięty.

Odczytywanie zestawienia zmian w usłudze Azure Cosmos DB

Możesz pracować z zestawieniem zmian usługi Azure Cosmos DB przy użyciu modelu wypychania lub modelu ściągania. W przypadku modelu wypychania procesor zestawienia zmian wypycha działanie do klienta, który ma logikę biznesową do przetwarzania tej pracy. Jednak złożoność sprawdzania pracy i przechowywania stanu ostatniej przetworzonej pracy jest obsługiwana w procesorze zestawienia zmian.

W przypadku modelu ściągania klient musi ściągnąć pracę z serwera. W tym przypadku klient ma nie tylko logikę biznesową do przetwarzania pracy, ale także przechowywanie stanu ostatniej przetworzonej pracy, obsługę równoważenia obciążenia w wielu klientach przetwarzania pracy równoległej i obsługę błędów.

Uwaga

Zaleca się użycie modelu wypychania, ponieważ nie trzeba martwić się o sondowanie zestawienia zmian pod kątem przyszłych zmian, przechowywanie stanu dla ostatniej przetworzonej zmiany i innych korzyści.

Większość scenariuszy korzystających ze zestawienia zmian usługi Azure Cosmos DB używa jednej z opcji modelu wypychania. Istnieją jednak pewne scenariusze, w których można chcieć uzyskać dodatkową kontrolę niskiego poziomu nad modelem ściągania. Są to:

  • Odczytywanie zmian z określonego klucza partycji
  • Kontrolowanie tempa, w którym klient otrzymuje zmiany do przetwarzania
  • Jednorazowe odczytywanie istniejących danych w kanale zmian (na przykład w celu przeprowadzenia migracji danych)

Odczytywanie zestawienia zmian za pomocą modelu wypychania

Istnieją dwa sposoby odczytywania ze zestawienia zmian za pomocą modelu wypychania: wyzwalacze usługi Azure Functions w usłudze Azure Cosmos DB i biblioteka procesora zestawienia zmian. Usługa Azure Functions używa procesora zestawienia zmian w tle, więc są to oba podobne sposoby odczytywania zestawienia zmian. Usługa Azure Functions jest po prostu platformą hostingu dla procesora zestawienia zmian, a nie zupełnie innym sposobem odczytywania zestawienia zmian. Usługa Azure Functions używa procesora zestawienia zmian w tle, a automatycznie przetwarza zmiany w partycjach kontenera.

Azure Functions

Możesz utworzyć małe reaktywne funkcje platformy Azure, które zostaną automatycznie wyzwolone na każdym nowym zdarzeniu w kanale zmian kontenera usługi Azure Cosmos DB. Za pomocą wyzwalacza usługi Azure Functions dla usługi Azure Cosmos DB można użyć funkcji skalowania i niezawodnego wykrywania zdarzeń procesora zestawienia zmian bez konieczności obsługi infrastruktury procesu roboczego.

Diagram showing the change feed triggering Azure Functions for processing.

Procesor zestawienia zmian

Procesor zestawienia zmian jest częścią zestawów SDK platformy .NET usługi Azure Cosmos DB w wersji 3 i języka Java w wersji 4 . Upraszcza to proces odczytywania zestawienia zmian i efektywnego dystrybuowania przetwarzania zdarzeń między wieloma użytkownikami.

Są cztery główne składniki implementacji procesora zestawienia zmian:

  1. Monitorowany kontener: monitorowany kontener zawiera dane, z których jest generowany kanał zmian. Wszystkie wstawienia i aktualizacje monitorowanego kontenera są odzwierciedlone w jego zestawieniu zmian.

  2. Kontener dzierżawy: kontener dzierżawy działa jako magazyn stanu i koordynuje przetwarzanie zestawienia zmian między wieloma procesami roboczymi. Kontener dzierżawy może być przechowywany na tym samym koncie co monitorowany kontener lub na osobnym koncie.

  3. Wystąpienie obliczeniowe: wystąpienie obliczeniowe hostuje procesor zestawienia zmian w celu nasłuchiwania zmian. W zależności od platformy może ona być reprezentowana przez maszynę wirtualną, zasobnik kubernetes, wystąpienie usługi aplikacja systemu Azure Service, rzeczywistą maszynę fizyczną. Ma on unikatowy identyfikator, do których odwołuje się nazwa wystąpienia w tym artykule.

  4. Delegat: Delegat to kod, który definiuje, co ty, deweloper, chcesz zrobić z każdą partią zmian odczytanych przez procesor zestawienia zmian.

Podczas implementowania procesora zestawienia zmian punkt wejścia jest zawsze monitorowany kontener, z Container wystąpienia, które wywołujesz GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Gdzie pierwszy parametr jest odrębną nazwą opisającą cel tego procesora, a drugą nazwą jest implementacja delegata, która będzie obsługiwać zmiany. Oto przykład delegata:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Następnie zdefiniujesz nazwę wystąpienia obliczeniowego lub unikatowy identyfikator WithInstanceNameza pomocą polecenia . Powinno to być unikatowe i inne w każdym wdrażanych wystąpieniach obliczeniowych, a na koniec jest to kontener do obsługi stanu dzierżawy za pomocą WithLeaseContainerpolecenia .

Wywołanie Build daje wystąpienie procesora, które można rozpocząć, wywołując polecenie StartAsync.

Normalny cykl życiowy wystąpienia hosta wygląda następująco:

  1. Odczyt zestawienia zmian.
  2. Jeśli nie ma żadnych zmian, uśpij wstępnie zdefiniowany czas (dostosowywalny za pomocą WithPollInterval polecenia w pliku Builder) i przejdź do pliku #1.
  3. Jeśli istnieją zmiany, wysłanie ich do obiektu delegowanego.
  4. Po pomyślnym zakończeniu przetwarzania zmian przez obiekt delegowany następuje aktualizacja magazynu dzierżawy informacjami o ostatnim przetworzonym punkcie w czasie i przejście do punktu 1.