共用方式為


Microsoft Agent Framework 工作流程 - 檢查點

本頁提供 Microsoft Agent Framework 工作流程系統中 檢查點 的概觀。

概觀

檢查點可讓您在工作流程執行期間的特定點儲存工作流程的狀態,並在稍後從這些點繼續。 此功能對於下列案例特別有用:

  • 長時間執行的工作流程,您希望於失敗時避免遺失進度。
  • 長時間執行的工作流程,您希望在之後的時間點暫停並恢復執行。
  • 需要定期儲存狀態以進行稽核或合規目的的工作流程。
  • 需要移轉跨不同環境或執行個體的工作流程。

檢查點何時建立?

請記住,工作流程是以 超級步驟執行,如 核心概念中所述。 在每個超級步驟中的所有執行程式完成其執行之後,都會在每個超級步驟的結尾建立檢查點。 檢查點會擷取工作流程的整個狀態,包括:

  • 所有執行程式的目前狀態
  • 工作流程中下一個超級步驟的所有待處理訊息
  • 擱置中的請求和回應
  • 共用狀態

佔領檢查點

要啟用檢查點,執行工作流程時必須提供 a CheckpointManager 。 檢查點可以透過 SuperStepCompletedEvent 存取,或是透過執行中的 Checkpoints 屬性進入。

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

要啟用檢查點,必須在建立工作流程時提供一個 CheckpointStorage。 然後可以通過儲存裝置存取檢查點。

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

從檢查點繼續

您可以直接在同一次執行中從特定檢查點繼續工作程序。

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

您可以在相同的工作流程執行個體上從特定檢查點接續執行該工作流程。

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

從檢查站補水

或者,您可以將工作流程從檢查點重新激活到新的運行實例。

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

或者,您可以從檢查點重新活化新的工作流程執行個體。

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

儲存執行程式狀態

若要確保在檢查點中擷取執行程式的狀態,執行程式必須覆寫 OnCheckpointingAsync 方法,並將其狀態儲存至工作流程內容。

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

此外,若要確保從檢查點繼續時正確還原狀態,執行程式必須覆寫 OnCheckpointRestoredAsync 方法,並從工作流程內容載入其狀態。

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

為了確保執行者的狀態在檢查點中被儲存,執行者必需覆寫該 on_checkpoint_save 方法,並以字典形式回傳其狀態。

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

此外,為確保從檢查點恢復時狀態正確還原,執行者必須覆蓋該 on_checkpoint_restore 方法,並從提供的狀態字典中恢復其狀態。

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

後續步驟