Condividi tramite


Flussi di lavoro di Microsoft Agent Framework - Stato

Questo documento offre una panoramica dello stato nel sistema flusso di lavoro di Microsoft Agent Framework.

Informazioni generali

Lo stato consente a più executor all'interno di un flusso di lavoro di accedere e modificare i dati comuni. Questa funzionalità è essenziale per gli scenari in cui parti diverse del flusso di lavoro devono condividere informazioni in cui il passaggio diretto dei messaggi non è fattibile o efficiente.

Scrittura nello stato

using Microsoft.Agents.AI.Workflows;

internal sealed partial class FileReadExecutor(): Executor("FileReadExecutor")
{
    /// <summary>
    /// Reads a file and stores its content in a shared state.
    /// </summary>
    /// <param name="message">The path to the embedded resource file.</param>
    /// <param name="context">The workflow context for accessing shared states.</param>
    /// <returns>The ID of the shared state where the file content is stored.</returns>
    [MessageHandler]
    private async ValueTask<string> HandleAsync(string message, IWorkflowContext context)
    {
        // Read file content from embedded resource
        string fileContent = File.ReadAllText(message);
        // Store file content in a shared state for access by other executors
        string fileID = Guid.NewGuid().ToString();
        await context.QueueStateUpdateAsync<string>(fileID, fileContent, scopeName: "FileContent");

        return fileID;
    }
}
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class FileReadExecutor(Executor):

    @handler
    async def handle(self, file_path: str, ctx: WorkflowContext[str]):
        # Read file content from embedded resource
        with open(file_path, 'r') as file:
            file_content = file.read()
        # Store file content in state for access by other executors
        file_id = str(uuid.uuid4())
        ctx.set_state(file_id, file_content)

        await ctx.send_message(file_id)

Accesso allo stato

using Microsoft.Agents.AI.Workflows;

internal sealed partial class WordCountingExecutor() : Executor("WordCountingExecutor")
{
    /// <summary>
    /// Counts the number of words in the file content stored in a shared state.
    /// </summary>
    /// <param name="message">The ID of the shared state containing the file content.</param>
    /// <param name="context">The workflow context for accessing shared states.</param>
    /// <returns>The number of words in the file content.</returns>
    [MessageHandler]
    private async ValueTask<int> HandleAsync(string message, IWorkflowContext context)
    {
        // Retrieve the file content from the shared state
        var fileContent = await context.ReadStateAsync<string>(message, scopeName: "FileContent")
            ?? throw new InvalidOperationException("File content state not found");

        return fileContent.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
    }
}
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class WordCountingExecutor(Executor):

    @handler
    async def handle(self, file_id: str, ctx: WorkflowContext[int]):
        # Retrieve the file content from state
        file_content = ctx.get_state(file_id)
        if file_content is None:
            raise ValueError("File content state not found")

        await ctx.send_message(len(file_content.split()))

Isolamento dello stato

Nelle applicazioni reali, la gestione corretta dello stato è fondamentale quando si gestiscono più attività o richieste. Senza un adeguato isolamento, lo stato condiviso tra diverse esecuzioni del flusso di lavoro può causare comportamenti imprevisti, corruzione dei dati e condizioni di gara. Questa sezione illustra come garantire l'isolamento dello stato all'interno dei flussi di lavoro di Microsoft Agent Framework, fornendo informazioni dettagliate sulle procedure consigliate e sulle insidie comuni.

Generatori di flussi di lavoro modificabili e flussi di lavoro non modificabili

I flussi di lavoro vengono creati dai generatori di flussi di lavoro. I generatori di flussi di lavoro sono in genere considerati modificabili, in cui è possibile aggiungere, modificare l'executor di avvio o altre configurazioni dopo la creazione del generatore o anche dopo la compilazione di un flusso di lavoro. D'altra parte, i flussi di lavoro non sono modificabili in quanto una volta compilato un flusso di lavoro, non può essere modificato (nessuna API pubblica per modificare un flusso di lavoro).

Questa distinzione è importante perché influisce sul modo in cui lo stato viene gestito tra diverse esecuzioni del flusso di lavoro. Non è consigliabile riutilizzare una singola istanza del flusso di lavoro per più attività o richieste, perché ciò può causare la condivisione dello stato non intenzionale. È invece consigliabile creare una nuova istanza del flusso di lavoro utilizzando il costruttore per ogni attività o richiesta, per garantire l'isolamento dello stato e una corretta sicurezza nell'uso dei thread.

