Модель извлечения канала изменений в Azure Cosmos DB
ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL
Вы можете использовать модель извлечения канала изменений, чтобы использовать канал изменений Azure Cosmos DB в собственном темпе. Как и обработчик канала изменений, можно использовать модель извлечения канала изменений для параллелизации обработки изменений несколькими потребителями канала изменений.
Сравнение с обработчиком канала изменений
Во многих сценариях канал изменений можно обрабатывать с помощью обработчика канала изменений или модели извлечения канала изменений. Маркеры продолжения модели извлечения и контейнер аренды обработчика канала изменений работают как закладки для последнего обработанного элемента или пакета элементов в канале изменений.
Однако вы не можете преобразовать маркеры продолжения в аренду или наоборот.
Примечание
В большинстве случаев, когда требуется чтение из канала изменений, самым простым вариантом является использование обработчика канала изменений.
Мы рекомендуем применять модель извлечения в следующих ситуациях:
- Чтение изменений из определенного ключа секции.
- Для управления скоростью, с которой клиент получает изменения для обработки.
- Для однократного чтения существующих данных в канале изменений (например, для переноса данных).
Ниже приведены некоторые основные различия между обработчиком канала изменений и моделью извлечения канала изменений.
Компонент | Обработчик канала изменений | Модель извлечения данных канала изменений |
---|---|---|
Отслеживание текущей точки при обработке канала изменений | Аренда (хранится в контейнере Azure Cosmos DB) | Маркер продолжения (сохраненный в памяти или вручную) |
Возможность воспроизведения прошлых изменений | Да, с моделью отправки | Да, с моделью извлечения |
Опрос предстоящих изменений | Автоматическая проверка наличия изменений на основе указанного WithPollInterval пользователем значения |
Вручную |
Поведение при отсутствии новых изменений | Автоматическое ожидание значения и WithPollInterval повторная проверка |
Необходимо проверить состояние и вручную выполнить повторную проверку |
Обработка изменений из всего контейнера | Да, и автоматически распараллеливается в нескольких потоках и компьютерах, которые потребляют из одного контейнера. | Да, и параллелизуются вручную с помощью FeedRange |
Обработка изменений только из одного ключа секции | Не поддерживается | Да |
Примечание
При использовании модели извлечения, в отличие от чтения с помощью обработчика канала изменений, необходимо явно обрабатывать случаи, когда новые изменения отсутствуют.
Работа с моделью извлечения
Чтобы обработать канал изменений с помощью модели извлечения, создайте экземпляр FeedIterator
. При первоначальном создании FeedIterator
необходимо указать обязательное ChangeFeedStartFrom
значение, которое состоит из начальной позиции для чтения изменений и значения, которое вы хотите использовать для FeedRange
. FeedRange
— это диапазон значений ключа секции и указывает элементы, которые можно считывать из канала изменений с помощью этого конкретного FeedIterator
. Необходимо также указать обязательное ChangeFeedMode
значение для режима обработки изменений: последняя версия или все версии и удаления. Используйте или ChangeFeedMode.LatestVersion
ChangeFeedMode.AllVersionsAndDeletes
, чтобы указать, какой режим вы хотите использовать для чтения канала изменений. При использовании всех версий и режима удаления необходимо выбрать канал изменений, начинающийся со значения или из определенного Now()
маркера продолжения.
При необходимости можно указать ChangeFeedRequestOptions
для задания PageSizeHint
. Если это свойство задано, оно устанавливает максимальное число полученных элементов на страницу. Если операции в отслеживаемой коллекции выполняются с помощью хранимых процедур, область транзакций сохраняются при чтении элементов из канала изменений. В результате количество полученных элементов может быть больше указанного значения, поэтому элементы, измененные той же транзакцией, возвращаются в составе одного атомарного пакета.
Ниже приведен пример получения FeedIterator
в режиме последней версии, который возвращает объекты сущности, в данном случае объект User
:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Совет
До версии 3.34.0
можно использовать режим последней версии, задав параметр ChangeFeedMode.Incremental
. И Incremental
, и LatestVersion
ссылаются на режим последней версии канала изменений, и приложения, использующие любой из этих режимов, будут видеть одинаковое поведение.
Все версии и режим удаления находится в предварительной версии и может использоваться с предварительными версиями >пакета SDK для .NET = 3.32.0-preview
. Ниже приведен пример получения FeedIterator
во всех версиях и режиме удаления, который возвращает динамические объекты:
FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Примечание
В режиме последней версии вы получаете объекты, представляющие измененный элемент, с некоторыми дополнительными метаданными. Все версии и режим удаления возвращает другую модель данных. Дополнительные сведения см. в разделе Анализ объекта ответа.
Использование канала изменений через потоки
FeedIterator
Для обоих режимов канала изменений есть два варианта. В дополнение к примерам, возвращающим объекты сущностей, можно также получить ответ с Stream
поддержкой. Потоки позволяют считывать данные без предварительной десериализации, что позволяет сэкономить на клиентских ресурсах.
Ниже приведен пример получения FeedIterator
в режиме последней версии, который возвращает Stream
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Использование изменений для всего контейнера
Если не указать FeedRange
параметр в FeedIterator
, вы можете обработать весь канал изменений контейнера в собственном темпе. Ниже приведен пример, который начинает считывать все изменения, начиная с текущего времени в режиме последней версии:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Так как канал изменений фактически представляет собой бесконечный список элементов, охватывающих все будущие операции записи и обновления, значение HasMoreResults
всегда true
равно . При попытке прочитать веб-канал изменений, когда новые изменения отсутствуют, вы получаете ответ с NotModified
состоянием. В предыдущем примере это обрабатывается путем ожидания пяти секунд перед повторной проверки на наличие изменений.
Использование изменений для ключа секции
В некоторых случаях может потребоваться обработать только изменения для определенного ключа секции. Вы можете получить FeedIterator
для определенного ключа секции и обработать изменения так же, как и для всего контейнера.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Использование FeedRange для параллелизации
Обработчик канала изменений автоматически распределяет нагрузку между несколькими потребителями. В модели извлечения веб-канала изменений можно применить FeedRange
для параллелизации обработки веб-канала изменений. FeedRange
представляет определенный диапазон значений ключа секции.
Ниже приведен пример, показывающий, как получить список диапазонов для контейнера:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
При получении списка значений FeedRange
для контейнера вы получаете по одному FeedRange
для каждой физической секции.
С помощью FeedRange
можно создать FeedIterator
для параллелизации обработки канала изменений на нескольких компьютерах или в потоках. В отличие от предыдущего примера, в котором показано, как получить FeedIterator
для всего контейнера или одного ключа раздела, вы можете с помощью FeedRanges получить несколько FeedIterator, способных обрабатывать канал изменений параллельно.
Если вы хотите использовать FeedRange, создайте процесс оркестрации, который получает экземпляры FeedRange и распространяет их по компьютерам. Это распределение может быть следующим:
- Используйте
FeedRange.ToJsonString
и передайте это строковое значение. Потребители могут использовать это значение сFeedRange.FromJsonString
. - Если распределение выполняется в том же процессе, передавайте ссылку на объект
FeedRange
.
Ниже приведен пример, показывающий, как считывать данные с начала канала изменений контейнера с помощью двух гипотетических отдельных компьютеров, которые считывают параллельно:
Компьютер 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Компьютер 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Сохранение маркеров продолжения
Вы можете сохранить текущее расположение FeedIterator
, создав маркер продолжения. Маркер продолжения — это строковое значение, которое отслеживает последние изменения, обработанные в FeedIterator, и позволяет установить FeedIterator
, чтобы позднее продолжить работу с этого места. Маркер продолжения, если он указан, имеет приоритет над временем начала и начинается с начальных значений. Следующий код считывает канал изменений с момента создания контейнера. Когда не останется доступных изменений, он сохраняет маркер продолжения, чтобы позднее возобновить обработку этого веб-канала изменений.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
При использовании режима последней версии срок действия маркера продолжения никогда не истечет до тех пор, FeedIterator
пока контейнер Azure Cosmos DB все еще существует. При использовании всех версий и режима удаления маркер продолжения действителен при условии, FeedIterator
что изменения произошли в течение периода хранения для непрерывного резервного копирования.