Migreren van de processorbibliotheek van de wijzigingenfeed naar de Azure Cosmos DB .NET V3 SDK

VAN TOEPASSING OP: NoSQL

In dit artikel worden de vereiste stappen beschreven voor het migreren van de code van een bestaande toepassing die gebruikmaakt van de processorbibliotheek voor wijzigingenfeeds naar de functie voor wijzigingenfeeds in de nieuwste versie van de .NET SDK (ook wel .NET V3 SDK genoemd).

Vereiste codewijzigingen

De .NET V3 SDK heeft verschillende belangrijke wijzigingen. Dit zijn de belangrijkste stappen voor het migreren van uw toepassing:

  1. Converteer de DocumentCollectionInfo exemplaren naar Container verwijzingen voor de bewaakte containers en leases.
  2. Aanpassingen die worden gebruikt WithProcessorOptions , moeten worden bijgewerkt voor gebruik WithLeaseConfiguration en WithPollInterval voor intervallen, WithStartTimevoor de begintijd en WithMaxItems voor het definiëren van het maximumaantal items.
  3. Stel de processorName aan in GetChangeFeedProcessorBuilder zodat deze overeenkomt met de waarde die is geconfigureerd voor ChangeFeedProcessorOptions.LeasePrefixof gebruik anderszins string.Empty .
  4. De wijzigingen worden niet meer geleverd als een IReadOnlyList<Document>, in plaats daarvan is het een IReadOnlyCollection<T> type dat T u moet definiëren, er is geen basisitemklasse meer.
  5. Als u de wijzigingen wilt afhandelen, hebt u geen implementatie meer nodig van, in plaats daarvan IChangeFeedObservermoet u een gemachtigde definiëren. De gemachtigde kan een statische functie zijn of, als u de status wilt behouden tijdens uitvoeringen, kunt u uw eigen klasse maken en een instantiemethode doorgeven als gemachtigde.

Als de oorspronkelijke code voor het bouwen van de wijzigingenfeedprocessor er bijvoorbeeld als volgt uitziet:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

De gemigreerde code ziet er als volgt uit:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Voor de gemachtigde kunt u een statische methode hebben om de gebeurtenissen te ontvangen. Als u gegevens van de IChangeFeedObserverContext migratie gebruikt om het ChangeFeedProcessorContextvolgende te gebruiken:

  • ChangeFeedProcessorContext.LeaseToken kan worden gebruikt in plaats van IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers kan worden gebruikt in plaats van IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics bevat gedetailleerde informatie over de latentie van aanvragen voor probleemoplossing
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

Status-gebeurtenissen en waarneembaarheid

Als u eerder hebt gebruikt IHealthMonitor of als u gebruikmaakt van IChangeFeedObserver.OpenAsync en IChangeFeedObserver.CloseAsyncgebruikt, gebruikt u de Meldingen-API.

  • IChangeFeedObserver.OpenAsync kan worden vervangen door WithLeaseAcquireNotification.
  • IChangeFeedObserver.CloseAsync kan worden vervangen door WithLeaseReleaseNotification.
  • IHealthMonitor.InspectAsync kan worden vervangen door WithErrorNotification.

Status- en leasecontainer

Net als bij de processorbibliotheek van de wijzigingenfeed gebruikt de functie voor wijzigingenfeeds in .NET V3 SDK een leasecontainer om de status op te slaan. De schema's verschillen echter.

De SDK V3-wijzigingenfeedprocessor detecteert een oude bibliotheekstatus en migreert deze automatisch naar het nieuwe schema bij de eerste uitvoering van de gemigreerde toepassingscode.

U kunt de toepassing veilig stoppen met behulp van de oude code, de code migreren naar de nieuwe versie, de gemigreerde toepassing starten en eventuele wijzigingen die zijn aangebracht tijdens het stoppen van de toepassing, worden opgehaald en verwerkt door de nieuwe versie.

Aanvullende bronnen

Volgende stappen

U kunt nu verdergaan met meer informatie over de verwerker van wijzigingenfeeds in de volgende artikelen: