共用方式為


Azure Cosmos DB 中的變更摘要提取模型

您可以使用變更摘要提取模型,依自己的步調取用 Azure Cosmos DB 變更摘要。 與變更摘要處理器類似,您可以使用變更摘要提取模型來平行處理多個變更摘要取用者之間的變更。

與變更摘要處理器進行比較

許多情節都可以使用變更摘要處理器或變更摘要提取模型來處理變更摘要。 提取模型的接續權杖和變更摘要處理器的租用容器,都是變更摘要中最後已處理項目或項目批次的書籤。

不過,您無法將接續權杖轉換成租用,反之亦然。

備註

大部分情況下,需要從變更摘要中進行讀取時,最簡單的選項是使用變更摘要處理器

在這些情況下,您應該考慮使用提取模型:

  • 從特定分割區鍵讀取變更
  • 控制客戶端接收變更的速度以便處理
  • 若要從變更資料流中一次性讀取現有資料(例如,進行資料遷移)

以下是變更摘要處理器與變更摘要提取模型之間的一些主要差異:

特徵 / 功能 變更摘要處理器 變更資料流擷取模型
在處理變更摘要時追蹤目前的時間點 租用 (儲存在 Azure Cosmos DB 容器中) 接續權杖 (儲存在記憶體中或以手動方式保存)
重新執行過去變更的能力 是,使用推送模型 是,使用提取模型
輪詢未來的變更 根據使用者指定的 WithPollInterval 值來自動檢查變更 說明書
沒有新變更時的行為 自動等候 WithPollInterval 的值,然後重新檢查 必須檢查狀態並手動重新檢查
處理整個容器的變更 是,並且會自動在從相同容器取用的多個執行緒和機器中平行處理 是,藉由使用 FeedRange 進行手動平行化。
只處理單一分割區索引鍵的變更 不支援 Yes

備註

當您使用提取模型時,與使用變更摘要處理器進行讀取時不同,您必須明確處理沒有任何新變更的情況。 這由 HTTP 304 NotModified表示。 傳回 0 個具有 HTTP 200 OK 狀態碼的文件的變更摘要回應,不一定表示您已到達變更摘要的結尾,您應繼續進行輪詢。

使用提取模型

若要使用提取模型來處理變更摘要,請建立 FeedIterator 的執行個體。 一開始建立 FeedIterator 時,您必須指定必要的 ChangeFeedStartFrom 值,其中包含讀取變更的起始位置以及您想要用於 FeedRange 的值。 FeedRange 是分割區索引鍵值的範圍,並會使用特定的 FeedIterator 指定可從變更摘要讀取的項目。 您也必須指定您想要處理變更的模式的必要 ChangeFeedMode 值:最新版本所有版本和刪除。 使用 ChangeFeedMode.LatestVersionChangeFeedMode.AllVersionsAndDeletes 來指出您想要用來讀取變更摘要的模式。 當您使用所有版本和刪除模式時,必須從 Now() 的值或從特定接續權杖開始選取變更摘要。

您可以選擇性指定 ChangeFeedRequestOptions 來設定 PageSizeHint。 完成設定後,此屬性會設定每個頁面可收到的項目數上限。 如果受監視集合中的作業是透過預存程序所執行,則從變更摘要中讀取項目時會保留交易範圍。 因此,所接收的項目數可能會高於指定的值,這樣同一次交易所變更的項目就會在一個不可部分完成的批次中傳回。

以下範例顯示如何以最新版本模式取得可傳回實體物件的 FeedIterator,在此案例中為 User 物件:

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

小提示

對於早於 3.34.0的版本,您可以藉由設定 ChangeFeedMode.Incremental來使用最新版本模式。 IncrementalLatestVersion都指的是變更摘要的最新版本模式,使用任一模式的應用程式都會看到相同的行為。

