Azure Cosmos DB 中的變更摘要提取模型

適用於:NoSQL

您可以使用變更摘要提取模型,依自己的步調來使用 Azure Cosmos DB 變更摘要。 與變更摘要處理器類似,您可以使用變更摘要提取模型來平行處理多個變更摘要取用者之間的變更。

與變更摘要處理器進行比較

許多情節都可以使用變更摘要處理器或變更摘要提取模型來處理變更摘要。 提取模型的接續權杖和變更摘要處理器的租用容器,都是變更摘要中最後已處理項目或項目批次的書籤。

不過,您無法將接續權杖轉換成租用,反之亦然。

注意

大部分情況下,需要從變更摘要中進行讀取時,最簡單的選項是使用變更摘要處理器

在這些情況下,您應該考慮使用提取模型:

  • 讀取特定分割區索引鍵的變更。
  • 控制用戶端接收變更以進行處理的步調。
  • 在變更摘要中對現有資料執行一次性讀取 (例如,進行資料移轉)。

以下是變更摘要處理器與變更摘要提取模型之間的一些主要差異:

功能 變更摘要處理器 變更摘要提取模型
追蹤處理變更摘要的目前時間點 租用 (儲存在 Azure Cosmos DB 容器中) 接續權杖 (儲存在記憶體中或以手動方式保存)
重新執行過去變更的能力 是,使用推送模型 是,使用提取模型
輪詢未來的變更 根據使用者指定的 WithPollInterval 值來自動檢查變更 手動
沒有新變更時的行為 自動等候 WithPollInterval 的值,然後重新檢查 必須檢查狀態並手動重新檢查
處理整個容器的變更 是,自動平行處理來自相同容器的多個執行緒和機器 是,使用 FeedRange 手動平行處理
只處理單一分割區索引鍵的變更 不支援 Yes

注意

當您使用提取模型時,與使用變更摘要處理器進行讀取時不同,您必須明確處理沒有任何新變更的情況。

使用提取模型

若要使用提取模型來處理變更摘要,請建立 FeedIterator 執行個體。 一開始建立 FeedIterator 時,您必須指定必要的 ChangeFeedStartFrom 值,其中包含讀取變更的起始位置以及您想要用於 FeedRange 的值。 FeedRange 是分割區索引鍵值的範圍,並使用特定 FeedIterator 來指定可從變更摘要中讀取的項目。 您也必須指定您想要處理變更的模式的必要 ChangeFeedMode 值:最新版本所有版本和刪除。 使用 ChangeFeedMode.LatestVersionChangeFeedMode.AllVersionsAndDeletes 來指出您想要用來讀取變更摘要的模式。 當您使用所有版本和刪除模式時,必須從 Now() 或特定接續權杖的值開始選取變更摘要。

您可以選擇性指定 ChangeFeedRequestOptions 來設定 PageSizeHint。 完成設定後,此屬性會設定每個頁面可收到的項目數上限。 如果受監視集合中的作業是透過預存程序所執行,則從變更摘要中讀取項目時會保留交易範圍。 因此,所接收的項目數可能會高於指定的值,這樣同一次交易所變更的項目就會在一個不可部分完成的批次中傳回。

以下範例顯示如何以最新版本模式取得可傳回實體物件的 FeedIterator,在此案例中為 User 物件:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

提示

3.34.0 版之前,設定 ChangeFeedMode.Incremental 即可使用最新版本模式。 IncrementalLatestVersion 都參照變更摘要的最新版本模式,而且使用任一模式的應用程式都會看到相同的行為。

所有版本和刪除模式處於預覽狀態,而且可以與預覽 .NET SDK 版本 >= 3.32.0-preview 搭配使用。 以下範例顯示如何以所有版本和刪除模式取得可傳回動態物件的 FeedIterator

FeedIterator<dynamic> InteratorWithDynamic = container.GetChangeFeedIterator<dynamic>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

注意

在最新版本模式中,您會收到代表已變更項目的物件,其中有些額外中繼資料。 所有版本和刪除模式會傳回不同的資料模型。 如需詳細資訊,請參閱剖析回應物件

透過串流來使用變更摘要

這兩種變更摘要模式的FeedIterator 都有兩個選項。 除了可傳回實體物件的範例之外,您也可以使用 Stream 支援來取得回應。 串流可讓您讀取資料,而不需要先將其還原序列化,因此您必須儲存至用戶端資源。

以下範例顯示如何以最新版本模式取得可傳回 StreamFeedIterator

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

使用整個容器的變更

如果您未將 FeedRange 參數提供給 FeedIterator,則可以依照自己的步調來處理整個容器的變更摘要。 以下範例使用最新版本模式來開始讀取目前時間起的所有變更:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

因為變更摘要實際上是包含所有未來寫入和更新的無止盡項目清單,所以 HasMoreResults 的值一律為 true。 嘗試讀取變更摘要而且沒有可用的新變更時,您會收到含有 NotModified 狀態的回應。 在先前的範例中,處理方式是先等候 5 秒,再重新檢查變更。

使用分割區索引鍵的變更

在某些情況下,您可能只想要處理特定分割區索引鍵的變更。 您可以取得特定分割區索引鍵的 FeedIterator,並使用與您對整個容器相同的方式來處理變更。

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

使用 FeedRange 進行平行處理

變更摘要處理器中,工作會自動分散至多個取用者。 在變更摘要提取模型中,您可以使用 FeedRange 來平行處理變更摘要的處理。 FeedRange 代表分割區索引鍵值的範圍。

以下範例顯示如何取得容器的範圍清單:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

當您取得容器的 FeedRange 值清單時,每個實體分割區都會取得一個 FeedRange

使用 FeedRange,您可以建立 FeedIterator,以將變更摘要的處理平行處理至多部機器或執行緒。 不同於先前範例示範如何取得整個容器或單一分割區索引鍵的 FeedIterator,您可以使用 FeedRange 來取得可平行處理變更摘要的多個 FeedIterator。

在您想要使用 FeedRanges 的案例中,您需要有可取得 FeedRanges 的協調器處理,並將其散發到這些電腦。 此散發可能是:

  • 使用 FeedRange.ToJsonString 並散發此字串值。 取用者可以搭配使用此值與 FeedRange.FromJsonString
  • 如果是內含式散發,則傳遞 FeedRange 物件參考。

以下範例顯示如何使用兩個平行讀取的假設個別機器,以從容器的變更摘要開頭進行讀取:

電腦 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

電腦 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

儲存接續權杖

您可以取得接續權杖來儲存 FeedIterator 的位置。 接續權杖是一個字串值,可追蹤 FeedIterator 上次處理的變更,並允許 FeedIterator 之後從這個時間點繼續執行。 接續權杖 (若指定) 優先於開始時間和從頭開始值。 下列程式碼會讀取建立容器之後的變更摘要。 在沒有其他的變更可用之後,其會保存接續權杖,讓變更摘要耗用量可以在稍後繼續。

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

當您要使用最新版本模式時,只要 Azure Cosmos DB 容器仍然存在,FeedIterator 接續權杖就永遠不會過期。 當您要使用所有版本和刪除模式時,只要連續備份保留時間範圍內發生變更,FeedIterator 接續權杖就會有效。

下一步