Поделиться через


Модель извлечения канала изменений в Azure Cosmos DB

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

Вы можете использовать модель извлечения канала изменений для использования канала изменений Azure Cosmos DB в собственном темпе. Аналогично обработчику канала изменений, вы можете использовать модель извлечения канала изменений для параллелизации обработки изменений для нескольких потребителей канала изменений.

Сравнение с обработчиком канала изменений

Многие сценарии могут обрабатывать канал изменений с помощью обработчика канала изменений или модели извлечения канала изменений. Маркеры продолжения модели извлечения и контейнер аренды обработчика канала изменений работают как закладки для последнего обработанного элемента или пакета элементов в канале изменений.

Однако невозможно преобразовать маркеры продолжения в аренду или наоборот.

Примечание.

В большинстве случаев, когда необходимо прочитать из канала изменений, самый простой вариант — использовать обработчик канала изменений.

Мы рекомендуем применять модель извлечения в следующих ситуациях:

  • Чтение изменений из определенного ключа секции.
  • Чтобы контролировать темпы, с которыми клиент получает изменения для обработки.
  • Чтобы выполнить однократное чтение существующих данных в канале изменений (например, для переноса данных).

Ниже приведены некоторые основные различия между обработчиком канала изменений и моделью извлечения канала изменений:

Функция Обработчик канала изменений Модель извлечения данных канала изменений
Отслеживание текущей точки обработки канала изменений Аренда (хранится в контейнере Azure Cosmos DB) Маркер продолжения (сохраненный в памяти или вручную)
Возможность воспроизведения прошлых изменений Да, с моделью отправки Да, с моделью извлечения
Опрос предстоящих изменений Автоматически проверяет наличие изменений на основе указанного WithPollInterval пользователем значения. Руководство
Поведение при отсутствии новых изменений Автоматическое ожидание значения WithPollInterval и повторная проверка Необходимо проверить состояние и вручную выполнить повторную проверку
Обработка изменений из всего контейнера Да, и автоматически параллелизируется между несколькими потоками и компьютерами, которые используются из одного контейнера. Да, а также параллелизация вручную с помощью FeedRange
Обработка изменений только из одного ключа секции Не поддерживается Да

Примечание.

При использовании модели извлечения, в отличие от чтения с помощью обработчика канала изменений, необходимо явно обрабатывать случаи, когда новые изменения отсутствуют.

Работа с моделью извлечения

Чтобы обработать канал изменений с помощью модели извлечения, создайте экземпляр FeedIterator. При первоначальном создании FeedIteratorнеобходимо указать необходимое ChangeFeedStartFrom значение, которое состоит как из начальной позиции для чтения изменений, так и значения, для FeedRangeкоторого требуется использовать. Это FeedRange диапазон значений ключа секции и указывает элементы, которые можно считывать из веб-канала изменений с помощью этого конкретного FeedIteratorпараметра. Необходимо также указать необходимое ChangeFeedMode значение для режима, в котором требуется обработать изменения: последняя версия или все версии и удаления. Используйте или ChangeFeedMode.LatestVersionChangeFeedMode.AllVersionsAndDeletes укажите, какой режим вы хотите использовать для чтения канала изменений. При использовании всех версий и удалений режима необходимо выбрать канал изменений, начинающийся с значения или из определенного Now() маркера продолжения.

При необходимости можно указать ChangeFeedRequestOptions для задания PageSizeHint. Если это свойство задано, оно устанавливает максимальное число полученных элементов на страницу. Если операции в отслеживаемой коллекции выполняются с помощью хранимых процедур, область транзакции сохраняется при считывании элементов из канала изменений. В результате число полученных элементов может быть выше указанного значения, чтобы элементы, измененные той же транзакцией, возвращались как часть одного атомарного пакета.

Ниже приведен пример получения FeedIterator в режиме последней версии, который возвращает объекты сущностей, в этом случае User объект:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Совет

До версии 3.34.0режим последней версии можно использовать с помощью параметра ChangeFeedMode.Incremental. LatestVersion Оба Incremental и ссылаются на последний режим версии канала изменений и приложений, использующих любой режим, будут видеть то же поведение.

