Модель извлечения канала изменений в Azure Cosmos DB

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

Вы можете использовать модель извлечения канала изменений, чтобы использовать канал изменений Azure Cosmos DB в собственном темпе. Как и обработчик канала изменений, можно использовать модель извлечения канала изменений для параллелизации обработки изменений несколькими потребителями канала изменений.

Сравнение с обработчиком канала изменений

Во многих сценариях канал изменений можно обрабатывать с помощью обработчика канала изменений или модели извлечения канала изменений. Маркеры продолжения модели извлечения и контейнер аренды обработчика канала изменений работают как закладки для последнего обработанного элемента или пакета элементов в канале изменений.

Однако вы не можете преобразовать маркеры продолжения в аренду или наоборот.

Примечание

В большинстве случаев, когда требуется чтение из канала изменений, самым простым вариантом является использование обработчика канала изменений.

Мы рекомендуем применять модель извлечения в следующих ситуациях:

  • Чтение изменений из определенного ключа секции.
  • Для управления скоростью, с которой клиент получает изменения для обработки.
  • Для однократного чтения существующих данных в канале изменений (например, для переноса данных).

Ниже приведены некоторые основные различия между обработчиком канала изменений и моделью извлечения канала изменений.

Компонент Обработчик канала изменений Модель извлечения данных канала изменений
Отслеживание текущей точки при обработке канала изменений Аренда (хранится в контейнере Azure Cosmos DB) Маркер продолжения (сохраненный в памяти или вручную)
Возможность воспроизведения прошлых изменений Да, с моделью отправки Да, с моделью извлечения
Опрос предстоящих изменений Автоматическая проверка наличия изменений на основе указанного WithPollInterval пользователем значения Вручную
Поведение при отсутствии новых изменений Автоматическое ожидание значения и WithPollInterval повторная проверка Необходимо проверить состояние и вручную выполнить повторную проверку
Обработка изменений из всего контейнера Да, и автоматически распараллеливается в нескольких потоках и компьютерах, которые потребляют из одного контейнера. Да, и параллелизуются вручную с помощью FeedRange
Обработка изменений только из одного ключа секции Не поддерживается Да

Примечание

При использовании модели извлечения, в отличие от чтения с помощью обработчика канала изменений, необходимо явно обрабатывать случаи, когда новые изменения отсутствуют.

Работа с моделью извлечения

Чтобы обработать канал изменений с помощью модели извлечения, создайте экземпляр FeedIterator. При первоначальном создании FeedIteratorнеобходимо указать обязательное ChangeFeedStartFrom значение, которое состоит из начальной позиции для чтения изменений и значения, которое вы хотите использовать для FeedRange. FeedRange— это диапазон значений ключа секции и указывает элементы, которые можно считывать из канала изменений с помощью этого конкретного FeedIterator. Необходимо также указать обязательное ChangeFeedMode значение для режима обработки изменений: последняя версия или все версии и удаления. Используйте или ChangeFeedMode.LatestVersionChangeFeedMode.AllVersionsAndDeletes , чтобы указать, какой режим вы хотите использовать для чтения канала изменений. При использовании всех версий и режима удаления необходимо выбрать канал изменений, начинающийся со значения или из определенного Now() маркера продолжения.

При необходимости можно указать ChangeFeedRequestOptions для задания PageSizeHint. Если это свойство задано, оно устанавливает максимальное число полученных элементов на страницу. Если операции в отслеживаемой коллекции выполняются с помощью хранимых процедур, область транзакций сохраняются при чтении элементов из канала изменений. В результате количество полученных элементов может быть больше указанного значения, поэтому элементы, измененные той же транзакцией, возвращаются в составе одного атомарного пакета.

Ниже приведен пример получения FeedIterator в режиме последней версии, который возвращает объекты сущности, в данном случае объект User :

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Совет

До версии 3.34.0можно использовать режим последней версии, задав параметр ChangeFeedMode.Incremental. И Incremental , и LatestVersion ссылаются на режим последней версии канала изменений, и приложения, использующие любой из этих режимов, будут видеть одинаковое поведение.

