Partage via


Processeur de flux de modification dans Azure Cosmos DB

S’APPLIQUE À : NoSQL

Le processeur de flux de modification fait partie des kits de développement logiciel Azure Cosmos DB .NET V3 et Java V4. Il simplifie le processus de lecture du flux de modification et répartit efficacement le traitement des événements sur plusieurs consommateurs.

Le principal avantage du processeur de flux de modification est sa conception de tolérance de panne qui garantit une livraison « au moins une fois » de tous les événements dans le flux de modification.

Kits SDK pris en charge

.Net V3 Java Node.JS Python

Composants du processeur de flux de modification

Le processeur de flux de modification est constitué de quatre composants principaux :

  • Conteneur supervisé : le conteneur supervisé est constitué des données à partir desquelles le flux de modification est généré. Toutes les insertions et les mises à jour apportées au conteneur supervisé sont répercutées dans le flux de modification du conteneur.

  • Le conteneur de baux : le conteneur de baux fait office d’emplacement de stockage d’état et coordonne le traitement du flux de modification entre plusieurs Workers. Le conteneur de baux peut être stocké dans le même compte que le conteneur surveillé ou dans un compte distinct.

  • L’instance de calcul : une instance de calcul héberge le processeur de flux de modification pour repérer les modifications. Selon la plateforme, il peut être représenté par une machine virtuelle, un pod Kubernetes, une instance Azure App Service ou une machine physique réelle. L’instance de calcul a un identificateur unique appelé nom d’instance tout au long de cet article.

  • Délégué : Le délégué est le code qui définit ce que vous, le développeur, souhaitez faire avec chaque lot de modifications que le processeur de flux de modification lit.

Pour mieux comprendre comment ces quatre éléments du processeur de flux de modification interagissent, examinons un exemple à l’aide du diagramme suivant. Le conteneur surveillé stocke les éléments et utilise « City » comme clé de partition. Les valeurs de clé de partition sont distribuées dans des plages (chaque plage représentant une partition physique) qui contiennent des éléments.

Le diagramme montre deux instances de calcul, et le processeur de flux de modification affecte différentes plages à chaque instance pour optimiser la distribution de calcul. Chaque instance contient un nom unique différent.

Chaque plage est lue en parallèle. La progression d’une plage est gérée séparément des autres plages dans le conteneur de baux au travers un document de bail. La combinaison des baux représente l’état actuel du processeur de flux de modification.

Exemple de processeur de flux de modification

Implémenter le processeur de flux de modification

Le processeur de flux de modifications dans .NET est disponible pour le mode dernière version et pour toutes les versions et le mode suppression. Toutes les versions et le mode suppression sont en préversion et sont pris en charge pour le processeur de flux de modification à partir de la version 3.40.0-preview.0. Le point d’entrée des deux modes est toujours le conteneur surveillé.

Pour lire à l’aide du mode de version le plus récent, dans une instance de Container, vous appelez GetChangeFeedProcessorBuilder :

/// <summary>
/// Start the Change Feed Processor to listen for changes and process them with the HandleChangesAsync implementation.
/// </summary>
private static async Task<ChangeFeedProcessor> StartChangeFeedProcessorAsync(
    CosmosClient cosmosClient,
    IConfiguration configuration)
{
    string databaseName = configuration["SourceDatabaseName"];
    string sourceContainerName = configuration["SourceContainerName"];
    string leaseContainerName = configuration["LeasesContainerName"];

    Container leaseContainer = cosmosClient.GetContainer(databaseName, leaseContainerName);
    ChangeFeedProcessor changeFeedProcessor = cosmosClient.GetContainer(databaseName, sourceContainerName)
        .GetChangeFeedProcessorBuilder<ToDoItem>(processorName: "changeFeedSample", onChangesDelegate: HandleChangesAsync)
            .WithInstanceName("consoleHost")
            .WithLeaseContainer(leaseContainer)
            .Build();

    Console.WriteLine("Starting Change Feed Processor...");
    await changeFeedProcessor.StartAsync();
    Console.WriteLine("Change Feed Processor started.");
    return changeFeedProcessor;
}

