Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En este artículo se describe cómo puede supervisar el progreso de las instancias del procesador de la fuente de cambios a medida que leen la fuente de cambios.
¿Por qué es importante supervisar el progreso?
El procesador de la fuente de cambios actúa como un puntero que avanza por la fuente de cambios y entrega los cambios a una implementación de delegado.
La implementación del procesador de flujo de cambios puede procesar los cambios a un ritmo específico en función de sus recursos disponibles, como la CPU, la memoria, la red, etc.
Si esta velocidad es más lenta que la velocidad a la que se producen los cambios en el contenedor de Azure Cosmos DB, el procesador comienza a retardar.
La identificación de este escenario ayuda a comprender si necesitamos escalar la implementación del procesador de fuente de cambios.
Implementar el estimador de fuente de cambios
Como modelo de empuje para notificaciones automáticas
Al igual que el procesador de la fuente de cambios, el estimador de la fuente de cambios funciona como un modelo de inserción. El estimador mide la diferencia entre el último elemento procesado (definido por el estado del contenedor de arrendamientos) y el último cambio en el contenedor, y envía este valor a un delegado. El intervalo en el que se toma la medida también se puede personalizar con un valor predeterminado de 5 segundos.
Por ejemplo, si el procesador de fuente de cambios usa el modo de versión más reciente y se define de esta manera:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
La manera correcta de inicializar un estimador para medir ese procesador estaría usando GetChangeFeedEstimatorBuilder de la siguiente manera:
ChangeFeedProcessor changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
.WithLeaseContainer(leaseContainer)
.Build();
Donde tanto el procesador como el estimador comparten el mismo leaseContainer y el mismo nombre.
Los otros dos parámetros son el delegado, que recibe un número que representa cuántos cambios están pendientes de ser leídos por el procesador y el intervalo de tiempo en el que desea que se tome esta medida.
Un ejemplo de delegado que recibe la estimación es:
static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
if (estimation > 0)
{
Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
}
await Task.Delay(0);
}
Puede enviar esta estimación a la solución de supervisión y usarla para comprender cómo se comporta el progreso con el tiempo.
Como estimación detallada a petición
A diferencia del modelo push, hay una alternativa que le permite obtener la estimación bajo demanda. Este modelo también proporciona información más detallada:
- Retardo estimado por concesión.
- Instancia que posee y procesa cada concesión, de modo que puede identificar si hay algún problema en una instancia.
Si el procesador de flujo de cambios se define de la siguiente manera:
Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
.GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
.WithInstanceName("consoleHost")
.WithLeaseContainer(leaseContainer)
.Build();
Puede crear el estimador con la misma configuración de concesión:
ChangeFeedEstimator changeFeedEstimator = monitoredContainer
.GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);
Y siempre que quiera, con la frecuencia que necesite, puede obtener la estimación detallada:
Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
foreach (ChangeFeedProcessorState leaseState in states)
{
string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
}
}
Cada ChangeFeedProcessorState contiene la información de concesión y retraso, y también quién es la instancia actual que lo posee.
Implementación del estimador
No es necesario implementar el estimador de fuente de cambios como parte del procesador de fuente de cambios ni formar parte del mismo proyecto. Se recomienda implementar el estimador en una instancia independiente de los procesadores. Una sola instancia del estimador puede realizar un seguimiento del progreso de todas las concesiones e instancias de la implementación del procesador de fuente de cambios.
Cada estimación consume unidades de solicitud de los contenedores supervisados y de concesión. Una frecuencia de 1 minuto entre sí es un buen punto de partida; cuanto menor sea la frecuencia, mayor será la cantidad de unidades de solicitud consumidas.
Modos de fuente de cambios admitidos
El estimador de fuente de cambios se puede usar para modo de versión más reciente y todas las versiones y elimina el modo. En ambos modos, no se garantiza que la estimación proporcionada sea un recuento exacto de cambios pendientes en el proceso.
Recursos adicionales
- SDK de Azure Cosmos DB
- Ejemplos de uso en GitHub (versión más reciente de .NET)
- Ejemplos de uso en GitHub (.NET todas las versiones y eliminaciones)
- Ejemplos de uso en GitHub (Java)
- Ejemplos adicionales en GitHub
Pasos siguientes
Ahora puede continuar para obtener más información sobre el procesador de fuente de cambios en el siguiente artículo: