Вы можете использовать модель потребления канала изменений, чтобы обрабатывать изменения в Azure Cosmos DB в удобном для вас темпе.
Аналогично обработчику потока изменений, вы можете использовать модель извлечения потока изменений для параллелизации обработки изменений для нескольких потребителей потока изменений.
Однако невозможно преобразовать токены продолжения в аренду и наоборот.
Вы можете рассмотреть возможность использования pull-модели в следующих ситуациях:
Ниже приведены некоторые основные различия между обработчиком канала изменений и моделью извлечения канала изменений:
Чтобы обработать канал изменений с помощью модели извлечения, создайте экземпляр FeedIterator
. При первоначальном создании FeedIterator
необходимо задать обязательное значение для ChangeFeedStartFrom
, которое включает начальную позицию для чтения изменений и значение, которое вы хотите использовать для FeedRange
. Это FeedRange
диапазон значений ключа раздела и указывает элементы, которые можно считывать из канала изменений с помощью этого конкретного FeedIterator
параметра. Необходимо также указать необходимое ChangeFeedMode
значение для режима, в котором требуется обработать изменения: последняя версия или все версии и удаления. Используйте либо ChangeFeedMode.LatestVersion
либо ChangeFeedMode.AllVersionsAndDeletes
, чтобы указать, какой режим вы хотите использовать для чтения канала изменений. При использовании режима всех версий и удалений необходимо выбрать канал изменений, начинающийся с значения Now()
или из определенного маркера продолжения.
При необходимости можно указать ChangeFeedRequestOptions
для задания PageSizeHint
. Если это свойство задано, оно устанавливает максимальное число полученных элементов на страницу. Если операции в отслеживаемой коллекции выполняются с помощью хранимых процедур, область транзакции сохраняется при считывании элементов из канала изменений. В результате число полученных элементов может быть выше указанного значения, чтобы элементы, измененные той же транзакцией, возвращались как часть одного атомарного пакета.
Пример, как получить FeedIterator
в последней версии режима, возвращающей объекты сущностей, например объект User
.
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Совет
До версии 3.34.0
режим последней версии можно использовать с помощью параметра ChangeFeedMode.Incremental
. Как Incremental
, так и LatestVersion
ссылаются на режим последней версии канала изменений, и приложения, использующие любой из этих режимов, будут наблюдать одно и то же поведение.
Все версии и режим удаления доступны в предварительной версии и могут быть использованы с предварительными версиями >.NET SDK = 3.32.0-preview
. Ниже приведен пример получения FeedIterator
во всех версиях и удалении режимов, возвращающих User
объекты:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
Использование канала изменений через потоки
FeedIterator
для обоих режимов канала изменений предусмотрены два варианта. Помимо примеров, возвращающих объекты сущностей, можно также получить ответ с Stream
поддержкой. Потоки позволяют считывать данные без первой десериализации, поэтому вы экономите ресурсы клиента.
Ниже приведен пример получения FeedIterator
в режиме последней версии, который возвращает Stream
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
Использование изменений для всего контейнера
Если вы не предоставите параметр FeedRange
для FeedIterator
, можно будет обрабатывать ленту изменений всего контейнера в собственном темпе. Вот пример, который начинает считывать все изменения в режиме последней версии, начиная с текущего времени.
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Поскольку веб-канал изменений фактически является бесконечным списком элементов, охватывающим все будущие записи и обновления, значение HasMoreResults
всегда true
. При попытке прочитать фид изменений и при отсутствии новых изменений, вы получите ответ с NotModified
статусом. В предыдущем примере это обрабатывается путем ожидания в течение пяти секунд перед повторной проверкой изменений.
Потребление изменений для ключа раздела
В некоторых случаях может потребоваться обработать только изменения для определенного ключа раздела. Для определенного ключа секции можно получить FeedIterator
и обработать изменения так же, как и для всего контейнера.
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Использование FeedRange для параллелизации
Обработчик канала изменений автоматически распределяет работу между несколькими потребителями. В модели извлечения канала изменений можно использовать FeedRange
для параллелизации обработки канала изменений.
FeedRange
представляет диапазон значений ключа разбивки.
Ниже приведен пример, в который показано, как получить список диапазонов для контейнера:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
При получении списка значений FeedRange
для контейнера вы получите по одному FeedRange
на физическую секцию.
С помощью средства FeedRange
можно создать FeedIterator
для параллелизации обработки канала изменений на нескольких компьютерах или потоках. В отличие от предыдущего примера, в котором показано, как получить FeedIterator
для всего контейнера или одного ключа раздела, с помощью FeedRanges вы можете получить несколько FeedIterator'ов, способных обрабатывать поток изменений параллельно.
Если вы хотите использовать FeedRanges, вам нужно создать процесс оркестрации, который получает экземпляры FeedRanges и распределяет их между компьютерами. Это распределение может быть следующим:
- Используйте
FeedRange.ToJsonString
и распространяйте это строковое значение. Потребители могут использовать это значение с FeedRange.FromJsonString
.
- Если распределение происходит в процессе, передавайте ссылку на объект
FeedRange
.
Ниже приведен пример, показывающий, как считывать с начала канала изменений контейнера с помощью двух гипотетических отдельных компьютеров, которые считывают параллельно:
компьютер 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
компьютер 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
Сохранение маркеров продолжения
Вы можете сохранить текущее расположение FeedIterator
, получив маркер продолжения. Маркер продолжения — это строковое значение, которое фиксирует последние изменения, обработанные в FeedIterator, и позволяет FeedIterator
продолжить выполнение с этой точки. Маркер продолжения, если он указан, имеет приоритет над временем начала и начинается с начальных значений. Следующий код считывает ленту изменений с момента создания контейнера. После того, как не останется доступных изменений, система сохранит маркер продолжения, чтобы позднее возобновить обработку этого потока изменений.
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
При использовании режима последней версии FeedIterator
маркер продолжения никогда не истекает до тех пор, пока контейнер Azure Cosmos DB по-прежнему существует. Если вы используете режим всех версий и удаления, маркер продолжения FeedIterator
действителен до тех пор, пока изменения произошли в периоде хранения для непрерывных резервных копий.
Чтобы обработать канал изменений с помощью модели извлечения, создайте экземпляр Iterator<FeedResponse<JsonNode>> responseIterator
. При создании CosmosChangeFeedRequestOptions
необходимо указать, где начать чтение канала изменений, и указать параметр FeedRange
, который вы хотите использовать. Это FeedRange
диапазон ключевых значений секции, определяющий элементы, которые можно считывать из потока изменений.
Если вы хотите прочитать веб-канал изменений в режиме всех версий и удалений, необходимо также указать allVersionsAndDeletes()
при создании CosmosChangeFeedRequestOptions
. Все версии и режим удаления не поддерживают обработку потока изменений с начала или с определенного момента времени. Необходимо либо обработать изменения, начиная с этого момента, либо с маркера продолжения. Все версии и режим удаления доступны в предварительной версии и доступен в версии >пакета SDK для Java = 4.42.0
.
Использование изменений для всего контейнера
При указании FeedRange.forFullRange()
вы можете обрабатывать ленту изменений для всего контейнера в удобном для вас темпе. При необходимости можно указать значение в byPage()
. Если это свойство задано, оно устанавливает максимальное число полученных элементов на страницу.
Примечание.
Все приведенные ниже фрагменты кода взяты из примеров в GitHub. Вы можете использовать пример режима последней версии и пример режима всех версий и удалений .
Ниже приведен пример получения значения в режиме последней responseIterator
версии:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Ниже приведен пример получения responseIterator
в режиме всех версий и удаления:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.allVersionsAndDeletes();
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
Затем можно выполнить итерацию по результатам. Поскольку веб-канал изменений фактически является бесконечным списком элементов, охватывающим все будущие записи и обновления, значение responseIterator.hasNext()
всегда true
равно. Ниже приведен пример в режиме последней версии, который считывает все изменения, начиная с начала. Каждая итерация сохраняет маркер продолжения после обработки всех событий. Он выбирается из последней обработанной точки в канале изменений и обрабатывается с помощью createForProcessingFromContinuation
:
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Обработка изменений разделительного ключа.
В некоторых случаях может потребоваться обработать только изменения для определенного ключа раздела. Изменения для определенного ключа раздела можно обрабатывать так же, как и изменения для всего контейнера. Ниже приведен пример использования последней версии:
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
Использование FeedRange для параллелизации
Обработчик канала изменений автоматически распределяет работу между несколькими потребителями. В модели извлечения канала изменений можно использовать FeedRange
для параллелизации обработки канала изменений.
FeedRange
представляет диапазон значений ключа разбивки.
Ниже приведен пример, в котором используется последний режим версии, в котором показано, как получить список диапазонов для контейнера:
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
При получении списка диапазонов потока для контейнера вы получите по одному FeedRange
на каждый физический раздел.
С помощью функции FeedRange
можно параллелизировать обработку канала изменений на нескольких компьютерах или потоках. В отличие от предыдущего примера, в котором показано, как обработать изменения для всего контейнера или одного ключа раздела, вы можете с помощью FeedRanges обрабатывать канал изменений параллельно.
Если вы хотите использовать FeedRanges, вам нужно создать процесс оркестрации, который получает экземпляры FeedRanges и распределяет их между компьютерами. Это распределение может быть следующим:
- Используйте
FeedRange.toString()
и распространяйте это строковое значение.
- Если распределение происходит в процессе, передавайте ссылку на объект
FeedRange
.
Ниже приведен пример, использующий последний режим версии. В нем показано, как читать с начала канала изменений контейнера с помощью двух гипотетических отдельных компьютеров, которые читают параллельно.
компьютер 1:
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
компьютер 2:
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
Чтобы обработать поток изменений с помощью модели вытягивания, создайте экземпляр responseIterator с типом ItemPaged[Dict[str, Any]]
.
При вызове API канала изменений необходимо указать, откуда начать чтение канала изменений и передать нужный feed_range
параметр.
Это feed_range
диапазон ключевых значений секции, определяющий элементы, которые можно считывать из потока изменений.
Можно также указать mode
параметр для режима канала изменений, в котором требуется обработать изменения: LatestVersion или AllVersionsAndDeletes (значение по умолчанию: ). LatestVersion
Используйте либо LatestVersion
либо AllVersionsAndDeletes
, чтобы указать, какой режим вы хотите использовать для чтения канала изменений.
При использовании AllVersionsAndDeletes
режима можно либо начать обработку изменений с этого момента, либо с маркера continuation
.
Чтение канала изменений с начала или с определённого момента времени с использованием start_time
не поддерживается.
Примечание.
AllVersionsAndDeletes
режим находится в режиме предварительной версии и доступен в пакете >SDK для Python = 4.9.1b1.
Использование изменений для всего контейнера
Если вы не предоставляете параметр feed_range
, вы можете обрабатывать весь журнал изменений контейнера в собственном темпе.
Примечание.
Все приведенные ниже фрагменты кода взяты из примеров в GitHub. Вы можете использовать примеры.
Ниже приведен пример получения responseIterator
в режиме LatestVersion
из Beginning
. Так как LatestVersion
это режим по умолчанию, mode
параметр не требуется передавать:
responseIterator = container.query_items_change_feed(start_time="Beginning")
Ниже приведен пример получения responseIterator
в режиме AllVersionsAndDeletes
из Now
. Поскольку Now
— это значение по умолчанию параметра start_time
, его не нужно передавать:
responseIterator = container.query_items_change_feed(mode="AllVersionsAndDeletes")
Затем можно выполнить итерацию по результатам. Поскольку канал изменений фактически является бесконечным списком элементов, охватывающим все будущие записи и обновления, responseIterator может зацикливаться бесконечно.
Ниже приведен пример в режиме последней версии, который считывает все изменения, начиная с начала.
Каждая итерация представляет изменения для документов.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
Обработка изменений разделительного ключа.
В некоторых случаях может потребоваться обработать только изменения для определенного ключа раздела.
Изменения можно обрабатывать так же, как и для всего контейнера с параметром partition_key
.
Ниже приведен пример использования LatestVersion
режима:
pk = "partition_key_value"
responseIterator = container.query_items_change_feed(start_time="Beginning", partition_key=pk)
for doc in responseIterator:
print(doc)
Использование FeedRange для параллелизации
В модели извлечения канала изменений можно использовать feed_range
для параллелизации обработки канала изменений.
feed_range
представляет диапазон значений ключа разбивки.
Ниже приведен пример, показывающий, как получить список диапазонов для контейнера.
list
команда преобразует итератор в список:
rangesIterator = container.read_feed_ranges(force_refresh=False)
ranges = list(rangesIterator)
При получении списка значений feed_range
для контейнера вы получите по одному feed_range
на физическую секцию.
С помощью итератора feed_range
можно создать итератор для параллелизации обработки канала изменений на нескольких компьютерах или потоках.
В отличие от предыдущего примера, в котором показано, как получить responseIterator
для всего контейнера или одного ключа раздела, можно использовать feed_range
для получения нескольких итераторов, которые могут параллельно обрабатывать поток изменений.
Ниже приведен пример, показывающий, как считывать с начала канала изменений контейнера с помощью двух гипотетических отдельных компьютеров, которые считывают параллельно:
компьютер 1:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[0])
for doc in responseIterator:
print(doc)
компьютер 2:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[1])
for doc in responseIterator:
print(doc)
Сохранение маркеров продолжения
Вы можете сохранить позицию итератора, получив маркер продолжения.
Маркер продолжения — это строковое значение, которое ведёт учёт responseIterator
последних обработанных изменений и позволяет итератору возобновить с этого места позже.
Маркер продолжения, если он указан, имеет приоритет над временем начала и начинается с начальных значений.
Следующий код считывает ленту изменений с момента создания контейнера.
После того, как не останется доступных изменений, система сохранит маркер продолжения, чтобы позднее возобновить обработку этого потока изменений.
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
continuation_token = container.client_connection.last_response_headers['etag']
Примечание.
Так как continuation
маркер содержит ранее использованный mode
параметр, если был использован continuation
, mode
параметр будет игнорироваться и использовать mode
из continuation
маркера.
Вот пример, который показывает, как считывать из канала изменений контейнера с использованием токена continuation
.
responseIterator = container.query_items_change_feed(continuation=continuation_token)
for doc in responseIterator:
print(doc)
Чтобы обработать канал изменений с помощью модели извлечения, создайте экземпляр ChangeFeedPullModelIterator
. При первоначальном создании ChangeFeedPullModelIterator
необходимо указать обязательное значение changeFeedStartFrom
внутри ChangeFeedIteratorOptions
, которое задаёт начальную позицию для чтения изменений и ресурс (ключ секции или FeedRange), для которого нужно получить изменения.
Примечание.
Если значение changeFeedStartFrom
не указано, то changefeed будет извлечён для всего контейнера, начиная с настоящего момента.
В настоящее время только последняя версия поддерживается пакетом SDK JS и выбрана по умолчанию.
При необходимости можно использовать maxItemCount
в ChangeFeedIteratorOptions
для задания максимального количества элементов на странице.
Ниже приведен пример получения итератора в режиме последней версии, возвращающего объекты сущностей:
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Now()
};
const iterator = container.items.getChangeFeedIterator(options);
Использование изменений для всего контейнера
Если вы не предоставляете параметр FeedRange
или PartitionKey
внутри ChangeFeedStartFrom
, вы можете обрабатывать весь поток изменений контейнера в своем темпе. Ниже приведен пример, который начинает считывать все изменения, начиная с текущего времени:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Поскольку веб-канал изменений фактически является бесконечным списком элементов, охватывающим все будущие записи и обновления, значение hasMoreResults
всегда true
. При попытке прочитать фид изменений и при отсутствии новых изменений, вы получите ответ с NotModified
статусом. В предыдущем примере это обрабатывается путем ожидания в течение пяти секунд перед повторной проверкой изменений.
Потребление изменений для ключа раздела
В некоторых случаях может потребоваться обработать только изменения для определенного ключа раздела. Вы можете запросить итератор для определенного ключа разбивки и обработать изменения так же, как и для всего контейнера.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("partitionKeyValue")
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Использование FeedRange для параллелизации
В модели извлечения канала изменений можно использовать FeedRange
для параллелизации обработки канала изменений.
FeedRange
представляет диапазон значений ключа разбивки.
Ниже приведен пример, в который показано, как получить список диапазонов для контейнера:
const ranges = await container.getFeedRanges();
При получении списка значений FeedRange
для контейнера вы получите по одному FeedRange
на физическую секцию.
С помощью итератора FeedRange
можно создать итератор для параллелизации обработки канала изменений на нескольких компьютерах или потоках. В отличие от предыдущего примера, в котором показано, как получить итератор для всего контейнера или одного ключа партиции, вы можете использовать FeedRanges для получения нескольких итераторов, которые могут параллельно обрабатывать ленту изменений.
Ниже приведен пример, показывающий, как считывать с начала канала изменений контейнера с помощью двух гипотетических отдельных компьютеров, которые считывают параллельно:
компьютер 1:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[0])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
компьютер 2:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[1])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
Сохранение маркеров продолжения
Вы можете сохранить позицию итератора, получив маркер продолжения. Маркер продолжения — это строковое значение, которое отслеживает последние обработанные изменения в потоке изменений и позволяет итератору возобновить работу с этого момента позже. Маркер продолжения, если он указан, имеет приоритет над временем начала и начинается с начальных значений. Следующий код считывает ленту изменений с момента создания контейнера. После того, как не останется доступных изменений, система сохранит маркер продолжения, чтобы позднее возобновить обработку этого потока изменений.
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
let continuation = "";
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
continuation = response.continuationToken;
break;
}
else {
console.log("Result found", response.result);
}
}
// For checking any new changes using the continuation token
const continuationOptions = {
changeFeedStartFrom: ChangeFeedStartFrom(continuation)
}
const newIterator = container.items.getChangeFeedIterator(continuationOptions);
Маркер продолжения никогда не истекает до тех пор, пока контейнер Azure Cosmos DB по-прежнему существует.
Использование AsyncIterator
Для получения канала изменений можно использовать Асинхронный итератор JavaScript. Ниже приведен пример использования Асинхронного итератора.
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
let timeout = 0;
for await(const result of container.items.getChangeFeedIterator(options).getAsyncIterator()) {
if (result.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", result.result);
timeout = 0;
}
await waitFor(timeout);
}