Pour lire à l’aide de toutes les versions et supprimer le mode, appelez GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes à partir de l’instance de Container :

Container leaseContainer = client.GetContainer(Program.databaseName, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(Program.databaseName, containerName);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilderWithAllVersionsAndDeletes<ToDoItem>(processorName: "changeFeedBasic", onChangesDelegate: Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Pour les deux modes, le premier paramètre est un nom distinct qui décrit l’objectif de ce processeur. Le deuxième nom est l’implémentation du délégué qui gère les modifications.

Voici un exemple de délégué(e) pour le mode de version le plus récent :

/// <summary>
/// The delegate receives batches of changes as they are generated in the change feed and can process them.
/// </summary>
static async Task HandleChangesAsync(
    ChangeFeedProcessorContext context,
    IReadOnlyCollection<ToDoItem> changes,
    CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ToDoItem item in changes)
    {
        Console.WriteLine($"Detected operation for item with id {item.id}, created at {item.creationTime}.");
        // Simulate some asynchronous operation
        await Task.Delay(10);
    }

    Console.WriteLine("Finished handling changes.");
}

Voici un exemple de délégué pour toutes les versions et tous les modes de suppression :

static async Task HandleChangesAsync(ChangeFeedProcessorContext context, IReadOnlyCollection<ChangeFeedItem<ToDoItem>> changes, CancellationToken cancellationToken)
{
    Console.WriteLine($"Started handling changes for lease {context.LeaseToken}...");
    Console.WriteLine($"Change Feed request consumed {context.Headers.RequestCharge} RU.");
    // SessionToken if needed to enforce Session consistency on another client instance
    Console.WriteLine($"SessionToken ${context.Headers.Session}");

    // We may want to track any operation's Diagnostics that took longer than some threshold
    if (context.Diagnostics.GetClientElapsedTime() > TimeSpan.FromSeconds(1))
    {
        Console.WriteLine($"Change Feed request took longer than expected. Diagnostics:" + context.Diagnostics.ToString());
    }

    foreach (ChangeFeedItem<ToDoItem> item in changes)
    {
        if (item.Metadata.OperationType == ChangeFeedOperationType.Delete)
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Previous.id}.");
        }
        else
        {
            Console.WriteLine($"\tDetected {item.Metadata.OperationType} operation for item with id {item.Current.id}.");
        }
        // Simulate work
        await Task.Delay(1);
    }
}

Ensuite, vous définissez le nom ou l’identificateur unique de l’instance de calcul à l’aide de WithInstanceName. Le nom de l’instance de calcul doit être unique et différent pour chaque instance de calcul déployée. Vous définissez le conteneur pour gérer l’état du bail à l’aide de WithLeaseContainer.

Appeler Build vous donne l’instance de processeur que vous pouvez démarrer en appelant StartAsync.

Remarque

Les extraits de code précédents sont extraits d’exemples dans GitHub. Vous pouvez obtenir l’exemple pour le mode dernière version ou toutes les versions et le mode de suppression.

Cycle de vie du traitement

Le cycle de vie normal d’une instance d’hôte est le suivant :

  1. Lire le flux de modification.
  2. En l’absence de modifications, activer la mise en veille pendant une durée prédéfinie (personnalisable avec WithPollInterval dans le générateur) et retourner à l’étape 1.
  3. En cas de modifications, envoyez-les au délégué.
  4. Lorsque le délégué finit de traiter correctement les modifications, mettez à jour le magasin de baux avec le dernier point traité dans le temps et accédez à #1.

Gestion des erreurs

Le processeur de flux de modification résiste aux erreurs de code utilisateur. Si votre implémentation de délégué a une exception non gérée (étape 4), le thread qui traite ce lot de modifications particulier s’arrête et un nouveau thread est créé. Le nouveau thread vérifie le dernier point dans le temps que le magasin de baux a enregistré pour cette plage de valeurs de clé de partition. Le nouveau thread redémarre à partir de là, en envoyant le même lot de modifications au délégué. Ce comportement se poursuit jusqu’à ce que votre délégué traite correctement les modifications. C’est la raison pour laquelle le processeur de flux de modification a une garantie « au moins une fois ».

