Bagikan melalui


Alur Kerja Kerangka Kerja Agen Microsoft - Titik Pemeriksaan

Halaman ini menyediakan gambaran umum Titik Pemeriksaan di sistem Microsoft Agent Framework Workflow.

Gambaran Umum

Titik pemeriksaan memungkinkan Anda menyimpan status alur kerja pada titik tertentu selama eksekusinya, dan melanjutkan dari titik-titik tersebut nanti. Fitur ini sangat berguna untuk skenario berikut:

  • Alur kerja yang berjalan lama di mana Anda ingin menghindari kehilangan kemajuan jika terjadi kegagalan.
  • Alur kerja yang berjalan lama di mana Anda ingin menjeda dan melanjutkan eksekusi di lain waktu.
  • Alur kerja yang memerlukan penyimpanan status berkala untuk tujuan audit atau kepatuhan.
  • Alur kerja yang perlu dimigrasikan di berbagai lingkungan atau instans.

Kapan Titik Pemeriksaan Dibuat?

Ingatlah bahwa alur kerja dijalankan dalam supersteps, sebagaimana didokumentasikan dalam konsep inti. Titik pemeriksaan dibuat di akhir setiap superstep, setelah semua pelaksana dalam superstep tersebut telah menyelesaikan eksekusi mereka. Titik pemeriksaan menangkap seluruh status alur kerja, termasuk:

  • Status saat ini dari semua pelaksana
  • Semua pesan yang tertunda dalam alur kerja untuk superstep berikutnya
  • Permintaan dan respons tertunda
  • Status bersama

Menangkap Titik Pemeriksaan

Untuk mengaktifkan pencatatan titik pemeriksaan, CheckpointManager perlu disediakan saat menjalankan alur kerja. Titik pemeriksaan kemudian dapat diakses melalui SuperStepCompletedEvent.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();

// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
    .StreamAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint and store it
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
        if (checkpoint != null)
        {
            checkpoints.Add(checkpoint);
        }
    }
}

Untuk mengaktifkan check pointing, CheckpointStorage perlu disediakan saat membuat alur kerja. Pos pemeriksaan kemudian dapat diakses melalui media penyimpanan.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.with_checkpointing(checkpoint_storage).build()

# Run the workflow
async for event in workflow.run_streaming(input):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()

Lanjutkan dari Titik Pemeriksaan

Anda dapat melanjutkan alur kerja dari titik pemeriksaan tertentu langsung dalam proses yang sama.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Anda dapat melanjutkan alur kerja dari checkpoint tertentu langsung pada instans alur kerja yang sama.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream(checkpoint_id=saved_checkpoint.checkpoint_id):
    ...

Rehidrasi dari Titik Pemeriksaan

Atau Anda dapat merehidrasi alur kerja dari titik pemeriksaan ke dalam instans eksekusi baru.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
    .ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Atau Anda dapat merehidrasi instans alur kerja baru dari titik pemeriksaan.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
):
    ...

Simpan Status Pelaksana

Untuk memastikan bahwa status pelaksana diambil dalam titik pemeriksaan, pelaksana harus mengambil alih OnCheckpointingAsync metode dan menyimpan statusnya ke konteks alur kerja.

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;

internal sealed class CustomExecutor() : Executor<string>("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    public async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Selain itu, untuk memastikan status dipulihkan dengan benar saat melanjutkan dari titik pemeriksaan, pelaksana harus mengambil alih OnCheckpointRestoredAsync metode dan memuat statusnya dari konteks alur kerja.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Untuk memastikan bahwa status pelaksana diambil dalam titik pemeriksaan, pelaksana harus mengambil alih on_checkpoint_save metode dan menyimpan statusnya ke konteks alur kerja.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Selain itu, untuk memastikan status dipulihkan dengan benar saat melanjutkan dari titik pemeriksaan, pelaksana harus mengambil alih on_checkpoint_restore metode dan memuat statusnya dari konteks alur kerja.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Langkah Selanjutnya