Compartir a través de


Flujos de trabajo de Microsoft Agent Framework: estado

En este documento se proporciona información general sobre el estado en el sistema de flujo de trabajo de Microsoft Agent Framework.

Información general

El estado permite que varios ejecutores dentro de un flujo de trabajo accedan y modifiquen datos comunes. Esta característica es esencial para escenarios en los que diferentes partes del flujo de trabajo necesitan compartir información en la que el paso directo de mensajes no es factible o eficaz.

Escribir en el estado

using Microsoft.Agents.AI.Workflows;

internal sealed partial class FileReadExecutor(): Executor("FileReadExecutor")
{
    /// <summary>
    /// Reads a file and stores its content in a shared state.
    /// </summary>
    /// <param name="message">The path to the embedded resource file.</param>
    /// <param name="context">The workflow context for accessing shared states.</param>
    /// <returns>The ID of the shared state where the file content is stored.</returns>
    [MessageHandler]
    private async ValueTask<string> HandleAsync(string message, IWorkflowContext context)
    {
        // Read file content from embedded resource
        string fileContent = File.ReadAllText(message);
        // Store file content in a shared state for access by other executors
        string fileID = Guid.NewGuid().ToString();
        await context.QueueStateUpdateAsync<string>(fileID, fileContent, scopeName: "FileContent");

        return fileID;
    }
}
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class FileReadExecutor(Executor):

    @handler
    async def handle(self, file_path: str, ctx: WorkflowContext[str]):
        # Read file content from embedded resource
        with open(file_path, 'r') as file:
            file_content = file.read()
        # Store file content in state for access by other executors
        file_id = str(uuid.uuid4())
        ctx.set_state(file_id, file_content)

        await ctx.send_message(file_id)

Estado de acceso

using Microsoft.Agents.AI.Workflows;

internal sealed partial class WordCountingExecutor() : Executor("WordCountingExecutor")
{
    /// <summary>
    /// Counts the number of words in the file content stored in a shared state.
    /// </summary>
    /// <param name="message">The ID of the shared state containing the file content.</param>
    /// <param name="context">The workflow context for accessing shared states.</param>
    /// <returns>The number of words in the file content.</returns>
    [MessageHandler]
    private async ValueTask<int> HandleAsync(string message, IWorkflowContext context)
    {
        // Retrieve the file content from the shared state
        var fileContent = await context.ReadStateAsync<string>(message, scopeName: "FileContent")
            ?? throw new InvalidOperationException("File content state not found");

        return fileContent.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
    }
}
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class WordCountingExecutor(Executor):

    @handler
    async def handle(self, file_id: str, ctx: WorkflowContext[int]):
        # Retrieve the file content from state
        file_content = ctx.get_state(file_id)
        if file_content is None:
            raise ValueError("File content state not found")

        await ctx.send_message(len(file_content.split()))

Aislamiento de estado

En las aplicaciones del mundo real, administrar correctamente el estado es fundamental al controlar varias tareas o solicitudes. Sin el aislamiento adecuado, el estado compartido entre distintas ejecuciones de flujo de trabajo puede provocar un comportamiento inesperado, daños en los datos y condiciones de carrera. En esta sección se explica cómo garantizar el aislamiento de estado en los flujos de trabajo de Microsoft Agent Framework, lo que proporciona información sobre los procedimientos recomendados y los problemas comunes.

Generadores de flujos de trabajo mutables frente a flujos de trabajo inmutables

Los generadores de flujos de trabajo crean flujos de trabajo. Por lo general, los generadores de flujos de trabajo se consideran mutables, donde se puede agregar, modificar el ejecutor de inicio u otras configuraciones después de crear el generador o incluso después de compilar un flujo de trabajo. Por otro lado, los flujos de trabajo son inmutables en que una vez compilado un flujo de trabajo, no se puede modificar (ninguna API pública para modificar un flujo de trabajo).

Esta distinción es importante porque afecta a cómo se administra el estado en distintas ejecuciones de flujo de trabajo. No se recomienda reutilizar una sola instancia de flujo de trabajo para varias tareas o solicitudes, ya que esto puede provocar un uso compartido de estado no deseado. En su lugar, se recomienda crear una nueva instancia de flujo de trabajo desde el generador para cada tarea o solicitud para garantizar el aislamiento de estado adecuado y la seguridad de subprocesos.

