Partekatu honen bidez:


Flujos de trabajo del marco de trabajo del agente de Microsoft: puntos de control

En esta página se proporciona información general sobre Checkpoints en el sistema de flujo de trabajo del marco del agente de Microsoft.

Información general

Los puntos de control permiten guardar el estado de un flujo de trabajo en puntos específicos durante su ejecución y reanudarse desde esos puntos más adelante. Esta característica es especialmente útil para los siguientes escenarios:

  • Flujos de trabajo de larga duración en los que desea evitar perder el progreso en caso de errores.
  • Flujos de trabajo de larga duración en los que desea pausar y reanudar la ejecución más adelante.
  • Flujos de trabajo que requieren el guardado de estado periódico para fines de auditoría o cumplimiento.
  • Flujos de trabajo que deben migrarse en diferentes entornos o instancias.

¿Cuándo se crean los puntos de control?

Recuerde que los flujos de trabajo se ejecutan en superpasos, como se documenta en los conceptos básicos. Los puntos de control se crean al final de cada superpaso, después de que todos los ejecutores de ese superpaso hayan completado su ejecución. Un punto de control captura todo el estado del flujo de trabajo, entre los que se incluyen:

  • Estado actual de todos los ejecutores
  • Todos los mensajes pendientes del flujo de trabajo para el siguiente superpaso
  • Solicitudes y respuestas pendientes
  • Estados compartidos

Capturar puntos de control

Para habilitar los puntos de control, es necesario proporcionar un elemento CheckpointManager al ejecutar el flujo de trabajo. A continuación, se puede acceder a un punto de control a través de un SuperStepCompletedEvent, o a través de la propiedad Checkpoints en la ejecución.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

Para habilitar los puntos de control, es necesario proporcionar un CheckpointStorage elemento al crear un flujo de trabajo. A continuación, se puede acceder a un punto de control a través del almacenamiento. Agent Framework incluye tres implementaciones integradas: elija la que coincida con sus necesidades de durabilidad e implementación:

Provider Package Durabilidad. Más adecuado para
InMemoryCheckpointStorage agent-framework Solo en proceso Pruebas, demostraciones, flujos de trabajo de corta duración
FileCheckpointStorage agent-framework Disco local Flujos de trabajo de una sola máquina, desarrollo local
CosmosCheckpointStorage agent-framework-azure-cosmos Azure Cosmos DB Flujos de trabajo de producción, distribuidos y entre procesos

Los tres implementan el mismo CheckpointStorage protocolo, por lo que puede intercambiar proveedores sin cambiar el código de flujo de trabajo o ejecutor.

InMemoryCheckpointStorage mantiene los puntos de control en la memoria del proceso. Ideal para pruebas, demostraciones y flujos de trabajo de corta duración en los que no necesita durabilidad en los reinicios.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

Reanudación desde puntos de control

Puede reanudar un flujo de trabajo desde un punto de control específico directamente en la misma ejecución.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Puede reanudar un flujo de trabajo desde un punto de control específico directamente en la misma instancia de flujo de trabajo.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

Rehidratación desde puntos de control

O bien, puede rehidratar un flujo de trabajo desde un punto de control a una nueva instancia de ejecución.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

O bien, puede rehidratar una nueva instancia de flujo de trabajo desde un punto de control.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

Guardar estados de ejecución

Para asegurarse de que el estado de un ejecutor se captura en un punto de control, el ejecutor debe invalidar el OnCheckpointingAsync método y guardar su estado en el contexto de flujo de trabajo.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Además, para asegurarse de que el estado se restaura correctamente al reanudar desde un punto de control, el ejecutor debe invalidar el OnCheckpointRestoredAsync método y cargar su estado desde el contexto de flujo de trabajo.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Para asegurarse de que el estado de un ejecutor se captura en un punto de control, el ejecutor debe invalidar el on_checkpoint_save método y devolver su estado como diccionario.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Además, para asegurarse de que el estado se restaura correctamente al reanudarse desde un punto de control, el ejecutor debe invalidar el on_checkpoint_restore método y restaurar su estado desde el diccionario de estado proporcionado.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Consideraciones de seguridad

Importante

El almacenamiento de los puntos de control es un límite de confianza. Independientemente de si usa las implementaciones de almacenamiento integradas o una personalizada, el back-end de almacenamiento debe tratarse como infraestructura privada de confianza. Nunca cargue puntos de control de orígenes que no sean de confianza o potencialmente alterados.

Asegúrese de que la ubicación de almacenamiento usada para los puntos de control está protegida correctamente. Solo los servicios y usuarios autorizados deben tener acceso de lectura o escritura a los datos de punto de control.

Serialización pickle de Python

Tanto FileCheckpointStorage como CosmosCheckpointStorage usan el módulo pickle de Python para serializar el estado no nativo de JSON, como dataclasses, datetimes y objetos personalizados. Para mitigar los riesgos de ejecución arbitraria de código durante la deserialización, ambos proveedores usan un unpickler restringido de forma predeterminada. Solo se permite un conjunto incorporado de tipos de Python seguros (primitivos, datetime, uuid, Decimal, colecciones comunes, etc.) y todos los tipos internos agent_framework durante la deserialización. Cualquier otro tipo encontrado en un punto de control hace que la deserialización falle con un WorkflowCheckpointException.

Para permitir tipos adicionales específicos de la aplicación, páselos mediante el parámetro allowed_checkpoint_types usando el formato "module:qualname".

from agent_framework import FileCheckpointStorage

storage = FileCheckpointStorage(
    "/tmp/checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

CosmosCheckpointStorage acepta el mismo parámetro:

from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage

storage = CosmosCheckpointStorage(
    endpoint="https://my-account.documents.azure.com:443/",
    credential=DefaultAzureCredential(),
    database_name="agent-db",
    container_name="checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

Si su modelo de amenazas no permite en absoluto la serialización basada en pickle, use InMemoryCheckpointStorage o implemente una versión personalizada de CheckpointStorage con una estrategia de serialización alternativa.

Responsabilidad de ubicación de almacenamiento

FileCheckpointStorage requiere un parámetro explícito storage_path : no hay ningún directorio predeterminado. Aunque el marco se valida contra ataques de recorrido de rutas, proteger el propio directorio de almacenamiento (permisos de archivo, cifrado de datos en reposo, controles de acceso) es responsabilidad del desarrollador. Solo los procesos autorizados deben tener acceso de lectura o escritura al directorio de punto de control.

CosmosCheckpointStorage se basa en Azure Cosmos DB para el almacenamiento. Use la identidad administrada o RBAC siempre que sea posible, limite la base de datos y el contenedor al servicio de workflows y rote las claves de cuenta si usa la autenticación basada en claves. Como con el almacenamiento de archivos, solo los roles autorizados deben tener acceso de lectura o escritura al contenedor de Cosmos DB que contiene documentos de punto de control.

Pasos siguientes