Partilhar via


Ponto de verificação e retomada de fluxos de trabalho

O ponto de verificação permite que os fluxos de trabalho salvem seu estado em pontos específicos e retomem a execução mais tarde, mesmo após a reinicialização do processo. Isso é crucial para fluxos de trabalho de longa execução, recuperação de erros e cenários humano no circuito.

Conceitos abordados

Pré-requisitos

Componentes-chave

Instalar pacotes NuGet

Primeiro, instale os pacotes necessários para seu projeto .NET:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

CheckpointManager

O CheckpointManager fornece armazenamento de ponto de verificação e funcionalidade de recuperação:

using Microsoft.Agents.AI.Workflows;

// Use the default in-memory checkpoint manager
var checkpointManager = CheckpointManager.Default;

// Or create a custom checkpoint manager with JSON serialization
var checkpointManager = CheckpointManager.CreateJson(store, customOptions);

Ativando o ponto de verificação

Habilite o ponto de verificação ao executar fluxos de trabalho usando InProcessExecution:

using Microsoft.Agents.AI.Workflows;

// Create workflow with checkpointing support
var workflow = await WorkflowHelper.GetWorkflowAsync();
var checkpointManager = CheckpointManager.Default;

// Execute with checkpointing enabled
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
    .StreamAsync(workflow, NumberSignal.Init, checkpointManager);

Persistência do Estado

Estado Executor

Os executores podem persistir o estado local que sobrevive aos pontos de verificação usando a Executor<T> classe base:

internal sealed class GuessNumberExecutor : Executor<NumberSignal>("Guess")
{
    private const string StateKey = "GuessNumberExecutor.State";

    public int LowerBound { get; private set; }
    public int UpperBound { get; private set; }

    public GuessNumberExecutor() : this()
    {
    }

    public override async ValueTask HandleAsync(NumberSignal message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        int guess = (LowerBound + UpperBound) / 2;
        await context.SendMessageAsync(guess, cancellationToken);
    }

    /// <summary>
    /// Checkpoint the current state of the executor.
    /// This must be overridden to save any state that is needed to resume the executor.
    /// </summary>
    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default) =>
        context.QueueStateUpdateAsync(StateKey, (LowerBound, UpperBound), cancellationToken);

    /// <summary>
    /// Restore the state of the executor from a checkpoint.
    /// This must be overridden to restore any state that was saved during checkpointing.
    /// </summary>
    protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        var state = await context.ReadStateAsync<(int, int)>(StateKey, cancellationToken);
        (LowerBound, UpperBound) = state;
    }
}

Criação automática de pontos de verificação

Os pontos de verificação são criados automaticamente no final de cada super etapa quando um gerenciador de pontos de verificação é fornecido:

var checkpoints = new List<CheckpointInfo>();

await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
    switch (evt)
    {
        case SuperStepCompletedEvent superStepCompletedEvt:
            // Checkpoints are automatically created at super step boundaries
            CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
            if (checkpoint is not null)
            {
                checkpoints.Add(checkpoint);
                Console.WriteLine($"Checkpoint created at step {checkpoints.Count}.");
            }
            break;

        case WorkflowOutputEvent workflowOutputEvt:
            Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
            break;
    }
}

Trabalhando com pontos de verificação

Acessando informações do ponto de verificação

Aceda aos metadados dos pontos de verificação das execuções concluídas:

// Get all checkpoints from a checkpointed run
var allCheckpoints = checkpointedRun.Checkpoints;

// Get the latest checkpoint
var latestCheckpoint = checkpointedRun.LatestCheckpoint;

// Access checkpoint details
foreach (var checkpoint in checkpoints)
{
    Console.WriteLine($"Checkpoint ID: {checkpoint.CheckpointId}");
    Console.WriteLine($"Step Number: {checkpoint.StepNumber}");
    Console.WriteLine($"Parent ID: {checkpoint.Parent?.CheckpointId ?? "None"}");
}

Armazenamento de ponto de verificação

Os pontos de verificação são geridos através da CheckpointManager interface:

// Commit a checkpoint (usually done automatically)
CheckpointInfo checkpointInfo = await checkpointManager.CommitCheckpointAsync(runId, checkpoint);

// Retrieve a checkpoint
Checkpoint restoredCheckpoint = await checkpointManager.LookupCheckpointAsync(runId, checkpointInfo);

Retomar a partir de pontos de verificação

Retomar Transmissão

Retome a execução a partir de um ponto de verificação e transmita eventos em tempo real:

// Resume from a specific checkpoint with streaming
CheckpointInfo savedCheckpoint = checkpoints[checkpointIndex];

