Partager via


Flux de travail Microsoft Agent Framework - État

Ce document fournit une vue d’ensemble de l’état dans le système de flux de travail Microsoft Agent Framework.

Vue d’ensemble

L’état permet à plusieurs exécuteurs au sein d’un flux de travail d’accéder aux données communes et de les modifier. Cette fonctionnalité est essentielle pour les scénarios où différentes parties du flux de travail doivent partager des informations où le passage de messages directs n’est pas réalisable ou efficace.

Écriture vers l’état

using Microsoft.Agents.AI.Workflows;

internal sealed partial class FileReadExecutor(): Executor("FileReadExecutor")
{
    /// <summary>
    /// Reads a file and stores its content in a shared state.
    /// </summary>
    /// <param name="message">The path to the embedded resource file.</param>
    /// <param name="context">The workflow context for accessing shared states.</param>
    /// <returns>The ID of the shared state where the file content is stored.</returns>
    [MessageHandler]
    private async ValueTask<string> HandleAsync(string message, IWorkflowContext context)
    {
        // Read file content from embedded resource
        string fileContent = File.ReadAllText(message);
        // Store file content in a shared state for access by other executors
        string fileID = Guid.NewGuid().ToString();
        await context.QueueStateUpdateAsync<string>(fileID, fileContent, scopeName: "FileContent");

        return fileID;
    }
}
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class FileReadExecutor(Executor):

    @handler
    async def handle(self, file_path: str, ctx: WorkflowContext[str]):
        # Read file content from embedded resource
        with open(file_path, 'r') as file:
            file_content = file.read()
        # Store file content in state for access by other executors
        file_id = str(uuid.uuid4())
        ctx.set_state(file_id, file_content)

        await ctx.send_message(file_id)

Accès à l’état

using Microsoft.Agents.AI.Workflows;

internal sealed partial class WordCountingExecutor() : Executor("WordCountingExecutor")
{
    /// <summary>
    /// Counts the number of words in the file content stored in a shared state.
    /// </summary>
    /// <param name="message">The ID of the shared state containing the file content.</param>
    /// <param name="context">The workflow context for accessing shared states.</param>
    /// <returns>The number of words in the file content.</returns>
    [MessageHandler]
    private async ValueTask<int> HandleAsync(string message, IWorkflowContext context)
    {
        // Retrieve the file content from the shared state
        var fileContent = await context.ReadStateAsync<string>(message, scopeName: "FileContent")
            ?? throw new InvalidOperationException("File content state not found");

        return fileContent.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
    }
}
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class WordCountingExecutor(Executor):

    @handler
    async def handle(self, file_id: str, ctx: WorkflowContext[int]):
        # Retrieve the file content from state
        file_content = ctx.get_state(file_id)
        if file_content is None:
            raise ValueError("File content state not found")

        await ctx.send_message(len(file_content.split()))

Isolation d'état

Dans les applications réelles, la gestion correcte de l’état est essentielle lors de la gestion de plusieurs tâches ou requêtes. Sans isolation appropriée, l’état partagé entre différentes exécutions de flux de travail peut entraîner un comportement inattendu, une altération des données et des conditions de concurrence. Cette section explique comment garantir l’isolation de l’état dans les flux de travail du Microsoft Agent Framework, en fournissant des aperçus sur les meilleures pratiques et les pièges courants.

Générateurs de flux de travail mutables et flux de travail immuables

Les flux de travail sont créés par les générateurs de flux de travail. Les générateurs de flux de travail sont généralement considérés comme mutables, où l’on peut ajouter, modifier l’exécuteur de démarrage ou d’autres configurations une fois le générateur créé ou même après la création d’un flux de travail. En revanche, les flux de travail sont immuables lorsqu’un flux de travail est généré, il ne peut pas être modifié (aucune API publique pour modifier un flux de travail).

Cette distinction est importante, car elle affecte la façon dont l’état est géré dans différentes exécutions de flux de travail. Il n’est pas recommandé de réutiliser une instance de flux de travail unique pour plusieurs tâches ou requêtes, car cela peut entraîner un partage d’état inattendu. Au lieu de cela, il est recommandé de créer une instance de flux de travail à partir du générateur pour chaque tâche ou demande pour garantir une isolation d’état appropriée et la sécurité des threads.

