다음을 통해 공유


Microsoft 에이전트 프레임워크 워크플로 - 검사점

이 페이지에서는 Microsoft 에이전트 프레임워크 워크플로 시스템의 검사점 개요를 제공합니다.

개요

검사점을 사용하면 실행 중에 특정 지점에서 워크플로의 상태를 저장하고 나중에 해당 지점에서 다시 시작할 수 있습니다. 이 기능은 다음 시나리오에 특히 유용합니다.

  • 오류 발생 시 진행률 손실을 방지하려는 장기 실행 워크플로입니다.
  • 나중에 실행을 일시 중지하고 다시 시작하려는 장기 실행 워크플로입니다.
  • 감사 또는 규정 준수를 위해 주기적인 상태 저장이 필요한 워크플로입니다.
  • 여러 환경 또는 인스턴스에서 마이그레이션해야 하는 워크플로입니다.

검사점은 언제 생성되나요?

워크플로는 핵심 개념에 설명된 대로 슈퍼스텝으로 실행됩니다. 검사점은 해당 슈퍼스텝의 모든 실행기가 실행을 완료한 후 각 슈퍼스텝의 끝에 만들어집니다. 검사점은 다음을 포함하여 워크플로의 전체 상태를 캡처합니다.

  • 모든 실행기의 현재 상태입니다.
  • 다음 슈퍼스텝에 대한 워크플로의 보류 중인 모든 메시지
  • 보류 중인 요청 및 응답
  • 공유 상태

검사점 캡처

워크플로 실행을 생성할 때 체크포인트를 사용하도록 설정하려면 CheckpointManager를 제공해야 합니다. 검사점은 다음을 통해 SuperStepCompletedEvent액세스할 수 있습니다.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();

// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
    .StreamAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint and store it
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
        if (checkpoint != null)
        {
            checkpoints.Add(checkpoint);
        }
    }
}

체크포인트를 활성화하려면 워크플로를 만들 때 CheckpointStorage을(를) 제공해야 합니다. 그런 다음 스토리지를 통해 검사점에 액세스할 수 있습니다.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.with_checkpointing(checkpoint_storage).build()

# Run the workflow
async for event in workflow.run_streaming(input):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()

검사점에서 다시 시작하기

동일한 실행에서 특정 검사점에서 직접 워크플로를 다시 시작할 수 있습니다.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

동일한 워크플로 인스턴스의 특정 검사점에서 직접 워크플로를 다시 시작할 수 있습니다.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream(checkpoint_id=saved_checkpoint.checkpoint_id):
    ...

검사점에서 리하이드라이팅

또는 검사점에서 새 실행 인스턴스로 워크플로를 리하일 수 있습니다.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
    .ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

새 워크플로 인스턴스를 검사점을 통해 리하이드레이트할 수 있습니다.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
):
    ...

실행기 상태 저장

실행기의 상태가 검사점에서 캡처되도록 하려면 실행기가 OnCheckpointingAsync 메서드를 재정의하고 워크플로 컨텍스트에 해당 상태를 저장해야 합니다.

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;

internal sealed class CustomExecutor() : Executor<string>("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    public async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

또한 검사점에서 복귀할 때 상태가 올바르게 복원되도록 하기 위해, 실행기가 OnCheckpointRestoredAsync 메서드를 재정의하고 워크플로 컨텍스트에서 해당 상태를 로드해야 합니다.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

실행기의 상태가 검사점에서 캡처되도록 하려면 실행기가 on_checkpoint_save 메서드를 재정의하고 워크플로 컨텍스트에 해당 상태를 저장해야 합니다.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

또한 검사점에서 복귀할 때 상태가 올바르게 복원되도록 하기 위해, 실행기가 on_checkpoint_restore 메서드를 재정의하고 워크플로 컨텍스트에서 해당 상태를 로드해야 합니다.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

다음 단계