次の方法で共有


Azure Cosmos DB の変更フィード プル モデル

適用対象: NoSQL

変更フィード プル モデルを使用すると、Azure Cosmos DB の変更フィードを自分のペースで使用できます。 変更フィード プロセッサと同様に、変更フィード プル モデルを使用して、複数の変更フィード コンシューマーの変更処理を並列化することができます。

変更フィード プロセッサとの比較

変更フィード プロセッサと変更フィード プル モデルのどちらかを使用することにより、多くのシナリオで変更フィードを処理できます。 プル モデルの継続トークンと変更フィード プロセッサのリース コンテナーは、どちらも変更フィードで最後に処理された項目または項目のバッチのブックマークとして機能します。

しかし、継続トークンをリース (またはその逆) に変換することはできません。

Note

変更フィードから読み取りを行う必要があるときは、ほとんどの場合、変更フィード プロセッサを使用することが最も簡単な方法です。

次のシナリオでは、プル モデルの使用を検討することをお勧めします。

  • 特定のパーティション キーから変更を読み取る
  • クライアントが処理のために変更を受け取るペースを制御する
  • 変更フィードの既存のデータを 1 回だけ読み取る (データ移行を行う場合など)

変更フィード プロセッサと変更フィード プル モデルとの主な違いを次に示します。

特徴量 変更フィード プロセッサ 変更フィードのプル モデル
変更フィード処理の現在位置の追跡 リース (Azure Cosmos DB コンテナーに保存) 継続トークン (メモリに保存または手動で保持)
過去の変更の再生機能 あり。プッシュ モデルを使用 あり。プル モデルを使用
将来の変更のポーリング ユーザー指定の WithPollInterval 値に基づき変更を自動チェック マニュアル
新しい変更がない場合の動作 WithPollInterval の値を自動的に待機して再確認する 状態の確認と手動での再確認が必要
コンテナー全体からの変更処理 可能。同一のコンテナーから使用している複数のスレッドとマシンの処理を自動的に並列化 可能。FeedRange を使用して手動で並列化
単一のパーティション キーのみからの変更処理 サポートされていません はい

Note

プル モデルを使用する場合は、変更フィード プロセッサを使用して読み取る場合と異なり、新しい変更がないというケースを明示的に処理する必要があります。

プル モデルの操作

プル モデルを使用して変更フィードを処理するために、FeedIterator のインスタンスを作成します。 最初に FeedIterator を作成するときには、必要な ChangeFeedStartFrom 値を指定する必要があります。この値は、変更の読み取りの開始位置と FeedRange に使用する値の両方で構成されます。 FeedRange はパーティション キー値の範囲であり、該当する特定の FeedIterator を使用して変更フィードから読み取れる項目を指定します。 また、変更を処理するモード (最新バージョンまたはすべてのバージョンと削除) に必要な ChangeFeedMode 値を指定する必要があります。 ChangeFeedMode.LatestVersion または ChangeFeedMode.AllVersionsAndDeletes を使用して、変更フィードの読み取りに使用するモードを指定します。 すべてのバージョンと削除モードを使用する場合は、Now() または特定の継続トークンの値から変更フィードの開始を選択する必要があります。

必要に応じて ChangeFeedRequestOptions を指定し、PageSizeHint を設定できます。 設定すると、ページあたりの受信アイテムの最大数がこのプロパティによって設定されます。 監視対象のコレクションでストアド プロシージャによって操作が実行されると、変更フィードから項目が読み取られるとき、トランザクション スコープが保持されます。 その結果、受信した項目数が指定した値よりも多くなり、同じトランザクションで変更された項目が 1 つのアトミック バッチの一部として返される可能性があります。

次に、エンティティ オブジェクト (この例では User オブジェクト) を返す FeedIterator を最新バージョンモードで取得する方法の例を示します。

FeedIterator<User> InteratorWithPOCOS = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

ヒント

バージョン 3.34.0 よりも前では、ChangeFeedMode.Incremental を設定して最新バージョン モード使用できます。 IncrementalLatestVersion は両方とも、変更フィードの最新バージョン モードを参照します。また、どちらのモードを使用するアプリケーションも動作は同じになります。

すべてのバージョンと削除モードはプレビュー段階であり、プレビュー .NET SDK バージョン >= 3.32.0-preview として使用できます。 すべてのバージョンと削除モードで FeedIterator を取得し、User オブジェクトを返す例を次に示します。

FeedIterator<ChangeFeedItem<User>> InteratorWithPOCOS = container.GetChangeFeedIterator<ChangeFeedItem<User>>(ChangeFeedStartFrom.Now(), ChangeFeedMode.AllVersionsAndDeletes);

Note

最新バージョン モードでは、追加のメタデータで変更された項目を表すオブジェクトを受け取ります。 すべてのバージョンと削除モードでは、別のデータ モデルが返されます。 詳細については、「応答オブジェクトを解析する」を参照してください。

最新バージョン モードまたはすべてのバージョンと削除モードの完全なサンプルを入手できます。

ストリーム経由で変更フィードを使用する

両方の変更フィード モードの FeedIterator には、2 つのオプションがあります。 エンティティ オブジェクトを返す例に加えて、Stream のサポートを使用して応答を取得することもできます。 ストリームを使用すると、最初にデータを逆シリアル化しなくてもデータを読み取れるため、クライアント リソースを節約できます。

