Поделиться через


Модель извлечения изменений в Azure Cosmos DB

ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL

Вы можете использовать модель потребления канала изменений, чтобы обрабатывать изменения в Azure Cosmos DB в удобном для вас темпе. Аналогично обработчику потока изменений, вы можете использовать модель извлечения потока изменений для параллелизации обработки изменений для нескольких потребителей потока изменений.

Сравнение с обработчиком канала изменений

Многие сценарии могут обрабатывать канал изменений с помощью обработчика канала изменений или модели извлечения канала изменений. Маркеры продолжения модели запроса и хранилище аренды процессора потока изменений работают как закладки для последнего обработанного элемента или пакета элементов в потоке изменений.

Однако невозможно преобразовать токены продолжения в аренду и наоборот.

Примечание.

В большинстве случаев, когда необходимо читать из канала изменений, самый простой вариант — использовать обработчик канала изменений.

Вы можете рассмотреть возможность использования pull-модели в следующих ситуациях:

  • Чтение изменений из определенного ключа раздела.
  • Чтобы контролировать темпы, с которыми клиент получает изменения для обработки.
  • Чтобы выполнить однократное чтение существующих данных в ленте изменений (например, для переноса данных).

Ниже приведены некоторые основные различия между обработчиком канала изменений и моделью извлечения канала изменений:

Функция Обработчик потока изменений Модель выборки данных канала изменений
Отслеживание текущей точки обработки канала изменений Аренда (хранится в контейнере Azure Cosmos DB) Маркер продолжения (сохраненный в памяти или вручную сохраненный)
Возможность воспроизведения прошлых изменений Да, с моделью отправки Да, с моделью извлечения
Опрос предстоящих изменений Автоматически проверяет наличие изменений на основе указанного WithPollInterval пользователем значения. Руководство
Поведение при отсутствии новых изменений Автоматическое ожидание значения WithPollInterval и повторная проверка Необходимо проверить состояние и вручную выполнить повторную проверку
Обработка изменений в пределах всего контейнера Да, и автоматически распараллеливается между несколькими потоками и машинами, которые потребляют данные из одного контейнера. Да, параллелизация вручную с помощью FeedRange
Обработка изменений только для одного ключа раздела Не поддерживается Да

Примечание.

При использовании модели извлечения, в отличие от чтения с помощью обработчика канала обновлений, необходимо явно обрабатывать случаи, когда новые изменения отсутствуют.

Работа с моделью вытягивания

Чтобы обработать канал изменений с помощью модели извлечения, создайте экземпляр FeedIterator. При первоначальном создании FeedIterator необходимо задать обязательное значение для ChangeFeedStartFrom, которое включает начальную позицию для чтения изменений и значение, которое вы хотите использовать для FeedRange. Это FeedRange диапазон значений ключа раздела и указывает элементы, которые можно считывать из канала изменений с помощью этого конкретного FeedIterator параметра. Необходимо также указать необходимое ChangeFeedMode значение для режима, в котором требуется обработать изменения: последняя версия или все версии и удаления. Используйте либо ChangeFeedMode.LatestVersion либо ChangeFeedMode.AllVersionsAndDeletes, чтобы указать, какой режим вы хотите использовать для чтения канала изменений. При использовании режима всех версий и удалений необходимо выбрать канал изменений, начинающийся с значения Now() или из определенного маркера продолжения.

При необходимости можно указать ChangeFeedRequestOptions для задания PageSizeHint. Если это свойство задано, оно устанавливает максимальное число полученных элементов на страницу. Если операции в отслеживаемой коллекции выполняются с помощью хранимых процедур, область транзакции сохраняется при считывании элементов из канала изменений. В результате число полученных элементов может быть выше указанного значения, чтобы элементы, измененные той же транзакцией, возвращались как часть одного атомарного пакета.

Пример, как получить FeedIterator в последней версии режима, возвращающей объекты сущностей, например объект User.

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Совет

До версии 3.34.0режим последней версии можно использовать с помощью параметра ChangeFeedMode.Incremental. Как Incremental, так и LatestVersion ссылаются на режим последней версии канала изменений, и приложения, использующие любой из этих режимов, будут наблюдать одно и то же поведение.

