変更フィード プロセッサ ライブラリから Azure Cosmos DB .NET V3 SDK に移行する

適用対象: NoSQL

この記事では、変更フィード プロセッサ ライブラリを使用する既存のアプリケーションのコードを、最新バージョンの .NET SDK (.NET V3 SDK とも呼ばれる) の変更フィード機能に移行するために必要な手順について説明します。

必要なコード変更

.NET V3 SDK には、いくつかの破壊的変更があります。ご利用のアプリケーションを移行する上で重要な手順を次に示します。

  1. DocumentCollectionInfo インスタンスを、監視されているコンテナーおよびリース コンテナーに対する Container 参照に変換します。
  2. WithProcessorOptions を使用しているカスタマイズについては、間隔に WithLeaseConfigurationWithPollInterval を使用し、開始時刻WithStartTime を使用し、最大項目数の定義する場合に WithMaxItems を使用するように更新する必要があります。
  3. GetChangeFeedProcessorBuilder 上の processorName については ChangeFeedProcessorOptions.LeasePrefix 上で構成されている値と一致するように設定します。それ以外の場合は string.Empty を使用します。
  4. 変更は IReadOnlyList<Document> として配信されなくなりました。代わりに、それは IReadOnlyCollection<T> となります。ここで、T は定義する必要のある型です。基本項目クラスはもうありません。
  5. 変更を処理する場合、IChangeFeedObserver の実装は不要になりました。代わりに、デリゲートを定義する必要があります。 デリゲートは静的関数とすることができます。実行中に状態を維持する必要がある場合は、独自のクラスを作成して、インスタンス メソッドをデリゲートとして渡すこともできます。

たとえば、変更フィード プロセッサを構築する元のコードが次のようであるとします。

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

移行されたコードは次のようになります。

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

デリゲートの場合は、静的メソッドを使用してイベントを受信できます。 IChangeFeedObserverContext からの情報を使用していた場合は、ChangeFeedProcessorContext を使用するように移行できます。

  • IChangeFeedObserverContext.PartitionKeyRangeId の代わりに ChangeFeedProcessorContext.LeaseToken を使用できます
  • IChangeFeedObserverContext.FeedResponse の代わりに ChangeFeedProcessorContext.Headers を使用できます
  • ChangeFeedProcessorContext.Diagnostics には、トラブルシューティングのために、要求の待機時間に関する詳細情報が含まれています
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

正常性イベントと可観測性

以前に IHealthMonitor を使用していた場合、または IChangeFeedObserver.OpenAsyncIChangeFeedObserver.CloseAsync を利用していた場合は、Notifications API を使用します。

  • IChangeFeedObserver.OpenAsyncWithLeaseAcquireNotification に置き換えることができます。
  • IChangeFeedObserver.CloseAsyncWithLeaseReleaseNotification に置き換えることができます。
  • IHealthMonitor.InspectAsyncWithErrorNotification に置き換えることができます。

静的なリース コンテナー

変更フィード プロセッサ ライブラリと同様に、.NET V3 SDK の変更フィード機能でもリース コンテナーを使用して状態が格納されます。 ただし、スキーマは異なります。

SDK V3 変更フィード プロセッサでは、移行されたアプリケーション コードの最初の実行時に、古いライブラリの状態が検出され、新しいスキーマに自動的に移行されます。

古いコードを使用するアプリケーションを安全に停止し、そのコードを新しいバージョンに移行し、移行されたアプリケーションを起動することができ、アプリケーションの停止中に発生した変更はいずれも新しいバージョンによって取得および処理されます。

その他のリソース

次のステップ

以下の記事で、変更フィード プロセッサに関してさらに詳しく知ることができます。