所有版本和刪除模式處於預覽狀態,而且可以與預覽 .NET SDK 版本 >= 3.32.0-preview 搭配使用。 以下範例顯示如何在所有版本和刪除模式中取得可傳回 FeedIterator 物件的 User

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

備註

在最新版本模式中,您會收到代表已變更項目的物件,其中有些額外中繼資料。 所有版本和刪除模式會傳回不同的資料模型。

您可以取得最新版本模式 (英文) 或所有版本和刪除模式 (英文) 的完整範例。

透過串流取用變更摘要

這兩種變更摘要模式的 FeedIterator 都有兩個選項。 除了可傳回實體物件的範例之外,您也可以使用 Stream 支援來取得回應。 串流可讓您讀取資料,而不需要先將其還原序列化,因此可以節省用戶端資源。

以下範例顯示如何以最新版本模式取得可傳回 FeedIteratorStream

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

取用整個容器的變更

如果您不將 FeedRange 參數提供給 FeedIterator,則可以依照自己的步調來處理整個容器的變更饋送。 以下範例使用最新版本模式來開始讀取目前時間起的所有變更:

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

因為變更摘要實際上是包含所有未來寫入和更新的無止盡項目清單,所以 HasMoreResults 的值一律為 true。 嘗試讀取變更摘要而且沒有可用的新變更時,您會收到含有 NotModified 狀態的回應。 這與收到不含變更且具有 OK 狀態的回應不同。 當有更多變更可用時,可能會取得空白的變更摘要回應,而且您應該繼續輪詢,直到收到 NotModified為止。 在上述範例中, NotModified 透過等待五秒鐘再重新檢查變更來處理。

取用分割區索引鍵的變更

在某些情況下,您可能只想要處理特定分割區索引鍵的變更。 您可以取得特定分割區索引鍵的 FeedIterator,並使用與您對整個容器所使用的相同方式處理變更。

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

使用 FeedRange 進行平行處理

變更摘要處理器 (部分機器翻譯) 中,工作會自動分散至多個取用者。 在變更摘要提取模型中,您可以使用 FeedRange 來平行處理變更摘要的處理。 FeedRange 代表分割區索引鍵值的範圍。

以下範例顯示如何取得容器的範圍清單:

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

當您取得容器的 FeedRange 值清單時,每個FeedRange (部分機器翻譯) 都會得到一個

使用 FeedRange,您可以建立 FeedIterator,以將變更回饋的處理平行化至多部機器或執行緒。 不同於先前範例 (顯示如何取得整個容器或單一分割區索引鍵的 FeedIterator),您可以使用 FeedRange 來取得多個可平行處理變更摘要的 FeedIterator。

在您想要使用 FeedRange 的案例中,您需要有可取得 FeedRange 並將其散發到這些機器的協調器流程。 此散發可能會:

  • 使用 FeedRange.ToJsonString 並散發此字串值。 消費者可以將此值與 FeedRange.FromJsonString 搭配使用。
  • 如果分發正在處理中,則傳遞 FeedRange 物件參考。

以下範例顯示如何使用兩個會平行讀取的假設個別機器,從容器的變更摘要開頭進行讀取:

電腦 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

電腦 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

儲存接續權杖

您可以藉由取得接續權杖來儲存 FeedIterator 的位置。 延續標記是一個字串值,用來追蹤 FeedIterator 最後處理過的變更,並允許 FeedIterator 在此時間點之後繼續執行。 接續權杖 (若有指定) 的優先順序高過開始時間值和從頭開始值。 下列程式碼會讀取建立容器之後的變更摘要。 在沒有其他的變更可用之後,其會保存接續權杖,讓變更摘要耗用量可以在稍後繼續。

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

當您要使用最新版本模式時,只要 Azure Cosmos DB 容器仍然存在,FeedIterator 接續權杖就永遠不會過期。 當您使用所有版本和刪除模式時,只要在連續備份的保留時間範圍內有發生變更,FeedIterator 接續權杖就會有效。

後續步驟