Migrieren von der Änderungsfeed-Verarbeitungsbibliothek zum Azure Cosmos DB .NET V3 SDK

GILT FÜR: NoSQL

In diesem Artikel werden die erforderlichen Schritte zum Migrieren eines vorhandenen Anwendungscodes, der die Änderungsfeed-Verarbeitungsbibliothek verwendet, zum Änderungsfeed-Feature in der neuesten Version des .NET SDK (auch als .NET V3 SDK bezeichnet) beschrieben.

Erforderliche Codeänderungen

Das .NET V3 SDK weist mehrere wichtige Änderungen auf. Nachfolgend sind die wichtigsten Schritte zum Migrieren Ihrer Anwendung aufgeführt:

  1. Wandeln Sie die DocumentCollectionInfo-Instanzen in Container-Verweise für die überwachten Container und Leasescontainer um.
  2. Anpassungen, die WithProcessorOptions verwenden, sollten für die Verwendung von WithLeaseConfiguration und WithPollInterval für Intervalle, WithStartTimefür die Startzeit und WithMaxItems zum Definieren der maximalen Anzahl von Elementen aktualisiert werden.
  3. Legen Sie processorName für GetChangeFeedProcessorBuilder entsprechend dem für ChangeFeedProcessorOptions.LeasePrefix konfigurierten Wert fest, oder verwenden Sie andernfalls string.Empty.
  4. Die Änderungen werden nicht mehr als IReadOnlyList<Document> übermittelt, sondern es handelt sich um eine IReadOnlyCollection<T>, wobei T für einen von Ihnen zu definierenden Typ steht. Es gibt keine Basiselementklasse mehr.
  5. Zum Verarbeiten der Änderungen ist keine Implementierung von IChangeFeedObserver mehr erforderlich, sondern Sie müssen stattdessen einen Delegaten definieren. Der Delegat kann eine statische Funktion sein, oder Sie können eine eigene Klasse erstellen und eine Instanzmethode als Delegat übergeben, wenn Sie den Status über mehrere Ausführungen hinweg beibehalten müssen.

Wenn der ursprüngliche Code zum Erstellen des Änderungsfeedprozessors beispielsweise wie folgt aussieht:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

Sieht der migrierte Code folgendermaßen aus:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Für den Delegaten können Sie eine statische Methode einrichten, um die Ereignisse zu empfangen. Wenn Sie Informationen aus IChangeFeedObserverContext verwendet haben, können Sie zur Verwendung von ChangeFeedProcessorContext migrieren:

  • ChangeFeedProcessorContext.LeaseToken kann anstelle von IChangeFeedObserverContext.PartitionKeyRangeId verwendet werden.
  • ChangeFeedProcessorContext.Headers kann anstelle von IChangeFeedObserverContext.FeedResponse verwendet werden.
  • ChangeFeedProcessorContext.Diagnostics enthält ausführliche Informationen zur Anforderungslatenz für die Problembehandlung.
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

Integritätsereignisse und Einblick

Wenn Sie zuvor IHealthMonitor verwendet oder IChangeFeedObserver.OpenAsync und IChangeFeedObserver.CloseAsync genutzt haben, verwenden Sie die Benachrichtigungs-API.

  • IChangeFeedObserver.OpenAsync kann durch WithLeaseAcquireNotification ersetzt werden.
  • IChangeFeedObserver.CloseAsync kann durch WithLeaseReleaseNotification ersetzt werden.
  • IHealthMonitor.InspectAsync kann durch WithErrorNotification ersetzt werden.

Status und Leasecontainer

Ähnlich wie bei der Änderungsfeed-Verarbeitungsbibliothek verwendet das Feature für Änderungsfeeds im .NET V3 SDK einen Leasecontainer zum Speichern des Status. Allerdings ist das Schema unterschiedlich.

Der SDK V3-Änderungsfeedprozessor erkennt jeden alten Bibliotheksstatus und migriert ihn bei der ersten Ausführung des migrierten Anwendungscodes automatisch in das neue Schema.

Sie können die Anwendung mithilfe des alten Codes gefahrlos beenden, den Code zur neuen Version migrieren und die migrierte Anwendung starten. Alle Änderungen, die während des Beendens der Anwendung auftraten, werden von der neuen Version übernommen und verarbeitet.

Zusätzliche Ressourcen

Nächste Schritte

In den folgenden Artikeln erfahren Sie mehr über den Änderungsfeedprozessor: