Uso del calculador de la fuente de cambios

SE APLICA A: NoSQL

En este artículo se describe cómo puede supervisar el progreso de las instancias del procesador de la fuente de cambios a medida que leen la fuente de cambios.

¿Por qué es importante el progreso de la supervisión?

El procesador de la fuente de cambios actúa como un puntero que avanza por la fuente de cambios y entrega los cambios a una implementación de delegado.

La implementación del procesador de la fuente de cambios puede procesar los cambios a una velocidad determinada en función de los recursos disponibles, como la CPU, la memoria, la red, etc.

Si esta velocidad es más lenta que la velocidad a la que se producen los cambios en el contenedor de Azure Cosmos DB, el procesador comenzará a retrasarse.

La identificación de este escenario ayuda a comprender si es necesario escalar la implementación del procesador de la fuente de cambios.

Implementación del calculador de la fuente de cambios

Como modelo de inserción para las notificaciones automáticas

Al igual que el procesador de la fuente de cambios, el estimador de la fuente de cambios funciona como un modelo de inserción. El calculador medirá la diferencia entre el último elemento procesado (definido por el estado del contenedor de concesiones) y el cambio más reciente en el contenedor, e insertará este valor en un delegado. El intervalo en el que se toma la medida también puede personalizarse con un valor predeterminado de 5 segundos.

Por ejemplo, si el procesador de la fuente de cambios se define de la siguiente manera:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

La manera correcta de inicializar un calculador para medir ese procesador sería usar GetChangeFeedEstimatorBuilder como se explica a continuación:

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Donde tanto el procesador como el calculador comparten el mismo leaseContainer y el mismo nombre.

Los otros dos parámetros son el delegado, que recibirá un número que representa cuántos cambios están pendientes de ser leídos por el procesador y el intervalo de tiempo en el que desea que se realice esta medida.

Un ejemplo de un delegado que recibe la estimación es:

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

Puede enviarla a su solución de supervisión y usarla para saber cómo se comporta el progreso con el tiempo.

Como una estimación detallada a petición

A diferencia del modelo de inserción, hay una alternativa que le permite obtener la estimación a petición. Este modelo también proporciona información más detallada:

  • Retardo estimado por concesión.
  • Instancia que posee y procesa cada concesión, de modo que puede identificar si hay algún problema en una instancia.

Si el procesador de la fuente de cambios se define de la siguiente manera:

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Puede crear el estimador con la misma configuración de concesión:

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

Y siempre que quiera, con la frecuencia que necesite, puede obtener la estimación detallada:

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Cada ChangeFeedProcessorState contendrá la información de la concesión y del retardo, y también quién es la instancia actual propietaria.

Implementación del estimador

No es necesario implementar el calculador de la fuente de cambios como parte del procesador de la fuente de cambios ni que formen parte del mismo proyecto. Se recomienda implementar el estimador en una instancia independiente y completamente diferente de los procesadores. Una sola instancia del estimador puede realizar un seguimiento del progreso de todas las concesiones e instancias de la implementación del procesador de fuente de cambios.

Cada estimación consumirá unidades de solicitud de los contenedores supervisados y de concesión. Una frecuencia de 1 minuto entre sí es un buen punto de partida; cuanto menor sea la frecuencia, mayor será la cantidad de unidades de solicitud consumidas.

Recursos adicionales

Pasos siguientes

Puede obtener más información sobre el procesador de la fuente de cambios en los siguientes artículos: