Partager via


Exécuteurs réinitialisables

Aperçu

Les exécuteurs dans les flux de travail sont souvent avec état , par exemple, ils peuvent accumuler des messages, suivre les nombres de tour ou mettre en cache les résultats intermédiaires. Lorsqu’un flux de travail est réutilisé sur plusieurs exécutions avec des instances d’exécuteur partagées, l’état de reste d’une exécution précédente peut fuiter dans des exécutions ultérieures, ce qui provoque une altération inattendue du comportement ou des données.

L’interface IResettableExecutor résout ce problème en fournissant un contrat pour que les exécuteurs effacent leur état interne entre les exécutions. Le runtime de flux de travail appelle automatiquement ResetAsync() sur les instances d’exécuteur partagé lorsqu’une exécution est terminée, ce qui garantit un environnement propre pour l’exécution suivante.

Le problème

Considérez un exécuteur qui collecte les messages pendant une exécution de flux de travail :

internal sealed partial class AggregationExecutor() : Executor("AggregationExecutor")
{
    private readonly List<string> _messages = [];

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this._messages.Add(message);
        // Process aggregated messages...
    }
}

Si cet exécuteur est partagé entre les exécutions de flux de travail, _messages conserve les données de l’exécution précédente. La deuxième exécution voit des messages périmés qui ne sont pas destinés à celle-ci.

Interface IResettableExecutor

IResettableExecutor définit une méthode unique que le runtime de workflow appelle entre les exécutions :

public interface IResettableExecutor
{
    ValueTask ResetAsync();
}

Lorsqu’un exécuteur implémente cette interface, le runtime peut le réinitialiser en toute sécurité après chaque exécution, ce qui permet au workflow d’être réutilisé sans état obsolète.

Implémentation d’IResettableExecutor

Pour rendre un exécuteur ayant un état réinitialisable, implémentez l’interface et effacez tout état mutable dans ResetAsync():

internal sealed partial class AggregationExecutor()
    : Executor("AggregationExecutor"), IResettableExecutor
{
    private readonly List<string> _messages = [];

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this._messages.Add(message);
        // Process aggregated messages...
    }

    public ValueTask ResetAsync()
    {
        this._messages.Clear();
        return default;
    }
}

Pour obtenir un exemple de travail complet d’un flux de travail qui utilise des exécuteurs réinitialisés, consultez l’exemple WorkflowAsAnAgent.

Quand implémenter

Tous les exécuteurs n’ont pas besoin d’implémenter IResettableExecutor. Utilisez ce guide de décision :

Scénario Implémenter? Reason
L’exécuteur a un état mutable (listes, compteurs, caches) et est partagé entre les exécutions Oui L’état d’un run se transmettra au suivant
L’exécuteur est sans état Non Rien à réinitialiser
L'exécuteur est créé à neuf pour chaque workflow (via une méthode de fabrique) Non Chaque exécution reçoit une nouvelle instance avec un état initial ou vierge
L'exécuteur est déclaré comme étant partageable entre différentes exécutions (declareCrossRunShareable: true) Non Les exécuteurs partageables entre exécutions prennent en charge l’utilisation simultanée sans réinitialisation

Avertissement

Si un exécuteur avec état partagé n'implémente pas IResettableExecutor, la réutilisation du workflow lève un InvalidOperationException:

"Cannot reuse Workflow with shared Executor instances that do not implement IResettableExecutor."

Comment l'environnement d'exécution l'utilise

Le runtime de workflow gère automatiquement le cycle de vie de réinitialisation. Vous n'avez pas besoin d'invoquer ResetAsync() vous-même. La séquence est la suivante :

  1. Propriété acquise : lorsqu’une exécution de flux de travail démarre, le runtime prend la propriété de l’instance de flux de travail et note les exécuteurs qui ont besoin de réinitialiser.
  2. Exécutions : les exécuteurs traitent les messages et peuvent accumuler l’état.
  3. Propriété publiée : une fois l’exécution terminée (ou supprimée), le runtime libère la propriété et appelle ResetAsync() toutes les instances d’exécuteur partagées qui implémentent IResettableExecutor.
  4. Prêt à être réutilisé : après une réinitialisation réussie, le flux de travail peut être utilisé pour une nouvelle exécution.

Si un exécuteur partagé ne parvient pas à se réinitialiser (car il n’implémente pas l’interface), le flux de travail est marqué comme non réutilisable et les exécutions suivantes provoqueront une exception.

Relation à l’isolation de l’état

IResettableExecutor complète le modèle de méthode d’assistance décrit dans State Management. Les deux approches servent des besoins différents :

  • Les méthodes d’assistance (création d’instances nouvelles par exécution) fournissent les garanties d’isolation les plus fortes et sont recommandées comme approche par défaut.
  • IResettableExecutor est utile lorsque vous devez partager des instances d’exécuteur entre les exécutions , par exemple, lorsque la construction de l’exécuteur est coûteuse ou lorsqu’un flux de travail est exposé en tant qu’agent et réutilisé sur plusieurs appels.

Choisissez l’approche qui convient le mieux à votre scénario. Pour la plupart des flux de travail, les méthodes d’assistance sont suffisantes. Utiliser IResettableExecutor lors du partage d’instances est un choix de conception délibéré.

Ce concept ne s’applique pas à Python. Pour une isolation complète de l’état, générez des instances de workflow et d’exécuteur fraîches pour chaque exécution indépendante. Consultez Gestion de l’état pour obtenir des modèles et des exemples.

Étapes suivantes