await using Checkpointed<StreamingRun> resumedRun = await InProcessExecution
    .ResumeStreamAsync(workflow, savedCheckpoint, checkpointManager, runId);

await foreach (WorkflowEvent evt in resumedRun.Run.WatchStreamAsync())
{
    switch (evt)
    {
        case ExecutorCompletedEvent executorCompletedEvt:
            Console.WriteLine($"Executor {executorCompletedEvt.ExecutorId} completed.");
            break;

        case WorkflowOutputEvent workflowOutputEvt:
            Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
            return;
    }
}

Retomar sem streaming

Retomar e aguardar conclusão:

// Resume from checkpoint without streaming
Checkpointed<Run> resumedRun = await InProcessExecution
    .ResumeAsync(workflow, savedCheckpoint, checkpointManager, runId);

// Wait for completion and get final result
var result = await resumedRun.Run.WaitForCompletionAsync();

Restauro In-Loco

Restaure um ponto de verificação diretamente para uma instância de execução existente:

// Restore checkpoint to the same run instance
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint);

// Continue execution from the restored state
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
    // Handle events as normal
    if (evt is WorkflowOutputEvent outputEvt)
    {
        Console.WriteLine($"Resumed workflow result: {outputEvt.Data}");
        break;
    }
}

Nova instância de fluxo de trabalho (reidratação)

Crie uma nova instância de fluxo de trabalho a partir de um ponto de verificação:

// Create a completely new workflow instance
var newWorkflow = await WorkflowHelper.GetWorkflowAsync();

// Resume with the new instance from a saved checkpoint
await using Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
    .ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager, originalRunId);

await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync())
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Rehydrated workflow result: {workflowOutputEvt.Data}");
        break;
    }
}

Human-in-the-Loop com Ponto de Verificação

Combine a criação de checkpoints com fluxos de trabalho com intervenção humana:

var checkpoints = new List<CheckpointInfo>();

await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle external requests
            ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
            await checkpointedRun.Run.SendResponseAsync(response);
            break;

        case SuperStepCompletedEvent superStepCompletedEvt:
            // Save checkpoint after each interaction
            CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
            if (checkpoint is not null)
            {
                checkpoints.Add(checkpoint);
                Console.WriteLine($"Checkpoint created after human interaction.");
            }
            break;

        case WorkflowOutputEvent workflowOutputEvt:
            Console.WriteLine($"Workflow completed: {workflowOutputEvt.Data}");
            return;
    }
}

// Later, resume from any checkpoint
if (checkpoints.Count > 0)
{
    var selectedCheckpoint = checkpoints[1]; // Select specific checkpoint
    await checkpointedRun.RestoreCheckpointAsync(selectedCheckpoint);

    // Continue from that point
    await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
    {
        // Handle remaining workflow execution
    }
}

Padrão de exemplo completo

Aqui está um padrão abrangente de fluxo de trabalho de ponto de verificação:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;

public static class CheckpointingExample
{
    public static async Task RunAsync()
    {
        // Create workflow and checkpoint manager
        var workflow = await WorkflowHelper.GetWorkflowAsync();
        var checkpointManager = CheckpointManager.Default;
        var checkpoints = new List<CheckpointInfo>();

        Console.WriteLine("Starting workflow with checkpointing...");

        // Execute workflow with checkpointing
        await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
            .StreamAsync(workflow, NumberSignal.Init, checkpointManager);

        // Monitor execution and collect checkpoints
        await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
        {
            switch (evt)
            {
                case ExecutorCompletedEvent executorEvt:
                    Console.WriteLine($"Executor {executorEvt.ExecutorId} completed.");
                    break;

                case SuperStepCompletedEvent superStepEvt:
                    var checkpoint = superStepEvt.CompletionInfo!.Checkpoint;
                    if (checkpoint is not null)
                    {
                        checkpoints.Add(checkpoint);
                        Console.WriteLine($"Checkpoint {checkpoints.Count} created.");
                    }
                    break;

                case WorkflowOutputEvent outputEvt:
                    Console.WriteLine($"Workflow completed: {outputEvt.Data}");
                    goto FinishExecution;
            }
        }

        FinishExecution:
        Console.WriteLine($"Total checkpoints created: {checkpoints.Count}");

        // Demonstrate resuming from a checkpoint
        if (checkpoints.Count > 5)
        {
            var selectedCheckpoint = checkpoints[5];
            Console.WriteLine($"Resuming from checkpoint 6...");

            // Restore to same instance
            await checkpointedRun.RestoreCheckpointAsync(selectedCheckpoint);

            await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
            {
                if (evt is WorkflowOutputEvent resumedOutputEvt)
                {
                    Console.WriteLine($"Resumed workflow result: {resumedOutputEvt.Data}");
                    break;
                }
            }
        }

        // Demonstrate rehydration with new workflow instance
        if (checkpoints.Count > 3)
        {
            var newWorkflow = await WorkflowHelper.GetWorkflowAsync();
            var rehydrationCheckpoint = checkpoints[3];

            Console.WriteLine("Rehydrating from checkpoint 4 with new workflow instance...");

            await using Checkpointed<StreamingRun> newRun = await InProcessExecution
                .ResumeStreamAsync(newWorkflow, rehydrationCheckpoint, checkpointManager, checkpointedRun.Run.RunId);

            await foreach (WorkflowEvent evt in newRun.Run.WatchStreamAsync())
            {
                if (evt is WorkflowOutputEvent rehydratedOutputEvt)
                {
                    Console.WriteLine($"Rehydrated workflow result: {rehydratedOutputEvt.Data}");
                    break;
                }
            }
        }
    }
}

