Delen via


Microsoft Agent Framework-werkstromen - Controlepunten

Op deze pagina vindt u een overzicht van controlepunten in het Werkstroomsysteem van Microsoft Agent Framework.

Overzicht

Met controlepunten kunt u de status van een werkstroom opslaan op specifieke punten tijdens de uitvoering en later hervatten vanaf die punten. Deze functie is met name handig voor de volgende scenario's:

  • Langlopende werkstromen waarbij u wilt voorkomen dat de voortgang verloren gaat in het geval van fouten.
  • Langlopende werkstromen waarin u de uitvoering op een later tijdstip wilt onderbreken en hervatten.
  • Werkstromen waarvoor periodieke statusbesparing is vereist voor controle- of nalevingsdoeleinden.
  • Werkstromen die moeten worden gemigreerd naar verschillende omgevingen of instanties.

Wanneer worden controlepunten gemaakt?

Houd er rekening mee dat werkstromen worden uitgevoerd in supersteps, zoals beschreven in de kernconcepten. Controlepunten worden gemaakt aan het einde van elke superstep, nadat alle uitvoerders in die superstep hun uitvoering hebben voltooid. Een controlepunt legt de volledige status van de werkstroom vast, waaronder:

  • De huidige status van alle uitvoerders
  • Alle berichten in behandeling in de werkstroom voor de volgende superstep
  • Aanvragen en antwoorden in behandeling
  • Gedeelde toestanden

Controlepunten vastleggen

Als u checkpointing wilt inschakelen, moet er een CheckpointManager worden opgegeven bij het maken van een werkstroomproces. Een controlepunt kan vervolgens worden benaderd via een SuperStepCompletedEvent.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();

// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
    .StreamAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint and store it
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
        if (checkpoint != null)
        {
            checkpoints.Add(checkpoint);
        }
    }
}

Als u checkpoints wilt inschakelen, moet er een CheckpointStorage worden opgegeven bij het aanmaken van een workflow. Een controlepunt kan vervolgens worden geopend via de opslag.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.with_checkpointing(checkpoint_storage).build()

# Run the workflow
async for event in workflow.run_streaming(input):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()

Hervatten vanaf controlepunten

U kunt een werkstroom rechtstreeks vanuit een specifiek controlepunt hervatten tijdens dezelfde uitvoering.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

U kunt een werkstroom hervatten vanaf een specifiek controlepunt op dezelfde workflowinstantie.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream(checkpoint_id=saved_checkpoint.checkpoint_id):
    ...

Reactiveren vanuit controlepunten

Of u kunt een werkstroom vanuit een controlepunt reactiveren in een nieuw run-exemplaar.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
    .ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

U kunt ook een nieuwe werkstroomexemplaar herstellen vanaf een controlepunt.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
):
    ...

Uitvoerdersstatussen opslaan

Om ervoor te zorgen dat de status van een uitvoerder wordt vastgelegd in een controlepunt, moet de uitvoerder de methode overschrijven en de OnCheckpointingAsync status ervan opslaan in de werkstroomcontext.

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;

internal sealed class CustomExecutor() : Executor<string>("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    public async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Om ervoor te zorgen dat de status correct wordt hersteld bij het hervatten van een controlepunt, moet de uitvoerder de OnCheckpointRestoredAsync methode overschrijven en de status ervan laden vanuit de werkstroomcontext.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Om ervoor te zorgen dat de status van een uitvoerder wordt vastgelegd in een controlepunt, moet de uitvoerder de methode overschrijven en de on_checkpoint_save status ervan opslaan in de werkstroomcontext.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Om ervoor te zorgen dat de status correct wordt hersteld bij het hervatten van een controlepunt, moet de uitvoerder de on_checkpoint_restore methode overschrijven en de status ervan laden vanuit de werkstroomcontext.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Volgende stappen