Garantire l'isolamento dello stato con i metodi helper

Quando le istanze dell'executor vengono create una sola volta e condivise tra più compilazioni del flusso di lavoro, lo stato interno viene condiviso tra tutte le esecuzioni del flusso di lavoro. Ciò può causare problemi se un executor contiene uno stato modificabile che deve essere isolato per ogni flusso di lavoro. Per garantire un appropriato isolamento dello stato e la sicurezza dei thread, incapsulare la costruzione del workflow e l'istanza dell'executor all'interno di un metodo di supporto in modo che ogni chiamata produca istanze nuove e indipendenti.

Prossimamente...

Esempio non isolato (stato condiviso):

executor_a = CustomExecutorA()
executor_b = CustomExecutorB()

# executor_a and executor_b are shared across all workflows built from this builder
workflow_builder = WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b)

workflow_a = workflow_builder.build()
workflow_b = workflow_builder.build()
# workflow_a and workflow_b share the same executor instances and their mutable state

Esempio isolato (metodo helper):

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated state.

    Each call produces independent executor instances, ensuring no state
    leaks between workflow runs.
    """
    executor_a = CustomExecutorA()
    executor_b = CustomExecutorB()

    return WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b).build()

# Each workflow has its own executor instances with independent state
workflow_a = create_workflow()
workflow_b = create_workflow()

Suggerimento

Per garantire il corretto isolamento dello stato e la sicurezza dei thread, assicurarsi anche che le istanze dell'executor create all'interno del metodo di aiuto non condividano uno stato esterno che possa essere modificato.

Gestione stato agente

Il contesto dell'agente viene gestito tramite thread dell'agente. Per impostazione predefinita, ogni agente in un flusso di lavoro otterrà il proprio thread, a meno che l'agente non sia gestito da un executor personalizzato. Per altre informazioni, vedere Uso degli agenti.

I thread dell'agente vengono mantenuti tra le esecuzioni del flusso di lavoro. Ciò significa che se un agente viene richiamato nella prima esecuzione di un flusso di lavoro, il contenuto generato dall'agente sarà disponibile nelle esecuzioni successive della stessa istanza del flusso di lavoro. Sebbene ciò possa essere utile per mantenere la continuità all'interno di una singola attività, può anche causare la condivisione dello stato imprevisto se la stessa istanza del flusso di lavoro viene riutilizzata per diverse attività o richieste. Per mantenere uno stato dell'agente isolato per ogni attività, incapsulare l'agente e la creazione del workflow all'interno di un metodo di supporto in modo che ogni chiamata produca nuove istanze dell'agente con i propri thread.

Prossimamente...

Esempio non isolato (stato agente condiviso):

writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content writer. You create new content and edit contents based on the feedback."
    ),
    name="writer_agent",
)
reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content reviewer."
        "Provide actionable feedback to the writer about the provided content."
        "Provide the feedback in the most concise manner possible."
    ),
    name="reviewer_agent",
)

# writer_agent and reviewer_agent are shared across all workflows
workflow = WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

Esempio isolato (metodo helper):

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated agent state.

    Each call produces new agent instances with their own threads,
    ensuring no conversation history leaks between workflow runs.
    """
    writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content writer. You create new content and edit contents based on the feedback."
        ),
        name="writer_agent",
    )
    reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content reviewer."
            "Provide actionable feedback to the writer about the provided content."
            "Provide the feedback in the most concise manner possible."
        ),
        name="reviewer_agent",
    )

    return WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

# Each workflow has its own agent instances and threads
workflow_a = create_workflow()
workflow_b = create_workflow()

Riassunto

L'isolamento dello stato nei flussi di lavoro di Microsoft Agent Framework può essere gestito in modo efficace incapsulando l'istanza dell'executor e dell'agente insieme alla costruzione del flusso di lavoro all'interno di metodi helper. Chiamando il metodo helper ogni volta che è necessario un nuovo flusso di lavoro, assicuri che ogni istanza abbia uno stato nuovo, indipendente ed eviti una condivisione dello stato non intenzionale tra diverse esecuzioni del flusso di lavoro.

Passaggi successivi