Вы можете использовать модель потребления канала изменений, чтобы обрабатывать изменения в Azure Cosmos DB в удобном для вас темпе.
Аналогично обработчику потока изменений, вы можете использовать модель извлечения потока изменений для параллелизации обработки изменений для нескольких потребителей потока изменений.
Однако невозможно преобразовать токены продолжения в аренду и наоборот.
Вы можете рассмотреть возможность использования pull-модели в следующих ситуациях:
Ниже приведены некоторые основные различия между обработчиком канала изменений и моделью извлечения канала изменений:
Чтобы обработать канал изменений с помощью модели извлечения, создайте экземпляр FeedIterator. При первоначальном создании FeedIterator необходимо задать обязательное значение для ChangeFeedStartFrom, которое включает начальную позицию для чтения изменений и значение, которое вы хотите использовать для FeedRange. Это FeedRange диапазон значений ключа раздела и указывает элементы, которые можно считывать из канала изменений с помощью этого конкретного FeedIterator параметра. Необходимо также указать необходимое ChangeFeedMode значение для режима, в котором требуется обработать изменения: последняя версия или все версии и удаления. Используйте либо ChangeFeedMode.LatestVersion либо ChangeFeedMode.AllVersionsAndDeletes, чтобы указать, какой режим вы хотите использовать для чтения канала изменений. При использовании режима всех версий и удалений необходимо выбрать канал изменений, начинающийся с значения Now() или из определенного маркера продолжения.
При необходимости можно указать ChangeFeedRequestOptions для задания PageSizeHint. Если это свойство задано, оно устанавливает максимальное число полученных элементов на страницу. Если операции в отслеживаемой коллекции выполняются с помощью хранимых процедур, область транзакции сохраняется при считывании элементов из канала изменений. В результате число полученных элементов может быть выше указанного значения, чтобы элементы, измененные той же транзакцией, возвращались как часть одного атомарного пакета.
Пример, как получить FeedIterator в последней версии режима, возвращающей объекты сущностей, например объект User.
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Подсказка
Для более ранних 3.34.0версий можно использовать режим последней версии, задав параметр ChangeFeedMode.Incremental. Как Incremental, так и LatestVersion относятся к последнему режиму версии ленты изменений, и приложения, использующие любой из режимов, демонстрируют одинаковое поведение.
Все версии и режим удаления доступны в предварительной версии и могут быть использованы с предварительными версиями >.NET SDK = 3.32.0-preview. Ниже приведен пример получения FeedIterator во всех версиях и удалении режимов, возвращающих User объекты:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Использование канала изменений через потоки
FeedIterator для обоих режимов канала изменений предусмотрены два варианта. Помимо примеров, возвращающих объекты сущностей, можно также получить ответ с Stream поддержкой. Потоки позволяют считывать данные без первой десериализации, поэтому вы экономите ресурсы клиента.
Ниже приведен пример получения FeedIterator в режиме последней версии, который возвращает Stream:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Использование изменений для всего контейнера
Если вы не предоставите параметр FeedRange для FeedIterator, можно будет обрабатывать ленту изменений всего контейнера в собственном темпе. Вот пример, который начинает считывать все изменения в режиме последней версии, начиная с текущего времени.
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Поскольку веб-канал изменений фактически является бесконечным списком элементов, охватывающим все будущие записи и обновления, значение HasMoreResults всегда true. При попытке прочитать фид изменений и при отсутствии новых изменений, вы получите ответ с NotModified статусом. Это отличается от получения ответа без изменений и статуса OK. Можно получить пустые ответы канала изменений, пока доступны дополнительные изменения, и вы должны продолжать опрос до получения NotModified. В предыдущем примере NotModified обрабатывается путем ожидания пяти секунд перед повторной проверкой изменений.
Потребление изменений для ключа раздела
В некоторых случаях может потребоваться обработать только изменения для определенного ключа раздела. Для определенного ключа секции можно получить FeedIterator и обработать изменения так же, как и для всего контейнера.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Использование FeedRange для параллелизации
Обработчик канала изменений автоматически распределяет работу между несколькими потребителями. В модели извлечения канала изменений можно использовать FeedRange для параллелизации обработки канала изменений.
FeedRange представляет диапазон значений ключа разбивки.
Ниже приведен пример, в который показано, как получить список диапазонов для контейнера:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
При получении списка значений FeedRange для контейнера вы получите по одному FeedRange на физическую секцию.
С помощью средства FeedRange можно создать FeedIterator для параллелизации обработки канала изменений на нескольких компьютерах или потоках. В отличие от предыдущего примера, в котором показано, как получить FeedIterator для всего контейнера или одного ключа раздела, с помощью FeedRanges вы можете получить несколько FeedIterator'ов, способных обрабатывать поток изменений параллельно.
Если вы хотите использовать FeedRanges, вам нужно создать процесс оркестрации, который получает экземпляры FeedRanges и распределяет их между компьютерами. Это распределение может быть следующим:
- Используйте
FeedRange.ToJsonString и распространяйте это строковое значение. Потребители могут использовать это значение с FeedRange.FromJsonString.
- Если распределение происходит в процессе, передавайте ссылку на объект
FeedRange.
Ниже приведен пример, показывающий, как считывать с начала канала изменений контейнера с помощью двух гипотетических отдельных компьютеров, которые считывают параллельно:
компьютер 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
компьютер 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Сохранение маркеров продолжения
Вы можете сохранить текущее расположение FeedIterator, получив маркер продолжения. Маркер продолжения — это строковое значение, которое фиксирует последние изменения, обработанные в FeedIterator, и позволяет FeedIterator продолжить выполнение с этой точки. Маркер продолжения, если он указан, имеет приоритет над временем начала и начинается с начальных значений. Следующий код считывает ленту изменений с момента создания контейнера. Когда больше нет изменений, сохраняется маркер продолжения, чтобы потребление потока изменений можно было возобновить позже.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
При использовании режима последней версии FeedIterator маркер продолжения никогда не истекает до тех пор, пока контейнер Azure Cosmos DB по-прежнему существует. Если вы используете режим всех версий и удаления, маркер продолжения FeedIterator действителен до тех пор, пока изменения произошли в периоде хранения для непрерывных резервных копий.
Чтобы обработать канал изменений с помощью модели извлечения, создайте экземпляр Iterator<FeedResponse<JsonNode>> responseIterator. При создании CosmosChangeFeedRequestOptions необходимо указать, где начать чтение канала изменений, и указать параметр FeedRange, который вы хотите использовать. Это FeedRange диапазон ключевых значений секции, определяющий элементы, которые можно считывать из потока изменений.
Если вы хотите прочитать веб-канал изменений в режиме всех версий и удалений, необходимо также указать allVersionsAndDeletes() при создании CosmosChangeFeedRequestOptions. Все версии и режим удаления не поддерживают обработку потока изменений с начала или с определенного момента времени. Необходимо либо обработать изменения, начиная с этого момента, либо с маркера продолжения. Все версии и режим удаления доступны в предварительной версии и доступен в версии >пакета SDK для Java = 4.42.0.
Использование изменений для всего контейнера
При указании FeedRange.forFullRange() вы можете обрабатывать ленту изменений для всего контейнера в удобном для вас темпе. При необходимости можно указать значение в byPage(). Если это свойство задано, оно устанавливает максимальное число полученных элементов на страницу.
Замечание
Все приведенные ниже фрагменты кода взяты из примеров в GitHub. Вы можете использовать пример режима последней версии и пример режима всех версий и удалений .
Ниже приведен пример получения значения в режиме последней responseIterator версии:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Ниже приведен пример получения responseIterator в режиме всех версий и удаления:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.allVersionsAndDeletes();
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Затем можно выполнить итерацию по результатам. Поскольку веб-канал изменений фактически является бесконечным списком элементов, охватывающим все будущие записи и обновления, значение responseIterator.hasNext() всегда true равно. Ниже приведен пример в режиме последней версии, который считывает все изменения, начиная с начала. Каждая итерация сохраняет маркер продолжения после обработки всех событий. Он выбирается из последней обработанной точки в канале изменений и обрабатывается с помощью createForProcessingFromContinuation:
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Обработка изменений разделительного ключа.
В некоторых случаях может потребоваться обработать только изменения для определенного ключа раздела. Изменения для определенного ключа раздела можно обрабатывать так же, как и изменения для всего контейнера. Ниже приведен пример использования последней версии:
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
Использование FeedRange для параллелизации
Обработчик канала изменений автоматически распределяет работу между несколькими потребителями. В модели извлечения канала изменений можно использовать FeedRange для параллелизации обработки канала изменений.
FeedRange представляет диапазон значений ключа разбивки.
Ниже приведен пример, в котором используется последний режим версии, в котором показано, как получить список диапазонов для контейнера:
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
При получении списка диапазонов потока для контейнера вы получите по одному FeedRange на каждый физический раздел.
С помощью функции FeedRangeможно параллелизировать обработку канала изменений на нескольких компьютерах или потоках. В отличие от предыдущего примера, в котором показано, как обработать изменения для всего контейнера или одного ключа раздела, вы можете с помощью FeedRanges обрабатывать канал изменений параллельно.
Если вы хотите использовать FeedRanges, вам нужно создать процесс оркестрации, который получает экземпляры FeedRanges и распределяет их между компьютерами. Это распределение может быть следующим:
- Используйте
FeedRange.toString() и распространяйте это строковое значение.
- Если распределение происходит в процессе, передавайте ссылку на объект
FeedRange.
Ниже приведен пример, использующий последний режим версии. В нем показано, как читать с начала канала изменений контейнера с помощью двух гипотетических отдельных компьютеров, которые читают параллельно.
компьютер 1:
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
компьютер 2:
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Чтобы обработать поток изменений с помощью модели вытягивания, создайте экземпляр responseIterator с типом ItemPaged[Dict[str, Any]].
При вызове API канала изменений необходимо указать, откуда начать чтение канала изменений и передать нужный feed_range параметр.
Это feed_range диапазон ключевых значений секции, определяющий элементы, которые можно считывать из потока изменений.
Можно также указать mode параметр для режима канала изменений, в котором требуется обработать изменения: LatestVersion или AllVersionsAndDeletes. Значение по умолчанию — LatestVersion.
Используйте либо LatestVersion либо AllVersionsAndDeletes, чтобы указать, какой режим вы хотите использовать для чтения канала изменений.
При использовании AllVersionsAndDeletes режима можно либо начать обработку изменений с этого момента, либо с маркера continuation .
Чтение канала изменений с начала или с определённого момента времени с использованием start_time не поддерживается.
Использование изменений для всего контейнера
Если вы не предоставляете параметр feed_range, вы можете обрабатывать весь журнал изменений контейнера в собственном темпе.
Ниже приведен пример получения responseIterator в режиме LatestVersion из Beginning. Так как LatestVersion это режим по умолчанию, mode параметр не требуется передавать:
responseIterator = container.query_items_change_feed(start_time="Beginning")
Ниже приведен пример получения responseIterator в режиме AllVersionsAndDeletes из Now. Поскольку Now — это значение по умолчанию параметра start_time, его не нужно передавать:
responseIterator = container.query_items_change_feed(mode="AllVersionsAndDeletes")
Затем можно выполнить итерацию по результатам. Так как канал изменений фактически является бесконечным списком элементов, охватывающих все будущие записи и обновления, responseIterator может зациклиться бесконечно.
Ниже приведен пример в режиме последней версии, который считывает все изменения, начиная с начала.
Каждая итерация представляет изменения для документов.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
Обработка изменений разделительного ключа.
В некоторых случаях может потребоваться обработать только изменения для определенного ключа раздела.
Изменения можно обрабатывать так же, как и для всего контейнера с параметром partition_key .
Ниже приведен пример использования LatestVersion режима:
pk = "partition_key_value"
responseIterator = container.query_items_change_feed(start_time="Beginning", partition_key=pk)
for doc in responseIterator:
print(doc)
Использование FeedRange для параллелизации
В модели извлечения канала изменений можно использовать feed_range для параллелизации обработки канала изменений.
feed_range представляет диапазон значений ключа разбивки.
Ниже приведен пример, показывающий, как получить список диапазонов для контейнера.
list команда преобразует итератор в список:
rangesIterator = container.read_feed_ranges(force_refresh=False)
ranges = list(rangesIterator)
При получении списка значений feed_range для контейнера вы получите по одному feed_range на физическую секцию.
С помощью итератора feed_rangeможно создать итератор для параллелизации обработки канала изменений на нескольких компьютерах или потоках.
В отличие от предыдущего примера, в котором показано, как получить responseIterator для всего контейнера или одного ключа раздела, можно использовать feed_range для получения нескольких итераторов, которые могут параллельно обрабатывать поток изменений.
Ниже приведен пример, показывающий, как считывать с начала канала изменений контейнера с помощью двух гипотетических отдельных компьютеров, которые считывают параллельно:
компьютер 1:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[0])
for doc in responseIterator:
print(doc)
компьютер 2:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[1])
for doc in responseIterator:
print(doc)
Сохранение маркеров продолжения
Вы можете сохранить позицию итератора, получив маркер продолжения.
Маркер продолжения — это строковое значение, которое ведёт учёт responseIterator последних обработанных изменений и позволяет итератору возобновить с этого места позже.
Маркер продолжения, если он указан, имеет приоритет над временем начала и начинается с начальных значений.
Следующий код считывает ленту изменений с момента создания контейнера.
Когда больше нет изменений, сохраняется маркер продолжения, чтобы потребление потока изменений можно было возобновить позже.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
continuation_token = container.client_connection.last_response_headers['etag']
Замечание
Так как continuation маркер содержит ранее использованный mode параметр, если использован continuation, то mode параметр игнорируется и используется mode из continuation маркера.
Вот пример, который показывает, как считывать из канала изменений контейнера с использованием токена continuation.
responseIterator = container.query_items_change_feed(continuation=continuation_token)
for doc in responseIterator:
print(doc)
Чтобы обработать канал изменений с помощью модели извлечения, создайте экземпляр ChangeFeedPullModelIterator. При первоначальном создании ChangeFeedPullModelIterator необходимо указать требуемое значение changeFeedStartFrom внутри ChangeFeedIteratorOptions, которое включает как начальную позицию для чтения изменений, так и ресурс (ключ раздела или FeedRange), для которого необходимо получить изменения.
Замечание
Если значение для changeFeedStartFrom не указано, поток изменений будет извлекаться для всего контейнера от Now().
В настоящее время только последняя версия поддерживается пакетом SDK JavaScript и выбрана по умолчанию.
При необходимости можно использовать maxItemCount в ChangeFeedIteratorOptions для задания максимального количества элементов на странице.
Ниже приведен пример получения итератора в режиме последней версии, возвращающего объекты сущностей:
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Now()
};
const iterator = container.items.getChangeFeedIterator(options);
Использование изменений для всего контейнера
Если вы не предоставляете параметр FeedRange или PartitionKey внутри ChangeFeedStartFrom, вы можете обрабатывать весь поток изменений контейнера в своем темпе. Ниже приведен пример, который начинает считывать все изменения, начиная с текущего времени:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Поскольку веб-канал изменений фактически является бесконечным списком элементов, охватывающим все будущие записи и обновления, значение hasMoreResults всегда true. При попытке прочитать фид изменений и при отсутствии новых изменений, вы получите ответ с NotModified статусом. Это отличается от получения ответа без изменений и статуса OK. Можно получить пустые ответы канала изменений, пока доступны дополнительные изменения, и вы должны продолжать опрос до получения NotModified. В предыдущем примере NotModified обрабатывается путем ожидания пяти секунд перед повторной проверкой изменений.
Потребление изменений для ключа раздела
В некоторых случаях может потребоваться обработать только изменения для определенного ключа раздела. Вы можете запросить итератор для определенного ключа разбивки и обработать изменения так же, как и для всего контейнера.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("partitionKeyValue")
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Использование FeedRange для параллелизации
В модели извлечения канала изменений можно использовать FeedRange для параллелизации обработки канала изменений.
FeedRange представляет диапазон значений ключа разбивки.
Ниже приведен пример, в который показано, как получить список диапазонов для контейнера:
const ranges = await container.getFeedRanges();
При получении списка значений FeedRange для контейнера вы получите по одному FeedRange на физическую секцию.
С помощью итератора FeedRangeможно создать итератор для параллелизации обработки канала изменений на нескольких компьютерах или потоках. В отличие от предыдущего примера, в котором показано, как получить итератор канала изменений для всего контейнера или одного ключа секции, можно использовать FeedRanges для получения нескольких итераторов, которые могут обрабатывать канал изменений параллельно.
Ниже приведен пример, показывающий, как считывать с начала канала изменений контейнера с помощью двух гипотетических отдельных компьютеров, которые считывают параллельно:
компьютер 1:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[0])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
компьютер 2:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[1])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Сохранение маркеров продолжения
Вы можете сохранить позицию итератора, получив маркер продолжения. Маркер продолжения — это строковое значение, которое отслеживает последние обработанные изменения канала изменений и позволяет итератору возобновить работу с этого момента позже. Маркер продолжения, если он указан, имеет приоритет над временем начала и начинается с начальных значений. Следующий код считывает ленту изменений с момента создания контейнера. Когда больше нет изменений, сохраняется маркер продолжения, чтобы потребление потока изменений можно было возобновить позже.
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
let continuation = "";
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
continuation = response.continuationToken;
break;
}
else {
console.log("Result found", response.result);
}
}
// For checking any new changes using the continuation token
const continuationOptions = {
changeFeedStartFrom: ChangeFeedStartFrom(continuation)
}
const newIterator = container.items.getChangeFeedIterator(continuationOptions);
Маркер продолжения никогда не истекает до тех пор, пока контейнер Azure Cosmos DB по-прежнему существует.
Использование AsyncIterator
JavaScript AsyncIterator можно использовать для получения потока изменений. Ниже приведен пример AsyncIterator.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
let timeout = 0;
for await(const result of container.items.getChangeFeedIterator(options).getAsyncIterator()) {
if (result.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", result.result);
timeout = 0;
}
await waitFor(timeout);
}