Bagikan melalui


Alur Kerja Kerangka Kerja Agen Microsoft - Titik Pemeriksaan

Halaman ini menyediakan gambaran umum Titik Pemeriksaan di sistem Microsoft Agent Framework Workflow.

Gambaran Umum

Titik pemeriksaan memungkinkan Anda menyimpan status alur kerja pada titik tertentu selama eksekusinya, dan melanjutkan dari titik-titik tersebut nanti. Fitur ini sangat berguna untuk skenario berikut:

  • Alur kerja yang berjalan lama di mana Anda ingin menghindari kehilangan kemajuan jika terjadi kegagalan.
  • Alur kerja yang berjalan lama di mana Anda ingin menjeda dan melanjutkan eksekusi di lain waktu.
  • Alur kerja yang memerlukan penyimpanan status berkala untuk tujuan audit atau kepatuhan.
  • Alur kerja yang perlu dimigrasikan di berbagai lingkungan atau instans.

Kapan Titik Pemeriksaan Dibuat?

Ingatlah bahwa alur kerja dijalankan dalam supersteps, sebagaimana didokumentasikan dalam konsep inti. Titik pemeriksaan dibuat di akhir setiap superstep, setelah semua pelaksana dalam superstep tersebut telah menyelesaikan eksekusi mereka. Titik pemeriksaan menangkap seluruh status alur kerja, termasuk:

  • Status saat ini dari semua pelaksana
  • Semua pesan yang tertunda dalam alur kerja untuk superstep berikutnya
  • Permintaan dan respons tertunda
  • Status bersama

Menangkap Titik Pemeriksaan

Untuk mengaktifkan titik pemeriksaan, CheckpointManager perlu disediakan saat menjalankan alur kerja. Titik pemeriksaan kemudian dapat diakses melalui SuperStepCompletedEvent, atau melalui properti Checkpoints pada run.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

Untuk mengaktifkan titik pemeriksaan, CheckpointStorage perlu disediakan saat membuat alur kerja. Titik pemeriksaan kemudian dapat diakses melalui penyimpanan.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

Lanjutkan dari Titik Pemeriksaan

Anda dapat melanjutkan alur kerja dari titik pemeriksaan tertentu langsung dalam proses yang sama.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Anda dapat melanjutkan alur kerja dari checkpoint tertentu langsung pada instans alur kerja yang sama.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

Rehidrasi dari Titik Pemeriksaan

Atau Anda dapat merehidrasi alur kerja dari titik pemeriksaan ke dalam instans eksekusi baru.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Atau Anda dapat merehidrasi instans alur kerja baru dari titik pemeriksaan.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

Simpan Status Pelaksana

Untuk memastikan bahwa status pelaksana diambil dalam titik pemeriksaan, pelaksana harus mengambil alih OnCheckpointingAsync metode dan menyimpan statusnya ke konteks alur kerja.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Selain itu, untuk memastikan status dipulihkan dengan benar saat melanjutkan dari titik pemeriksaan, pelaksana harus mengambil alih OnCheckpointRestoredAsync metode dan memuat statusnya dari konteks alur kerja.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Untuk memastikan bahwa status pelaksana ditangkap dalam titik pemeriksaan, pelaksana harus mengambil on_checkpoint_save alih metode dan mengembalikan statusnya sebagai kamus.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Selain itu, untuk memastikan status dipulihkan dengan benar saat melanjutkan dari titik pemeriksaan, pelaksana harus mengambil alih on_checkpoint_restore metode dan memulihkan statusnya dari kamus status yang disediakan.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Langkah Selanjutnya