Freigeben über


Microsoft Agent Framework-Workflows – Prüfpunkte

Diese Seite bietet eine Übersicht über Prüfpunkte im Microsoft Agent Framework-Workflowsystem.

Überblick

Prüfpunkte ermöglichen es Ihnen, den Status eines Workflows an bestimmten Punkten während der Ausführung zu speichern und von diesen Punkten später fortzusetzen. Dieses Feature eignet sich besonders für die folgenden Szenarien:

  • Lang andauernde Workflows, bei denen Sie den Fortschrittsverlust im Falle von Fehlern vermeiden möchten.
  • Lang andauernde Workflows, bei denen Sie die Ausführung zu einem späteren Zeitpunkt anhalten und fortsetzen möchten.
  • Workflows, die regelmäßige Zustandsspeicherung für Überwachungs- oder Compliancezwecke erfordern.
  • Workflows, die in verschiedenen Umgebungen oder Instanzen migriert werden müssen.

Wann werden Prüfpunkte erstellt?

Denken Sie daran, dass Workflows in Supersteps ausgeführt werden, wie in den Kernkonzepten dokumentiert. Prüfpunkte werden am Ende jedes Supersteps erstellt, nachdem alle Ausführenden in diesem Superstep ihre Ausführung abgeschlossen haben. Ein Prüfpunkt erfasst den gesamten Status des Workflows, einschließlich:

  • Der aktuelle Status aller Executoren
  • Alle ausstehenden Nachrichten im Workflow für den nächsten Superstep
  • Ausstehende Anforderungen und Antworten
  • Freigegebene Zustände

Erfassen von Prüfpunkten

Um Checkpointing zu aktivieren, muss beim Erstellen eines Workflow-Durchlaufs ein CheckpointManager bereitgestellt werden. Dann kann über einen SuperStepCompletedEventPrüfpunkt zugegriffen werden.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();

// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
    .StreamAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint and store it
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
        if (checkpoint != null)
        {
            checkpoints.Add(checkpoint);
        }
    }
}

Um die Zeigerprüfung zu aktivieren, muss beim Erstellen eines Workflows eine CheckpointStorage Bereitstellung erfolgen. Anschließend kann über den Speicher auf einen Prüfpunkt zugegriffen werden.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.with_checkpointing(checkpoint_storage).build()

# Run the workflow
async for event in workflow.run_streaming(input):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()

Fortsetzen ab Prüfpunkten

Sie können einen Workflow von einem bestimmten Checkpoint direkt im selben Ausführungslauf fortsetzen.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Sie können einen Workflow aus einem bestimmten Prüfpunkt direkt in derselben Workflowinstanz fortsetzen.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream(checkpoint_id=saved_checkpoint.checkpoint_id):
    ...

Rehydratieren von Prüfpunkten

Sie können einen Workflow auch aus einem Prüfpunkt in eine neue Ausführungsinstanz rehydratisieren.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
    .ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Alternativ können Sie eine neue Workflowinstanz aus einem Prüfpunkt rehydratisieren.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
):
    ...

Vollstreckungsstatus speichern

Um sicherzustellen, dass der Status eines Executors in einem Prüfpunkt erfasst wird, muss der Executor die OnCheckpointingAsync Methode überschreiben und den Status im Workflowkontext speichern.

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;

internal sealed class CustomExecutor() : Executor<string>("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    public async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Um sicherzustellen, dass der Zustand beim Fortsetzen eines Checkpoints ordnungsgemäß wiederhergestellt wird, muss der Ausführende die OnCheckpointRestoredAsync-Methode überschreiben und seinen Zustand aus dem Workflow-Kontext laden.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Um sicherzustellen, dass der Status eines Executors in einem Prüfpunkt erfasst wird, muss der Executor die on_checkpoint_save Methode überschreiben und den Status im Workflowkontext speichern.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Um sicherzustellen, dass der Zustand beim Fortsetzen eines Checkpoints ordnungsgemäß wiederhergestellt wird, muss der Ausführende die on_checkpoint_restore-Methode überschreiben und seinen Zustand aus dem Workflow-Kontext laden.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Nächste Schritte