Microsoft Agent Framework-arbetsflöden – kontrollpunkter

Den här sidan innehåller en översikt över kontrollpunkter i Microsoft Agent Framework-arbetsflödessystemet.

Översikt

Med kontrollpunkter kan du spara tillståndet för ett arbetsflöde vid specifika tidpunkter under körningen och återuppta från dessa punkter senare. Den här funktionen är särskilt användbar för följande scenarier:

  • Långvariga arbetsflöden där du vill undvika att förlora förloppet vid fel.
  • Långvariga arbetsflöden där du vill pausa och återuppta körningen vid ett senare tillfälle.
  • Arbetsflöden som kräver periodisk lagring av tillstånd för gransknings- eller efterlevnadssyften.
  • Arbetsflöden som måste migreras mellan olika miljöer eller instanser.

När skapas kontrollpunkter?

Kom ihåg att arbetsflöden körs i supersteg, vilket beskrivs i huvudbegreppen. Kontrollpunkter skapas i slutet av varje supersteg, efter att alla utförare i det supersteget har slutfört sin körning. En kontrollpunkt samlar in hela arbetsflödets tillstånd, inklusive:

  • Det aktuella tillståndet för alla utförare
  • Alla väntande meddelanden i arbetsflödet för nästa supersteg
  • Väntande begäranden och svar
  • Delade tillstånd

Fånga kontrollpunkter

Om du vill aktivera kontrollpunktsfunktionen måste du ange en CheckpointManager när du skapar en arbetsflödeskörning. En kontrollpunkt kan sedan nås via en SuperStepCompletedEvent.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();

// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
    .StreamAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint and store it
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
        if (checkpoint != null)
        {
            checkpoints.Add(checkpoint);
        }
    }
}

Om du vill aktivera kontrollpekande måste du ange ett CheckpointStorage när du skapar ett arbetsflöde. En kontrollpunkt kan sedan nås via lagringen.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.with_checkpointing(checkpoint_storage).build()

# Run the workflow
async for event in workflow.run_streaming(input):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()

Återuppta från kontrollpunkter

Du kan återuppta ett arbetsflöde från en specifik kontrollpunkt direkt på samma körning.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Du kan återuppta ett arbetsflöde från en specifik kontrollpunkt direkt på samma arbetsflödesinstans.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream(checkpoint_id=saved_checkpoint.checkpoint_id):
    ...

Rehydrering från kontrollpunkter

Eller så kan du återställa ett arbetsflöde från en kontrollpunkt i en ny körningsinstans.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
    .ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Du kan också återställa en ny arbetsflödesinstans från en kontrollpunkt.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
):
    ...

Spara exekutortillstånd

För att säkerställa att tillståndet för en exekverare registreras i en kontrollpunkt måste exekveraren åsidosätta OnCheckpointingAsync metoden och spara dess tillstånd i arbetsflödeskontexten.

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;

internal sealed class CustomExecutor() : Executor<string>("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    public async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

För att säkerställa att tillståndet återställs korrekt när du återupptar från en kontrollpunkt måste kören åsidosätta OnCheckpointRestoredAsync metoden och läsa in dess tillstånd från arbetsflödeskontexten.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

För att säkerställa att tillståndet för en exekverare registreras i en kontrollpunkt måste exekveraren åsidosätta on_checkpoint_save metoden och spara dess tillstånd i arbetsflödeskontexten.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

För att säkerställa att tillståndet återställs korrekt när du återupptar från en kontrollpunkt måste kören åsidosätta on_checkpoint_restore metoden och läsa in dess tillstånd från arbetsflödeskontexten.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Nästa steg