若要使用提取模型來處理變更摘要,請建立 FeedIterator
執行個體。 一開始建立 FeedIterator
時,您必須指定必要的 ChangeFeedStartFrom
值,其中包含讀取變更的起始位置以及您想要用於 FeedRange
的值。 FeedRange
是分割區索引鍵值的範圍,並使用特定 FeedIterator
來指定可從變更摘要中讀取的項目。 您也必須指定您想要處理變更的模式的必要 ChangeFeedMode
值:最新版本或所有版本和刪除。 使用 ChangeFeedMode.LatestVersion
或 ChangeFeedMode.AllVersionsAndDeletes
來指出您想要用來讀取變更摘要的模式。 當您使用所有版本和刪除模式時,必須從 Now()
或特定接續權杖的值開始選取變更摘要。
您可以選擇性指定 ChangeFeedRequestOptions
來設定 PageSizeHint
。 完成設定後,此屬性會設定每個頁面可收到的項目數上限。 如果受監視集合中的作業是透過預存程序所執行,則從變更摘要中讀取項目時會保留交易範圍。 因此,所接收的項目數可能會高於指定的值,這樣同一次交易所變更的項目就會在一個不可部分完成的批次中傳回。
以下範例顯示如何以最新版本模式取得可傳回實體物件的 FeedIterator
,在此案例中為 User
物件:
FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
提示
在 3.34.0
版之前,設定 ChangeFeedMode.Incremental
即可使用最新版本模式。 Incremental
和 LatestVersion
都參照變更摘要的最新版本模式,而且使用任一模式的應用程式都會看到相同的行為。
所有版本和刪除模式處於預覽狀態,而且可以與預覽 .NET SDK 版本 >= 3.32.0-preview
搭配使用。 以下範例顯示如何在所有版本和刪除模式中取得可傳回 User
物件的 FeedIterator
:
FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);
透過串流來使用變更摘要
這兩種變更摘要模式的FeedIterator
都有兩個選項。 除了可傳回實體物件的範例之外,您也可以使用 Stream
支援來取得回應。 串流可讓您讀取資料,而不需要先將其還原序列化,因此您必須儲存至用戶端資源。
以下範例顯示如何以最新版本模式取得可傳回 Stream
的 FeedIterator
:
FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
使用整個容器的變更
如果您未將 FeedRange
參數提供給 FeedIterator
,則可以依照自己的步調來處理整個容器的變更摘要。 以下範例使用最新版本模式來開始讀取目前時間起的所有變更:
FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);
while (iteratorForTheEntireContainer.HasMoreResults)
{
FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
因為變更摘要實際上是包含所有未來寫入和更新的無止盡項目清單,所以 HasMoreResults
的值一律為 true
。 嘗試讀取變更摘要而且沒有可用的新變更時,您會收到含有 NotModified
狀態的回應。 在先前的範例中,處理方式是先等候 5 秒,再重新檢查變更。
使用分割區索引鍵的變更
在某些情況下,您可能只想要處理特定分割區索引鍵的變更。 您可以取得特定分割區索引鍵的 FeedIterator
,並使用與您對整個容器相同的方式來處理變更。
FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));
while (iteratorForThePartitionKey.HasMoreResults)
{
FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
使用 FeedRange 進行平行處理
在變更摘要處理器中,工作會自動分散至多個取用者。 在變更摘要提取模型中,您可以使用 FeedRange
來平行處理變更摘要的處理。 FeedRange
代表分割區索引鍵值的範圍。
以下範例顯示如何取得容器的範圍清單:
IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();
當您取得容器的 FeedRange
值清單時,每個實體分割區都會取得一個 FeedRange
。
使用 FeedRange
,您可以建立 FeedIterator
,以將變更摘要的處理平行處理至多部機器或執行緒。 不同於先前範例示範如何取得整個容器或單一分割區索引鍵的 FeedIterator
,您可以使用 FeedRange 來取得可平行處理變更摘要的多個 FeedIterator。
在您想要使用 FeedRanges 的案例中,您需要有可取得 FeedRanges 的協調器處理,並將其散發到這些電腦。 此散發可能是:
- 使用
FeedRange.ToJsonString
並散發此字串值。 取用者可以搭配使用此值與 FeedRange.FromJsonString
。
- 如果是內含式散發,則傳遞
FeedRange
物件參考。
以下範例顯示如何使用兩個平行讀取的假設個別機器,以從容器的變更摘要開頭進行讀取:
電腦 1:
FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
電腦 2:
FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
FeedResponse<User> response = await iteratorA.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
await Task.Delay(TimeSpan.FromSeconds(5));
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
儲存接續權杖
您可以取得接續權杖來儲存 FeedIterator
的位置。 接續權杖是一個字串值,可追蹤 FeedIterator 上次處理的變更,並允許 FeedIterator
之後從這個時間點繼續執行。 接續權杖 (若指定) 優先於開始時間和從頭開始值。 下列程式碼會讀取建立容器之後的變更摘要。 在沒有其他的變更可用之後,其會保存接續權杖,讓變更摘要耗用量可以在稍後繼續。
FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);
string continuation = null;
while (iterator.HasMoreResults)
{
FeedResponse<User> response = await iterator.ReadNextAsync();
if (response.StatusCode == HttpStatusCode.NotModified)
{
Console.WriteLine($"No new changes");
continuation = response.ContinuationToken;
// Stop the consumption since there are no new changes
break;
}
else
{
foreach (User user in response)
{
Console.WriteLine($"Detected change for user with id {user.id}");
}
}
}
// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);
當您要使用最新版本模式時,只要 Azure Cosmos DB 容器仍然存在,FeedIterator
接續權杖就永遠不會過期。 當您要使用所有版本和刪除模式時,只要連續備份保留時間範圍內發生變更,FeedIterator
接續權杖就會有效。
若要使用提取模型來處理變更摘要,請建立 Iterator<FeedResponse<JsonNode>> responseIterator
執行個體。 建立 CosmosChangeFeedRequestOptions
時,您必須指定要從何處開始讀取變更摘要,並傳遞您想要使用的 FeedRange
參數。 FeedRange
是分割區索引鍵值的範圍,而此範圍指定可從變更摘要中讀取的項目。
如果您想要以所有版本和刪除模式讀取變更摘要,則也必須在建立 CosmosChangeFeedRequestOptions
時指定 allVersionsAndDeletes()
。 所有版本和刪除模式不支援處理開頭或某個時間點的變更摘要。 您必須從現在或從接續權杖來處理變更。 所有版本和刪除模式處於預覽狀態,而且可以在 Java SDK 版本 >= 4.42.0
中使用。
使用整個容器的變更
如果您指定 FeedRange.forFullRange()
,則可以依自己的步調來處理整個容器的變更摘要。 您可以在 byPage()
中選擇性指定值。 完成設定後,此屬性會設定每個頁面可收到的項目數上限。
以下範例顯示如何以最新版本模式取得 responseIterator
值:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forFullRange());
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
以下範例顯示如何以所有版本和刪除模式取得 responseIterator
:
CosmosChangeFeedRequestOptions options = CosmosChangeFeedRequestOptions
.createForProcessingFromNow(FeedRange.forFullRange())
.allVersionsAndDeletes();
Iterator<FeedResponse<JsonNode>> responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
我們接著可以逐一查看結果。 因為變更摘要實際上是包含所有未來寫入和更新的無止盡項目清單,所以 responseIterator.hasNext()
的值一律為 true
。 以下最新版本模式範例會從頭開始讀取所有變更。 每次反覆運算都會在處理所有事件之後持續保存接續權杖。 其會從變更摘要中的最後一個已處理點進行挑選,並使用 createForProcessingFromContinuation
進行處理:
int i = 0;
List<JsonNode> results;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s)");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
i++;
if (i >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
使用分割區索引鍵的變更
在某些情況下,您可能只想要處理特定分割區索引鍵的變更。 您可以使用用於整個容器的相同方式來處理特定分割區索引鍵的變更。 以下是使用最新版本模式的範例:
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(FeedRange.forLogicalPartition(new PartitionKey(partitionKey)));
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int pkIndex = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
pkIndex++;
if (pkIndex >= 5) {
// artificially breaking out of loop
System.out.println("breaking....");
break;
}
}
使用 FeedRange 進行平行處理
在變更摘要處理器中,工作會自動分散至多個取用者。 在變更摘要提取模型中,您可以使用 FeedRange
來平行處理變更摘要的處理。 FeedRange
代表分割區索引鍵值的範圍。
以下是使用最新版本模式的範例,顯示如何取得容器的範圍清單:
Mono<List<FeedRange>> feedranges = resources.container.getFeedRanges();
List<FeedRange> feedRangeList = feedranges.block();
當您取得容器的 FeedRanges 清單時,每個實體分割區都會取得一個 FeedRange
。
使用 FeedRange
,您可以將變更摘要的處理平行處理至多部電腦或執行緒。 與先前範例顯示如何處理整個容器或單一分割區索引鍵的變更不同,您可以使用 FeedRange 來平行處理變更摘要。
在您想要使用 FeedRanges 的案例中,您需要有可取得 FeedRanges 的協調器處理,並將其散發到這些電腦。 此散發可能是:
- 使用
FeedRange.toString()
並散發此字串值。
- 如果是內含式散發,則傳遞
FeedRange
物件參考。
以下是使用最新版本模式的範例。 其顯示如何使用兩個平行讀取的假設個別機器,以從容器的變更摘要開頭進行讀取:
電腦 1:
FeedRange range1 = feedRangeList.get(0);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range1);
int machine1index = 0;
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine1index++;
if (machine1index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
電腦 2:
FeedRange range2 = feedRangeList.get(1);
options = CosmosChangeFeedRequestOptions
.createForProcessingFromBeginning(range2);
responseIterator = container
.queryChangeFeed(options, JsonNode.class)
.byPage()
.toIterable()
.iterator();
int machine2index = 0;
while (responseIterator.hasNext()) {
FeedResponse<JsonNode> response = responseIterator.next();
results = response.getResults();
logger.info("Got " + results.size() + " items(s) retrieved");
// applying the continuation token
// only after processing all events
options = CosmosChangeFeedRequestOptions
.createForProcessingFromContinuation(response.getContinuationToken());
machine2index++;
if (machine2index >= 5) {
// artificially breaking out of loop - not required in a real app
System.out.println("breaking....");
break;
}
}
若要使用提取模型來處理變更摘要,請建立 ChangeFeedPullModelIterator
執行個體。 一開始建立 ChangeFeedPullModelIterator
時,必須在 ChangeFeedIteratorOptions
內指定必要的 changeFeedStartFrom
值,其包含讀取變更的起始位置以及要擷取其變更的資源 (分割區索引鍵或 FeedRange)。
注意
如果未指定 changeFeedStartFrom
值,則會從 Now() 擷取整個容器的變更摘要。
目前,JS SDK 僅支援最新版本,而且預設會予以選取。
您可以在 ChangeFeedIteratorOptions
中選擇性使用 maxItemCount
,以設定每個頁面收到的項目數上限。
以下範例顯示如何以最新版本模式取得可傳回實體物件的迭代器:
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Now()
};
const iterator = container.items.getChangeFeedIterator(options);
使用整個容器的變更
如果您未提供 ChangeFeedStartFrom
內的 FeedRange
或 PartitionKey
參數,則可以依照自己的步調來處理整個容器的變更摘要。 以下範例會開始讀取目前時間起的所有變更:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
因為變更摘要實際上是包含所有未來寫入和更新的無止盡項目清單,所以 hasMoreResults
的值一律為 true
。 嘗試讀取變更摘要而且沒有可用的新變更時,您會收到含有 NotModified
狀態的回應。 在先前的範例中,處理方式是先等候 5 秒,再重新檢查變更。
使用分割區索引鍵的變更
在某些情況下,您可能只想要處理特定分割區索引鍵的變更。 您可以取得特定分割區索引鍵的迭代器,並使用與您對整個容器相同的方式來處理變更。
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning("partitionKeyValue")
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
使用 FeedRange 進行平行處理
在變更摘要提取模型中,您可以使用 FeedRange
來平行處理變更摘要的處理。 FeedRange
代表分割區索引鍵值的範圍。
以下範例顯示如何取得容器的範圍清單:
const ranges = await container.getFeedRanges();
當您取得容器的 FeedRange
值清單時,每個實體分割區都會取得一個 FeedRange
。
使用 FeedRange
,您可以建立迭代器,以將變更摘要的處理平行處理至多部機器或執行緒。 與先前範例顯示如何取得整個容器或單一分割區索引鍵的迭代器不同,您可以使用 FeedRanges 來取得可平行處理變更摘要的多個迭代器。
以下範例顯示如何使用兩個平行讀取的假設個別機器,以從容器的變更摘要開頭進行讀取:
電腦 1:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[0])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
電腦 2:
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning(ranges[1])
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", response.result);
timeout = 0;
}
await waitFor(timeout);
}
儲存接續權杖
您可以取得接續權杖來儲存迭代器的位置。 接續權杖是一個字串值,可追蹤變更摘要迭代器上次處理的變更,並允許迭代器之後從這個時間點繼續執行。 接續權杖 (若指定) 優先於開始時間和從頭開始值。 下列程式碼會讀取建立容器之後的變更摘要。 在沒有其他的變更可用之後,其會保存接續權杖,讓變更摘要耗用量可以在稍後繼續。
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
const iterator = container.items.getChangeFeedIterator(options);
let timeout = 0;
let continuation = "";
while(iterator.hasMoreResults) {
const response = await iterator.readNext();
if (response.statusCode === StatusCodes.NotModified) {
continuation = response.continuationToken;
break;
}
else {
console.log("Result found", response.result);
}
}
// For checking any new changes using the continuation token
const continuationOptions = {
changeFeedStartFrom: ChangeFeedStartFrom(continuation)
}
const newIterator = container.items.getChangeFeedIterator(continuationOptions);
只要 Azure Cosmos DB 容器仍然存在,接續權杖就永遠不會過期。
使用 AsyncIterator
您可以使用 JavaScript Async Iterator 來擷取變更摘要。 以下是使用 Async Iterator 的範例。
async function waitFor(milliseconds: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, milliseconds));
}
const options = {
changeFeedStartFrom: ChangeFeedStartFrom.Beginning()
};
let timeout = 0;
for await(const result of container.items.getChangeFeedIterator(options).getAsyncIterator()) {
if (result.statusCode === StatusCodes.NotModified) {
timeout = 5000;
}
else {
console.log("Result found", result.result);
timeout = 0;
}
await waitFor(timeout);
}