Garantizar el aislamiento de estado con métodos auxiliares

Cuando las instancias del ejecutor se crean una vez y se comparten entre varias compilaciones de flujo de trabajo, su estado interno se comparte en todas las ejecuciones de flujo de trabajo. Esto puede provocar problemas si un ejecutor contiene el estado mutable que se debe aislar por flujo de trabajo. Para garantizar el aislamiento de estado adecuado y la seguridad de los subprocesos, encapsule la instanciación del ejecutor y la definición de flujos de trabajo dentro de un método auxiliar para que cada llamada genere instancias nuevas e independientes.

Próximamente...

Ejemplo no aislado (estado compartido):

executor_a = CustomExecutorA()
executor_b = CustomExecutorB()

# executor_a and executor_b are shared across all workflows built from this builder
workflow_builder = WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b)

workflow_a = workflow_builder.build()
workflow_b = workflow_builder.build()
# workflow_a and workflow_b share the same executor instances and their mutable state

Ejemplo aislado (método auxiliar):

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated state.

    Each call produces independent executor instances, ensuring no state
    leaks between workflow runs.
    """
    executor_a = CustomExecutorA()
    executor_b = CustomExecutorB()

    return WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b).build()

# Each workflow has its own executor instances with independent state
workflow_a = create_workflow()
workflow_b = create_workflow()

Sugerencia

Para garantizar el aislamiento de estado adecuado y la seguridad de subprocesos, asegúrese también de que las instancias del ejecutor creadas dentro del método auxiliar no compartan el estado mutable externo.

Administración del estado del agente

El contexto del agente se administra a través de subprocesos del agente. De forma predeterminada, cada agente de un flujo de trabajo obtendrá su propio subproceso a menos que un ejecutor personalizado administre el agente. Para obtener más información, consulte Trabajar con agentes.

Los subprocesos del agente se conservan en las ejecuciones de flujo de trabajo. Esto significa que si se invoca un agente en la primera ejecución de un flujo de trabajo, el contenido generado por el agente estará disponible en ejecuciones posteriores de la misma instancia de flujo de trabajo. Aunque esto puede ser útil para mantener la continuidad dentro de una sola tarea, también puede dar lugar a un uso compartido de estado no deseado si se reutiliza la misma instancia de flujo de trabajo para diferentes tareas o solicitudes. Para asegurarse de que cada tarea tiene el estado del agente aislado, envuelve el agente y la creación del flujo de trabajo dentro de un método auxiliar para que cada llamada genere nuevas instancias de agente con sus propios hilos.

Próximamente...

Ejemplo no aislado (estado del agente compartido):

writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content writer. You create new content and edit contents based on the feedback."
    ),
    name="writer_agent",
)
reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content reviewer."
        "Provide actionable feedback to the writer about the provided content."
        "Provide the feedback in the most concise manner possible."
    ),
    name="reviewer_agent",
)

# writer_agent and reviewer_agent are shared across all workflows
workflow = WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

Ejemplo aislado (método auxiliar):

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated agent state.

    Each call produces new agent instances with their own threads,
    ensuring no conversation history leaks between workflow runs.
    """
    writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content writer. You create new content and edit contents based on the feedback."
        ),
        name="writer_agent",
    )
    reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content reviewer."
            "Provide actionable feedback to the writer about the provided content."
            "Provide the feedback in the most concise manner possible."
        ),
        name="reviewer_agent",
    )

    return WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

# Each workflow has its own agent instances and threads
workflow_a = create_workflow()
workflow_b = create_workflow()

Resumen

El aislamiento de estado en los flujos de trabajo de Microsoft Agent Framework se puede gestionar eficazmente al envolver la instanciación del ejecutor y del agente junto con la construcción del flujo de trabajo utilizando métodos auxiliares. Al llamar al método auxiliar cada vez que necesita un nuevo flujo de trabajo, asegúrese de que cada instancia tenga un estado nuevo e independiente y evite el uso compartido de estado no deseado entre distintas ejecuciones de flujo de trabajo.

Pasos siguientes