Compartir a través de


Flujos de trabajo de Microsoft Agent Framework: puntos de control

En esta página se proporciona información general sobre los puntos de control en el sistema de flujo de trabajo de Microsoft Agent Framework.

Información general

Los puntos de control permiten guardar el estado de un flujo de trabajo en puntos específicos durante su ejecución y reanudarse desde esos puntos más adelante. Esta característica es especialmente útil para los siguientes escenarios:

  • Flujos de trabajo de larga duración en los que desea evitar perder el progreso en caso de errores.
  • Flujos de trabajo de larga duración en los que desea pausar y reanudar la ejecución más adelante.
  • Flujos de trabajo que requieren el guardado de estado periódico para fines de auditoría o cumplimiento.
  • Flujos de trabajo que deben migrarse en diferentes entornos o instancias.

¿Cuándo se crean los puntos de control?

Recuerde que los flujos de trabajo se ejecutan en superpasos, como se documenta en los conceptos básicos. Los puntos de control se crean al final de cada superpaso, después de que todos los ejecutores de ese superpaso hayan completado su ejecución. Un punto de control captura todo el estado del flujo de trabajo, entre los que se incluyen:

  • Estado actual de todos los ejecutores
  • Todos los mensajes pendientes del flujo de trabajo para el siguiente superpaso
  • Solicitudes y respuestas pendientes
  • Estados compartidos

Capturar puntos de control

Para habilitar el punto de control, es necesario proporcionar un CheckpointManager al crear una ejecución de un flujo de trabajo. Se puede acceder a un punto de control a través de un SuperStepCompletedEvent.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();

// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
    .StreamAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint and store it
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
        if (checkpoint != null)
        {
            checkpoints.Add(checkpoint);
        }
    }
}

Para habilitar el punto de control, se necesita proporcionar un componente CheckpointStorage al configurar un flujo de trabajo. A continuación, se puede acceder a un punto de control a través del almacenamiento.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.with_checkpointing(checkpoint_storage).build()

# Run the workflow
async for event in workflow.run_streaming(input):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()

Reanudación desde puntos de control

Puede reanudar un flujo de trabajo desde un punto de control específico directamente en la misma ejecución.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Puede reanudar un flujo de trabajo desde un punto de control específico directamente en la misma instancia de flujo de trabajo.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream(checkpoint_id=saved_checkpoint.checkpoint_id):
    ...

Rehidratación desde puntos de control

O bien, puede rehidratar un flujo de trabajo desde un punto de control a una nueva instancia de ejecución.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
    .ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

O bien, puede rehidratar una nueva instancia de flujo de trabajo desde un punto de control.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
):
    ...

Guardar estados de ejecución

Para asegurarse de que el estado de un ejecutor se captura en un punto de control, el ejecutor debe invalidar el OnCheckpointingAsync método y guardar su estado en el contexto de flujo de trabajo.

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;

internal sealed class CustomExecutor() : Executor<string>("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    public async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Además, para asegurarse de que el estado se restaura correctamente al reanudar desde un punto de control, el ejecutor debe invalidar el OnCheckpointRestoredAsync método y cargar su estado desde el contexto de flujo de trabajo.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Para asegurarse de que el estado de un ejecutor se captura en un punto de control, el ejecutor debe invalidar el on_checkpoint_save método y guardar su estado en el contexto de flujo de trabajo.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Además, para asegurarse de que el estado se restaura correctamente al reanudar desde un punto de control, el ejecutor debe invalidar el on_checkpoint_restore método y cargar su estado desde el contexto de flujo de trabajo.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Pasos siguientes