Все версии и режим удаления доступны в предварительной версии и можно использовать с предварительными версиями >пакета SDK для .NET = 3.32.0-preview. Ниже приведен пример получения FeedIterator во всех версиях и удалении режима, возвращающего динамические объекты:

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Примечание.

В последнем режиме версии вы получаете объекты, представляющие измененный элемент с некоторыми дополнительными метаданными. Все версии и режим удаления возвращают другую модель данных. Дополнительные сведения см. в разделе "Анализ объекта ответа".

Использование канала изменений через потоки

FeedIterator для обоих режимов канала изменений есть два варианта. Помимо примеров, возвращающих объекты сущностей, можно также получить ответ с Stream поддержкой. Потоки позволяют считывать данные без первой десериализации, поэтому вы экономите ресурсы клиента.

Ниже приведен пример получения FeedIterator в режиме последней версии, который возвращает Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Использование изменений для всего контейнера

Если параметр не указан FeedRangeFeedIterator, можно обрабатывать весь канал изменений контейнера в собственном темпе. Ниже приведен пример, который начинает считывать все изменения, начиная с текущего времени с помощью последней версии:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Поскольку веб-канал изменений фактически является бесконечным списком элементов, охватывающих все будущие записи и обновления, всегда имеет значение HasMoreResultstrue. При попытке прочитать веб-канал изменений и нет новых изменений, вы получите ответ с NotModified состоянием. В предыдущем примере она обрабатывается ожиданием пяти секунд до повторной проверки изменений.

Использование изменений для ключа секции

В некоторых случаях может потребоваться обработать только изменения для определенного ключа секции. Для определенного ключа секции можно получить FeedIterator и обработать изменения так же, как и для всего контейнера.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Использование FeedRange для параллелизации

Обработчик канала изменений автоматически распределяет нагрузку между несколькими потребителями. В модели извлечения веб-канала изменений можно применить FeedRange для параллелизации обработки веб-канала изменений. FeedRange представляет определенный диапазон значений ключа секции.

Ниже приведен пример, в который показано, как получить список диапазонов для контейнера:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

При получении списка значений FeedRange для контейнера вы получите по одному FeedRange на физическую секцию.

С помощью средства FeedRangeможно создать FeedIterator параллелизацию обработки канала изменений на нескольких компьютерах или потоках. В отличие от предыдущего примера, в котором показано, как получить FeedIterator для всего контейнера или одного ключа раздела, вы можете с помощью FeedRanges получить несколько FeedIterator, способных обрабатывать канал изменений параллельно.

Если вы хотите использовать FeedRange, создайте процесс оркестрации, который получает экземпляры FeedRange и распространяет их по компьютерам. Это распределение может быть следующим:

  • Используйте FeedRange.ToJsonString и передайте это строковое значение. Потребители могут использовать это значение с FeedRange.FromJsonString.
  • Если распределение выполняется в том же процессе, передавайте ссылку на объект FeedRange.

Ниже приведен пример, показывающий, как считывать с начала канала изменений контейнера с помощью двух гипотетических отдельных компьютеров, которые считываются параллельно:

компьютер 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

компьютер 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Сохранение маркеров продолжения

Вы можете сохранить текущее расположение FeedIterator, создав маркер продолжения. Маркер продолжения — это строковое значение, которое отслеживает последние изменения, обработанные в FeedIterator, и позволяет установить FeedIterator, чтобы позднее продолжить работу с этого места. Маркер продолжения, если он указан, имеет приоритет над временем начала и начинается с начальных значений. Следующий код считывает канал изменений с момента создания контейнера. Когда не останется доступных изменений, он сохраняет маркер продолжения, чтобы позднее возобновить обработку этого веб-канала изменений.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

При использовании последней версии маркер продолжения никогда не истекает до тех пор, FeedIterator пока контейнер Azure Cosmos DB по-прежнему существует. Если вы используете все версии и удаляете режим, маркер продолжения действителен до тех пор, FeedIterator пока изменения произошли в окне хранения для непрерывных резервных копий.

Следующие шаги