Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ta strona zawiera omówienie punktów kontrolnych w systemie przepływu pracy programu Microsoft Agent Framework.
Przegląd
Punkty kontrolne umożliwiają zapisywanie stanu procesu w określonych punktach podczas jego wykonywania i kontynuowanie od tych punktów później. Ta funkcja jest szczególnie przydatna w następujących scenariuszach:
- Długotrwałe przepływy pracy, w których chcesz uniknąć utraty postępu w przypadku awarii.
- Długotrwałe przepływy pracy, w których chcesz wstrzymać i wznowić wykonywanie w późniejszym czasie.
- Przepływy pracy, które wymagają okresowego zapisywania stanu na potrzeby inspekcji lub zgodności.
- Przepływy pracy, które należy migrować w różnych środowiskach lub instancjach.
Kiedy są tworzone punkty kontrolne?
Pamiętaj, że przepływy pracy są wykonywane w superkrokach, jak opisano w podstawowych pojęciach. Punkty kontrolne są tworzone na końcu każdego superkroku, po zakończeniu wykonywania wszystkich funkcji wykonawczych w tym superkroku. Punkt kontrolny przechwytuje cały stan przepływu pracy, w tym:
- Bieżący stan wszystkich funkcji wykonawczych
- Wszystkie oczekujące komunikaty w przepływie pracy dla następnego superkroku
- Oczekujące żądania i odpowiedzi
- Stany udostępnione
Przechwytywanie punktów kontrolnych
Aby włączyć tworzenie punktów kontrolnych, należy podać element CheckpointManager podczas uruchamiania przepływu pracy. Następnie można uzyskać dostęp do punktu kontrolnego za pośrednictwem SuperStepCompletedEvent, lub poprzez właściwość Checkpoints w ramach uruchomienia.
using Microsoft.Agents.AI.Workflows;
// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
.RunStreamingAsync(workflow, input, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is SuperStepCompletedEvent superStepCompletedEvt)
{
// Access the checkpoint
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
}
}
// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;
Aby włączyć tworzenie punktów kontrolnych, należy podać element CheckpointStorage podczas tworzenia przepływu pracy. Następnie można uzyskać dostęp do punktu kontrolnego za pośrednictwem magazynu.
from agent_framework import (
InMemoryCheckpointStorage,
WorkflowBuilder,
)
# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()
# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()
# Run the workflow
async for event in workflow.run(input, stream=True):
...
# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)
Wznawianie z punktów kontrolnych
Przepływ pracy można wznowić bezpośrednio z konkretnego punktu kontrolnego w tym samym przebiegu.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Możesz wznowić przepływ pracy z określonego punktu kontrolnego bezpośrednio w tym samym wystąpieniu.
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
...
Przywracanie z punktów kontrolnych
Możesz też przywrócić przepływ pracy z punktu kontrolnego do nowego wystąpienia uruchomienia.
// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
.ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
.ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
}
}
Możesz też przywrócić nowe wystąpienie przepływu pracy z punktu kontrolnego.
from agent_framework import WorkflowBuilder
builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()
# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
checkpoint_id=saved_checkpoint.checkpoint_id,
checkpoint_storage=checkpoint_storage,
stream=True,
):
...
Zapisz stany wykonawcze
Aby upewnić się, że stan funkcji wykonawczej jest przechwytywany w punkcie OnCheckpointingAsync kontrolnym, funkcja wykonawcza musi zastąpić metodę i zapisać jej stan w kontekście przepływu pracy.
using Microsoft.Agents.AI.Workflows;
internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
private const string StateKey = "CustomExecutorState";
private List<string> messages = new();
[MessageHandler]
private async ValueTask HandleAsync(string message, IWorkflowContext context)
{
this.messages.Add(message);
// Executor logic...
}
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
return context.QueueStateUpdateAsync(StateKey, this.messages);
}
}
Ponadto, aby zapewnić prawidłowe przywrócenie stanu podczas wznawiania z punktu kontrolnego, egzekutor musi zastąpić OnCheckpointRestoredAsync metodę i załadować jego stan z kontekstu przepływu pracy.
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}
Aby upewnić się, że stan funkcji wykonawczej jest przechwytywany w punkcie kontrolnym, funkcja wykonawcza musi zastąpić on_checkpoint_save metodę i zwrócić jej stan jako słownik.
class CustomExecutor(Executor):
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._messages: list[str] = []
@handler
async def handle(self, message: str, ctx: WorkflowContext):
self._messages.append(message)
# Executor logic...
async def on_checkpoint_save(self) -> dict[str, Any]:
return {"messages": self._messages}
Ponadto, aby upewnić się, że stan jest poprawnie przywracany podczas wznawiania z punktu kontrolnego, funkcja wykonawcza musi zastąpić metodę on_checkpoint_restore i przywrócić jej stan z dostarczonego słownika stanu.
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
self._messages = state.get("messages", [])