Aracılığıyla paylaş


Microsoft Agent Framework İş Akışları - Denetim Noktaları

Bu sayfa, Microsoft Agent Framework İş Akışı sistemindeki Denetim Noktalarına genel bir bakış sağlar.

Genel Bakış

Denetim noktaları, yürütme sırasında belirli noktalarda iş akışının durumunu kaydetmenize ve daha sonra bu noktalardan devam etmenizi sağlar. Bu özellik özellikle aşağıdaki senaryolar için kullanışlıdır:

  • Uzun süre devam eden ve hata durumunda ilerlemeyi kaybetmek istemediğiniz iş akışları.
  • Yürütmeyi daha sonra duraklatmak ve devam ettirmek istediğiniz uzun süreli iş akışları.
  • Denetim veya uyumluluk amacıyla düzenli olarak durum kaydetme gerektiren iş akışları.
  • Farklı ortamlar veya örnekler arasında taşınması gereken iş akışları.

Denetim Noktaları Ne Zaman Oluşturulur?

İş akışlarının süper adımlar olarak yürütüldüğünü, temel kavramlarda belgelenmiş olduğunu unutmayın. Denetim noktaları, bu üst adımdaki tüm yürütücüler yürütmelerini tamamladıktan sonra her üst adımın sonunda oluşturulur. Denetim noktası iş akışının durumunun tamamını yakalar, örneğin:

  • Tüm yürütücülerin geçerli durumu
  • Sonraki üst adım için iş akışında bekleyen tüm iletiler
  • Bekleyen istekler ve yanıtlar
  • Paylaşılan durumlar

Denetim Noktalarını Yakalama

Kontrol noktalarını etkinleştirmek için bir iş akışı çalıştırması oluşturulurken bir CheckpointManager sağlanması gerekir. SuperStepCompletedEvent aracılığıyla bir kontrol noktasına erişilebilir.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();

// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
    .StreamAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint and store it
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
        if (checkpoint != null)
        {
            checkpoints.Add(checkpoint);
        }
    }
}

Denetim işaretlerini etkinleştirmek için iş akışı oluştururken bir CheckpointStorage sağlanmalıdır. Ardından bir denetim noktasına depolama üzerinden erişilebilir.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run_streaming(input):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()

Kontrol Noktalarından Devam Etme

Belirli bir denetim noktasından bir iş akışını aynı çalışmada doğrudan sürdürebilirsiniz.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Belirli bir denetim noktasından bir iş akışını doğrudan aynı iş akışı örneğinde sürdürebilirsiniz.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream(checkpoint_id=saved_checkpoint.checkpoint_id):
    ...

Denetim Noktalarından Yeniden Doldurma

Veya bir iş akışını bir denetim noktasından yeni bir çalıştırma örneğine yeniden yükleyebilirsiniz.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
    .ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Veya bir kontrol noktasından yeni bir iş akışı örneğini yeniden başlatabilirsiniz.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
):
    ...

Yürütücü Durumlarını Kaydet

Bir yürütücü durumunun bir denetim noktasında yakalanmasını sağlamak için, yürütücü OnCheckpointingAsync yöntemini geçersiz kılmalı ve durumunu iş akışı bağlamına kaydetmelidir.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Ayrıca, denetim noktasından devam ederken durumun doğru şekilde geri yüklendiğinden emin olmak için yürütücü yöntemi geçersiz kılmalı OnCheckpointRestoredAsync ve iş akışı bağlamından durumunu yüklemelidir.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Bir yürütücü durumunun bir denetim noktasında yakalanmasını sağlamak için, yürütücü on_checkpoint_save yöntemini geçersiz kılmalı ve durumunu iş akışı bağlamına kaydetmelidir.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Ayrıca, denetim noktasından devam ederken durumun doğru şekilde geri yüklendiğinden emin olmak için yürütücü yöntemi geçersiz kılmalı on_checkpoint_restore ve iş akışı bağlamından durumunu yüklemelidir.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Sonraki Adımlar