Udostępnij za pośrednictwem


Model ściągania zestawienia zmian w usłudze Azure Cosmos DB

DOTYCZY: NoSQL

Możesz użyć modelu ściągania zestawienia zmian, aby korzystać z zestawienia zmian usługi Azure Cosmos DB we własnym tempie. Podobnie jak w przypadku procesora zestawienia zmian, można użyć modelu ściągania zestawienia zmian, aby zrównoleglić przetwarzanie zmian w wielu odbiorcach zestawienia zmian.

Porównanie z procesorem zestawienia zmian

Wiele scenariuszy może przetwarzać zestawienie zmian przy użyciu procesora zestawienia zmian lub modelu ściągania zestawienia zmian. Tokeny kontynuacji modelu ściągania i kontener dzierżawy procesora zestawienia zmian działają jako zakładki dla ostatniego przetworzonego elementu lub partii elementów w zestawieniach zmian.

Nie można jednak przekonwertować tokenów kontynuacji na dzierżawę lub odwrotnie.

Uwaga

W większości przypadków, gdy trzeba odczytać ze zestawienia zmian, najprostszą opcją jest użycie procesora zestawienia zmian.

Należy rozważyć użycie modelu ściągania w następujących scenariuszach:

  • Aby odczytać zmiany z określonego klucza partycji.
  • Aby kontrolować tempo, w którym klient otrzymuje zmiany do przetwarzania.
  • Aby wykonać jednorazowy odczyt istniejących danych w kanale informacyjnym zmian (na przykład w celu przeprowadzenia migracji danych).

Poniżej przedstawiono kilka kluczowych różnic między procesorem zestawienia zmian a modelem ściągania zestawienia zmian:

Funkcja Procesor zestawienia zmian Model ściągania zestawienia zmian
Śledzenie bieżącego punktu przetwarzania zestawienia zmian Dzierżawa (przechowywana w kontenerze usługi Azure Cosmos DB) Token kontynuacji (przechowywany w pamięci lub ręcznie utrwalone)
Możliwość odtwarzania poprzednich zmian Tak, z modelem wypychania Tak, z modelem ściągania
Sondowanie pod kątem przyszłych zmian Automatycznie sprawdza zmiany na podstawie wartości określonej przez WithPollInterval użytkownika Ręcznie
Zachowanie w przypadku braku nowych zmian Automatycznie zaczekaj na wartość WithPollInterval , a następnie ponownie sprawdź Należy sprawdzić stan i ręcznie ponownie sprawdzić
Przetwarzanie zmian z całego kontenera Tak, i automatycznie zrównoleglizowane między wieloma wątkami i maszynami korzystającymi z tego samego kontenera Tak, i ręcznie zrównane przy użyciu polecenia FeedRange
Przetwarzanie zmian tylko z jednego klucza partycji Nieobsługiwane Tak

Uwaga

W przypadku korzystania z modelu ściągania, w przeciwieństwie do odczytywania przy użyciu procesora zestawienia zmian, należy jawnie obsługiwać przypadki, w których nie ma nowych zmian.

Praca z modelem ściągania

Aby przetworzyć zestawienie zmian przy użyciu modelu ściągania, utwórz wystąpienie klasy FeedIterator. Podczas początkowego tworzenia FeedIteratornależy określić wymaganą ChangeFeedStartFrom wartość, która składa się zarówno z pozycji początkowej do odczytywania zmian, jak i wartości, której chcesz użyć dla FeedRangeelementu . Jest FeedRange to zakres wartości klucza partycji i określa elementy, które mogą być odczytywane z zestawienia zmian przy użyciu tego konkretnego FeedIteratorelementu . Należy również określić wymaganą ChangeFeedMode wartość dla trybu, w którym chcesz przetworzyć zmiany: najnowsza wersja lub wszystkie wersje i usunięcia. Użyj opcji ChangeFeedMode.LatestVersion lub ChangeFeedMode.AllVersionsAndDeletes , aby wskazać tryb, którego chcesz użyć do odczytania zestawienia zmian. W przypadku używania wszystkich wersji i trybu usuwania należy wybrać źródło zmian rozpoczynające się od wartości Now() lub z określonego tokenu kontynuacji.

Opcjonalnie można określić ChangeFeedRequestOptions , aby ustawić wartość PageSizeHint. Po ustawieniu ta właściwość ustawia maksymalną liczbę odebranych elementów na stronę. Jeśli operacje w monitorowanej kolekcji są wykonywane za pomocą procedur składowanych, zakres transakcji jest zachowywany podczas odczytywania elementów ze źródła zmian. W związku z tym liczba odebranych elementów może być większa niż określona wartość, aby elementy zmienione przez tę samą transakcję zostały zwrócone w ramach jednej partii atomowej.

Oto przykład sposobu uzyskiwania FeedIterator w trybie najnowszej wersji, który zwraca obiekty jednostki, w tym przypadku User obiekt:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Napiwek

Przed wersją 3.34.0można użyć najnowszego trybu wersji przez ustawienie .ChangeFeedMode.Incremental Zarówno Incremental , jak i LatestVersion odwołują się do najnowszego trybu wersji zestawienia zmian i aplikacji korzystających z dowolnego trybu, będą widzieć to samo zachowanie.

