Поделиться через


Рабочие процессы Microsoft Agent Framework — контрольные точки

На этой странице представлен обзор контрольных точек в системе рабочих процессов Microsoft Agent Framework.

Обзор

Контрольные точки позволяют сохранять состояние рабочего процесса в определенных точках во время его выполнения и возобновлять работу с этих точек позже. Эта функция особенно полезна для следующих сценариев:

  • Длительные рабочие процессы, в которых требуется избежать потери хода выполнения в случае сбоев.
  • Длительные рабочие процессы, в которых вы хотите приостановить и возобновить выполнение позже.
  • Рабочие процессы, требующие периодического сохранения состояния для аудита или соответствия требованиям.
  • Рабочие процессы, которые необходимо перенести между разными средами или экземплярами.

Когда создаются контрольные точки?

Помните, что рабочие процессы выполняются в суперэтапах, как описано в основных понятиях. Контрольные точки создаются в конце каждого этапа, после завершения выполнения всех исполнителей задач на этом этапе. Контрольная точка фиксирует все состояние рабочего процесса, в том числе:

  • Текущее состояние всех исполнителей
  • Все ожидающие сообщения в рабочем процессе для следующего суперстепа
  • Ожидающие запросы и ответы
  • Общие состояния

Захват контрольных точек

Чтобы включить создание контрольных точек, необходимо указать CheckpointManager при запуске рабочего процесса. Затем можно получить доступ к контрольной точке с помощью свойства SuperStepCompletedEvent или через свойство Checkpoints в ходе выполнения.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

Чтобы включить контрольную точку, CheckpointStorage необходимо указать при создании рабочего процесса. Затем через хранилище можно получить доступ к контрольной точке.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

Возобновление с контрольных точек

Вы можете возобновить рабочий процесс из определенной контрольной точки непосредственно в том же запуске.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Вы можете возобновить рабочий процесс из определенной контрольной точки непосредственно в одном экземпляре рабочего процесса.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

Восстановление из контрольных точек

Вы также можете восстановить рабочий процесс из контрольной точки в новый экземпляр запуска.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Кроме того, можно восстановить новый экземпляр рабочего процесса из контрольной точки.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

Сохранение состояний исполнителя

Чтобы убедиться, что состояние исполнителя фиксируется в чекпоинте, исполнитель должен переопределить метод OnCheckpointingAsync и сохранить его состояние в контексте рабочего процесса.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Кроме того, чтобы состояние было правильно восстановлено при возобновлении из контрольной точки, исполнитель должен переопределить OnCheckpointRestoredAsync метод и загрузить его состояние из контекста рабочего процесса.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Чтобы убедиться, что состояние исполнителя фиксируется в чекпоинте, исполнитель должен переопределить метод on_checkpoint_save и вернуть его состояние в виде словаря.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Кроме того, чтобы состояние было правильно восстановлено при возобновлении из контрольной точки, исполнитель должен переопределить on_checkpoint_restore метод и восстановить его состояние из предоставленного словаря состояний.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Дальнейшие шаги