Все версии и режим удаления доступны в предварительной версии и могут быть использованы с предварительными версиями >.NET SDK = 3.32.0-preview. Ниже приведен пример получения FeedIterator во всех версиях и удалении режимов, возвращающих User объекты:

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Примечание.

В последнем режиме версии вы получаете объекты, представляющие измененный элемент с некоторыми дополнительными метаданными. Все версии и режим удаления возвращают другую модель данных. Дополнительные сведения см. в разделе "Анализ объекта ответа".

Полный пример можно получить для режима последней версии или режима всех версий и удаления.

Использование канала изменений через потоки

FeedIterator для обоих режимов канала изменений предусмотрены два варианта. Помимо примеров, возвращающих объекты сущностей, можно также получить ответ с Stream поддержкой. Потоки позволяют считывать данные без первой десериализации, поэтому вы экономите ресурсы клиента.

Ниже приведен пример получения FeedIterator в режиме последней версии, который возвращает Stream:

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

Использование изменений для всего контейнера

Если вы не предоставите параметр FeedRange для FeedIterator, можно будет обрабатывать ленту изменений всего контейнера в собственном темпе. Вот пример, который начинает считывать все изменения в режиме последней версии, начиная с текущего времени.

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Поскольку веб-канал изменений фактически является бесконечным списком элементов, охватывающим все будущие записи и обновления, значение HasMoreResults всегда true. При попытке прочитать фид изменений и при отсутствии новых изменений, вы получите ответ с NotModified статусом. В предыдущем примере это обрабатывается путем ожидания в течение пяти секунд перед повторной проверкой изменений.

Потребление изменений для ключа раздела

В некоторых случаях может потребоваться обработать только изменения для определенного ключа раздела. Для определенного ключа секции можно получить FeedIterator и обработать изменения так же, как и для всего контейнера.

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Использование FeedRange для параллелизации

Обработчик канала изменений автоматически распределяет работу между несколькими потребителями. В модели извлечения канала изменений можно использовать FeedRange для параллелизации обработки канала изменений. FeedRange представляет диапазон значений ключа разбивки.

Ниже приведен пример, в который показано, как получить список диапазонов для контейнера:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

При получении списка значений FeedRange для контейнера вы получите по одному FeedRange на физическую секцию.

С помощью средства FeedRange можно создать FeedIterator для параллелизации обработки канала изменений на нескольких компьютерах или потоках. В отличие от предыдущего примера, в котором показано, как получить FeedIterator для всего контейнера или одного ключа раздела, с помощью FeedRanges вы можете получить несколько FeedIterator'ов, способных обрабатывать поток изменений параллельно.

Если вы хотите использовать FeedRanges, вам нужно создать процесс оркестрации, который получает экземпляры FeedRanges и распределяет их между компьютерами. Это распределение может быть следующим:

  • Используйте FeedRange.ToJsonString и распространяйте это строковое значение. Потребители могут использовать это значение с FeedRange.FromJsonString.
  • Если распределение происходит в процессе, передавайте ссылку на объект FeedRange.

Ниже приведен пример, показывающий, как считывать с начала канала изменений контейнера с помощью двух гипотетических отдельных компьютеров, которые считывают параллельно:

компьютер 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

компьютер 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

Сохранение маркеров продолжения

Вы можете сохранить текущее расположение FeedIterator, получив маркер продолжения. Маркер продолжения — это строковое значение, которое фиксирует последние изменения, обработанные в FeedIterator, и позволяет FeedIterator продолжить выполнение с этой точки. Маркер продолжения, если он указан, имеет приоритет над временем начала и начинается с начальных значений. Следующий код считывает ленту изменений с момента создания контейнера. После того, как не останется доступных изменений, система сохранит маркер продолжения, чтобы позднее возобновить обработку этого потока изменений.

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

При использовании режима последней версии FeedIterator маркер продолжения никогда не истекает до тех пор, пока контейнер Azure Cosmos DB по-прежнему существует. Если вы используете режим всех версий и удаления, маркер продолжения FeedIterator действителен до тех пор, пока изменения произошли в периоде хранения для непрерывных резервных копий.

Следующие шаги