Stream を返す FeedIterator を最新バージョンモードで取得する方法の例を次に示します。

FeedIterator iteratorWithStreams = container.GetChangeFeedStreamIterator(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

コンテナー全体の変更を使用する

FeedIterator に対する FeedRange パラメーターを指定しない場合は、コンテナー全体の変更フィードを自分のペースで処理できます。 最新バージョンモードを使ってすべての変更の読み取りが現在の時刻から開始される例を次に示します。

FeedIterator<User> iteratorForTheEntireContainer = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Now(), ChangeFeedMode.LatestVersion);

while (iteratorForTheEntireContainer.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForTheEntireContainer.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else 
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

変更フィードは事実上、今後のすべての書き込みと更新を含む無限の項目の一覧であるため、HasMoreResults の値は常に true です。 変更フィードを読み取ろうとして、新しい変更がない場合は、NotModified ステータスの応答が返されます。 上の例では、これは、変更を再確認する前に 5 秒間待機することによって処理されます。

パーティション キーの変更を使用する

特定のパーティション キーの変更のみを処理することが必要になる場合があります。 特定のパーティション キーの FeedIterator を取得し、その変更をコンテナー全体の場合と同じ方法で処理することができます。

FeedIterator<User> iteratorForPartitionKey = container.GetChangeFeedIterator<User>(
    ChangeFeedStartFrom.Beginning(FeedRange.FromPartitionKey(new PartitionKey("PartitionKeyValue")), ChangeFeedMode.LatestVersion));

while (iteratorForThePartitionKey.HasMoreResults)
{
    FeedResponse<User> response = await iteratorForThePartitionKey.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

並列処理のために FeedRange を使用する

変更フィード プロセッサでは、作業は複数のコンシューマーに自動的に分散されます。 変更フィード プル モデルでは、FeedRange を使用して、変更フィードの処理を並列化できます。 FeedRange は、パーティション キー値の範囲を表します。

コンテナーの範囲の一覧を取得する方法の例を次に示します。

IReadOnlyList<FeedRange> ranges = await container.GetFeedRangesAsync();

コンテナーの FeedRange の値の一覧を取得すると、物理パーティションごとに 1 つの FeedRange が得られます。

FeedRange を使用することで、FeedIterator を作成して、複数のマシンやスレッドの変更フィードの処理を並列化できます。 前の例では、コンテナー全体または単一のパーティション キーに対応する 1 つの FeedIterator を取得する方法を示しましたが、FeedRanges を使用して、変更フィードを並列処理できる複数の FeedIterator を取得することも可能です。

FeedRange を使用する場合は、FeedRange を取得して対象のマシンに配布するオーケストレーター プロセスが必要になります。 この配布は次のように行います。

  • FeedRange.ToJsonString を使用して、この文字列値を配布する。 コンシューマーは、この値を FeedRange.FromJsonString で使用できます。
  • 配布がインプロセスの場合は、FeedRange オブジェクト参照を渡す。

並列で読み取りを行う 2 台の別個の仮定のマシンを使用して、コンテナーの変更フィードを最初から読み取る方法を示すサンプルを次に示します。

マシン 1:

FeedIterator<User> iteratorA = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[0]), ChangeFeedMode.LatestVersion);
while (iteratorA.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

マシン 2:

FeedIterator<User> iteratorB = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(ranges[1]), ChangeFeedMode.LatestVersion);
while (iteratorB.HasMoreResults)
{
    FeedResponse<User> response = await iteratorA.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        await Task.Delay(TimeSpan.FromSeconds(5));
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

継続トークンを保存する

継続トークンを取得すれば、FeedIterator の位置を保存できます。 継続トークンとは、FeedIterator が最後に処理した変更を追跡し、FeedIterator が将来この位置から再開できるようにする文字列値のことです。 後続トークンを指定している場合、開始時間と初めから開始の値よりも優先されます。 次のコードでは、コンテナーの作成以降の変更フィードを読み取ります。 それ以上読み取るべき変更がなくなると、変更フィードの使用を後で再開できるように継続トークンが保持されます。

FeedIterator<User> iterator = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.Beginning(), ChangeFeedMode.LatestVersion);

string continuation = null;

while (iterator.HasMoreResults)
{
    FeedResponse<User> response = await iterator.ReadNextAsync();

    if (response.StatusCode == HttpStatusCode.NotModified)
    {
        Console.WriteLine($"No new changes");
        continuation = response.ContinuationToken;
        // Stop the consumption since there are no new changes
        break;
    }
    else
    {
        foreach (User user in response)
        {
            Console.WriteLine($"Detected change for user with id {user.id}");
        }
    }
}

// Some time later when I want to check changes again
FeedIterator<User> iteratorThatResumesFromLastPoint = container.GetChangeFeedIterator<User>(ChangeFeedStartFrom.ContinuationToken(continuation), ChangeFeedMode.LatestVersion);

最新バージョン モードを使用している場合は、 Azure Cosmos DB コンテナーが存在する限り、FeedIterator の継続トークンが期限切れになることはありません。 すべてのバージョンと削除モードを使用している場合は、継続的バックアップの保持期間内に変更が行われる限り、FeedIterator の継続トークンは有効です。

次のステップ