Azure Cosmos DB の変更フィード プロセッサ

適用対象: NoSQL

変更フィード プロセッサは Azure Cosmos DB .NET V3 および Java V4 SDK の一部です。 それにより、変更フィードを読み取り、イベント処理を複数のコンシューマーに効率的に分散させるプロセスが簡単になります。

変更フィード プロセッサ ライブラリの主な利点は、変更フィード内のすべてのイベントが確実に "少なくとも 1 回" は配信されるフォールト トレラントな動作です。

変更フィード プロセッサのコンポーネント

変更フィード プロセッサの実装には、4 つの主要なコンポーネントがあります。

  1. 監視対象コンテナー: 監視対象コンテナーには、変更フィードの生成元となるデータが含まれています。 監視対象コンテナーに対する挿入と更新が、コンテナーの変更フィードに反映されます。

  2. リース コンテナー: リース コンテナーは、状態ストレージとして機能し、複数の worker 間での変更フィードの処理を調整します。 リース コンテナーは、監視対象コンテナーと同じアカウントまたは別のアカウントに格納できます。

  3. コンピューティング インスタンス: コンピューティング インスタンスでは、変更をリッスンする変更フィード プロセッサをホストします。 プラットフォームによっては、これは VM、kubernetes ポッド、Azure App Service インスタンス、実際の物理マシンによって表される場合があります。 これには、この記事全体を通して "インスタンス名" と呼ばれている、一意の識別子があります。

  4. デリゲート: デリゲートは、変更フィード プロセッサによって読み取られる変更の各バッチについて、開発者が行いたいことが定義されているコードです。

変更フィード プロセッサのこれら 4 要素の連携のしくみについて理解を深めるために、次の図の例を見てみましょう。 監視対象コンテナーでは、ドキュメントが保存され、パーティション キーとして "City" が使われます。 パーティション キーの値は、項目を含む範囲内 (物理パーティションを表す各範囲) に分散されていることがわかります。 2 つのコンピューティング インスタンスがあります。コンピューティングの分散を最大にするため、変更フィード プロセッサによって、各インスタンスにさまざまな範囲が割り当てられます。各インスタンスには一意の異なる名前があります。 各範囲は並列に読み取られ、その進行状況は、"リース" ドキュメントを使用して、リース コンテナー内の他の範囲とは別に保管されます。 リースの組み合わせは、変更フィード プロセッサの現在の状態を表します。

変更フィード プロセッサの例

変更フィード プロセッサの実装

エントリ ポイントは常に監視対象コンテナーであり、Container インスタンスから GetChangeFeedProcessorBuilder を呼び出します。

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

1 番目のパラメーターは、このプロセッサの目的を説明する一意の名前です。2番目の名前は、変更を処理するデリゲートの実装です。

デリゲートの例を次に示します。

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

その後、WithInstanceName を使用してコンピューティング インスタンスの名前または一意の識別子を定義します。これは、デプロイしようとしているコンピューティング インスタンスごとに一意で異なっている必要があります。そして最後に、WithLeaseContainer を使用して、どれがリース状態を保持するコンテナーであるかを定義します。

Build を呼び出すとプロセッサ インスタンスが提供され、StartAsync を呼び出すことによってそれを開始できます。

処理のライフ サイクル

ホスト インスタンスの通常のライフ サイクルは次のとおりです。

  1. 変更フィードを読み取ります。
  2. 変更がない場合は、事前に定義された時間 (ビルダーの WithPollInterval でカスタマイズ可能) だけスリープし、#1 に移ります。
  3. 変更がある場合は、それらをデリゲートに送信します。
  4. デリゲートによる変更の処理が正常に完了すると、最後に処理された時点でリース ストアを更新し、#1 に移ります。

エラー処理