Garantir l’isolation de l’état avec les méthodes d’assistance

Lorsque des instances d’exécuteur sont créées une fois et partagées sur plusieurs builds de flux de travail, leur état interne est partagé entre toutes les exécutions de flux de travail. Cela peut entraîner des problèmes si un exécuteur contient un état mutable qui doit être isolé par workflow. Pour garantir une isolation d’état appropriée et la sécurité des threads, encapsulez l’instanciation de l’exécuteur et la création de flux de travail à l’intérieur d’une méthode d’assistance afin que chaque appel produise des instances fraîches et indépendantes.

À venir...

Exemple non isolé (état partagé) :

executor_a = CustomExecutorA()
executor_b = CustomExecutorB()

# executor_a and executor_b are shared across all workflows built from this builder
workflow_builder = WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b)

workflow_a = workflow_builder.build()
workflow_b = workflow_builder.build()
# workflow_a and workflow_b share the same executor instances and their mutable state

Exemple isolé (méthode d’assistance) :

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated state.

    Each call produces independent executor instances, ensuring no state
    leaks between workflow runs.
    """
    executor_a = CustomExecutorA()
    executor_b = CustomExecutorB()

    return WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b).build()

# Each workflow has its own executor instances with independent state
workflow_a = create_workflow()
workflow_b = create_workflow()

Conseil / Astuce

Pour garantir une isolation d’état appropriée et la sécurité des threads, assurez-vous également que les instances d’exécuteur créées à l’intérieur de la méthode d’assistance ne partagent pas l’état mutable externe.

Gestion de l’état de l’agent

Le contexte de l’agent est géré via des threads d’agent. Par défaut, chaque agent d’un workflow obtient son propre thread, sauf si l’agent est géré par un exécuteur personnalisé. Pour plus d’informations, reportez-vous à Travailler avec les agents.

Les threads d’agent sont conservés entre les exécutions de flux de travail. Cela signifie que si un agent est appelé lors de la première exécution d’un flux de travail, le contenu généré par l’agent sera disponible dans les exécutions suivantes de la même instance de flux de travail. Bien que cela puisse être utile pour maintenir la continuité au sein d’une seule tâche, il peut également entraîner un partage d’état inattendu si la même instance de flux de travail est réutilisée pour différentes tâches ou demandes. Pour vous assurer que chaque tâche dispose d’un état d’agent isolé, encapsulez la création de l’agent et du flux de travail à l’intérieur d’une méthode secondaire afin que chaque appel produise de nouvelles instances d’agent avec des threads propres à chacun.

À venir...

Exemple non isolé (état de l’agent partagé) :

writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content writer. You create new content and edit contents based on the feedback."
    ),
    name="writer_agent",
)
reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content reviewer."
        "Provide actionable feedback to the writer about the provided content."
        "Provide the feedback in the most concise manner possible."
    ),
    name="reviewer_agent",
)

# writer_agent and reviewer_agent are shared across all workflows
workflow = WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

Exemple isolé (méthode d’assistance) :

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated agent state.

    Each call produces new agent instances with their own threads,
    ensuring no conversation history leaks between workflow runs.
    """
    writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content writer. You create new content and edit contents based on the feedback."
        ),
        name="writer_agent",
    )
    reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content reviewer."
            "Provide actionable feedback to the writer about the provided content."
            "Provide the feedback in the most concise manner possible."
        ),
        name="reviewer_agent",
    )

    return WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

# Each workflow has its own agent instances and threads
workflow_a = create_workflow()
workflow_b = create_workflow()

Résumé

L'isolation de l'état dans les flux de travail du Microsoft Agent Framework peut être gérée efficacement en encapsulant l'exécuteur et l’instanciation de l'agent ainsi que la construction du flux de travail à l'intérieur des méthodes utilitaires. En appelant la méthode d’assistance chaque fois que vous avez besoin d’un nouveau flux de travail, vous vérifiez que chaque instance a un état frais, indépendant et évitez le partage d’état involontaire entre différentes exécutions de flux de travail.

Étapes suivantes