Jegyzet
Az oldalhoz való hozzáférés engedélyezést igényel. Próbálhatod be jelentkezni vagy könyvtárat váltani.
Az oldalhoz való hozzáférés engedélyezést igényel. Megpróbálhatod a könyvtár váltását.
Az ellenőrzőpontokkal a munkafolyamatok bizonyos pontokon menthetik az állapotukat, és később folytathatják a végrehajtást, még a folyamat újraindítása után is. Ez elengedhetetlen a hosszú ideig futó munkafolyamatok, a hibajavítás és az emberi közreműködést igénylő forgatókönyvek esetében.
A tárgyalt fogalmak
Előfeltételek
- .NET 8.0 SDK vagy újabb
- Új konzolalkalmazás
Kulcsösszetevők
NuGet-csomagok telepítése
Először telepítse a szükséges csomagokat a .NET-projekthez:
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
CheckpointManager
Az CheckpointManager ellenőrzőpont tárolási és lekérési funkciói:
using Microsoft.Agents.AI.Workflows;
// Use the default in-memory checkpoint manager
var checkpointManager = CheckpointManager.Default;
// Or create a custom checkpoint manager with JSON serialization
var checkpointManager = CheckpointManager.CreateJson(store, customOptions);
Ellenőrzőpont-készítés engedélyezése
Ellenőrzőpontok engedélyezése munkafolyamatok végrehajtásakor a(z) InProcessExecution használatával:
using Microsoft.Agents.AI.Workflows;
// Create workflow with checkpointing support
var workflow = await WorkflowHelper.GetWorkflowAsync();
var checkpointManager = CheckpointManager.Default;
// Execute with checkpointing enabled
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, NumberSignal.Init, checkpointManager);
Állapottartósítás
Végrehajtó állapot
A végrehajtók az alaposztály használatával megőrizhetik az Executor<T> ellenőrzőpontokat túlélő helyi állapotot:
internal sealed class GuessNumberExecutor : Executor<NumberSignal>("Guess")
{
private const string StateKey = "GuessNumberExecutor.State";
public int LowerBound { get; private set; }
public int UpperBound { get; private set; }
public GuessNumberExecutor() : this()
{
}
public override async ValueTask HandleAsync(NumberSignal message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
int guess = (LowerBound + UpperBound) / 2;
await context.SendMessageAsync(guess, cancellationToken);
}
/// <summary>
/// Checkpoint the current state of the executor.
/// This must be overridden to save any state that is needed to resume the executor.
/// </summary>
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default) =>
context.QueueStateUpdateAsync(StateKey, (LowerBound, UpperBound), cancellationToken);
/// <summary>
/// Restore the state of the executor from a checkpoint.
/// This must be overridden to restore any state that was saved during checkpointing.
/// </summary>
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{
var state = await context.ReadStateAsync<(int, int)>(StateKey, cancellationToken);
(LowerBound, UpperBound) = state;
}
}
Automatikus ellenőrzőpont-létrehozás
Az ellenőrzőpontok automatikusan létrejönnek az egyes szuperlépések végén, amikor az ellenőrzőpont-kezelő meg van adva:
var checkpoints = new List<CheckpointInfo>();
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case SuperStepCompletedEvent superStepCompletedEvt:
// Checkpoints are automatically created at super step boundaries
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint created at step {checkpoints.Count}.");
}
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
break;
}
}
Ellenőrzőpontok használata
Ellenőrzőpont adatainak elérése
Az ellenőrzőpont metaadatainak elérése a befejezett futtatásokból:
// Get all checkpoints from a checkpointed run
var allCheckpoints = checkpointedRun.Checkpoints;
// Get the latest checkpoint
var latestCheckpoint = checkpointedRun.LatestCheckpoint;
// Access checkpoint details
foreach (var checkpoint in checkpoints)
{
Console.WriteLine($"Checkpoint ID: {checkpoint.CheckpointId}");
Console.WriteLine($"Step Number: {checkpoint.StepNumber}");
Console.WriteLine($"Parent ID: {checkpoint.Parent?.CheckpointId ?? "None"}");
}
Ellenőrzőpont tárhely
Az ellenőrzőpontok kezelése a CheckpointManager felületen keresztül történik:
// Commit a checkpoint (usually done automatically)
CheckpointInfo checkpointInfo = await checkpointManager.CommitCheckpointAsync(runId, checkpoint);
// Retrieve a checkpoint
Checkpoint restoredCheckpoint = await checkpointManager.LookupCheckpointAsync(runId, checkpointInfo);
Folytatás ellenőrzőpontokból
Streamelés folytatása
Folytassa a végrehajtást egy ellenőrzőpontról, és valós időben streamelje az eseményeket:
// Resume from a specific checkpoint with streaming
CheckpointInfo savedCheckpoint = checkpoints[checkpointIndex];
await using Checkpointed<StreamingRun> resumedRun = await InProcessExecution
.ResumeStreamAsync(workflow, savedCheckpoint, checkpointManager, runId);
await foreach (WorkflowEvent evt in resumedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorCompletedEvent executorCompletedEvt:
Console.WriteLine($"Executor {executorCompletedEvt.ExecutorId} completed.");
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
return;
}
}
Folyamat folytatása streaming nélkül
Folytatás és várakozás a befejezésre:
// Resume from checkpoint without streaming
Checkpointed<Run> resumedRun = await InProcessExecution
.ResumeAsync(workflow, savedCheckpoint, checkpointManager, runId);
// Wait for completion and get final result
var result = await resumedRun.Run.WaitForCompletionAsync();
Helyben helyreállítás
Állítson vissza egy ellenőrzőpontot közvetlenül egy meglévő futó példányra:
// Restore checkpoint to the same run instance
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint);
// Continue execution from the restored state
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
// Handle events as normal
if (evt is WorkflowOutputEvent outputEvt)
{
Console.WriteLine($"Resumed workflow result: {outputEvt.Data}");
break;
}
}
Új munkafolyamat-példány (rehidratálás)
Hozzon létre egy új munkafolyamat-példányt egy ellenőrzőpontból:
// Create a completely new workflow instance
var newWorkflow = await WorkflowHelper.GetWorkflowAsync();
// Resume with the new instance from a saved checkpoint
await using Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager, originalRunId);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Rehydrated workflow result: {workflowOutputEvt.Data}");
break;
}
}
Ember a folyamatban ellenőrzési pontokkal
Az ellenőrzőpont-készítés kombinálása a cikluson belüli emberi munkafolyamatokkal:
var checkpoints = new List<CheckpointInfo>();
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle external requests
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await checkpointedRun.Run.SendResponseAsync(response);
break;
case SuperStepCompletedEvent superStepCompletedEvt:
// Save checkpoint after each interaction
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint created after human interaction.");
}
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed: {workflowOutputEvt.Data}");
return;
}
}
// Later, resume from any checkpoint
if (checkpoints.Count > 0)
{
var selectedCheckpoint = checkpoints[1]; // Select specific checkpoint
await checkpointedRun.RestoreCheckpointAsync(selectedCheckpoint);
// Continue from that point
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
// Handle remaining workflow execution
}
}
Teljes példaminta
Az alábbiakban egy átfogó ellenőrzési munkafolyamat-mintát mutatunk be:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;
public static class CheckpointingExample
{
public static async Task RunAsync()
{
// Create workflow and checkpoint manager
var workflow = await WorkflowHelper.GetWorkflowAsync();
var checkpointManager = CheckpointManager.Default;
var checkpoints = new List<CheckpointInfo>();
Console.WriteLine("Starting workflow with checkpointing...");
// Execute workflow with checkpointing
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, NumberSignal.Init, checkpointManager);
// Monitor execution and collect checkpoints
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorCompletedEvent executorEvt:
Console.WriteLine($"Executor {executorEvt.ExecutorId} completed.");
break;
case SuperStepCompletedEvent superStepEvt:
var checkpoint = superStepEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint {checkpoints.Count} created.");
}
break;
case WorkflowOutputEvent outputEvt:
Console.WriteLine($"Workflow completed: {outputEvt.Data}");
goto FinishExecution;
}
}
FinishExecution:
Console.WriteLine($"Total checkpoints created: {checkpoints.Count}");
// Demonstrate resuming from a checkpoint
if (checkpoints.Count > 5)
{
var selectedCheckpoint = checkpoints[5];
Console.WriteLine($"Resuming from checkpoint 6...");
// Restore to same instance
await checkpointedRun.RestoreCheckpointAsync(selectedCheckpoint);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent resumedOutputEvt)
{
Console.WriteLine($"Resumed workflow result: {resumedOutputEvt.Data}");
break;
}
}
}
// Demonstrate rehydration with new workflow instance
if (checkpoints.Count > 3)
{
var newWorkflow = await WorkflowHelper.GetWorkflowAsync();
var rehydrationCheckpoint = checkpoints[3];
Console.WriteLine("Rehydrating from checkpoint 4 with new workflow instance...");
await using Checkpointed<StreamingRun> newRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, rehydrationCheckpoint, checkpointManager, checkpointedRun.Run.RunId);
await foreach (WorkflowEvent evt in newRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent rehydratedOutputEvt)
{
Console.WriteLine($"Rehydrated workflow result: {rehydratedOutputEvt.Data}");
break;
}
}
}
}
}
Főbb előnyök
- Hibatűrés: A munkafolyamatok az utolsó ellenőrzőpontról való visszaállással helyreállhatnak a hibákból
- Hosszú futású folyamatok: A hosszú munkafolyamatokat kezelhető szegmensekre bonthatja automatikus ellenőrzési pontok meghatározásával.
- Human-in-the-Loop: Szünet a külső bemenethez, és később folytatódik a mentett állapotból
- Hibakeresés: A munkafolyamat állapotának vizsgálata adott pontokon, és a végrehajtás folytatása tesztelés céljából
- Hordozhatóság: Az ellenőrzőpontok visszaállíthatók új munkafolyamat-példányokra (rehidratálás)
- Automatikus felügyelet: Az ellenőrzőpontok automatikusan jönnek létre a szuperlépések határán
A példa futtatása
A teljes munka implementációja a CheckpointAndResume mintában található.
Kulcsösszetevők
FájlEllenőrzőpontTárolás
Az FileCheckpointStorage osztály állandó ellenőrzőpont-tárolót biztosít JSON-fájlok használatával:
from agent_framework import FileCheckpointStorage
from pathlib import Path
# Initialize checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
Ellenőrzőpont-készítés engedélyezése
A munkafolyamat létrehozásakor engedélyezze az ellenőrzőpont-ellenőrzést:
from agent_framework import WorkflowBuilder
workflow = (
WorkflowBuilder(max_iterations=5)
.add_edge(executor1, executor2)
.set_start_executor(executor1)
.with_checkpointing(checkpoint_storage=checkpoint_storage) # Enable checkpointing
.build()
)
Állapottartósítás
Végrehajtó állapot
A végrehajtók megőrizhetik az ellenőrzőpontokat túlélő helyi állapotot:
from agent_framework import Executor, WorkflowContext, handler
class WorkerExecutor(Executor):
"""Processes numbers to compute their factor pairs and manages executor state for checkpointing."""
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._composite_number_pairs: dict[int, list[tuple[int, int]]] = {}
@handler
async def compute(
self,
task: ComputeTask,
ctx: WorkflowContext[ComputeTask, dict[int, list[tuple[int, int]]]],
) -> None:
"""Process the next number in the task, computing its factor pairs."""
next_number = task.remaining_numbers.pop(0)
print(f"WorkerExecutor: Computing factor pairs for {next_number}")
pairs: list[tuple[int, int]] = []
for i in range(1, next_number):
if next_number % i == 0:
pairs.append((i, next_number // i))
self._composite_number_pairs[next_number] = pairs
if not task.remaining_numbers:
# All numbers processed - output the results
await ctx.yield_output(self._composite_number_pairs)
else:
# More numbers to process - continue with remaining task
await ctx.send_message(task)
@override
async def on_checkpoint_save(self) -> dict[str, Any]:
"""Save the executor's internal state for checkpointing."""
return {"composite_number_pairs": self._composite_number_pairs}
@override
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
"""Restore the executor's internal state from a checkpoint."""
self._composite_number_pairs = state.get("composite_number_pairs", {})
Ellenőrzőpontok használata
Ellenőrzőpontok listázása
Az elérhető ellenőrzőpontok lekérése és vizsgálata:
# List all checkpoints
all_checkpoints = await checkpoint_storage.list_checkpoints()
# List checkpoints for a specific workflow
workflow_checkpoints = await checkpoint_storage.list_checkpoints(workflow_id="my-workflow")
# Sort by creation time
sorted_checkpoints = sorted(all_checkpoints, key=lambda cp: cp.timestamp)
Folytatás ellenőrzőpontokból
Streamelés folytatása
Valós időben folytathatja a végrehajtást és streameli az eseményeket:
# Resume from a specific checkpoint
async for event in workflow.run_stream(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
):
print(f"Resumed Event: {event}")
if isinstance(event, WorkflowOutputEvent):
print(f"Final Result: {event.data}")
break
Folyamat folytatása streaming nélkül
Folytassa, és az összes eredményt egyszerre kapja meg:
# Resume and wait for completion
result = await workflow.run(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
)
# Access final outputs
outputs = result.get_outputs()
print(f"Final outputs: {outputs}")
Függőben lévő kérelmek folytatása
Ha egy függőben lévő kéréseket tartalmazó ellenőrzőpontról indul, a munkafolyamat újra kibocsátja ezeket a kéréseseményeket, így rögzítheti és megválaszolhatja őket:
request_info_events = []
# Resume from checkpoint - pending requests will be re-emitted
async for event in workflow.run_stream(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
):
if isinstance(event, RequestInfoEvent):
# Capture re-emitted pending requests
print(f"Pending request re-emitted: {event.request_id}")
request_info_events.append(event)
# Handle the request and provide response
# If responses are already provided, no need to handle them again
responses = {}
for event in request_info_events:
response = handle_request(event.data)
responses[event.request_id] = response
# Send response back to workflow
async for event in workflow.send_responses_streaming(responses):
if isinstance(event, WorkflowOutputEvent):
print(f"Workflow completed: {event.data}")
Ha egy ellenőrzőpontot folytat a már megválaszolt, de még függő kérésekkel, akkor először meg kell hívnia run_stream() a munkafolyamat folytatásához, majd send_responses_streaming() az előre megadott válaszokkal.
Interaktív ellenőrzőpont kiválasztása
Felhasználóbarát ellenőrzőpont-kijelölés létrehozása:
async def select_and_resume_checkpoint(workflow, storage):
# Get available checkpoints
checkpoints = await storage.list_checkpoints()
if not checkpoints:
print("No checkpoints available")
return
# Sort and display options
sorted_cps = sorted(checkpoints, key=lambda cp: cp.timestamp)
print("Available checkpoints:")
for i, cp in enumerate(sorted_cps):
summary = get_checkpoint_summary(cp)
print(f"[{i}] {summary.checkpoint_id[:8]}... iter={summary.iteration_count}")
# Get user selection
try:
idx = int(input("Enter checkpoint index: "))
selected = sorted_cps[idx]
# Resume from selected checkpoint
print(f"Resuming from checkpoint: {selected.checkpoint_id}")
async for event in workflow.run_stream(
selected.checkpoint_id,
checkpoint_storage=storage
):
print(f"Event: {event}")
except (ValueError, IndexError):
print("Invalid selection")
Teljes példaminta
Íme egy tipikus ellenőrzési munkafolyamat-minta:
import asyncio
from pathlib import Path
from agent_framework import (
FileCheckpointStorage,
WorkflowBuilder,
WorkflowOutputEvent,
get_checkpoint_summary
)
async def main():
# Setup checkpoint storage
checkpoint_dir = Path("./checkpoints")
checkpoint_dir.mkdir(exist_ok=True)
storage = FileCheckpointStorage(checkpoint_dir)
# Build workflow with checkpointing
workflow = (
WorkflowBuilder()
.add_edge(executor1, executor2)
.set_start_executor(executor1)
.with_checkpointing(storage)
.build()
)
# Initial run
print("Running workflow...")
async for event in workflow.run_stream("input data"):
print(f"Event: {event}")
# List and inspect checkpoints
checkpoints = await storage.list_checkpoints()
for cp in sorted(checkpoints, key=lambda c: c.timestamp):
summary = get_checkpoint_summary(cp)
print(f"Checkpoint: {summary.checkpoint_id[:8]}... iter={summary.iteration_count}")
# Resume from a checkpoint
if checkpoints:
latest = max(checkpoints, key=lambda cp: cp.timestamp)
print(f"Resuming from: {latest.checkpoint_id}")
async for event in workflow.run_stream(latest.checkpoint_id):
print(f"Resumed: {event}")
if __name__ == "__main__":
asyncio.run(main())
Főbb előnyök
- Hibatűrés: A munkafolyamatok az utolsó ellenőrzőpontról való visszaállással helyreállhatnak a hibákból
- Hosszú futásidejű folyamatok: A hosszú munkafolyamatokat ellenőrzőpont határokkal rendelkező szegmensekre kezelhetően bonthatja
- Human-in-the-Loop: Az emberi bemenet megszakítása és későbbi folytatása – a függőben lévő kérések újra kibocsátásra kerülnek a folytatáskor
- Hibakeresés: A munkafolyamat állapotának vizsgálata adott pontokon, és a végrehajtás folytatása tesztelés céljából
- Erőforrás-kezelés: Munkafolyamatok leállítása és újraindítása az erőforrások rendelkezésre állása alapján
A példa futtatása
A teljes működő megoldás érdekében tekintse meg a Checkpoint with Resume mintát.