変更フィード プロセッサには、ユーザー コード エラーに対する回復性があります。 つまり、デリゲートの実装にハンドルされない例外がある場合 (ステップ #4)、その特定の変更バッチを処理しているスレッドは停止され、新しいスレッドが作成されます。 新しいスレッドでは、そのパーティション キー値範囲に対してリース ストアで保持されている最新時点が確認され、そこから処理が再開されて、同じ変更バッチがデリゲートに効率的に送信されます。 デリゲートによって変更が正しく処理されるまで、この動作が続けられます。そのため、変更フィード プロセッサでは "少なくとも 1 回" が保証されます。

Note

変更のバッチが再試行されないシナリオは 1 つだけです。 最初に発生したデリゲートの実行でエラーが発生した場合、リース ストアには再試行時に使用される以前の保存状態がありません。 このような場合、再試行では 最初の開始構成が使用されますが、最後のバッチが含まれていない場合もあります。

変更フィード プロセッサが同じバッチの変更を継続的に再試行して "行き詰まる" ことがないように、例外が発生した場合に、ドキュメントに書き込むためのデリゲート コードのロジックをエラー メッセージ キューに追加する必要があります。 このように設計することで、今後の変更の処理を継続しながら、未処理の変更を追跡できます。 エラー メッセージ キューは、別の Azure Cosmos DB コンテナーである可能性があります。 問題になるのは、データ ストア自体ではなく、ただ未処理の変更が永続化されることだけです。

さらに、変更フィード推定機能を使用して、変更フィードを読み取る変更フィード プロセッサ インスタンスの進行状況を監視できます。または、ライフ サイクル通知を使用して基になっている障害を検出できます。

ライフサイクルの通知

変更フィード プロセッサを使用すると、ライフ サイクルで関連するイベントにフックでき、それらの 1 つまたはすべてに対して通知を受けるかを選択できます。 少なくとも、エラー通知を登録することをお勧めします。

  • WithLeaseAcquireNotification のハンドラーを登録して、現在のホストがリースを取得して処理を開始したときに通知されるようにします。
  • WithLeaseReleaseNotification のハンドラーを登録して、現在のホストがリースをリリースして処理を停止したときに通知されるようにします。
  • WithErrorNotification のハンドラーを登録して、現在のホストで処理中に例外が発生したときに通知されるようにします。ソースがユーザー デリゲート (ハンドルされない例外) であるのか、または監視対象のコンテナーにアクセスしようとしているときにプロセッサが検出したエラー (ネットワークの問題など) であるのかを識別できます。
Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

展開単位

変更フィード プロセッサの 1 つの展開単位は、同じ processorName とリース コンテナーの構成を持つ 1 つまたは複数のインスタンスから構成されますが、それぞれに異なるインスタンス名があります。 変更に対してそれぞれに異なるビジネス フローを備えた展開単位を多数保持することができ、各展開単位は 1 つまたは複数のインスタンスで構成されています。

たとえば、コンテナーに変更が発生するたびに、外部 API をトリガーする 1 つの展開単位があるとします。 別の展開単位では、変更が発生するたびに、リアルタイムでデータを移動することができます。 監視対象のコンテナーで変更が発生すると、すべての展開単位に対して通知が行われます。

動的スケーリング

前述したように、1 つの展開単位内には、1 つまたは複数のコンピューティング インスタンスを保持できます。 展開単位内でコンピューティングの分散を利用するには、次の要件のみが重要になります。

  1. すべてのインスタンスのリース コンテナーの構成が同じである必要があります。
  2. すべてのインスタンスが、同じ processorName を保持している必要があります。
  3. 各インスタンスには、異なるインスタンス名が設定されている必要があります (WithInstanceName)。

これら 3 つの条件に当てはまる場合、変更フィード プロセッサでは、リース コンテナー内のすべてのリースが該当の展開単位に含まれる実行中の全インスタンスに分散され、均等分散アルゴリズムを使ってコンピューティングが並列化されます。 1 つのリースは一度に 1 つのインスタンスによってのみ所有されるため、インスタンスの数はリースの数を超えてはなりません。

インスタンス数は増減する可能性があり、変更フィード プロセッサではそれに従って再配布することで負荷が動的に調整されます。

さらに、スループットまたはストレージの増加に応じて、変更フィード プロセッサでコンテナーを動的に調整できます。 コンテナーが拡張されると、変更フィード プロセッサでは、リースを動的に増やし、既存のインスタンス間で新しいリースを分散することによって、これらのシナリオが透過的に処理されます。

変更フィードとプロビジョニング済みスループット

監視対象コンテナーで変更フィード読み取り操作を行うと、要求ユニットが消費されます。 監視対象のコンテナーでスロットリングが発生していないことを確認します。そうしないと、プロセッサでの変更フィード イベントの受信に遅延が発生します。

リース コンテナーに対する操作 (状態の更新と保守) では、要求ユニットが消費されます。 同じリース コンテナーを使用しているインスタンスの数が多いほど、要求ユニットの潜在的な消費量が大きくなります。 リース コンテナーでスロットリングが発生していないことを確認します。そうしないと、プロセッサでの変更フィード イベントの受信に遅延が発生します。スロットリングが高い場合は、プロセッサが処理を完全に停止してしまう可能性があります。

開始時刻

既定では、変更フィード プロセッサは、最初に開始したときに、リース コンテナーを初期化し、その処理のライフ サイクルを開始します。 変更フィード プロセッサが初めて初期化される前に監視対象コンテナー内で発生した変更が検出されることはありません。

以前の日時からの読み取り

DateTime のインスタンスを WithStartTime ビルダー拡張機能に渡すことで、特定の日時以降の変更を読み取るよう変更フィード プロセッサを初期化することができます。

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

変更フィード プロセッサは、その特定の日時に対して初期化され、それ以降に発生した変更の読み取りを開始します。

Note

変更フィード プロセッサを特定の日時に起動することは、マルチリージョン書き込みアカウントではサポートされていません。

最初からの読み取り

データの移行やコンテナーの履歴全体の分析など、他のシナリオでは、そのコンテナーの有効期間の最初から変更フィードを読み取る必要があります。 これを行うために、ビルダー拡張機能で WithStartTime を使用できますが、DateTime.MinValue.ToUniversalTime() を渡すと、次のように DateTime の最小値の UTC 表現が生成されます。

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

この変更フィード プロセッサは初期化され、コンテナーの有効期間の最初から変更の読み取りを開始します。

Note

これらのカスタマイズ オプションは、変更フィード プロセッサの開始時点を設定するためだけに機能します。 リース コンテナーが初めて初期化された後、それらを変更しても影響はありません。

リース コンテナーの共有

リース コンテナーは、複数のデプロイ ユニット間で共有できます。各デプロイ ユニットは、異なる監視対象コンテナーをリッスンしているか、異なる processorName を持っています。 この構成では、各デプロイ ユニットは、リース コンテナーで独立した状態を維持します。 リース コンテナーでの要求ユニットの消費量を確認し、プロビジョニングされたスループットがすべてのデプロイ ユニットに対して十分であることを確認します。

変更フィード プロセッサをホストする場所

変更フィード プロセッサは、長時間実行されるプロセスまたはタスクをサポートする任意のプラットフォームでホストできます。

変更フィード プロセッサは有効期間が短い環境で実行できますが、その状態がリース コンテナーによって維持されるため、これらの環境の開始および停止サイクルでは、通知の受信に遅延が加えられます (環境が開始されるたびにプロセッサを開始するオーバーヘッドが生じるため)。

その他のリソース

次のステップ

以下の記事で、変更フィード プロセッサに関してさらに詳しく知ることができます。