Все версии и режим удаления находится в предварительной версии и может использоваться с предварительными версиями >пакета SDK для .NET = 3.32.0-preview. Ниже приведен пример получения FeedIterator во всех версиях и режиме удаления, который возвращает динамические объекты:

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Примечание

В режиме последней версии вы получаете объекты, представляющие измененный элемент, с некоторыми дополнительными метаданными. Все версии и режим удаления возвращает другую модель данных. Дополнительные сведения см. в разделе Анализ объекта ответа.

Использование канала изменений через потоки

FeedIterator Для обоих режимов канала изменений есть два варианта. В дополнение к примерам, возвращающим объекты сущностей, можно также получить ответ с Stream поддержкой. Потоки позволяют считывать данные без предварительной десериализации, что позволяет сэкономить на клиентских ресурсах.

Ниже приведен пример получения FeedIterator в режиме последней версии, который возвращает Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Использование изменений для всего контейнера

Если не указать FeedRange параметр в FeedIterator, вы можете обработать весь канал изменений контейнера в собственном темпе. Ниже приведен пример, который начинает считывать все изменения, начиная с текущего времени в режиме последней версии:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Так как канал изменений фактически представляет собой бесконечный список элементов, охватывающих все будущие операции записи и обновления, значение HasMoreResults всегда trueравно . При попытке прочитать веб-канал изменений, когда новые изменения отсутствуют, вы получаете ответ с NotModified состоянием. В предыдущем примере это обрабатывается путем ожидания пяти секунд перед повторной проверки на наличие изменений.

Использование изменений для ключа секции

В некоторых случаях может потребоваться обработать только изменения для определенного ключа секции. Вы можете получить FeedIterator для определенного ключа секции и обработать изменения так же, как и для всего контейнера.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Использование FeedRange для параллелизации

Обработчик канала изменений автоматически распределяет нагрузку между несколькими потребителями. В модели извлечения веб-канала изменений можно применить FeedRange для параллелизации обработки веб-канала изменений. FeedRange представляет определенный диапазон значений ключа секции.

Ниже приведен пример, показывающий, как получить список диапазонов для контейнера:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

При получении списка значений FeedRange для контейнера вы получаете по одному FeedRange для каждой физической секции.

С помощью FeedRangeможно создать FeedIterator для параллелизации обработки канала изменений на нескольких компьютерах или в потоках. В отличие от предыдущего примера, в котором показано, как получить FeedIterator для всего контейнера или одного ключа раздела, вы можете с помощью FeedRanges получить несколько FeedIterator, способных обрабатывать канал изменений параллельно.

Если вы хотите использовать FeedRange, создайте процесс оркестрации, который получает экземпляры FeedRange и распространяет их по компьютерам. Это распределение может быть следующим:

  • Используйте FeedRange.ToJsonString и передайте это строковое значение. Потребители могут использовать это значение с FeedRange.FromJsonString.
  • Если распределение выполняется в том же процессе, передавайте ссылку на объект FeedRange.

Ниже приведен пример, показывающий, как считывать данные с начала канала изменений контейнера с помощью двух гипотетических отдельных компьютеров, которые считывают параллельно:

Компьютер 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Компьютер 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Сохранение маркеров продолжения

Вы можете сохранить текущее расположение FeedIterator, создав маркер продолжения. Маркер продолжения — это строковое значение, которое отслеживает последние изменения, обработанные в FeedIterator, и позволяет установить FeedIterator, чтобы позднее продолжить работу с этого места. Маркер продолжения, если он указан, имеет приоритет над временем начала и начинается с начальных значений. Следующий код считывает канал изменений с момента создания контейнера. Когда не останется доступных изменений, он сохраняет маркер продолжения, чтобы позднее возобновить обработку этого веб-канала изменений.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

При использовании режима последней версии срок действия маркера продолжения никогда не истечет до тех пор, FeedIterator пока контейнер Azure Cosmos DB все еще существует. При использовании всех версий и режима удаления маркер продолжения действителен при условии, FeedIterator что изменения произошли в течение периода хранения для непрерывного резервного копирования.

Дальнейшие действия