Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Esta página fornece uma visão geral dos pontos de verificação no sistema de fluxo de trabalho do Microsoft Agent Framework.
Visão geral
Os pontos de verificação permitem que você salve o estado de um fluxo de trabalho em pontos específicos durante sua execução e retome a partir desses pontos mais tarde. Esse recurso é particularmente útil para os seguintes cenários:
- Fluxos de trabalho de longa execução onde você deseja evitar a perda de progresso em caso de falhas.
- Fluxos de trabalho de longa execução em que você deseja pausar e retomar a execução posteriormente.
- Fluxos de trabalho que exigem armazenamento de estado periódico para fins de auditoria ou cumprimento.
- Fluxos de trabalho que precisam ser migrados entre diferentes ambientes ou instâncias.
Quando são criados os pontos de verificação?
Lembre-se de que os fluxos de trabalho são executados em superetapas, conforme documentado nos conceitos principais. Os pontos de verificação são criados no final de cada superetapa, depois de todos os executores nessa superetapa terem concluído a sua execução. Um ponto de verificação captura todo o estado do fluxo de trabalho, incluindo:
- O estado atual de todos os executores
- Todas as mensagens pendentes no fluxo de trabalho para a próxima superetapa
- Pedidos e respostas pendentes
- Estados partilhados
Capturando pontos de verificação
Para habilitar o ponto de verificação, é necessário fornecer um CheckpointManager ao criar um run de workflow. Um ponto de verificação pode ser acessado mediante um SuperStepCompletedEvent.
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();
// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint and store it
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint != null)
{
checkpoints.Add(checkpoint);
}
}
}
Para habilitar o ponto de verificação, é necessário fornecer um CheckpointStorage ao criar um fluxo de trabalho. Um ponto de verificação pode ser acessado através do armazenamento.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.with_checkpointing(checkpoint_storage).build()
# Run the workflow
async for event in workflow.run_streaming(input):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()
Retomar a partir de pontos de verificação
Você pode retomar um fluxo de trabalho de um ponto de verificação específico diretamente na mesma execução.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Você pode retomar um fluxo de trabalho de um ponto de verificação específico diretamente na mesma instância de fluxo de trabalho.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream(checkpoint_id=saved_checkpoint.checkpoint_id):
...
Reidratação a partir de pontos de verificação
Ou você pode reidratar um fluxo de trabalho de um ponto de verificação para uma nova instância de execução.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Ou pode-se reidratar uma nova instância de fluxo de trabalho a partir de um checkpoint.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
):
...
Salvar Estados do Executor
Para garantir que o estado de um executor seja capturado em um ponto de verificação, o executor deve substituir o OnCheckpointingAsync método e salvar seu estado no contexto do fluxo de trabalho.
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;
internal sealed class CustomExecutor() : Executor<string>("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
public async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
Além disso, para garantir que o estado seja restaurado corretamente ao retomar a partir de um ponto de verificação, o executor deve substituir o OnCheckpointRestoredAsync método e carregar seu estado do contexto do fluxo de trabalho.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
Para garantir que o estado de um executor seja capturado em um ponto de verificação, o executor deve substituir o on_checkpoint_save método e salvar seu estado no contexto do fluxo de trabalho.
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
Além disso, para garantir que o estado seja restaurado corretamente ao retomar a partir de um ponto de verificação, o executor deve substituir o on_checkpoint_restore método e carregar seu estado do contexto do fluxo de trabalho.
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])