Exploración de la fuente de cambios en Azure Cosmos DB

Completado

La fuente de cambios de Azure Cosmos DB es un registro persistente de cambios en un contenedor en el orden en que se producen. La compatibilidad con la fuente de cambios en Azure Cosmos DB se proporciona al observar si hay algún cambio en un contenedor de Azure Cosmos DB. A continuación, muestra la lista ordenada de los documentos que han cambiado en el orden en el que se modificaron. Los cambios se conservan y se pueden procesar de manera asincrónica e incremental, y la salida se puede distribuir entre uno o varios consumidores para procesarse en paralelo.

Fuente de cambios y operaciones diferentes

Actualmente, verá todas las inserciones y actualizaciones en la fuente de cambios. No se puede filtrar la fuente de cambios para un tipo de operación específico. Actualmente, la fuente de cambios no registra las operaciones de eliminación. Como solución alternativa, puede agregar un marcador flexible en los elementos que se van a eliminar. Por ejemplo, puede agregar un atributo en el elemento denominado "deleted", establecer su valor en "true" y, luego, definir un valor de período de vida (TTL) en el elemento. Al establecer el TTL se garantiza que el elemento se elimine automáticamente.

Lectura de la fuente de cambios de Azure Cosmos DB

Puede trabajar con la fuente de cambios de Azure Cosmos DB mediante un modelo de inserción o un modelo de extracción. Con un modelo de inserción, el procesador de fuente de cambios envía el trabajo a un cliente que tiene la lógica de negocios para procesarlo. Sin embargo, el procesador de fuente de cambios se encarga de la complejidad de comprobar el trabajo y almacenar el estado del último trabajo procesado.

Con un modelo de extracción, el cliente tiene que extraer el trabajo del servidor. En este caso, el cliente no solo tiene lógica de negocios para procesar el trabajo, sino que también almacena el estado del último trabajo procesado. De este modo, se encarga del equilibrio de carga entre varios clientes que procesan el trabajo en paralelo, así como de los errores.

Nota

Se recomienda usar el modelo de inserción porque no tendrá que preocuparse por sondear la fuente de cambios para futuros cambios, almacenar el estado del último cambio procesado y otras ventajas.

La mayoría de los escenarios que usan la fuente de cambios de Azure Cosmos DB usarán una de las opciones del modelo de inserción. Sin embargo, hay algunas situaciones en las que es preferible el control de bajo nivel adicional del modelo de extracción. Entre ellas se incluyen las siguientes:

  • Leer cambios de una clave de partición determinada.
  • Controlar el ritmo con el que el cliente recibe los cambios para procesarlos.
  • Realizar una lectura única de los datos existentes en la fuente de cambios (por ejemplo, para realizar una migración de datos).

Lectura de la fuente de cambios con un modelo de inserción

Hay dos formas de leer desde la fuente de cambios con un modelo de inserción: los desencadenadores de Azure Functions para Azure Cosmos DB y la biblioteca de procesadores de fuente de cambios. Azure Functions usa el procesador de fuente de cambios en segundo plano, por lo que ambas son formas similares de leer la fuente de cambios. Simplemente, piense en Azure Functions como una plataforma de hospedaje para el procesador de fuente de cambios, no como una forma totalmente diferente de leer la fuente de cambios. Azure Functions usa el procesador de fuente de cambios en segundo plano, ejecuta en paralelo de forma automática el procesamiento de los cambios en las particiones del contenedor.

Azure Functions

Puede crear pequeñas funciones reactivas de Azure Functions que se desencadenarán automáticamente en cada nuevo evento en la fuente de cambios de su contenedor de Azure Cosmos DB. Con el desencadenador de Azure Functions para Azure Cosmos DB, puede usar el escalado del procesador de fuente de cambios y la funcionalidad de detección de eventos confiable sin necesidad de mantener ninguna infraestructura de trabajo.

Diagram showing the change feed triggering Azure Functions for processing.

Procesador de fuente de cambios

El procesador de fuente de cambios es parte de los SDK .NET V3 and Java V4 de Azure Cosmos DB. Simplifica el proceso de lectura de la fuente de cambios y distribuye el procesamiento de eventos entre varios consumidores de manera eficaz.

Hay cuatro componentes principales en la implementación del procesador de fuente de cambios:

  1. El contenedor supervisado: el contenedor supervisado tiene los datos a partir de los cuales se genera la fuente de cambios. Todas las inserciones y actualizaciones realizadas en el contenedor supervisado se reflejan en la fuente de cambios del contenedor.

  2. El contenedor de concesión: el contenedor de concesión actúa como un almacenamiento de estado y coordina el procesamiento de la fuente de cambios entre varios trabajadores. El contenedor de concesión se puede almacenar en la misma cuenta que el contenedor supervisado o en una cuenta independiente.

  3. Instancia de proceso: una instancia de proceso hospeda el procesador de fuente de cambios para escuchar los cambios. En función de la plataforma, podría representarse mediante una máquina virtual, un pod de Kubernetes, una instancia de Azure App Service, una máquina física real. Tiene un identificador único al que se hace referencia como nombre de instancia a lo largo de este artículo.

  4. El delegado: el delegado es el código que define lo que usted, el desarrollador, desea hacer con cada lote de cambios que el procesador de la fuente de cambios lea.

Al implementar el procesador de fuente de cambios, el punto de entrada siempre es el contenedor supervisado, desde una instancia de Container a la que llama GetChangeFeedProcessorBuilder:

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

El primer parámetro es un nombre distintivo que describe el objetivo de este procesador y el segundo nombre es la implementación de delegado que controlará los cambios. A continuación se expone un ejemplo de un delegado:

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Después, defina el nombre de instancia de proceso o el identificador único con WithInstanceName; debe ser único y diferente en cada instancia de proceso que va a implementar y, por último, cuál es el contenedor para mantener el estado de concesión con WithLeaseContainer.

La llamada a Build le proporcionará la instancia del procesador que puede iniciar mediante una llamada a StartAsync.

El ciclo de vida normal de una instancia de host es:

  1. Leer la fuente de cambios.
  2. Si no hay ningún cambio, mantenerse suspendida durante un período de tiempo predefinido (personalizable con WithPollInterval en Builder) e ir a n.º 1.
  3. Si hay cambios, enviarlos al delegado.
  4. Cuando el delegado termina de procesar los cambios correctamente, se actualiza el almacén de concesión con el último punto en el tiempo procesado y se vuelve al primer paso.