Partager via


Flux de travail Microsoft Agent Framework - Points de contrôle

Cette page fournit une vue d’ensemble des points de contrôle dans le système de flux de travail Microsoft Agent Framework.

Vue d’ensemble

Les points de contrôle vous permettent d’enregistrer l’état d’un flux de travail à des points spécifiques pendant son exécution et de reprendre à partir de ces points ultérieurement. Cette fonctionnalité est particulièrement utile pour les scénarios suivants :

  • Flux de travail longs dans lesquels vous souhaitez éviter de perdre la progression en cas d’échecs.
  • Flux de travail de longue durée dans lesquels vous souhaitez suspendre et reprendre l’exécution ultérieurement.
  • Flux de travail nécessitant un enregistrement d’état périodique pour des besoins d’audit ou de conformité.
  • Flux de travail qui doivent être migrés dans différents environnements ou instances.

Quand les points de contrôle sont-ils créés ?

N’oubliez pas que les flux de travail sont exécutés en supersteps, comme documenté dans les concepts de base. Les points de contrôle sont créés à la fin de chaque superstep, une fois que tous les exécuteurs de ce superstep ont terminé leur exécution. Un point de contrôle capture l’état entier du flux de travail, notamment :

  • État actuel de tous les exécuteurs
  • Tous les messages en attente dans le flux de travail pour le superstep suivant
  • Demandes et réponses en attente
  • États partagés

Capture de points de contrôle

Pour activer le checkpointing, vous devez disposer d’un CheckpointManager lors de l’exécution du flux de travail. Un point de contrôle est ensuite accessible via un SuperStepCompletedEvent, ou via la propriété Checkpoints pendant l’exécution.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

Pour activer le point de contrôle, un CheckpointStorage doit être fourni lors de la création d’un flux de travail. Un point de contrôle est ensuite accessible via le stockage.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

Reprise à partir de points de contrôle

Vous pouvez reprendre un flux de travail à partir d’un point de contrôle spécifique directement lors de la même exécution.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Vous pouvez reprendre un flux de travail à partir d’un point de contrôle spécifique directement sur la même instance de workflow.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

Réhydratage à partir de points de contrôle

Vous pouvez également réalimenter un flux de travail à partir d’un point de contrôle dans une nouvelle instance d’exécution.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Vous pouvez également réalimenter une nouvelle instance de flux de travail à partir d’un point de contrôle.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

Enregistrer les états de l’exécuteur

Pour que l'état d'un exécuteur soit capturé dans un point de contrôle, l'exécuteur doit redéfinir la méthode OnCheckpointingAsync et enregistrer son état dans le contexte du workflow.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

En outre, pour vous assurer que l’état est correctement restauré lors de la reprise à partir d’un point de contrôle, l’exécuteur doit remplacer la OnCheckpointRestoredAsync méthode et charger son état à partir du contexte de flux de travail.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Pour vous assurer que l’état d’un exécuteur est capturé dans un point de contrôle, l’exécuteur doit remplacer la méthode on_checkpoint_save et retourner son état sous forme de dictionnaire.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

En outre, pour vous assurer que l’état est correctement restauré lors de la reprise à partir d’un point de contrôle, l’exécuteur doit remplacer la on_checkpoint_restore méthode et restaurer son état à partir du dictionnaire d’état fourni.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Considérations relatives à la sécurité

Important

Le stockage de points de contrôle est une frontière de confiance. Que vous utilisiez les implémentations de stockage intégrées ou une implémentation personnalisée, le serveur principal de stockage doit être traité comme une infrastructure privée approuvée. Ne chargez jamais les points de contrôle à partir de sources non approuvées ou potentiellement falsifiées. Le chargement d’un point de contrôle malveillant peut exécuter du code arbitraire.

Vérifiez que l’emplacement de stockage utilisé pour les points de contrôle est sécurisé de manière appropriée. Seuls les services autorisés et les utilisateurs doivent avoir un accès en lecture ou en écriture aux données de point de contrôle.

Sérialisation avec Pickle

FileCheckpointStorage utilise le module python pickle pour sérialiser un état natif non JSON, tel que des classes de données, des datetimes et des objets personnalisés. Étant donné que pickle.loads() peut exécuter du code arbitraire pendant la désérialisation, un fichier de point de contrôle compromis peut exécuter du code malveillant lors du chargement. La vérification du type post-désérialisation effectuée par le framework ne peut pas empêcher cela.

Si votre modèle de menace n’autorise pas la sérialisation basée sur pickle, utilisez InMemoryCheckpointStorage ou implémentez un CheckpointStorage personnalisé avec une stratégie de sérialisation alternative.

Responsabilité de l’emplacement de stockage

FileCheckpointStorage nécessite un paramètre explicite storage_path : il n’existe aucun répertoire par défaut. Bien que l’infrastructure valide contre les attaques de traversée de chemin d’accès, la sécurisation du répertoire de stockage lui-même (autorisations de fichier, chiffrement au repos, contrôles d’accès) est la responsabilité du développeur. Seuls les processus autorisés doivent avoir un accès en lecture ou en écriture au répertoire de point de contrôle.

Étapes suivantes