Remarque

Dans un seul scénario, un lot de modifications n’est pas retenté. Si l’échec se produit lors de la première exécution déléguée, le magasin de bail ne dispose d’aucun état enregistré précédent à utiliser pour la nouvelle tentative. Dans ce cas-là, la nouvelle tentative utilise la configuration de démarrage initiale, qui peut inclure ou non le dernier lot.

Pour empêcher votre processeur de flux de modification de « se bloquer » en relançant continuellement le même lot de modifications, vous devez ajouter une logique dans votre code de délégué pour écrire des documents, en cas d’exception, dans une file d’attente d’un message contenant des erreurs. Cette conception garantit que vous pouvez effectuer le suivi des modifications non traitées tout en continuant à traiter les futures modifications. La file d’attente du message contenant des erreurs peut être un autre conteneur Azure Cosmos DB. Le magasin de données exact n’a pas d’importance. Vous souhaitez simplement que les modifications non traitées soient conservées.

Vous pouvez également utiliser l’estimateur du flux de modification pour surveiller la progression des instances de votre processeur de flux de modification à mesure qu’elles lisent le flux de modification, ou vous pouvez utiliser les notifications du cycle de vie pour détecter les défaillances sous-jacentes.

Notifications de cycle de vie

Vous pouvez connecter le processeur de flux de modification à n’importe quel événement pertinent de son cycle de vie. Vous pouvez choisir d’être notifié à l’un d’eux ou à tous les événements. Il est recommandé d’enregistrer au moins la notification d’erreur :

  • Inscrivez un responsable pour WithLeaseAcquireNotification afin d’être averti lorsque l’hôte actuel acquiert un bail pour commencer à le traiter.
  • Inscrivez un responsable pour WithLeaseReleaseNotification afin d’être averti lorsque l’hôte actuel libère un bail et arrête de le traiter.
  • Inscrivez un gestionnaire pour WithErrorNotification afin d’être averti quand l’hôte actuel rencontre une exception pendant le traitement. Vous devez être en mesure de déterminer si la source est le délégué de l’utilisateur (une exception non prise en charge) ou s’il s’agit d’une erreur que le processeur rencontre quand il tente d’accéder au conteneur surveillé (par exemple, problèmes de mise en réseau).

Les notifications de cycle de vie sont disponibles dans les deux modes de flux de modification. Voici un exemple de notifications de cycle de vie en mode version la plus récente :

Container.ChangeFeedMonitorLeaseAcquireDelegate onLeaseAcquiredAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is acquired and will start processing");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorLeaseReleaseDelegate onLeaseReleaseAsync = (string leaseToken) =>
{
    Console.WriteLine($"Lease {leaseToken} is released and processing is stopped");
    return Task.CompletedTask;
};