Principais Benefícios

  • Tolerância a falhas: os fluxos de trabalho podem se recuperar de falhas retomando a partir do último ponto de verificação
  • Processos de Longa Duração: divida fluxos de trabalho longos em segmentos geríveis com limites automáticos de pontos de verificação
  • Human-in-the-Loop: Pause para entrada externa e continue posteriormente a partir do estado guardado.
  • Depuração: inspecione o estado do fluxo de trabalho em pontos específicos e retome a execução para teste
  • Portabilidade: os pontos de verificação podem ser restaurados para novas instâncias de fluxo de trabalho (reidratação)
  • Gestão Automática: Os pontos de verificação são criados automaticamente nos limites do super passo

Executando o exemplo

Para obter a implementação de trabalho completa, consulte o exemplo CheckpointAndResume.

Componentes-chave

FileCheckpointStorage

A FileCheckpointStorage classe fornece armazenamento de ponto de verificação persistente usando arquivos JSON:

from agent_framework import FileCheckpointStorage
from pathlib import Path

# Initialize checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

Ativando o ponto de verificação

Habilite o ponto de verificação ao criar seu fluxo de trabalho:

from agent_framework import WorkflowBuilder

workflow = (
    WorkflowBuilder(max_iterations=5)
    .add_edge(executor1, executor2)
    .set_start_executor(executor1)
    .with_checkpointing(checkpoint_storage=checkpoint_storage)  # Enable checkpointing
    .build()
)

Persistência do Estado

Estado Executor

Os executores podem persistir o estado local que sobrevive aos pontos de verificação:

from agent_framework import Executor, WorkflowContext, handler

