Бөлісу құралы:


Рабочие процессы фреймворка агента Майкрософт — контрольные точки

На этой странице представлен обзор Контрольных точек в системе рабочих процессов Майкрософт Agent Framework.

Обзор

Контрольные точки позволяют сохранять состояние рабочего процесса в определенных точках во время его выполнения и возобновлять работу с этих точек позже. Эта функция особенно полезна для следующих сценариев:

  • Длительные рабочие процессы, в которых требуется избежать потери хода выполнения в случае сбоев.
  • Длительные рабочие процессы, в которых вы хотите приостановить и возобновить выполнение позже.
  • Рабочие процессы, требующие периодического сохранения состояния для аудита или соответствия требованиям.
  • Рабочие процессы, которые необходимо перенести между разными средами или экземплярами.

Когда создаются контрольные точки?

Помните, что рабочие процессы выполняются в суперэтапах, как описано в основных понятиях. Контрольные точки создаются в конце каждого этапа, после завершения выполнения всех исполнителей задач на этом этапе. Контрольная точка фиксирует все состояние рабочего процесса, в том числе:

  • Текущее состояние всех исполнителей
  • Все ожидающие сообщения в рабочем процессе для следующего суперстепа
  • Ожидающие запросы и ответы
  • Общие состояния

Захват контрольных точек

Чтобы включить создание контрольных точек, необходимо указать CheckpointManager при запуске рабочего процесса. Затем можно получить доступ к контрольной точке с помощью свойства SuperStepCompletedEvent или через свойство Checkpoints в ходе выполнения.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

Чтобы включить контрольную точку, CheckpointStorage необходимо указать при создании рабочего процесса. Затем через хранилище можно получить доступ к контрольной точке. Agent Framework поставляет три встроенных реализации. Выберите один из них, соответствующий вашим требованиям к устойчивости и развертыванию:

Поставщик Package Durability лучше всего подходит для
InMemoryCheckpointStorage agent-framework Только для текущего процесса Тесты, демонстрации, короткие рабочие процессы
FileCheckpointStorage agent-framework Локальный диск Рабочие процессы с одним компьютером, локальная разработка
CosmosCheckpointStorage agent-framework-azure-cosmos Azure Cosmos DB Рабочие, распределенные, межпроцессные рабочие процессы

Все три реализуют один и тот же CheckpointStorage протокол, поэтому можно переключать поставщиков без изменения рабочего процесса или кода исполнителя.

InMemoryCheckpointStorage сохраняет контрольные точки в памяти процесса. Лучше всего подходит для тестов, демонстраций и кратковременных рабочих процессов, где не требуется устойчивость во время перезапуска.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

Возобновление с контрольных точек

Вы можете возобновить рабочий процесс из определенной контрольной точки непосредственно в том же запуске.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Вы можете возобновить рабочий процесс из определенной контрольной точки непосредственно в одном экземпляре рабочего процесса.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

Восстановление из контрольных точек

Вы также можете восстановить рабочий процесс из контрольной точки в новый экземпляр запуска.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Кроме того, можно восстановить новый экземпляр рабочего процесса из контрольной точки.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

Сохранение состояний исполнителя

Чтобы убедиться, что состояние исполнителя фиксируется в чекпоинте, исполнитель должен переопределить метод OnCheckpointingAsync и сохранить его состояние в контексте рабочего процесса.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Кроме того, чтобы состояние было правильно восстановлено при возобновлении из контрольной точки, исполнитель должен переопределить OnCheckpointRestoredAsync метод и загрузить его состояние из контекста рабочего процесса.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Чтобы убедиться, что состояние исполнителя фиксируется в чекпоинте, исполнитель должен переопределить метод on_checkpoint_save и вернуть его состояние в виде словаря.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Кроме того, чтобы состояние было правильно восстановлено при возобновлении из контрольной точки, исполнитель должен переопределить on_checkpoint_restore метод и восстановить его состояние из предоставленного словаря состояний.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Соображения безопасности

Это важно

Хранилище контрольных точек — это граница доверия. Независимо от того, используется ли встроенная реализация хранилища или пользовательская, серверная часть хранилища должна рассматриваться как надежная частная инфраструктура. Никогда не загружайте контрольные точки из ненадежных или потенциально измененных источников.

Убедитесь, что расположение хранилища, используемое для контрольных точек, безопасно. Только авторизованные службы и пользователи должны иметь доступ на чтение или запись к данным контрольных точек.

Сериализация с использованием Pickle

Оба FileCheckpointStorage и CosmosCheckpointStorage используют модуль Python pickle для сериализации состояний, не являющихся родными для JSON, таких как классы данных, даты и времена, а также пользовательские объекты. Чтобы снизить риски произвольного выполнения кода во время десериализации, оба провайдера по умолчанию используют ограниченный распаковщик (unpickler). Во время десериализации разрешен только встроенный набор безопасных типов Python (примитивы, datetime, uuid, Decimal, общие коллекции и т. д.) и все внутренние типы agent_framework. Любой другой тип, встречающийся в контрольной точке, вызывает сбой десериализации с кодом ошибки WorkflowCheckpointException.

Чтобы разрешить дополнительные типы, специфичные для приложения, передайте эти типы через параметр allowed_checkpoint_types с помощью формата "module:qualname".

from agent_framework import FileCheckpointStorage

storage = FileCheckpointStorage(
    "/tmp/checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

CosmosCheckpointStorage принимает тот же параметр:

from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage

storage = CosmosCheckpointStorage(
    endpoint="https://my-account.documents.azure.com:443/",
    credential=DefaultAzureCredential(),
    database_name="agent-db",
    container_name="checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

Если ваша модель угроз вообще не разрешает сериализацию на основе pickle, используйте InMemoryCheckpointStorage или реализуйте свою собственную CheckpointStorage с альтернативной стратегией сериализации.

Ответственность за расположение хранилища

FileCheckpointStorage требует явного storage_path параметра — каталог по умолчанию отсутствует. Хотя платформа проверяется на соответствие атакам путевого обхода, защита самого каталога хранилища (разрешения файлов, шифрование неактивных данных, управление доступом) является ответственностью разработчика. Только авторизованные процессы должны иметь доступ на чтение или запись в каталог контрольных точек.

CosmosCheckpointStorage использует хранилище Azure Cosmos DB. Используйте управляемое удостоверение или RBAC, если это возможно, определите область базы данных и контейнера в службе рабочих процессов и меняйте ключи учетной записи, если используете аутентификацию по ключу. Аналогично файловому хранилищу, только авторизованные субъекты должны иметь доступ на чтение или запись к контейнеру Cosmos DB, в котором хранятся документы контрольной точки.

Дальнейшие шаги