Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Cette page fournit une vue d’ensemble des points de contrôle dans le système de flux de travail Microsoft Agent Framework.
Aperçu
Les points de contrôle vous permettent d’enregistrer l’état d’un flux de travail à des points spécifiques pendant son exécution et de reprendre à partir de ces points ultérieurement. Cette fonctionnalité est particulièrement utile pour les scénarios suivants :
- Flux de travail longs dans lesquels vous souhaitez éviter de perdre la progression en cas d’échecs.
- Flux de travail de longue durée dans lesquels vous souhaitez suspendre et reprendre l’exécution ultérieurement.
- Flux de travail nécessitant un enregistrement d’état périodique pour des besoins d’audit ou de conformité.
- Flux de travail qui doivent être migrés dans différents environnements ou instances.
Quand les points de contrôle sont-ils créés ?
N’oubliez pas que les flux de travail sont exécutés en supersteps, comme documenté dans les concepts de base. Les points de contrôle sont créés à la fin de chaque superstep, une fois que tous les exécuteurs de ce superstep ont terminé leur exécution. Un point de contrôle capture l’état entier du flux de travail, notamment :
- État actuel de tous les exécuteurs
- Tous les messages en attente dans le flux de travail pour le superstep suivant
- Demandes et réponses en attente
- États partagés
Capture de points de contrôle
Pour activer le point de contrôle, un CheckpointManager doit être fourni lors de la création d'un flux de travail. Un point de contrôle est alors accessible via un SuperStepCompletedEvent.
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();
// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint and store it
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint != null)
{
checkpoints.Add(checkpoint);
}
}
}
Pour activer un point de contrôle, un CheckpointStorage doit être fourni lors de la création d’un flux de travail. Un point de contrôle est alors accessible via le stockage.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.with_checkpointing(checkpoint_storage).build()
# Run the workflow
async for event in workflow.run_streaming(input):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()
Reprise à partir de points de contrôle
Vous pouvez reprendre un flux de travail à partir d’un point de contrôle spécifique directement lors de la même exécution.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Vous pouvez reprendre un flux de travail à partir d’un point de contrôle spécifique directement sur la même instance de workflow.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream(checkpoint_id=saved_checkpoint.checkpoint_id):
...
Réhydratage à partir de points de contrôle
Vous pouvez également réalimenter un flux de travail à partir d’un point de contrôle dans une nouvelle instance d’exécution.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Vous pouvez également réalimenter une nouvelle instance de flux de travail à partir d’un point de contrôle.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
):
...
Enregistrer les états de l’exécuteur
Pour que l'état d'un exécuteur soit capturé dans un point de contrôle, l'exécuteur doit redéfinir la méthode OnCheckpointingAsync et enregistrer son état dans le contexte du workflow.
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;
internal sealed class CustomExecutor() : Executor<string>("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
public async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
En outre, pour vous assurer que l’état est correctement restauré lors de la reprise à partir d’un point de contrôle, l’exécuteur doit remplacer la OnCheckpointRestoredAsync méthode et charger son état à partir du contexte de flux de travail.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
Pour que l'état d'un exécuteur soit capturé dans un point de contrôle, l'exécuteur doit redéfinir la méthode on_checkpoint_save et enregistrer son état dans le contexte du workflow.
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
En outre, pour vous assurer que l’état est correctement restauré lors de la reprise à partir d’un point de contrôle, l’exécuteur doit remplacer la on_checkpoint_restore méthode et charger son état à partir du contexte de flux de travail.
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])