Explorar o feed de alterações no Azure Cosmos DB

Concluído

O feed de alterações no Azure Cosmos DB é um registro persistente de alterações em um contêiner na ordem em que ocorrem. O suporte para feed de alterações no Azure Cosmos DB funciona ouvindo um contêiner do Azure Cosmos DB para quaisquer alterações. Ele gera a lista classificada de documentos que foram alterados na ordem em que eles foram modificados. As alterações mantidas podem ser processadas de maneira assíncrona e incremental, e a saída pode ser distribuída em um ou mais consumidores para processamento paralelo.

Feed de alterações e operações diferentes

Hoje você vê todas as inserções e atualizações no feed de alterações. Não é possível filtrar o feed de alterações para um tipo específico de operação. No momento, o feed de alterações não registra as operações de exclusão em log. Como alternativa, você pode adicionar um marcador flexível nos itens que estão sendo excluídos. Por exemplo, você pode adicionar um atributo ao item chamado "excluído", definir o valor dele como "true" e definir um valor de TTL (vida útil) no item. A definição da TTL garante que o item seja excluído automaticamente.

Lendo o Azure Cosmos DB alterar feed

Você pode trabalhar com o feed de alterações do Azure Cosmos DB usando um modelo push ou pull. Com um modelo de push, o processador do feed de alterações envia o trabalho por push para um cliente que tem lógica de negócios para processar esse trabalho. No entanto, a complexidade na verificação do trabalho e no armazenamento do estado do último trabalho processadp é tratada dentro do processador de feed de alterações.

Com um modelo de pull, o cliente precisa efetuar pull do trabalho do servidor. O cliente, nesse caso, não só tem lógica de negócios para processar o trabalho, mas também armazenar o estado do último trabalho processado, lidando com o balanceamento de carga entre vários clientes que processam trabalhos em paralelo e manipulando erros.

Observação

É recomendável usar o modelo de push porque você não precisará se preocupar em sondar o feed de alterações para alterações futuras, armazenar o estado da última alteração processada e outros benefícios.

A maioria dos cenários que usam o feed de alterações do Azure Cosmos DB usará uma das opções de modelo de push. No entanto, há alguns cenários em que você talvez queira o controle de nível inferior adicional do modelo pull. Eles incluem:

  • Ler as alterações de uma chave de partição específica
  • Controlar o ritmo no qual seu cliente recebe alterações para processamento
  • Fazer uma leitura única dos dados existentes no feed de alterações (por exemplo, fazer uma migração de dados)

Ler o feed de alterações com um modelo push

Há duas maneiras de ler do feed de alterações com um modelo de push: gatilhos do Azure Functions para o Azure Cosmos DB e a biblioteca do processador de feed de alterações. O Azure Functions usa o processador do feed de alterações nos bastidores, portanto, são maneiras muito semelhantes de ler o feed de alterações. Imagine o Azure Functions como uma plataforma de hospedagem para o processador do feed de alterações, não uma maneira totalmente diferente de ler o feed de alterações. O Azure Functions usa o processador do feed de alterações nos bastidores e paraleliza automaticamente o processamento de alterações nas partições do contêiner.

Funções do Azure

Você pode criar pequenas Azure Functions reativas que serão disparadas automaticamente a cada novo evento no feed de alterações do contêiner do Azure Cosmos DB. Com o gatilho do Azure Functions para o Azure Cosmos DB, você pode usar a funcionalidade de colocação em escala e detecção de eventos confiáveis do Processador do Feed de Alterações sem a necessidade de manter nenhuma infraestrutura de trabalho.

Diagram showing the change feed triggering Azure Functions for processing.

Alterar o processador de feed

O processador do feed de alterações faz parte dos SDKs do .NET V3 e do Java V4 do Azure Cosmos DB. Ele simplifica o processo de leitura do feed de alterações e distribui o processamento de eventos entre vários consumidores com eficiência.

Há quatro componentes principais de implementação do processador de feed de alterações:

  1. Contêiner monitorado: o contêiner monitorado tem os dados com base nos quais o feed de alterações é gerado. Todas as inserções e atualizações no contêiner monitorado são refletidas no feed de alterações do contêiner.

  2. Contêiner de concessão: o contêiner de concessão atua como um armazenamento de estado e coordena o processamento do feed de alterações em vários trabalhos. O contêiner de concessão pode ser armazenado na mesma conta que o contêiner monitorado ou em uma conta separada.

  3. Instância de computação: uma instância de computação hospeda o processador do feed de alterações para escutar as alterações. A depender da plataforma, ela pode ser representada por uma VM, um pod kubernetes, uma instância do Serviço de Aplicativo do Azure, um computador físico real. Ela tem um identificador exclusivo referenciado como o nome da instância em todo este artigo.

  4. Delegado: o delegado é o código que define o que você, desenvolvedor, deseja fazer com cada lote de alterações lido pelo processador do feed de alterações.

Ao implementar o processador do feed de alterações, o ponto de entrada é sempre o contêiner monitorado, de uma instância de Container que você chama de GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Onde o primeiro parâmetro é um nome distinto que descreve a meta desse processador e o segundo nome é a implementação delegada que manipulará as alterações. A seguir, veja um exemplo de delegado:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Posteriormente, você define o nome da instância de computação ou o identificador exclusivo com WithInstanceName, que deve ser exclusivo e diferente em cada instância de computação que você está implantando e, finalmente, o contêiner no qual será mantido o estado de concessão com WithLeaseContainer.

Chamar Build fornecerá a você a instância do processador que você pode iniciar chamando StartAsync.

O ciclo de vida normal de uma instância de host é:

  1. Ler o feed de alterações.
  2. Se não houver alteração, suspenda por um período de tempo predefinido (personalizável com WithPollInterval no Builder) e vá para o n. 1.
  3. Se houver alterações, envie-as para o delegado.
  4. Quando o delegado terminar de processar as alterações com êxito, atualize o repositório de concessão com o último ponto processado no tempo e vá para 1.