変更フィード推定機能を使用する

適用対象: NoSQL

この記事では、変更フィードを読み取るお使いの変更フィード プロセッサ インスタンスの進行状況を監視する方法について説明します。

進行状況の監視が重要な理由

変更フィード プロセッサは、お使いの変更フィードを前方に移動し、変更をデリゲート実装に配信するポインターとして機能します。

お使いの展開されている変更フィード プロセッサは、CPU、メモリ、ネットワークなどの使用可能なリソースに基づいた特定の速度で変更を処理できます。

この速度が、お使いの Azure Cosmos DB コンテナーで変更が発生する速度よりも遅い場合は、お使いのプロセッサが遅延するようになります。

このシナリオを特定することにより、変更フィード プロセッサの展開を拡張する必要があるかどうかがわかります。

変更フィード推定機能の実装

自動通知のプッシュ モデルとして

変更フィード推定機能は、変更フィード プロセッサと同様に、プッシュ モデルとして機能できます。 推定機能は、最後に処理された項目 (リース コンテナーの状態で定義される) とコンテナーの最新の変更の差を測定し、この値をデリゲートにプッシュします。 この測定値が取得される間隔 (既定値は 5 秒) は、カスタマイズも可能です。

たとえば、お使いの変更フィード プロセッサが次のように定義されているとします。

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

プロセッサで GetChangeFeedEstimatorBuilder が使用されるよう、推定機能を初期化する正しい方法は次のようになります。

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

ここでは、プロセッサと推定機能の両方の leaseContainer と名前が同じです。

他の 2 つのパラメーターはデリゲートです。このデリゲートは、プロセッサによって読み取られる保留中の変更の数と、この測定値を取得する時間間隔を表す数値を受け取ります。

推定を受け取るデリゲートの例は次のとおりです。

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

この推定はお使いの監視ソリューションに送信して、時間と共に進行状況がどのようなものかを把握するのに使用できます。

オンデマンドの詳細な推定として

プッシュ モデルとは異なり、オンデマンドで推定を取得できる代替手段があります。 このモデルでは、次のような、より詳細な情報も提供されます。

  • リースあたりの推定ラグ。
  • 各リースを所有して処理するインスタンス。これにより、インスタンスに問題があるかどうかを識別できます。

たとえば、変更フィード プロセッサがこのように定義されているとします。

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

同じリース構成で、次のように推定器を作成できます。

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

必要なときに必要な頻度で、詳細な推定を取得できます。

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

ChangeFeedProcessorState には、リース情報とラグ情報、およびそれを所有している現在のインスタンスがだれであるかが含まれます。

推定機能のデプロイ

変更フィード推定機能は、お使いの変更フィード プロセッサの一部として配置する必要はありません。また、同じプロジェクトに含める必要もありません。 プロセッサとは完全に異なる、独立したインスタンスに推定機能をデプロイすることをお勧めします。 単一の推定機能インスタンスは、変更フィード プロセッサ デプロイ内のすべてのリースとインスタンスの進行状況を追跡できます。

各見積もりでは、監視対象コンテナーとリース コンテナーからの要求ユニットが使用されます。 まずは 1 分間隔の頻度から始めるのがお勧めです。頻度を短くするほど、使用される要求ユニットは高くなります。

その他のリソース

次のステップ

以下の記事で、変更フィード プロセッサに関してさらに詳しく知ることができます。