class WorkerExecutor(Executor):
    """Processes numbers to compute their factor pairs and manages executor state for checkpointing."""

    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._composite_number_pairs: dict[int, list[tuple[int, int]]] = {}

    @handler
    async def compute(
        self,
        task: ComputeTask,
        ctx: WorkflowContext[ComputeTask, dict[int, list[tuple[int, int]]]],
    ) -> None:
        """Process the next number in the task, computing its factor pairs."""
        next_number = task.remaining_numbers.pop(0)

        print(f"WorkerExecutor: Computing factor pairs for {next_number}")
        pairs: list[tuple[int, int]] = []
        for i in range(1, next_number):
            if next_number % i == 0:
                pairs.append((i, next_number // i))
        self._composite_number_pairs[next_number] = pairs

        if not task.remaining_numbers:
            # All numbers processed - output the results
            await ctx.yield_output(self._composite_number_pairs)
        else:
            # More numbers to process - continue with remaining task
            await ctx.send_message(task)

    @override
    async def on_checkpoint_save(self) -> dict[str, Any]:
        """Save the executor's internal state for checkpointing."""
        return {"composite_number_pairs": self._composite_number_pairs}

    @override
    async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
        """Restore the executor's internal state from a checkpoint."""
        self._composite_number_pairs = state.get("composite_number_pairs", {})

Trabalhando com pontos de verificação

Listando pontos de verificação

Recupere e inspecione os pontos de verificação disponíveis:

# List all checkpoints
all_checkpoints = await checkpoint_storage.list_checkpoints()

# List checkpoints for a specific workflow
workflow_checkpoints = await checkpoint_storage.list_checkpoints(workflow_id="my-workflow")

# Sort by creation time
sorted_checkpoints = sorted(all_checkpoints, key=lambda cp: cp.timestamp)

Retomar a partir de pontos de verificação

Retomar Transmissão

Retome a execução e transmita eventos em tempo real:

# Resume from a specific checkpoint
async for event in workflow.run_stream(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage
):
    print(f"Resumed Event: {event}")

    if isinstance(event, WorkflowOutputEvent):
        print(f"Final Result: {event.data}")
        break

Retomar sem streaming

Retome e obtenha todos os resultados de uma só vez:

# Resume and wait for completion
result = await workflow.run(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage
)

# Access final outputs
outputs = result.get_outputs()
print(f"Final outputs: {outputs}")

Continuar com pedidos pendentes

Ao retomar a partir de um ponto de verificação que contém solicitações pendentes, o fluxo de trabalho emitirá novamente esses eventos de solicitação, permitindo que você capture e responda a eles:

request_info_events = []
# Resume from checkpoint - pending requests will be re-emitted
async for event in workflow.run_stream(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage
):
    if isinstance(event, RequestInfoEvent):
        # Capture re-emitted pending requests
        print(f"Pending request re-emitted: {event.request_id}")
        request_info_events.append(event)

# Handle the request and provide response
# If responses are already provided, no need to handle them again
responses = {}
for event in request_info_events:
    response = handle_request(event.data)
    responses[event.request_id] = response

# Send response back to workflow
async for event in workflow.send_responses_streaming(responses):
    if isinstance(event, WorkflowOutputEvent):
        print(f"Workflow completed: {event.data}")

Se estiver a retomar de um ponto de verificação com pedidos pendentes que já foram respondidos, ainda precisará chamar run_stream() para continuar o fluxo de trabalho, seguido de send_responses_streaming() com as respostas pré-fornecidas.

Seleção interativa de pontos de verificação

Crie uma seleção de pontos de verificação fácil de usar:

async def select_and_resume_checkpoint(workflow, storage):
    # Get available checkpoints
    checkpoints = await storage.list_checkpoints()
    if not checkpoints:
        print("No checkpoints available")
        return

    # Sort and display options
    sorted_cps = sorted(checkpoints, key=lambda cp: cp.timestamp)
    print("Available checkpoints:")
    for i, cp in enumerate(sorted_cps):
        summary = get_checkpoint_summary(cp)
        print(f"[{i}] {summary.checkpoint_id[:8]}... iter={summary.iteration_count}")

    # Get user selection
    try:
        idx = int(input("Enter checkpoint index: "))
        selected = sorted_cps[idx]

        # Resume from selected checkpoint
        print(f"Resuming from checkpoint: {selected.checkpoint_id}")
        async for event in workflow.run_stream(
            selected.checkpoint_id,
            checkpoint_storage=storage
        ):
            print(f"Event: {event}")

    except (ValueError, IndexError):
        print("Invalid selection")

Padrão de exemplo completo

Aqui está um padrão típico de fluxo de trabalho de criação de pontos de verificação:

import asyncio
from pathlib import Path

from agent_framework import (
    FileCheckpointStorage,
    WorkflowBuilder,
    WorkflowOutputEvent,
    get_checkpoint_summary
)

async def main():
    # Setup checkpoint storage
    checkpoint_dir = Path("./checkpoints")
    checkpoint_dir.mkdir(exist_ok=True)
    storage = FileCheckpointStorage(checkpoint_dir)

    # Build workflow with checkpointing
    workflow = (
        WorkflowBuilder()
        .add_edge(executor1, executor2)
        .set_start_executor(executor1)
        .with_checkpointing(storage)
        .build()
    )

    # Initial run
    print("Running workflow...")
    async for event in workflow.run_stream("input data"):
        print(f"Event: {event}")

    # List and inspect checkpoints
    checkpoints = await storage.list_checkpoints()
    for cp in sorted(checkpoints, key=lambda c: c.timestamp):
        summary = get_checkpoint_summary(cp)
        print(f"Checkpoint: {summary.checkpoint_id[:8]}... iter={summary.iteration_count}")

    # Resume from a checkpoint
    if checkpoints:
        latest = max(checkpoints, key=lambda cp: cp.timestamp)
        print(f"Resuming from: {latest.checkpoint_id}")

        async for event in workflow.run_stream(latest.checkpoint_id):
            print(f"Resumed: {event}")

if __name__ == "__main__":
    asyncio.run(main())

Principais Benefícios

  • Tolerância a falhas: os fluxos de trabalho podem se recuperar de falhas retomando a partir do último ponto de verificação
  • Processos de Longa Duração: Divida fluxos de trabalho longos em segmentos geríveis com limites de verificação
  • Human-in-the-Loop: Pause para entrada humana e retome mais tarde - as solicitações pendentes são reemitidas após a retomada
  • Depuração: inspecione o estado do fluxo de trabalho em pontos específicos e retome a execução para teste
  • Gerenciamento de recursos: interrompa e reinicie fluxos de trabalho com base na disponibilidade de recursos

Executando o exemplo

Para obter a implementação de trabalho completa, consulte o Exemplo de ponto de verificação com currículo.

Próximas Etapas