Container.ChangeFeedMonitorErrorDelegate onErrorAsync = (string LeaseToken, Exception exception) =>
{
    if (exception is ChangeFeedProcessorUserException userException)
    {
        Console.WriteLine($"Lease {LeaseToken} processing failed with unhandled exception from user delegate {userException.InnerException}");
    }
    else
    {
        Console.WriteLine($"Lease {LeaseToken} failed with {exception}");
    }

    return Task.CompletedTask;
};

ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedNotifications", handleChanges)
        .WithLeaseAcquireNotification(onLeaseAcquiredAsync)
        .WithLeaseReleaseNotification(onLeaseReleaseAsync)
        .WithErrorNotification(onErrorAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .Build();

Unité de déploiement

Une unité de déploiement du processeur de flux de modification unique se compose d’au moins une instance de calcul ayant la même valeur pour processorName et la même configuration de conteneur de baux, mais des noms d’instance différents. Vous pouvez avoir de nombreuses unités de déploiement dans lesquelles chacune d’elles a un flux d’entreprise différent pour les modifications et chaque unité de déploiement composée d’une ou de plusieurs instances.

Par exemple, vous pouvez avoir une unité de déploiement qui déclenche une API externe à chaque fois qu’une modification est apportée à votre conteneur. Une autre unité de déploiement peut déplacer des données en temps réel chaque fois qu’une modification est apportée. Quand une modification est apportée dans votre conteneur surveillé, toutes vos unités de déploiement sont notifiées.

Mise à l’échelle dynamique

Comme mentionné précédemment, vous pouvez avoir au moins une instance de calcul au sein d’une unité de déploiement. Pour tirer parti de la distribution de calcul au sein de l’unité de déploiement, les seules exigences principales sont les suivantes :

  • Toutes les instances doivent avoir la même configuration de conteneur de baux.
  • Toutes les instances doivent avoir la même valeur pour processorName.
  • Chaque instance doit avoir un nom d’instance différent (WithInstanceName).

Si ces trois conditions s’appliquent, le processeur de flux de modification répartit tous les baux dans le conteneur de baux sur toutes les instances en cours de cette unité de déploiement et parallélise le calcul à l’aide d’un algorithme de distribution égale. Un bail appartient à une seule instance à la fois, ce qui signifie que le nombre d’instances ne doit pas dépasser le nombre de baux.

Le nombre d’instances peut augmenter et diminuer. Le processeur de flux de modification ajuste de manière dynamique la charge en la redistribuant en conséquence.

En outre, le processeur de flux de modification peut ajuster de manière dynamique l’échelle d’un conteneur si le débit ou le stockage de celui-ci augmente. Quand votre conteneur croît, le processeur de flux de modification gère en toute transparence le scénario en augmentant les baux de manière dynamique et en distribuant les nouveaux baux aux instances existantes.

Heure de début

Par défaut, quand un processeur de flux de modification démarre pour la première fois, il initialise le conteneur de baux et démarre son cycle de vie de traitement. Les modifications survenues dans le conteneur analysé avant la première initialisation du processeur de flux de modification ne sont pas détectées.

Lecture à partir d’une date et d’une heure précédentes

Il est possible d’initialiser le processeur de flux de modification pour lire les modifications à partir d’une date et d’une heure spécifiques en transmettant une instance de DateTime à l’extension de générateur WithStartTime :

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedTime", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(particularPointInTime)
        .Build();

Le processeur de flux de modification est initialisé pour cette date et cette heure spécifiques et commence à lire les modifications survenues en amont.

Lecture à partir du début

Dans d’autres scénarios tels que la migration de données ou l’analyse de l’intégralité de l’historique d’un conteneur, vous devez lire le flux de modification à partir du début de la durée de vie de ce conteneur. Vous pouvez utiliser WithStartTime sur l’extension du générateur, mais transmettre DateTime.MinValue.ToUniversalTime(), ce qui génère la représentation UTC de la valeur DateTime minimale, comme dans cet exemple :

Container leaseContainer = client.GetContainer(databaseId, Program.leasesContainer);
Container monitoredContainer = client.GetContainer(databaseId, Program.monitoredContainer);
ChangeFeedProcessor changeFeedProcessor = monitoredContainer
    .GetChangeFeedProcessorBuilder<ToDoItem>("changeFeedBeginning", Program.HandleChangesAsync)
        .WithInstanceName("consoleHost")
        .WithLeaseContainer(leaseContainer)
        .WithStartTime(DateTime.MinValue.ToUniversalTime())
        .Build();

Le processeur de flux de modification est initialisé et commence à lire les modifications à partir du début de la durée de vie du conteneur.

Notes

Ces options de personnalisation ne fonctionnent que pour configurer le point de départ dans le temps du processeur de flux de modification. Après la première initialisation du conteneur de baux, la modification de ces options n’a plus aucun effet.

La personnalisation du point de départ n’est disponible que pour le dernier mode de flux de modification de version. Lorsque vous utilisez toutes les versions et le mode suppression, vous devez commencer à lire à partir du démarrage du processeur, ou reprendre à partir d’un état de bail antérieur qui se trouve dans la sauvegarde continue période de rétention de votre compte.

Flux de modification et débit provisionné

Les opérations de lecture du flux de modification sur le conteneur analysé consomment des unités de requête. Assurez-vous que votre conteneur surveillé ne subit pas de limitation. La limitation ajoute des retards dans la réception des événements de flux de modification sur vos processeurs.

Les opérations sur le conteneur de bail (mise à jour et maintenance de l’état) consomment des unités de requête. Plus le nombre d’instances qui utilisent le même conteneur de baux est élevé, plus la consommation potentielle d’unités de requête est importante. Assurez-vous que votre conteneur de baux ne subit pas de limitation. La limitation ajoute des retards dans la réception des événements de flux de modification. La limitation peut même mettre fin complètement au traitement.

Partager le conteneur de baux

Vous pouvez partager un conteneur de baux entre plusieurs unités de déploiement. Dans un conteneur de baux partagé, chaque unité de déploiement écoute un conteneur surveillé différent ou a une valeur différente pour processorName. Dans cette configuration, chaque unité de déploiement conserve un état indépendant sur le conteneur de baux. Passez en revue la consommation d’unités de requête sur un conteneur de baux pour vous assurer que le débit approvisionné est suffisant pour toutes les unités de déploiement.

Configuration avancée des baux

Trois configurations principales peuvent affecter le fonctionnement du processeur de flux de modification. Chaque configuration affecte la consommation d’unités de requête sur le conteneur de baux. Vous pouvez définir l’une de ces configurations quand vous créez le processeur de flux de modification, mais utilisez-les avec prudence :

  • Acquisition des baux : par défaut, toutes les 17 secondes. Un hôte vérifie régulièrement l’état du magasin de baux et envisage d’acquérir des baux dans le cadre du processus de mise à l’échelle dynamique. Ce processus est réalisé en exécutant une requête sur le conteneur de baux. La réduction de cette valeur accélère le rééquilibrage et l’acquisition des baux, mais augmente la consommation d’unités de requête sur le conteneur de baux.
  • Expiration des baux : par défaut, 60 secondes. Définit la durée maximale pendant laquelle un bail peut exister sans aucune activité de renouvellement avant d’être acquis par un autre hôte. En cas d’incident d’un hôte, les baux qu’il possédait sont récupérés par d’autres hôtes après cette période et l’intervalle de renouvellement configuré. La réduction de cette valeur accélère la récupération après l’incident de l’hôte, mais la valeur d’expiration ne doit jamais être inférieure à l’intervalle de renouvellement.
  • Renouvellement des baux : par défaut, toutes les 13 secondes. Un hôte qui possède un bail le renouvelle régulièrement, même s’il n’y a pas de nouvelles modifications à consommer. Ce processus est réalisé en exécutant un remplacement sur le bail. La réduction de cette valeur diminue le temps nécessaire pour détecter les baux perdus en cas d’incident de l’hôte, mais augmente la consommation d’unités de requête sur le conteneur de baux.

Où héberger le processeur de flux de modification

Le processeur de flux de modification peut être hébergé sur n’importe quelle plateforme prenant en charge des processus ou des tâches de longue durée. Voici quelques exemples :

Bien que le processeur de flux de modification puisse s’exécuter dans des environnements à courte durée de vie étant donné que le conteneur de baux conserve l’état, le cycle de démarrage de ces environnements ajoute des retards à la durée de réception des notifications (en raison de la surcharge liée au démarrage du processeur chaque fois que l’environnement est démarré).

Exigences d’accès en fonction du rôle

Lorsque vous utilisez Microsoft Entra ID comme mécanisme d’authentification, assurez-vous que l’identité dispose des autorisations appropriées :

  • Sur le conteneur supervisé :
    • Microsoft.DocumentDB/databaseAccounts/readMetadata
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/readChangeFeed
  • Sur le conteneur de bail :
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/read
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/create
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/replace
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/delete
    • Microsoft.DocumentDB/databaseAccounts/sqlDatabases/containers/items/executeQuery

Ressources supplémentaires

Étapes suivantes

Pour en savoir plus sur le processeur de flux de modification, consultez les articles suivants :