Модель извлечения канала изменений в Azure Cosmos DB
ОБЛАСТЬ ПРИМЕНЕНИЯ: NoSQL
Вы можете использовать модель извлечения канала изменений для использования канала изменений Azure Cosmos DB в собственном темпе. Аналогично обработчику канала изменений, вы можете использовать модель извлечения канала изменений для параллелизации обработки изменений для нескольких потребителей канала изменений.
Сравнение с обработчиком канала изменений
Многие сценарии могут обрабатывать канал изменений с помощью обработчика канала изменений или модели извлечения канала изменений. Маркеры продолжения модели извлечения и контейнер аренды обработчика канала изменений работают как закладки для последнего обработанного элемента или пакета элементов в канале изменений.
Однако невозможно преобразовать маркеры продолжения в аренду или наоборот.
Примечание.
В большинстве случаев, когда необходимо прочитать из канала изменений, самый простой вариант — использовать обработчик канала изменений.
Мы рекомендуем применять модель извлечения в следующих ситуациях:
- Чтение изменений из определенного ключа секции.
- Чтобы контролировать темпы, с которыми клиент получает изменения для обработки.
- Чтобы выполнить однократное чтение существующих данных в канале изменений (например, для переноса данных).
Ниже приведены некоторые основные различия между обработчиком канала изменений и моделью извлечения канала изменений:
Функция | Обработчик канала изменений | Модель извлечения данных канала изменений |
---|---|---|
Отслеживание текущей точки обработки канала изменений | Аренда (хранится в контейнере Azure Cosmos DB) | Маркер продолжения (сохраненный в памяти или вручную) |
Возможность воспроизведения прошлых изменений | Да, с моделью отправки | Да, с моделью извлечения |
Опрос предстоящих изменений | Автоматически проверяет наличие изменений на основе указанного WithPollInterval пользователем значения. |
Руководство |
Поведение при отсутствии новых изменений | Автоматическое ожидание значения WithPollInterval и повторная проверка |
Необходимо проверить состояние и вручную выполнить повторную проверку |
Обработка изменений из всего контейнера | Да, и автоматически параллелизируется между несколькими потоками и компьютерами, которые используются из одного контейнера. | Да, а также параллелизация вручную с помощью FeedRange |
Обработка изменений только из одного ключа секции | Не поддерживается | Да |
Примечание.
При использовании модели извлечения, в отличие от чтения с помощью обработчика канала изменений, необходимо явно обрабатывать случаи, когда новые изменения отсутствуют.
Работа с моделью извлечения
Чтобы обработать канал изменений с помощью модели извлечения, создайте экземпляр FeedIterator
. При первоначальном создании FeedIterator
необходимо указать необходимое ChangeFeedStartFrom
значение, которое состоит как из начальной позиции для чтения изменений, так и значения, для FeedRange
которого требуется использовать. Это FeedRange
диапазон значений ключа секции и указывает элементы, которые можно считывать из веб-канала изменений с помощью этого конкретного FeedIterator
параметра. Необходимо также указать необходимое ChangeFeedMode
значение для режима, в котором требуется обработать изменения: последняя версия или все версии и удаления. Используйте или ChangeFeedMode.LatestVersion
ChangeFeedMode.AllVersionsAndDeletes
укажите, какой режим вы хотите использовать для чтения канала изменений. При использовании всех версий и удалений режима необходимо выбрать канал изменений, начинающийся с значения или из определенного Now()
маркера продолжения.
При необходимости можно указать ChangeFeedRequestOptions
для задания PageSizeHint
. Если это свойство задано, оно устанавливает максимальное число полученных элементов на страницу. Если операции в отслеживаемой коллекции выполняются с помощью хранимых процедур, область транзакции сохраняется при считывании элементов из канала изменений. В результате число полученных элементов может быть выше указанного значения, чтобы элементы, измененные той же транзакцией, возвращались как часть одного атомарного пакета.
Ниже приведен пример получения FeedIterator
в режиме последней версии, который возвращает объекты сущностей, в этом случае User
объект:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Совет
До версии 3.34.0
режим последней версии можно использовать с помощью параметра ChangeFeedMode.Incremental
. LatestVersion
Оба Incremental
и ссылаются на последний режим версии канала изменений и приложений, использующих любой режим, будут видеть то же поведение.
Все версии и режим удаления доступны в предварительной версии и можно использовать с предварительными версиями >пакета SDK для .NET = 3.32.0-preview
. Ниже приведен пример получения FeedIterator
во всех версиях и удалении режимов, возвращающих User
объекты:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Примечание.
В последнем режиме версии вы получаете объекты, представляющие измененный элемент с некоторыми дополнительными метаданными. Все версии и режим удаления возвращают другую модель данных. Дополнительные сведения см. в разделе "Анализ объекта ответа".
Полный пример можно получить для последнего режима версии или всех версий и режима удаления.
Использование канала изменений через потоки
FeedIterator
для обоих режимов канала изменений есть два варианта. Помимо примеров, возвращающих объекты сущностей, можно также получить ответ с Stream
поддержкой. Потоки позволяют считывать данные без первой десериализации, поэтому вы экономите ресурсы клиента.
Ниже приведен пример получения FeedIterator
в режиме последней версии, который возвращает Stream
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Использование изменений для всего контейнера
Если параметр не указан FeedRange
FeedIterator
, можно обрабатывать весь канал изменений контейнера в собственном темпе. Ниже приведен пример, который начинает считывать все изменения, начиная с текущего времени с помощью последней версии:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Поскольку веб-канал изменений фактически является бесконечным списком элементов, охватывающих все будущие записи и обновления, всегда имеет значение HasMoreResults
true
. При попытке прочитать веб-канал изменений и нет новых изменений, вы получите ответ с NotModified
состоянием. В предыдущем примере она обрабатывается ожиданием пяти секунд до повторной проверки изменений.
Использование изменений для ключа секции
В некоторых случаях может потребоваться обработать только изменения для определенного ключа секции. Для определенного ключа секции можно получить FeedIterator
и обработать изменения так же, как и для всего контейнера.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Использование FeedRange для параллелизации
Обработчик канала изменений автоматически распределяет нагрузку между несколькими потребителями. В модели извлечения веб-канала изменений можно применить FeedRange
для параллелизации обработки веб-канала изменений. FeedRange
представляет определенный диапазон значений ключа секции.
Ниже приведен пример, в который показано, как получить список диапазонов для контейнера:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
При получении списка значений FeedRange
для контейнера вы получите по одному FeedRange
на физическую секцию.
С помощью средства FeedRange
можно создать FeedIterator
параллелизацию обработки канала изменений на нескольких компьютерах или потоках. В отличие от предыдущего примера, в котором показано, как получить FeedIterator
для всего контейнера или одного ключа раздела, вы можете с помощью FeedRanges получить несколько FeedIterator, способных обрабатывать канал изменений параллельно.
Если вы хотите использовать FeedRange, создайте процесс оркестрации, который получает экземпляры FeedRange и распространяет их по компьютерам. Это распределение может быть следующим:
- Используйте
FeedRange.ToJsonString
и передайте это строковое значение. Потребители могут использовать это значение сFeedRange.FromJsonString
. - Если распределение выполняется в том же процессе, передавайте ссылку на объект
FeedRange
.
Ниже приведен пример, показывающий, как считывать с начала канала изменений контейнера с помощью двух гипотетических отдельных компьютеров, которые считываются параллельно:
компьютер 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
компьютер 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Сохранение маркеров продолжения
Вы можете сохранить текущее расположение FeedIterator
, создав маркер продолжения. Маркер продолжения — это строковое значение, которое отслеживает последние изменения, обработанные в FeedIterator, и позволяет установить FeedIterator
, чтобы позднее продолжить работу с этого места. Маркер продолжения, если он указан, имеет приоритет над временем начала и начинается с начальных значений. Следующий код считывает канал изменений с момента создания контейнера. Когда не останется доступных изменений, он сохраняет маркер продолжения, чтобы позднее возобновить обработку этого веб-канала изменений.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
При использовании последней версии маркер продолжения никогда не истекает до тех пор, FeedIterator
пока контейнер Azure Cosmos DB по-прежнему существует. Если вы используете все версии и удаляете режим, маркер продолжения действителен до тех пор, FeedIterator
пока изменения произошли в окне хранения для непрерывных резервных копий.