Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Vytváření kontrolních bodů umožňuje pracovním postupům uložit jejich stav v konkrétních bodech a pokračovat v provádění později i po restartování procesu. To je zásadní pro dlouhotrvající workflowy, obnovení po chybách a scénáře zahrnující člověka.
Pokryté koncepty
Požadavky
- Sada .NET 8.0 SDK nebo novější
- Nová konzolová aplikace
Klíčové komponenty
Instalace balíčků NuGet
Nejprve nainstalujte požadované balíčky pro váš projekt .NET:
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
CheckpointManager
CheckpointManager poskytuje funkcionalitu úložiště kontrolních bodů a načítání:
using Microsoft.Agents.AI.Workflows;
// Use the default in-memory checkpoint manager
var checkpointManager = CheckpointManager.Default;
// Or create a custom checkpoint manager with JSON serialization
var checkpointManager = CheckpointManager.CreateJson(store, customOptions);
Povolení vytváření kontrolních bodů
Povolení vytváření kontrolních bodů při provádění pracovních postupů pomocí InProcessExecution:
using Microsoft.Agents.AI.Workflows;
// Create workflow with checkpointing support
var workflow = await WorkflowHelper.GetWorkflowAsync();
var checkpointManager = CheckpointManager.Default;
// Execute with checkpointing enabled
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, NumberSignal.Init, checkpointManager);
Trvalost stavu
Stav exekutoru
Exekutory mohou pomocí základní třídy Executor<T> zachovat místní stav, který přežije kontrolní body.
internal sealed class GuessNumberExecutor : Executor<NumberSignal>("Guess")
{
private const string StateKey = "GuessNumberExecutor.State";
public int LowerBound { get; private set; }
public int UpperBound { get; private set; }
public GuessNumberExecutor() : this()
{
}
public override async ValueTask HandleAsync(NumberSignal message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
int guess = (LowerBound + UpperBound) / 2;
await context.SendMessageAsync(guess, cancellationToken);
}
/// <summary>
/// Checkpoint the current state of the executor.
/// This must be overridden to save any state that is needed to resume the executor.
/// </summary>
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default) =>
context.QueueStateUpdateAsync(StateKey, (LowerBound, UpperBound), cancellationToken);
/// <summary>
/// Restore the state of the executor from a checkpoint.
/// This must be overridden to restore any state that was saved during checkpointing.
/// </summary>
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{
var state = await context.ReadStateAsync<(int, int)>(StateKey, cancellationToken);
(LowerBound, UpperBound) = state;
}
}
Automatické vytváření kontrolních bodů
Kontrolní body se automaticky vytvoří na konci každého superkroku, když je k dispozici správce kontrolních bodů:
var checkpoints = new List<CheckpointInfo>();
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case SuperStepCompletedEvent superStepCompletedEvt:
// Checkpoints are automatically created at super step boundaries
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint created at step {checkpoints.Count}.");
}
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
break;
}
}
Práce s kontrolními body
Přístup k informacím o kontrolním bodu
Přístup k metadatům kontrolního bodu z dokončených spuštění:
// Get all checkpoints from a checkpointed run
var allCheckpoints = checkpointedRun.Checkpoints;
// Get the latest checkpoint
var latestCheckpoint = checkpointedRun.LatestCheckpoint;
// Access checkpoint details
foreach (var checkpoint in checkpoints)
{
Console.WriteLine($"Checkpoint ID: {checkpoint.CheckpointId}");
Console.WriteLine($"Step Number: {checkpoint.StepNumber}");
Console.WriteLine($"Parent ID: {checkpoint.Parent?.CheckpointId ?? "None"}");
}
Úložiště kontrolních bodů
Kontrolní body se spravují prostřednictvím CheckpointManager rozhraní:
// Commit a checkpoint (usually done automatically)
CheckpointInfo checkpointInfo = await checkpointManager.CommitCheckpointAsync(runId, checkpoint);
// Retrieve a checkpoint
Checkpoint restoredCheckpoint = await checkpointManager.LookupCheckpointAsync(runId, checkpointInfo);
Obnovení z kontrolních bodů
Obnovení streamování
Obnovení vykonávání z kontrolního bodu a streamování událostí v reálném čase:
// Resume from a specific checkpoint with streaming
CheckpointInfo savedCheckpoint = checkpoints[checkpointIndex];
await using Checkpointed<StreamingRun> resumedRun = await InProcessExecution
.ResumeStreamAsync(workflow, savedCheckpoint, checkpointManager, runId);
await foreach (WorkflowEvent evt in resumedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorCompletedEvent executorCompletedEvt:
Console.WriteLine($"Executor {executorCompletedEvt.ExecutorId} completed.");
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
return;
}
}
Pokračování bez streamování
Pokračovat a počkat na dokončení:
// Resume from checkpoint without streaming
Checkpointed<Run> resumedRun = await InProcessExecution
.ResumeAsync(workflow, savedCheckpoint, checkpointManager, runId);
// Wait for completion and get final result
var result = await resumedRun.Run.WaitForCompletionAsync();
Obnovení na místě
Obnovení kontrolního bodu přímo do existující běhové instance
// Restore checkpoint to the same run instance
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint);
// Continue execution from the restored state
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
// Handle events as normal
if (evt is WorkflowOutputEvent outputEvt)
{
Console.WriteLine($"Resumed workflow result: {outputEvt.Data}");
break;
}
}
Nová instance pracovního postupu (rehydratace)
Vytvořte novou instanci pracovního postupu z kontrolního bodu:
// Create a completely new workflow instance
var newWorkflow = await WorkflowHelper.GetWorkflowAsync();
// Resume with the new instance from a saved checkpoint
await using Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager, originalRunId);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Rehydrated workflow result: {workflowOutputEvt.Data}");
break;
}
}
Člověk ve smyčce s kontrolními body
Kombinování kontrolních bodů s pracovními postupy human-in-the-loop:
var checkpoints = new List<CheckpointInfo>();
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle external requests
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await checkpointedRun.Run.SendResponseAsync(response);
break;
case SuperStepCompletedEvent superStepCompletedEvt:
// Save checkpoint after each interaction
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint created after human interaction.");
}
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed: {workflowOutputEvt.Data}");
return;
}
}
// Later, resume from any checkpoint
if (checkpoints.Count > 0)
{
var selectedCheckpoint = checkpoints[1]; // Select specific checkpoint
await checkpointedRun.RestoreCheckpointAsync(selectedCheckpoint);
// Continue from that point
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
// Handle remaining workflow execution
}
}
Kompletní ukázkový vzor
Tady je komplexní vzor pracovního postupu kontrolních bodů:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;
public static class CheckpointingExample
{
public static async Task RunAsync()
{
// Create workflow and checkpoint manager
var workflow = await WorkflowHelper.GetWorkflowAsync();
var checkpointManager = CheckpointManager.Default;
var checkpoints = new List<CheckpointInfo>();
Console.WriteLine("Starting workflow with checkpointing...");
// Execute workflow with checkpointing
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, NumberSignal.Init, checkpointManager);
// Monitor execution and collect checkpoints
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorCompletedEvent executorEvt:
Console.WriteLine($"Executor {executorEvt.ExecutorId} completed.");
break;
case SuperStepCompletedEvent superStepEvt:
var checkpoint = superStepEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint {checkpoints.Count} created.");
}
break;
case WorkflowOutputEvent outputEvt:
Console.WriteLine($"Workflow completed: {outputEvt.Data}");
goto FinishExecution;
}
}
FinishExecution:
Console.WriteLine($"Total checkpoints created: {checkpoints.Count}");
// Demonstrate resuming from a checkpoint
if (checkpoints.Count > 5)
{
var selectedCheckpoint = checkpoints[5];
Console.WriteLine($"Resuming from checkpoint 6...");
// Restore to same instance
await checkpointedRun.RestoreCheckpointAsync(selectedCheckpoint);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent resumedOutputEvt)
{
Console.WriteLine($"Resumed workflow result: {resumedOutputEvt.Data}");
break;
}
}
}
// Demonstrate rehydration with new workflow instance
if (checkpoints.Count > 3)
{
var newWorkflow = await WorkflowHelper.GetWorkflowAsync();
var rehydrationCheckpoint = checkpoints[3];
Console.WriteLine("Rehydrating from checkpoint 4 with new workflow instance...");
await using Checkpointed<StreamingRun> newRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, rehydrationCheckpoint, checkpointManager, checkpointedRun.Run.RunId);
await foreach (WorkflowEvent evt in newRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent rehydratedOutputEvt)
{
Console.WriteLine($"Rehydrated workflow result: {rehydratedOutputEvt.Data}");
break;
}
}
}
}
}
Klíčové výhody
- Odolnost proti chybám: Pracovní postupy se můžou zotavit z selhání obnovením z posledního kontrolního bodu.
- Dlouhotrvající procesy: Rozdělení dlouhých pracovních postupů do zvládnutelných segmentů s automatickými hranicemi kontrolních bodů
- Human-in-the-Loop: Pozastavení externího vstupu a pozdější obnovení z uloženého stavu
- Ladění: Kontrola stavu pracovního postupu v konkrétních bodech a obnovení spuštění pro testování
- Přenositelnost: Kontrolní body lze obnovit do nových instancí pracovního postupu (znovuoživení).
- Automatická správa: Kontrolní body se vytvářejí automaticky na hranicích superkroků.
Spuštění příkladu
Kompletní pracovní implementaci najdete v ukázce CheckpointAndResume.
Klíčové komponenty
FileCheckpointStorage
Třída FileCheckpointStorage poskytuje trvalé úložiště kontrolních bodů pomocí souborů JSON:
from agent_framework import FileCheckpointStorage
from pathlib import Path
# Initialize checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
Povolení vytváření kontrolních bodů
Povolení vytváření kontrolních bodů při vytváření pracovního postupu:
from agent_framework import WorkflowBuilder
workflow = (
WorkflowBuilder(max_iterations=5)
.add_edge(executor1, executor2)
.set_start_executor(executor1)
.with_checkpointing(checkpoint_storage=checkpoint_storage) # Enable checkpointing
.build()
)
Trvalost stavu
Stav exekutoru
Spouštěče mohou zachovat místní stav, který přežije kontrolní body.
from agent_framework import Executor, WorkflowContext, handler
class WorkerExecutor(Executor):
"""Processes numbers to compute their factor pairs and manages executor state for checkpointing."""
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._composite_number_pairs: dict[int, list[tuple[int, int]]] = {}
@handler
async def compute(
self,
task: ComputeTask,
ctx: WorkflowContext[ComputeTask, dict[int, list[tuple[int, int]]]],
) -> None:
"""Process the next number in the task, computing its factor pairs."""
next_number = task.remaining_numbers.pop(0)
print(f"WorkerExecutor: Computing factor pairs for {next_number}")
pairs: list[tuple[int, int]] = []
for i in range(1, next_number):
if next_number % i == 0:
pairs.append((i, next_number // i))
self._composite_number_pairs[next_number] = pairs
if not task.remaining_numbers:
# All numbers processed - output the results
await ctx.yield_output(self._composite_number_pairs)
else:
# More numbers to process - continue with remaining task
await ctx.send_message(task)
@override
async def on_checkpoint_save(self) -> dict[str, Any]:
"""Save the executor's internal state for checkpointing."""
return {"composite_number_pairs": self._composite_number_pairs}
@override
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
"""Restore the executor's internal state from a checkpoint."""
self._composite_number_pairs = state.get("composite_number_pairs", {})
Práce s kontrolními body
Výpis kontrolních bodů
Načtení a kontrola dostupných kontrolních bodů:
# List all checkpoints
all_checkpoints = await checkpoint_storage.list_checkpoints()
# List checkpoints for a specific workflow
workflow_checkpoints = await checkpoint_storage.list_checkpoints(workflow_id="my-workflow")
# Sort by creation time
sorted_checkpoints = sorted(all_checkpoints, key=lambda cp: cp.timestamp)
Obnovení z kontrolních bodů
Obnovení streamování
Obnovení spouštění a streamování událostí v reálném čase:
# Resume from a specific checkpoint
async for event in workflow.run_stream(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
):
print(f"Resumed Event: {event}")
if isinstance(event, WorkflowOutputEvent):
print(f"Final Result: {event.data}")
break
Pokračování bez streamování
Pokračovat a získat všechny výsledky najednou:
# Resume and wait for completion
result = await workflow.run(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
)
# Access final outputs
outputs = result.get_outputs()
print(f"Final outputs: {outputs}")
Pokračovat s čekajícími žádostmi
Při pokračování z kontrolního bodu, který obsahuje čekající požadavky, pracovní postup znovu vygeneruje události těchto požadavků a umožní vám je zachytit a reagovat na ně.
request_info_events = []
# Resume from checkpoint - pending requests will be re-emitted
async for event in workflow.run_stream(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
):
if isinstance(event, RequestInfoEvent):
# Capture re-emitted pending requests
print(f"Pending request re-emitted: {event.request_id}")
request_info_events.append(event)
# Handle the request and provide response
# If responses are already provided, no need to handle them again
responses = {}
for event in request_info_events:
response = handle_request(event.data)
responses[event.request_id] = response
# Send response back to workflow
async for event in workflow.send_responses_streaming(responses):
if isinstance(event, WorkflowOutputEvent):
print(f"Workflow completed: {event.data}")
Pokud probíhá obnovení z kontrolního bodu s čekajícími požadavky, na které již byla odpověď poskytnuta, musíte zavolat run_stream(), abyste mohli pokračovat ve workflow následovaném send_responses_streaming() s předem stanovenými odpověďmi.
Interaktivní výběr kontrolního bodu
Vytvořit uživatelsky přívětivý výběr kontrolního bodu:
async def select_and_resume_checkpoint(workflow, storage):
# Get available checkpoints
checkpoints = await storage.list_checkpoints()
if not checkpoints:
print("No checkpoints available")
return
# Sort and display options
sorted_cps = sorted(checkpoints, key=lambda cp: cp.timestamp)
print("Available checkpoints:")
for i, cp in enumerate(sorted_cps):
summary = get_checkpoint_summary(cp)
print(f"[{i}] {summary.checkpoint_id[:8]}... iter={summary.iteration_count}")
# Get user selection
try:
idx = int(input("Enter checkpoint index: "))
selected = sorted_cps[idx]
# Resume from selected checkpoint
print(f"Resuming from checkpoint: {selected.checkpoint_id}")
async for event in workflow.run_stream(
selected.checkpoint_id,
checkpoint_storage=storage
):
print(f"Event: {event}")
except (ValueError, IndexError):
print("Invalid selection")
Kompletní ukázkový vzor
Tady je typický vzor pracovního postupu vytváření kontrolních bodů:
import asyncio
from pathlib import Path
from agent_framework import (
FileCheckpointStorage,
WorkflowBuilder,
WorkflowOutputEvent,
get_checkpoint_summary
)
async def main():
# Setup checkpoint storage
checkpoint_dir = Path("./checkpoints")
checkpoint_dir.mkdir(exist_ok=True)
storage = FileCheckpointStorage(checkpoint_dir)
# Build workflow with checkpointing
workflow = (
WorkflowBuilder()
.add_edge(executor1, executor2)
.set_start_executor(executor1)
.with_checkpointing(storage)
.build()
)
# Initial run
print("Running workflow...")
async for event in workflow.run_stream("input data"):
print(f"Event: {event}")
# List and inspect checkpoints
checkpoints = await storage.list_checkpoints()
for cp in sorted(checkpoints, key=lambda c: c.timestamp):
summary = get_checkpoint_summary(cp)
print(f"Checkpoint: {summary.checkpoint_id[:8]}... iter={summary.iteration_count}")
# Resume from a checkpoint
if checkpoints:
latest = max(checkpoints, key=lambda cp: cp.timestamp)
print(f"Resuming from: {latest.checkpoint_id}")
async for event in workflow.run_stream(latest.checkpoint_id):
print(f"Resumed: {event}")
if __name__ == "__main__":
asyncio.run(main())
Klíčové výhody
- Odolnost proti chybám: Pracovní postupy se můžou zotavit z selhání obnovením z posledního kontrolního bodu.
- Dlouhé procesy: Rozdělte dlouhé pracovní postupy do spravovatelných segmentů s bodovou kontrolou.
- Human-in-the-Loop: Pozastavení pro vstup člověka a obnovení později – čekající žádosti se po obnovení znovu vygenerují.
- Ladění: Kontrola stavu pracovního postupu v konkrétních bodech a obnovení spuštění pro testování
- Správa prostředků: Zastavení a restartování pracovních postupů na základě dostupnosti prostředků
Spuštění příkladu
Kompletní pracovní implementaci najdete v ukázce Checkpoint s životopisem.