Compartilhar via


Migrar da biblioteca do processador do feed de alterações para o SDK do Azure Cosmos DB .NET V3

APLICA-SE A: NoSQL

Este artigo descreve as etapas necessárias para migrar o código de um aplicativo existente que usa a biblioteca do processador do feed de alterações para o recurso do feed de alterações na versão mais recente do SDK do .NET (também conhecido como SDK do .NET V3).

Alterações de código necessárias

O SDK do .NET V3 tem várias alterações interruptivas. A seguir estão as principais etapas para migrar seu aplicativo:

  1. Converta as instâncias DocumentCollectionInfo para as referências Container para os contêineres monitorados e de concessões.
  2. As personalizações que usam WithProcessorOptions devem ser atualizadas para usar WithLeaseConfiguration e WithPollInterval para os intervalos, WithStartTimepara a hora de início e WithMaxItems para definir a contagem máxima de itens.
  3. Defina o processorName em GetChangeFeedProcessorBuilder para corresponder ao valor configurado em ChangeFeedProcessorOptions.LeasePrefix ou use string.Empty caso contrário.
  4. As alterações não são mais entregues como um IReadOnlyList<Document>, em vez disso, é um IReadOnlyCollection<T>, onde T é um tipo que você precisa definir, não há mais nenhuma classe de item base.
  5. Para lidar com as alterações, você não precisa mais de uma implementação de IChangeFeedObserver, em vez disso, você precisa definir um delegado. O delegado pode ser uma função estática ou, se você precisar manter o estado entre execuções, pode criar sua própria classe e passar um método de instância como delegado.

Por exemplo, se o código original para criar o processador do feed de alterações tiver a seguinte aparência:

ChangeFeedProcessorLibrary.DocumentCollectionInfo monitoredCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.monitoredContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.DocumentCollectionInfo leaseCollectionInfo = new ChangeFeedProcessorLibrary.DocumentCollectionInfo()
{
    DatabaseName = databaseId,
    CollectionName = Program.leasesContainer,
    Uri = new Uri(configuration["EndPointUrl"]),
    MasterKey = configuration["AuthorizationKey"]
};

ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder builder = new ChangeFeedProcessorLibrary.ChangeFeedProcessorBuilder();
var oldChangeFeedProcessor = await builder
    .WithHostName("consoleHost")
    .WithProcessorOptions(new ChangeFeedProcessorLibrary.ChangeFeedProcessorOptions
    {
        StartFromBeginning = true,
        LeasePrefix = "MyLeasePrefix",
        MaxItemCount = 10,
        FeedPollDelay = TimeSpan.FromSeconds(1)
    })
    .WithFeedCollection(monitoredCollectionInfo)
    .WithLeaseCollection(leaseCollectionInfo)
    .WithObserver<ChangeFeedObserver>()
    .BuildAsync();

O código que foi migrado terá esta aparência:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("MyLeasePrefix", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithMaxItems(10)
        .WithPollInterval(TimeSpan.FromSeconds(1))
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Para o delegado, você pode ter um método estático para receber os eventos. Se você estivesse consumindo informações do IChangeFeedObserverContext, pode migrar para usar o ChangeFeedProcessorContext:

  • ChangeFeedProcessorContext.LeaseToken poderá ser usado em lugar de IChangeFeedObserverContext.PartitionKeyRangeId
  • ChangeFeedProcessorContext.Headers poderá ser usado em lugar de IChangeFeedObserverContext.FeedResponse
  • ChangeFeedProcessorContext.Diagnostics contém informações detalhadas sobre a latência de solicitação para solução de problemas
static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ToDoItem> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"\tDetected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate work
        await Task.Delay(1);
    }
}

Eventos de integridade e observabilidade

Se anteriormente você estava usando IHealthMonitor ou aproveitando IChangeFeedObserver.OpenAsync e IChangeFeedObserver.CloseAsync, use a API de Notificações.

  • IChangeFeedObserver.OpenAsync pode ser substituído por WithLeaseAcquireNotification.
  • IChangeFeedObserver.CloseAsync pode ser substituído por WithLeaseReleaseNotification.
  • IHealthMonitor.InspectAsync pode ser substituído por WithErrorNotification.

Contêiner de estado e de concessão

De forma semelhante à biblioteca do processador do feed de alterações, o recurso de feed de alterações no SDK do .NET V3 usa um contêiner de concessão para armazenar o estado. No entanto, os esquemas são diferentes.

O processador do feed de alterações do SDK V3 detecta qualquer estado de biblioteca antigo e o migra para o novo esquema automaticamente após a primeira execução do código do aplicativo migrado.

Você pode interromper o aplicativo de forma segurança usando o código antigo, migrar o código para a nova versão e iniciar o aplicativo migrado. Todas as alterações ocorridas enquanto o aplicativo foi interrompido serão coletadas e processadas pela nova versão.

Recursos adicionais

Próximas etapas

Agora continue para saber mais sobre o processador do feed de alterações nos seguintes artigos: