Migrálás a változáscsatorna processzortárából az Azure Cosmos DB .NET V3 SDK-ba

A KÖVETKEZŐRE VONATKOZIK: NoSQL

Ez a cikk a .NET SDK (más néven .NET V3 SDK) legújabb verziójában a változáscsatorna-feldolgozó kódtárat használó meglévő alkalmazás kódjának áttelepítéséhez szükséges lépéseket ismerteti.

Szükséges kódmódosítások

A .NET V3 SDK számos kompatibilitástörő változást tartalmaz, az alkalmazás áttelepítésének legfontosabb lépései a következők:

  1. Konvertálja a DocumentCollectionInfo példányokat Container a figyelt és a tárolók bérletére szolgáló hivatkozássá.
  2. A használt WithProcessorOptions testreszabásokat frissíteni kell a használathoz WithLeaseConfiguration és WithPollInterval az intervallumokhoz, WithStartTimea kezdési időponthoz és WithMaxItems a maximális elemszám meghatározásához.
  3. Állítsa be a processorName beállítást GetChangeFeedProcessorBuilder úgy, hogy megfeleljen a konfigurált ChangeFeedProcessorOptions.LeasePrefixértéknek, vagy használja string.Empty másként.
  4. A módosítások már nem lesznek kézbesítve IReadOnlyList<Document>, hanem egy olyan típus, IReadOnlyCollection<T> amelyet T meg kell határoznia, és már nincs alapelemosztály.
  5. A módosítások kezeléséhez már nincs szükség implementációra IChangeFeedObserver, hanem meg kell határoznia egy meghatalmazottat. A meghatalmazott lehet statikus függvény, vagy ha a végrehajtások során állapotot kell fenntartania, létrehozhatja saját osztályát, és meghatalmazottként átadhat egy példánymetódust.

Ha például a változáscsatorna-feldolgozó létrehozásához használt eredeti kód a következőképpen néz ki:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

A migrált kód a következőképpen fog kinézni:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

A meghatalmazott számára statikus metódussal fogadhatja az eseményeket. Ha a migrálás ChangeFeedProcessorContextsorán a következő adatokat használta fel:IChangeFeedObserverContext

  • ChangeFeedProcessorContext.LeaseToken használható ahelyett, hogy IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers használható ahelyett, hogy IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics részletes információkat tartalmaz a hibaelhárítási kérelmek késéséről
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

Egészségügyi események és megfigyelhetőség

Ha korábban használta IHealthMonitor vagy használta IChangeFeedObserver.OpenAsyncIChangeFeedObserver.CloseAsync, használja az Értesítések API-t.

  • IChangeFeedObserver.OpenAsync helyére a következőt lehet helyezni WithLeaseAcquireNotification: .
  • IChangeFeedObserver.CloseAsync helyére a következőt lehet helyezni WithLeaseReleaseNotification: .
  • IHealthMonitor.InspectAsync helyére a következőt lehet helyezni WithErrorNotification: .

Állapot- és bérlettároló

A változáscsatorna processzortárához hasonlóan a .NET V3 SDK változáscsatorna funkciója egy bérlettárolót használ az állapot tárolásához. A sémák azonban eltérőek.

Az SDK V3 változáscsatorna feldolgozója észleli a régi kódtár állapotát, és a migrált alkalmazáskód első végrehajtásakor automatikusan áttelepíti azt az új sémába.

Biztonságosan leállíthatja az alkalmazást a régi kóddal, migrálhatja a kódot az új verzióra, elindíthatja az áttelepített alkalmazást, és az alkalmazás leállítása közben történt módosításokat az új verzió fogja átvenni és feldolgozni.

Additional resources

További lépések

A változáscsatorna-feldolgozóról az alábbi cikkekben olvashat bővebben: