Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
En esta página se proporciona información general sobre los puntos de control en el sistema de flujo de trabajo de Microsoft Agent Framework.
Información general
Los puntos de control permiten guardar el estado de un flujo de trabajo en puntos específicos durante su ejecución y reanudarse desde esos puntos más adelante. Esta característica es especialmente útil para los siguientes escenarios:
- Flujos de trabajo de larga duración en los que desea evitar perder el progreso en caso de errores.
- Flujos de trabajo de larga duración en los que desea pausar y reanudar la ejecución más adelante.
- Flujos de trabajo que requieren el guardado de estado periódico para fines de auditoría o cumplimiento.
- Flujos de trabajo que deben migrarse en diferentes entornos o instancias.
¿Cuándo se crean los puntos de control?
Recuerde que los flujos de trabajo se ejecutan en superpasos, como se documenta en los conceptos básicos. Los puntos de control se crean al final de cada superpaso, después de que todos los ejecutores de ese superpaso hayan completado su ejecución. Un punto de control captura todo el estado del flujo de trabajo, entre los que se incluyen:
- Estado actual de todos los ejecutores
- Todos los mensajes pendientes del flujo de trabajo para el siguiente superpaso
- Solicitudes y respuestas pendientes
- Estados compartidos
Capturar puntos de control
Para habilitar los puntos de control, es necesario proporcionar un elemento CheckpointManager al ejecutar el flujo de trabajo. A continuación, se puede acceder a un punto de control a través de un SuperStepCompletedEvent, o a través de la propiedad Checkpoints en la ejecución.
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
.RunStreamingAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
}
}
// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;
Para habilitar los puntos de control, es necesario proporcionar un CheckpointStorage elemento al crear un flujo de trabajo. A continuación, se puede acceder a un punto de control a través del almacenamiento.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()
# Run the workflow
async for event in workflow.run(input, stream=True):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
Reanudación desde puntos de control
Puede reanudar un flujo de trabajo desde un punto de control específico directamente en la misma ejecución.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Puede reanudar un flujo de trabajo desde un punto de control específico directamente en la misma instancia de flujo de trabajo.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
...
Rehidratación desde puntos de control
O bien, puede rehidratar un flujo de trabajo desde un punto de control a una nueva instancia de ejecución.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
O bien, puede rehidratar una nueva instancia de flujo de trabajo desde un punto de control.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
stream=True,
):
...
Guardar estados de ejecución
Para asegurarse de que el estado de un ejecutor se captura en un punto de control, el ejecutor debe invalidar el OnCheckpointingAsync método y guardar su estado en el contexto de flujo de trabajo.
using Microsoft.Agents.AI.Workflows;
internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
Además, para asegurarse de que el estado se restaura correctamente al reanudar desde un punto de control, el ejecutor debe invalidar el OnCheckpointRestoredAsync método y cargar su estado desde el contexto de flujo de trabajo.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
Para asegurarse de que el estado de un ejecutor se captura en un punto de control, el ejecutor debe invalidar el on_checkpoint_save método y devolver su estado como diccionario.
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
Además, para asegurarse de que el estado se restaura correctamente al reanudarse desde un punto de control, el ejecutor debe invalidar el on_checkpoint_restore método y restaurar su estado desde el diccionario de estado proporcionado.
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])