若要使用提取模型來處理變更摘要,請建立 FeedIterator 的執行個體。 一開始建立 FeedIterator 時,您必須指定必要的 ChangeFeedStartFrom 值,其中包含讀取變更的起始位置以及您想要用於 FeedRange 的值。
FeedRange 是分割區索引鍵值的範圍,並會使用特定的 FeedIterator 指定可從變更摘要讀取的項目。 您也必須指定您想要處理變更的模式的必要 ChangeFeedMode 值:最新版本或所有版本和刪除。 使用 ChangeFeedMode.LatestVersion 或 ChangeFeedMode.AllVersionsAndDeletes 來指出您想要用來讀取變更摘要的模式。 當您使用所有版本和刪除模式時,必須從 Now() 的值或從特定接續權杖開始選取變更摘要。
您可以選擇性指定 ChangeFeedRequestOptions 來設定 PageSizeHint。 完成設定後,此屬性會設定每個頁面可收到的項目數上限。 如果受監視集合中的作業是透過預存程序所執行,則從變更摘要中讀取項目時會保留交易範圍。 因此,所接收的項目數可能會高於指定的值,這樣同一次交易所變更的項目就會在一個不可部分完成的批次中傳回。
以下範例顯示如何以最新版本模式取得可傳回實體物件的 FeedIterator,在此案例中為 User 物件:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
小提示
對於早於 3.34.0的版本,您可以藉由設定 ChangeFeedMode.Incremental來使用最新版本模式。
Incremental和LatestVersion都指的是變更摘要的最新版本模式,使用任一模式的應用程式都會看到相同的行為。
所有版本和刪除模式處於預覽狀態,而且可以與預覽 .NET SDK 版本 >= 3.32.0-preview 搭配使用。 以下範例顯示如何在所有版本和刪除模式中取得可傳回 FeedIterator 物件的 User:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
透過串流取用變更摘要
這兩種變更摘要模式的 FeedIterator 都有兩個選項。 除了可傳回實體物件的範例之外,您也可以使用 Stream 支援來取得回應。 串流可讓您讀取資料,而不需要先將其還原序列化,因此可以節省用戶端資源。
以下範例顯示如何以最新版本模式取得可傳回 FeedIterator 的 Stream:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
取用整個容器的變更
如果您不將 FeedRange 參數提供給 FeedIterator,則可以依照自己的步調來處理整個容器的變更饋送。 以下範例使用最新版本模式來開始讀取目前時間起的所有變更:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
因為變更摘要實際上是包含所有未來寫入和更新的無止盡項目清單,所以 HasMoreResults 的值一律為 true。 嘗試讀取變更摘要而且沒有可用的新變更時,您會收到含有 NotModified 狀態的回應。 這與收到不含變更且具有 OK 狀態的回應不同。 當有更多變更可用時,可能會取得空白的變更摘要回應,而且您應該繼續輪詢,直到收到 NotModified為止。 在上述範例中, NotModified 透過等待五秒鐘再重新檢查變更來處理。
取用分割區索引鍵的變更
在某些情況下,您可能只想要處理特定分割區索引鍵的變更。 您可以取得特定分割區索引鍵的 FeedIterator,並使用與您對整個容器所使用的相同方式處理變更。
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
使用 FeedRange 進行平行處理
在變更摘要處理器 (部分機器翻譯) 中,工作會自動分散至多個取用者。 在變更摘要提取模型中,您可以使用 FeedRange 來平行處理變更摘要的處理。
FeedRange 代表分割區索引鍵值的範圍。
以下範例顯示如何取得容器的範圍清單:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
當您取得容器的 FeedRange 值清單時,每個FeedRange (部分機器翻譯) 都會得到一個 。
使用 FeedRange,您可以建立 FeedIterator,以將變更回饋的處理平行化至多部機器或執行緒。 不同於先前範例 (顯示如何取得整個容器或單一分割區索引鍵的 FeedIterator),您可以使用 FeedRange 來取得多個可平行處理變更摘要的 FeedIterator。
在您想要使用 FeedRange 的案例中,您需要有可取得 FeedRange 並將其散發到這些機器的協調器流程。 此散發可能會:
- 使用
FeedRange.ToJsonString 並散發此字串值。 消費者可以將此值與 FeedRange.FromJsonString 搭配使用。
- 如果分發正在處理中,則傳遞
FeedRange 物件參考。
以下範例顯示如何使用兩個會平行讀取的假設個別機器,從容器的變更摘要開頭進行讀取:
電腦 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
電腦 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
儲存接續權杖
您可以藉由取得接續權杖來儲存 FeedIterator 的位置。 延續標記是一個字串值,用來追蹤 FeedIterator 最後處理過的變更,並允許 FeedIterator 在此時間點之後繼續執行。 接續權杖 (若有指定) 的優先順序高過開始時間值和從頭開始值。 下列程式碼會讀取建立容器之後的變更摘要。 在沒有其他的變更可用之後,其會保存接續權杖,讓變更摘要耗用量可以在稍後繼續。
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
當您要使用最新版本模式時,只要 Azure Cosmos DB 容器仍然存在,FeedIterator 接續權杖就永遠不會過期。 當您使用所有版本和刪除模式時,只要在連續備份的保留時間範圍內有發生變更,FeedIterator 接續權杖就會有效。
若要使用提取模型來處理變更摘要,請建立 Iterator<FeedResponse<JsonNode>> responseIterator 的執行個體。 建立 CosmosChangeFeedRequestOptions 時,您必須指定要從何處開始讀取變更摘要,並傳遞您想要使用的 FeedRange 參數。
FeedRange 是分割區鍵值範圍,用於指定可從更改資料流中讀取的項目。
如果您想要以所有版本和刪除模式讀取變更摘要,則也必須在建立 allVersionsAndDeletes() 時指定 CosmosChangeFeedRequestOptions。 所有版本和刪除模式不支援處理開頭或某個時間點的變更摘要。 您必須從現在或從接續權杖處理變更。 所有版本和刪除模式處於預覽狀態,而且可以在 Java SDK 版本 >= 4.42.0 中使用。
取用整個容器的變更
如果您指定 FeedRange.forFullRange(),則可以依自己的步調處理整個容器的變更摘要。 您可以在 byPage() 中選擇性指定值。 完成設定後,此屬性會設定每個頁面可收到的項目數上限。
以下範例顯示如何以最新版本模式取得 responseIterator 值:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
以下範例顯示如何以所有版本和刪除模式取得 responseIterator:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.allVersionsAndDeletes();
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
我們接著可以逐一查看結果。 因為變更摘要實際上是包含所有未來寫入和更新的無止盡項目清單,所以 responseIterator.hasNext() 的值一律為 true。 以下是以最新版本模式運行的範例,會從頭開始讀取所有變更。 每次反覆運算都會在處理所有事件之後持續保存接續權杖。 其會從變更摘要中的最後一個已處理點進行挑選,並使用 createForProcessingFromContinuation 進行處理:
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
取用分割區索引鍵的變更
在某些情況下,您可能只想要處理特定分割區索引鍵的變更。 您可以使用用於整個容器的相同方式來處理特定分割區索引鍵的變更。 以下是使用最新版本模式的範例:
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
使用 FeedRange 進行平行處理
在變更摘要處理器 (部分機器翻譯) 中,工作會自動分散至多個取用者。 在變更摘要提取模型中,您可以使用 FeedRange 來平行處理變更摘要的處理。
FeedRange 代表分割區索引鍵值的範圍。
以下是使用最新版本模式的範例,顯示如何取得容器的範圍清單:
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
當您取得容器的 FeedRange 清單時,每個FeedRange (部分機器翻譯) 都會得到一個 。
藉由使用 FeedRange,您可以在多個機器或執行緒平行處理變更摘要。 與先前範例顯示如何處理整個容器或單一分割區索引鍵的變更不同,您可以使用 FeedRange 來平行處理變更摘要。
在您想要使用 FeedRange 的案例中,您需要有可取得 FeedRange 並將其散發到這些機器的協調器流程。 此散發可能會:
- 使用
FeedRange.toString() 並散發此字串值。
- 如果分發正在處理中,則傳遞
FeedRange 物件參考。
以下是使用最新版本模式的範例。 其會顯示如何使用兩個會平行讀取的假設個別機器,從容器的變更摘要開頭進行讀取:
電腦 1:
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
電腦 2:
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
若要使用提取模型處理變更摘要,請使用 類型 ItemPaged[Dict[str, Any]]建立 responseIterator 實例。
當您呼叫變更摘要 API 時,您必須指定要從何處開始讀取變更摘要,並傳遞 feed_range 您想要使用的參數。
feed_range 是分割區鍵值範圍,用於指定可從更改資料流中讀取的項目。
您也可以指定要處理變更之變更摘要模式的 mode 參數:LatestVersion 或 AllVersionsAndDeletes。 預設值是 LatestVersion。
使用 LatestVersion 或 AllVersionsAndDeletes 來指出您想要用來讀取變更摘要的模式。
當您使用 AllVersionsAndDeletes 模式時,您可以從現在開始處理變更,或從 continuation 令牌開始處理變更。
不支援從開頭或從某個時間點 (使用 start_time) 讀取變更摘要。
取用整個容器的變更
如果您沒有提供 feed_range 參數,您可以按照自己的步調讀取整個容器的變更記錄。
以下是如何在responseIterator模式中從LatestVersion取得Beginning的範例。 由於 LatestVersion 是預設模式, mode 因此不需要傳遞 參數:
responseIterator = container.query_items_change_feed(start_time="Beginning")
以下是如何在responseIterator模式下,從AllVersionsAndDeletes取得Now的範例,由於Now是start_time參數的預設值,因此不需要傳遞:
responseIterator = container.query_items_change_feed(mode="AllVersionsAndDeletes")
我們接著可以逐一查看結果。 由於變更摘要實際上是包含所有未來寫入和更新的無止盡項目清單,所以 responseIterator 可以無限重複。
以下是以最新版本模式運行的範例,會從頭開始讀取所有變更。
每次反覆運算都會列印文件的變更摘要。
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
取用分割區索引鍵的變更
在某些情況下,您可能只想要處理特定分割區索引鍵的變更。
您可以使用 partition_key 參數,以與處理整個容器相同的方式來處理變更。
以下是使用 LatestVersion 模式的範例:
pk = "partition_key_value"
responseIterator = container.query_items_change_feed(start_time="Beginning", partition_key=pk)
for doc in responseIterator:
print(doc)
使用 FeedRange 進行平行處理
在變更摘要提取模型中,您可以使用 feed_range 來平行處理變更摘要的處理。
feed_range 代表分割區索引鍵值的範圍。
以下是示範如何取得容器範圍清單的範例。
list 命令會將迭代器轉換成清單:
rangesIterator = container.read_feed_ranges(force_refresh=False)
ranges = list(rangesIterator)
當您取得容器的 feed_range 值清單時,每個feed_range (部分機器翻譯) 都會得到一個 。
透過使用 feed_range,您可以創建一個迭代器,以便在多台機器或多個執行緒上並行處理變更饋送。
與先前展示如何取得 responseIterator 的整個容器或單一分割區金鑰的範例不同,您可以使用 feed_range 來取得多個迭代器,以平行方式處理變更資料流。
以下範例顯示如何使用兩個會平行讀取的假設個別機器,從容器的變更摘要開頭進行讀取:
電腦 1:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[0])
for doc in responseIterator:
print(doc)
電腦 2:
responseIterator = container.query_items_change_feed(start_time="Beginning", feed_range=ranges[1])
for doc in responseIterator:
print(doc)
儲存接續權杖
您可以取得續傳標記來保存迭代器的位置。
接續權杖是一個字串值,可追蹤 responseIterator 上次處理的變更,並讓迭代器之後從這個時間點繼續執行。
接續權杖 (若有指定) 的優先順序高過開始時間值和從頭開始值。
下列程式碼會讀取建立容器之後的變更摘要。
在沒有其他的變更可用之後,其會保存接續權杖,讓變更摘要耗用量可以在稍後繼續。
responseIterator = container.query_items_change_feed(start_time="Beginning")
for doc in responseIterator:
print(doc)
continuation_token = container.client_connection.last_response_headers['etag']
備註
由於 continuation 標記包含先前使用 mode 的參數,如果使用 continuation,則 mode 參數會被忽略,並改為使用來自 mode 標記的 continuation。
以下範例示範如何使用 continuation 權杖從容器的變更摘要進行讀取:
responseIterator = container.query_items_change_feed(continuation=continuation_token)
for doc in responseIterator:
print(doc)
若要使用提取模型來處理變更摘要,請建立 ChangeFeedPullModelIterator 的執行個體。 當您一開始建立 ChangeFeedPullModelIterator 時,必須在 changeFeedStartFrom 中指定必要的 ChangeFeedIteratorOptions 值,這其中包含讀取變更的起始位置,以及用來擷取變更的資源(分割區金鑰或 FeedRange)。
備註
如果changeFeedStartFrom未指定任何值,則會從Now()查詢整個容器的變更記錄。
目前,JavaScript SDK 僅支援 最新版本 ,且預設為選取。
您可以在 maxItemCount 中選擇性使用 ChangeFeedIteratorOptions,以設定每個頁面收到的項目數上限。
以下範例顯示如何以最新版本模式取得可傳回實體物件的迭代器:
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Now()
};
const iterator = container.items.getChangeFeedIterator(options);
取用整個容器的變更
如果您未在 FeedRange 內提供 PartitionKey 或 ChangeFeedStartFrom 參數,則可以依照自己的步調處理整個容器的變更摘要。 以下範例顯示如何從目前時間開始讀取所有變更:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
因為變更摘要實際上是包含所有未來寫入和更新的無止盡項目清單,所以 hasMoreResults 的值一律為 true。 嘗試讀取變更摘要而且沒有可用的新變更時,您會收到含有 NotModified 狀態的回應。 這與收到不含變更且具有 OK 狀態的回應不同。 當有更多變更可用時,可能會取得空白的變更摘要回應,而且您應該繼續輪詢,直到收到 NotModified為止。 在上述範例中, NotModified 透過等待五秒鐘再重新檢查變更來處理。
取用分割區索引鍵的變更
在某些情況下,您可能只想要處理特定分割區索引鍵的變更。 您可以取得特定分割區索引鍵的迭代器,並使用與您對整個容器所使用的相同方式處理變更。
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("partitionKeyValue")
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
使用 FeedRange 進行平行處理
在變更摘要提取模型中,您可以使用 FeedRange 來平行處理變更摘要的處理。
FeedRange 代表分割區索引鍵值的範圍。
以下範例顯示如何取得容器的範圍清單:
const ranges = await container.getFeedRanges();
當您取得容器的 FeedRange 值清單時,每個FeedRange (部分機器翻譯) 都會得到一個 。
透過使用 FeedRange,您可以創建一個迭代器,以便在多台機器或多個執行緒上並行處理變更饋送。 與先前示範如何取得整個容器或單一資料分割索引鍵的變更摘要列舉程式不同,您可以使用FeedRanges 取得多個列舉程式,以平行方式處理變更摘要。
以下範例顯示如何使用兩個會平行讀取的假設個別機器,從容器的變更摘要開頭進行讀取:
電腦 1:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[0])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
電腦 2:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[1])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
儲存接續權杖
您可以藉由取得接續令牌來儲存迭代器的位置。 接續權杖是一個字串值,可追蹤變更摘要反列舉程式上次處理的變更,並允許列舉程式稍後從該點繼續執行。 接續權杖 (若有指定) 的優先順序高過開始時間值和從頭開始值。 下列程式碼會讀取建立容器之後的變更摘要。 在沒有其他的變更可用之後,其會保存接續權杖,讓變更摘要耗用量可以在稍後繼續。
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
let continuation = "";
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
continuation = response.continuationToken;
break;
}
else {
console.log("Result found", response.result);
}
}
// For checking any new changes using the continuation token
const continuationOptions = {
changeFeedStartFrom: ChangeFeedStartFrom(continuation)
}
const newIterator = container.items.getChangeFeedIterator(continuationOptions);
只要 Azure Cosmos DB 容器仍然存在,接續令牌就永遠不會過期。
使用 AsyncIterator
您可以使用 JavaScript AsyncIterator 來擷取變更摘要。 以下是 的 AsyncIterator範例。
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
let timeout = 0;
for await(const result of container.items.getChangeFeedIterator(options).getAsyncIterator()) {
if (result.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", result.result);
timeout = 0;
}
await waitFor(timeout);
}