Sdílet prostřednictvím


Migrace z knihovny zpracovávající datové kanály změn na .NET SDK V3 pro Azure Cosmos DB

PLATÍ PRO: NoSQL

Tento článek popisuje požadované kroky k migraci kódu existující aplikace, který používá knihovnu procesoru změnového kanálu do funkce změnového kanálu v nejnovější verzi sady .NET SDK, též označované jako .NET V3 SDK.

Požadované změny kódu

Sada .NET V3 SDK obsahuje několik zásadních změn, následující kroky jsou klíčovými kroky pro migraci aplikace:

  1. Převeďte instance DocumentCollectionInfo na odkazy Container pro monitorované a najaté kontejnery.
  2. Vlastní nastavení, která používají WithProcessorOptions, by měla být aktualizována na použití WithLeaseConfiguration a WithPollInterval pro intervaly, WithStartTimepro počáteční čas a WithMaxItems k definování maximálního počtu položek.
  3. Nastavte processorName na GetChangeFeedProcessorBuilder, aby odpovídalo hodnotě nakonfigurované na ChangeFeedProcessorOptions.LeasePrefix, nebo jinak použijte string.Empty.
  4. Změny již nejsou doručovány jako IReadOnlyList<Document>, místo toho jako IReadOnlyCollection<T>, kde T je typ, který je třeba definovat; třída základní položky již neexistuje.
  5. Ke zpracování změn už nepotřebujete implementaci IChangeFeedObserver, místo toho musíte definovat delegáta. Delegátem může být statická funkce, nebo pokud potřebujete zachovat stav napříč spuštěními, můžete vytvořit vlastní třídu a předat metodu instance jako delegát.

Například pokud původní kód pro vytvoření procesoru zpracovávajícího změny vypadá takto:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

Migrovaný kód bude vypadat takto:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Pro delegáta můžete mít statickou metodu pro příjem událostí. Pokud jste získávali informace z IChangeFeedObserverContext, můžete přejít na používání ChangeFeedProcessorContext.

  • ChangeFeedProcessorContext.LeaseToken lze použít místo IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers lze použít místo IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics obsahuje podrobné informace o latenci požadavků pro řešení potíží.
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

Události stavu a pozorovatelnost

Pokud jste dříve používali IHealthMonitor nebo jste využívali IChangeFeedObserver.OpenAsync a IChangeFeedObserver.CloseAsync, použijte notifikační API.

  • IChangeFeedObserver.OpenAsync lze nahradit znakem WithLeaseAcquireNotification.
  • IChangeFeedObserver.CloseAsync lze nahradit znakem WithLeaseReleaseNotification.
  • IHealthMonitor.InspectAsync lze nahradit znakem WithErrorNotification.

Kontejner pro stav a pronájem

Podobně jako knihovna procesoru kanálu změn používá funkce kanálu změn v sadě .NET V3 SDK kontejner zápůjček k uložení stavu. Schémata se ale liší.

Procesor kanálu změn SADY SDK V3 rozpozná jakýkoli starý stav knihovny a automaticky ho migruje do nového schématu při prvním spuštění migrovaného kódu aplikace.

Aplikaci můžete bezpečně zastavit pomocí starého kódu, migrovat kód do nové verze, spustit migrovanou aplikaci a všechny změny, ke kterým došlo v době, kdy byla aplikace zastavena, budou vyzvednuty a zpracovány novou verzí.

Další materiály

Další kroky

Teď můžete pokračovat a získat další informace o procesoru kanálu změn v následujících článcích: