Jegyzet
Az oldalhoz való hozzáférés engedélyezést igényel. Próbálhatod be jelentkezni vagy könyvtárat váltani.
Az oldalhoz való hozzáférés engedélyezést igényel. Megpróbálhatod a könyvtár váltását.
Ez a lap áttekintést nyújt a Microsoft Agent Framework munkafolyamat-rendszer ellenőrzőpontjairól .
Áttekintés
Az ellenőrzőpontok lehetővé teszik a munkafolyamat állapotának mentését a végrehajtás bizonyos pontjain, és később folytathatja azokat. Ez a funkció különösen hasznos a következő helyzetekben:
- Hosszú ideig futó munkafolyamatok, ahol hibák esetén el szeretné kerülni az előrehaladás elvesztését.
- Hosszú ideig futó munkafolyamatok, ahol később szüneteltetni és folytatni szeretné a végrehajtást.
- Olyan munkafolyamatok, amelyek naplózási vagy megfelelőségi célokra rendszeres állapotmentést igényelnek.
- Különböző környezetekben vagy példányokban áttelepítendő munkafolyamatok.
Mikor jönnek létre ellenőrzőpontok?
Ne feledje, hogy a munkafolyamatok szupersztepsekben vannak végrehajtva, az alapfogalmakban leírtak szerint. Az ellenőrzőpontok az egyes szupersztepek végén jönnek létre, miután az adott szuperstep összes végrehajtója befejezte a végrehajtást. Egy ellenőrzőpont rögzíti a munkafolyamat teljes állapotát, beleértve a következőket:
- Az összes végrehajtó aktuális állapota
- A munkafolyamat összes függőben lévő üzenete a következő szuperlépéshez
- Függőben lévő kérelmek és válaszok
- Megosztott állapotok
Ellenőrzőpontok rögzítése
Az ellenőrzőpont-készítés engedélyezéséhez egy CheckpointManager-t kell megadni a munkafolyamat futtatásakor. Ezután az ellenőrzőpont elérhető egy SuperStepCompletedEvent-ön keresztül.
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();
// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint and store it
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint != null)
{
checkpoints.Add(checkpoint);
}
}
}
Az ellenőrzőpont-készítés engedélyezéséhez meg kell adni egy CheckpointStorage-t, amikor munkafolyamatot hoz létre. Ezután egy ellenőrzőpont a tárterületen keresztül érhető el.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.with_checkpointing(checkpoint_storage).build()
# Run the workflow
async for event in workflow.run_streaming(input):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()
Folytatás ellenőrzőpontokból
A munkafolyamatot közvetlenül ugyanazon a futtatáson folytathatja egy adott ellenőrzőpontról.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
A munkafolyamatot közvetlenül ugyanazon a munkafolyamat-példányon folytathatja egy adott ellenőrzőpontról.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream(checkpoint_id=saved_checkpoint.checkpoint_id):
...
Rehidratálás ellenőrzőpontokból
Vagy újrahidratálhat egy munkafolyamatot egy ellenőrzőpontból egy új futtatási példányba.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Egy új munkafolyamat-példányt egy ellenőrzőpontról is újrahidratálhat.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
):
...
Végrehajtói állapotok mentése
Annak érdekében, hogy a végrehajtó állapota egy ellenőrzőponton legyen rögzítve, a végrehajtónak felül kell bírálnia a OnCheckpointingAsync metódust, és mentenie kell az állapotát a munkafolyamat-környezetbe.
using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;
internal sealed class CustomExecutor() : Executor<string>("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
public async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
Emellett annak érdekében, hogy az állapot helyesen legyen visszaállítva egy ellenőrzőpontról való folytatáskor, a végrehajtónak felül kell bírálnia a OnCheckpointRestoredAsync metódust, és be kell töltenie az állapotát a munkafolyamat-környezetből.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
Annak érdekében, hogy a végrehajtó állapota egy ellenőrzőponton legyen rögzítve, a végrehajtónak felül kell bírálnia a on_checkpoint_save metódust, és mentenie kell az állapotát a munkafolyamat-környezetbe.
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
Emellett annak érdekében, hogy az állapot helyesen legyen visszaállítva egy ellenőrzőpontról való folytatáskor, a végrehajtónak felül kell bírálnia a on_checkpoint_restore metódust, és be kell töltenie az állapotát a munkafolyamat-környezetből.
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])