Wszystkie wersje i tryb usuwania są dostępne w wersji zapoznawczej i mogą być używane z wersjami >zestawu .NET SDK w wersji zapoznawczej = 3.32.0-preview. Oto przykład uzyskiwania FeedIterator we wszystkich wersjach i usuwania trybu, który zwraca obiekty dynamiczne:

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Uwaga

W trybie najnowszej wersji są odbierane obiekty reprezentujące zmieniony element z dodatkowymi metadanymi. Wszystkie wersje i tryb usuwania zwraca inny model danych. Aby uzyskać więcej informacji, zobacz Analizowanie obiektu odpowiedzi.

Korzystanie ze zestawienia zmian za pośrednictwem strumieni

FeedIterator Dla obu trybów zestawienia zmian dostępne są dwie opcje. Oprócz przykładów, które zwracają obiekty jednostki, można również uzyskać odpowiedź z Stream pomocą techniczną. Strumienie umożliwiają odczytywanie danych bez wcześniejszego deserializacji, dzięki czemu można zaoszczędzić na zasobach klienta.

Oto przykład sposobu uzyskiwania FeedIterator w trybie najnowszej wersji, który zwraca wartość Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Korzystanie ze zmian dla całego kontenera

Jeśli nie podasz parametru FeedRange do FeedIterator, możesz przetworzyć źródło zmian całego kontenera we własnym tempie. Oto przykład, który rozpoczyna odczytywanie wszystkich zmian, począwszy od bieżącej godziny przy użyciu najnowszego trybu wersji:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Ponieważ kanał informacyjny zmian jest w rzeczywistości nieskończoną listą elementów, które obejmują wszystkie przyszłe zapisy i aktualizacje, wartość HasMoreResults to zawsze true. Podczas próby odczytania zestawienia zmian i braku dostępnych nowych zmian otrzymasz odpowiedź ze stanem NotModified . W poprzednim przykładzie jest obsługiwana przez odczekanie pięciu sekund przed ponownym sprawdzeniem zmian.

Korzystanie ze zmian klucza partycji

W niektórych przypadkach można przetworzyć tylko zmiany dla określonego klucza partycji. Można uzyskać FeedIterator dla określonego klucza partycji i przetworzyć zmiany w taki sam sposób, jak w przypadku całego kontenera.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Używanie elementu FeedRange do przetwarzania równoległego

W procesorze zestawienia zmian praca jest automatycznie rozłożona na wielu odbiorców. W modelu ściągania zestawienia zmian można użyć FeedRange elementu , aby zrównać przetwarzanie zestawienia zmian. Obiekt FeedRange reprezentuje zakres wartości klucza partycji.

Oto przykład pokazujący, jak uzyskać listę zakresów dla kontenera:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

Po otrzymaniu FeedRange listy wartości dla kontenera uzyskasz jedną FeedRange partycję fizyczną.

Za pomocą elementu FeedRangemożna utworzyć element , FeedIterator aby zrównoleglić przetwarzanie zestawienia zmian na wielu maszynach lub wątkach. W przeciwieństwie do poprzedniego przykładu, w którym pokazano, jak uzyskać element FeedIterator dla całego kontenera lub pojedynczego klucza partycji, można użyć elementu FeedRanges, aby uzyskać wiele elementów FeedIterators, które mogą przetwarzać źródło zmian równolegle.

W przypadku, gdy chcesz użyć biblioteki FeedRanges, musisz mieć proces orkiestratora, który uzyskuje elementy FeedRanges i dystrybuuje je do tych maszyn. Ta dystrybucja może być następująca:

  • Używanie FeedRange.ToJsonString i rozpowszechnianie tej wartości ciągu. Użytkownicy mogą używać tej wartości z wartością FeedRange.FromJsonString.
  • Jeśli dystrybucja jest w trakcie przetwarzania FeedRange , przekazanie odwołania do obiektu.

Oto przykład pokazujący, jak odczytywać od początku zestawienia zmian kontenera przy użyciu dwóch hipotetycznych oddzielnych maszyn, które odczytują równolegle:

Maszyna 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Maszyna 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Zapisywanie tokenów kontynuacji

Możesz zapisać pozycję, FeedIterator uzyskując token kontynuacji. Token kontynuacji to wartość ciągu, która śledzi ostatnie przetworzone zmiany usługi FeedIterator i umożliwia FeedIterator wznowienie w tym momencie później. Token kontynuacji, jeśli zostanie określony, ma pierwszeństwo przed godziną rozpoczęcia i zaczyna się od wartości początkowych. Poniższy kod odczytuje zestawienie zmian od momentu utworzenia kontenera. Po dokonaniu kolejnych zmian będzie ona utrwalać token kontynuacji, aby można było później wznowić użycie zestawienia zmian.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

W przypadku korzystania z trybu najnowszej wersji FeedIterator token kontynuacji nigdy nie wygasa tak długo, jak kontener usługi Azure Cosmos DB nadal istnieje. Jeśli używasz wszystkich wersji i trybu usuwania, FeedIterator token kontynuacji jest prawidłowy, o ile zmiany wystąpiły w oknie przechowywania dla ciągłych kopii zapasowych.

Następne kroki