Condividi tramite


Flussi di lavoro di Microsoft Agent Framework - Checkpoint

Questa pagina offre una panoramica dei checkpoint nel sistema flusso di lavoro di Microsoft Agent Framework.

Informazioni generali

I checkpoint consentono di salvare lo stato di un flusso di lavoro in punti specifici durante l'esecuzione e riprendere da tali punti in un secondo momento. Questa funzionalità è particolarmente utile per gli scenari seguenti:

  • Flussi di lavoro con esecuzione prolungata in cui si desidera evitare di perdere lo stato di avanzamento in caso di errori.
  • Flussi di lavoro a esecuzione prolungata in cui si vuole sospendere e riprendere l'esecuzione in un secondo momento.
  • Flussi di lavoro che richiedono il salvataggio periodico dello stato per scopi di controllo o conformità.
  • Flussi di lavoro di cui è necessario eseguire la migrazione in ambienti o istanze diversi.

Quando vengono creati i checkpoint?

Tenere presente che i flussi di lavoro vengono eseguiti in superstep, come documentato nei concetti di base. I checkpoint vengono creati alla fine di ogni passaggio superiore, dopo che tutti gli executor in tale passaggio hanno completato l'esecuzione. Un checkpoint acquisisce l'intero stato del flusso di lavoro, tra cui:

  • Stato corrente di tutti gli esecutori
  • Tutti i messaggi in sospeso nel flusso di lavoro per il passaggio successivo
  • Richieste e risposte in sospeso
  • Stati condivisi

Cattura dei checkpoint

Per abilitare il checkpointing, è necessario specificare un CheckpointManager quando si crea un'esecuzione di flusso di lavoro. È quindi possibile accedere a un checkpoint tramite un oggetto SuperStepCompletedEvent.

using Microsoft.Agents.Workflows;

// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();

// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
    .StreamAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint and store it
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
        if (checkpoint != null)
        {
            checkpoints.Add(checkpoint);
        }
    }
}

Per abilitare il checkpointing, è necessario specificare un oggetto CheckpointStorage durante la creazione di un flusso di lavoro. È quindi possibile accedere a un punto di controllo attraverso lo storage.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.with_checkpointing(checkpoint_storage).build()

# Run the workflow
async for event in workflow.run_streaming(input):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()

Riprendere dai checkpoint

È possibile riprendere un flusso di lavoro da un checkpoint specifico direttamente nella stessa esecuzione.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

È possibile riprendere un flusso di lavoro da un checkpoint specifico direttamente nella stessa istanza del flusso di lavoro.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream(checkpoint_id=saved_checkpoint.checkpoint_id):
    ...

Riattivazione da checkpoint

In alternativa, è possibile riattivare un flusso di lavoro da un checkpoint in una nuova istanza di esecuzione.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
    .ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

In alternativa, è possibile ri-idratare una nuova istanza del flusso di lavoro da un checkpoint.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
):
    ...

Salvare gli stati dell'esecutore

Per assicurarsi che lo stato di un executor venga acquisito in un checkpoint, l'executor deve sovrascrivere il metodo OnCheckpointingAsync e salvare il suo stato nel contesto del flusso di lavoro.

using Microsoft.Agents.Workflows;
using Microsoft.Agents.Workflows.Reflection;

internal sealed class CustomExecutor() : Executor<string>("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    public async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Inoltre, per assicurarsi che lo stato venga ripristinato correttamente durante la ripresa da un checkpoint, l'executor deve eseguire l'override del metodo e caricarne lo OnCheckpointRestoredAsync stato dal contesto del flusso di lavoro.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Per assicurarsi che lo stato di un executor venga acquisito in un checkpoint, l'executor deve sovrascrivere il metodo on_checkpoint_save e salvare il suo stato nel contesto del flusso di lavoro.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Inoltre, per assicurarsi che lo stato venga ripristinato correttamente durante la ripresa da un checkpoint, l'executor deve eseguire l'override del metodo e caricarne lo on_checkpoint_restore stato dal contesto del flusso di lavoro.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Passaggi successivi