Partager via


Utilisation de l’estimateur de flux de modification

S’APPLIQUE À : NoSQL

Cet article décrit comment superviser la progression de vos instances de processeur de flux de modification au fur et à mesure qu’elles lisent le flux de modification.

Pourquoi la supervision de la progression est-elle importante ?

Le processeur de flux de modification agit comme un pointeur qui avance dans votre flux de modification et fournit les modifications à une implémentation de délégué.

Votre déploiement de processeur de flux de modification peut traiter les modifications à une vitesse particulière en fonction de ses ressources disponibles, telles que le CPU, la mémoire, le réseau, et ainsi de suite.

Si cette vitesse est inférieure à celle à laquelle vos modifications se produisent dans votre conteneur Azure Cosmos DB, votre processeur commencera à accumuler du retard.

L’identification de ce scénario aide à comprendre si nous devons mettre à l’échelle notre déploiement du processeur de flux de modification.

Utilisation de l’estimateur de flux de modification

En tant que modèle push pour les notifications automatiques

Comme le processeur de flux de modification, l’estimateur de flux de modification peut fonctionner en tant que modèle Push. L’estimateur mesure la différence entre le dernier élément traité (défini par l’état du conteneur de baux) et la dernière modification dans le conteneur, et il transmet cette valeur à un délégué. Vous pouvez également personnaliser l’intervalle de prise de cette mesure avec une valeur par défaut de cinq secondes.

Par exemple, si votre processeur de flux de modification est défini comme suit :

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

La méthode correcte pour initialiser un estimateur afin de mesurer ce processeur consisterait à utiliser GetChangeFeedEstimatorBuilder comme suit :

ChangeFeedProcessor changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimatorBuilder("changeFeedEstimator", Program.HandleEstimationAsync, TimeSpan.FromMilliseconds(1000))
    .WithLeaseContainer(leaseContainer)
    .Build();

Où le processeur et l’estimateur partagent le même leaseContainer et le même nom.

Les deux autres paramètres sont le délégué, qui recevra un nombre représentant le nombre de modifications en attente de lecture par le processeur, et l’intervalle de temps auquel vous souhaitez que cette mesure soit prise.

Voici un exemple de délégué qui reçoit l’estimation :

static async Task HandleEstimationAsync(long estimation, CancellationToken cancellationToken)
{
    if (estimation > 0)
    {
        Console.WriteLine($"\tEstimator detected {estimation} items pending to be read by the Processor.");
    }

    await Task.Delay(0);
}

Vous pouvez envoyer cette estimation à votre solution de supervision et l’utiliser pour comprendre le fonctionnement de votre progression au fil du temps.

Comme une estimation détaillée à la demande

Contrairement au modèle push, il existe une alternative qui vous permet d’obtenir l’estimation à la demande. Ce modèle fournit également des informations plus détaillées :

  • Le décalage estimé par bail.
  • L’instance possédant et traitant chaque bail, afin que vous puissiez déterminer s’il existe un problème sur une instance.

Si votre processeur de flux de modification est défini comme suit :

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedEstimator", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Vous pouvez créer l’estimateur avec la même configuration de bail :

ChangeFeedEstimator changeFeedEstimator = monitoredContainer
    .GetChangeFeedEstimator("changeFeedEstimator", leaseContainer);

À chaque fois que vous le souhaitez, avec la fréquence requise, vous pouvez obtenir l’estimation détaillée :

Console.WriteLine("Checking estimation...");
using FeedIterator<ChangeFeedProcessorState> estimatorIterator = changeFeedEstimator.GetCurrentStateIterator();
while (estimatorIterator.HasMoreResults)
{
    FeedResponse<ChangeFeedProcessorState> states = await estimatorIterator.ReadNextAsync();
    foreach (ChangeFeedProcessorState leaseState in states)
    {
        string host = leaseState.InstanceName == null ? $"not owned by any host currently" : $"owned by host {leaseState.InstanceName}";
        Console.WriteLine($"Lease [{leaseState.LeaseToken}] {host} reports {leaseState.EstimatedLag} as estimated lag.");
    }
}

Chaque ChangeFeedProcessorState contient les informations relatives au bail et au décalage, ainsi que l’identité de l’instance actuelle qui en est la propriétaire.

Déploiement de l’estimateur

L’estimateur de flux de modification n’a pas besoin d’être déployé dans le cadre de votre processeur de flux de modification, ni de faire partie du même projet. Nous vous recommandons le déploiement de l’estimateur sur une instance indépendante et complètement différente de vos processeurs. Une unique instance de l’estimateur peut suivre la progression de l’ensemble des baux et des instances de votre déploiement du processeur de flux de modification.

Chaque estimation consomme des unités de requête de vos conteneurs surveillés et de bail. Une fréquence de 1 minute entre les deux est un bon point de départ ; une fréquence plus faible augmente le nombre d’unités de requête consommées.

Ressources supplémentaires

Étapes suivantes

Pour plus d’informations sur le processeur de flux de modification, consultez les articles suivants :