Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tato stránka obsahuje přehled kontrolních bodů v systému pracovních postupů rozhraní Microsoft Agent Framework.
Přehled
Kontrolní body umožňují uložit stav pracovního postupu v konkrétních bodech během jeho provádění a pokračovat z těchto bodů později. Tato funkce je užitečná zejména pro následující scénáře:
- Dlouhotrvající pracovní postupy, ve kterých chcete zabránit ztrátě průběhu v případě selhání.
- Dlouhotrvající pracovní postupy, ve kterých chcete pozastavit a obnovit spouštění později.
- Pracovní postupy, které vyžadují pravidelné ukládání stavu pro účely auditování nebo dodržování předpisů
- Pracovní postupy, které je potřeba migrovat napříč různými prostředími nebo instancemi
Kdy jsou vytvořeny kontrolní body?
Mějte na paměti, že pracovní postupy se provádějí v superkrocích, jak je uvedeno v základních konceptech. Kontrolní body se vytvoří na konci každého superkroku poté, co všechny exekutory v daném superkroku dokončily provádění. Kontrolní bod zachycuje celý stav pracovního postupu, včetně:
- Aktuální stav všech exekutorů
- Všechny čekající zprávy ve workflow pro další superstep
- Čekající žádosti a odpovědi
- Sdílené stavy
Zachytávání kontrolních bodů
Pokud chcete povolit vytváření kontrolních bodů, je potřeba poskytnout CheckpointManager při vytváření spuštění pracovního postupu. Kontrolní bod pak lze získat přístup přes SuperStepCompletedEvent.
using Microsoft.Agents.Workflows;
// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();
// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint and store it
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint != null)
{
checkpoints.Add(checkpoint);
}
}
}
Pokud chcete povolit vytváření kontrolních bodů, CheckpointStorage musí být při vytváření pracovního postupu k dispozici. K kontrolnímu bodu se pak dostanete přes úložiště.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.with_checkpointing(checkpoint_storage).build()
# Run the workflow
async for event in workflow.run_streaming(input):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()
Obnovení z kontrolních bodů
Pracovní postup můžete obnovit z konkrétního kontrolního bodu přímo v tom samém běhu.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowCompletedEvent workflowCompletedEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowCompletedEvt.Data}");
}
}
Pracovní postup můžete obnovit z konkrétního kontrolního bodu přímo ve stejné instanci pracovního postupu.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream(checkpoint_id=saved_checkpoint.checkpoint_id):
...
Obnovení z kontrolních bodů
Nebo můžete pracovní postup obnovit z kontrolního bodu do nové spustitelné instance.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowCompletedEvent workflowCompletedEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowCompletedEvt.Data}");
}
}
Nebo můžete obnovit novou instanci pracovního postupu z kontrolního bodu.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
):
...
Stavy exekutoru
Aby se zajistilo, že se stav exekutoru zachytí do kontrolního bodu, musí exekutor přepsat OnCheckpointingAsync metodu a uložit svůj stav do kontextu pracovního postupu.
using Microsoft.Agents.Workflows;
using Microsoft.Agents.Workflows.Reflection;
internal sealed class CustomExecutor() : ReflectingExecutor<CustomExecutor>("CustomExecutor"), IMessageHandler<string>
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
public async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
Aby se zajistilo, že se stav při obnovení z kontrolního bodu správně obnoví, musí exekutor přepsat OnCheckpointRestoredAsync metodu a načíst její stav z kontextu pracovního postupu.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
Další kroky
- Naučte se používat agenty v pracovních postupech k vytváření inteligentních pracovních postupů.
- Naučte se používat pracovní postupy jako agenty.
- Naučte se zpracovávat požadavky a odpovědi v pracovních postupech.
- Zjistěte, jak spravovat stav v pracovních postupech.