Exploración de la fuente de cambios en Azure Cosmos DB
La fuente de cambios de Azure Cosmos DB es un registro persistente de cambios en un contenedor en el orden en que se producen. La compatibilidad con la fuente de cambios en Azure Cosmos DB se proporciona al observar si hay algún cambio en un contenedor de Azure Cosmos DB. A continuación, muestra la lista ordenada de los documentos que han cambiado en el orden en el que se modificaron. Los cambios se conservan y se pueden procesar de manera asincrónica e incremental, y la salida se puede distribuir entre uno o varios consumidores para procesarse en paralelo.
Fuente de cambios y operaciones diferentes
Actualmente, verá todas las inserciones y actualizaciones en la fuente de cambios. No se puede filtrar la fuente de cambios para un tipo de operación específico. Actualmente, la fuente de cambios no registra las operaciones de eliminación. Como solución alternativa, puede agregar un marcador flexible en los elementos que se van a eliminar. Por ejemplo, puede agregar un atributo en el elemento denominado "deleted", establecer su valor en "true" y, luego, definir un valor de período de vida (TTL) en el elemento. Al establecer el TTL se garantiza que el elemento se elimine automáticamente.
Lectura de la fuente de cambios de Azure Cosmos DB
Puede trabajar con la fuente de cambios de Azure Cosmos DB mediante un modelo de inserción o un modelo de extracción. Con un modelo de inserción, el procesador de fuente de cambios envía el trabajo a un cliente que tiene la lógica de negocios para procesarlo. Sin embargo, el procesador de fuente de cambios se encarga de la complejidad de comprobar el trabajo y almacenar el estado del último trabajo procesado.
Con un modelo de extracción, el cliente tiene que extraer el trabajo del servidor. En este caso, el cliente tiene una lógica de negocios para el trabajo de procesamiento y también almacena el estado del último trabajo procesado. El cliente controla el equilibrio de carga entre varios clientes que procesan el trabajo en paralelo y controla los errores.
Nota:
Se recomienda usar el modelo de inserción porque no tendrá que preocuparse por sondear la fuente de cambios para futuros cambios, almacenar el estado del último cambio procesado y otras ventajas.
La mayoría de los escenarios que usan la fuente de cambios de Azure Cosmos DB usarán una de las opciones del modelo de inserción. Sin embargo, hay algunas situaciones en las que es preferible el control de bajo nivel adicional del modelo de extracción. El control de bajo nivel adicional incluye:
- Leer cambios de una clave de partición determinada.
- Controlar el ritmo con el que el cliente recibe los cambios para procesarlos.
- Realizar una lectura única de los datos existentes en la fuente de cambios (por ejemplo, para realizar una migración de datos).
Lectura de la fuente de cambios con un modelo de inserción
Hay dos formas de leer desde la fuente de cambios con un modelo de inserción: los desencadenadores de Azure Functions para Azure Cosmos DB y la biblioteca de procesadores de fuente de cambios. Azure Functions usa el procesador de fuente de cambios en segundo plano, por lo que ambas son formas similares de leer la fuente de cambios. Simplemente, piense en Azure Functions como una plataforma de hospedaje para el procesador de fuente de cambios, no como una forma totalmente diferente de leer la fuente de cambios. Azure Functions usa el procesador de fuente de cambios en segundo plano. Paraleliza automáticamente el procesamiento de cambios en las particiones del contenedor.
Funciones de Azure
Puede crear pequeñas funciones reactivas de Azure Functions que se desencadenarán automáticamente en cada nuevo evento en la fuente de cambios de su contenedor de Azure Cosmos DB. Con el desencadenador de Azure Functions para Azure Cosmos DB, puede usar el escalado del procesador de fuente de cambios y la funcionalidad de detección de eventos confiable sin necesidad de mantener ninguna infraestructura de trabajo.
Procesador de fuente de cambios
El procesador de fuente de cambios es parte de los SDK .NET V3 and Java V4 de Azure Cosmos DB. Simplifica el proceso de lectura de la fuente de cambios y distribuye el procesamiento de eventos entre varios consumidores de manera eficaz.
Hay cuatro componentes principales en la implementación del procesador de fuente de cambios:
El contenedor supervisado: el contenedor supervisado tiene los datos a partir de los cuales se genera la fuente de cambios. Todas las inserciones y actualizaciones realizadas en el contenedor supervisado se reflejan en la fuente de cambios del contenedor.
El contenedor de concesión: el contenedor de concesión actúa como un almacenamiento de estado y coordina el procesamiento de la fuente de cambios entre varios trabajadores. El contenedor de concesión se puede almacenar en la misma cuenta que el contenedor supervisado o en una cuenta independiente.
Instancia de proceso: una instancia de proceso hospeda el procesador de fuente de cambios para escuchar los cambios. En función de la plataforma, podría representarse mediante una máquina virtual, un pod de Kubernetes, una instancia de Azure App Service, una máquina física real. Tiene un identificador único al que se hace referencia como nombre de instancia a lo largo de este artículo.
El delegado: el delegado es el código que define lo que usted, el desarrollador, desea hacer con cada lote de cambios que el procesador de la fuente de cambios lea.
Al implementar el procesador de fuente de cambios, el punto de entrada siempre es el contenedor supervisado, desde una instancia de Container
a la que llama GetChangeFeedProcessorBuilder
:
/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
CosmosClient cosmosClient,
IConfiguration configuration)
{
string databaseName = configuration["SourceDatabaseName"];
string sourceContainerName = configuration["SourceContainerName"];
string leaseContainerName = configuration["LeasesContainerName"];
Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
.GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Console.WriteLine("Starting Change Feed Processor...");
await changeFeedProcessor.StartAsync();
Console.WriteLine("Change Feed Processor started.");
return changeFeedProcessor;
}
El primer parámetro es un nombre distintivo que describe el objetivo de este procesador y el segundo nombre es la implementación delegada que controla los cambios. A continuación se expone un ejemplo de un delegado:
/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
ChangeFeedProcessorContext context,
IReadOnlyCollection<ToDoItem> changes,
CancellationToken cancellationToken)
{
Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
// SessionToken if needed to enforce Session consistency on another client instance
Console.WriteLine($"SessionToken ${context.Headers.Session}");
// We may want to track any operation's Diagnostics that took longer than some threshold
if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
{
Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
}
foreach (ToDoItem item in changes)
{
Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
// Simulate some asynchronous operation
await Task.Delay(10);
}
Console.WriteLine("Finished handling changes.");
}
Después, defina el nombre de instancia de proceso o el identificador único con WithInstanceName
; debe ser único y diferente en cada instancia de proceso que va a implementar y, por último, cuál es el contenedor para mantener el estado de concesión con WithLeaseContainer
.
La llamada a Build
le proporcionará la instancia del procesador que puede iniciar mediante una llamada a StartAsync
.
El ciclo de vida normal de una instancia de host es:
- Leer la fuente de cambios.
- Si no hay ningún cambio, mantenerse suspendida durante un período de tiempo predefinido (personalizable con
WithPollInterval
enBuilder
) e ir a n.º 1. - Si hay cambios, enviarlos al delegado.
- Cuando el delegado termina de procesar los cambios correctamente, se actualiza el almacén de concesión con el último punto en el tiempo procesado y se vuelve al primer paso.