Microsoft Agent Framework İş Akışları - Denetim Noktaları

Bu sayfa, Microsoft Agent Framework İş Akışı sistemindeki Checkpoints hakkında genel bir bakış sunulmaktadır.

Genel Bakış

Denetim noktaları, yürütme sırasında belirli noktalarda iş akışının durumunu kaydetmenize ve daha sonra bu noktalardan devam etmenizi sağlar. Bu özellik özellikle aşağıdaki senaryolar için kullanışlıdır:

  • Uzun süre devam eden ve hata durumunda ilerlemeyi kaybetmek istemediğiniz iş akışları.
  • Yürütmeyi daha sonra duraklatmak ve devam ettirmek istediğiniz uzun süreli iş akışları.
  • Denetim veya uyumluluk amacıyla düzenli olarak durum kaydetme gerektiren iş akışları.
  • Farklı ortamlar veya örnekler arasında taşınması gereken iş akışları.

Denetim Noktaları Ne Zaman Oluşturulur?

İş akışlarının süper adımlar olarak yürütüldüğünü, temel kavramlarda belgelenmiş olduğunu unutmayın. Denetim noktaları, bu üst adımdaki tüm yürütücüler yürütmelerini tamamladıktan sonra her üst adımın sonunda oluşturulur. Denetim noktası iş akışının durumunun tamamını yakalar, örneğin:

  • Tüm yürütücülerin geçerli durumu
  • Sonraki üst adım için iş akışında bekleyen tüm iletiler
  • Bekleyen istekler ve yanıtlar
  • Paylaşılan durumlar

Denetim Noktalarını Yakalama

Denetim noktası oluşturmayı etkinleştirmek için iş akışı çalıştırılırken bir CheckpointManager sağlanması gerekir. Denetim noktasına SuperStepCompletedEvent aracılığıyla veya çalıştırmada Checkpoints özelliği aracılığıyla erişilebilir.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the workflow with checkpointing enabled
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo?.Checkpoint;
    }
}

// Checkpoints can also be accessed from the run directly
IReadOnlyList<CheckpointInfo> checkpoints = run.Checkpoints;

İş akışı oluşturulurken denetim noktası oluşturmayı etkinleştirmek için bir CheckpointStorage sağlanması gerekir. Daha sonra bir denetim noktasına depolama alanı üzerinden erişilebilir. Agent Framework üç yerleşik uygulama sunar; dayanıklılık ve dağıtım gereksinimlerinize uyan uygulamayı seçin:

Provider Package Durability En iyi kullanım alanı:
InMemoryCheckpointStorage agent-framework Yalnızca işlem içi Testler, tanıtımlar, kısa süreli iş akışları
FileCheckpointStorage agent-framework Yerel disk Tek makineli iş akışları, yerel geliştirme
CosmosCheckpointStorage agent-framework-azure-cosmos Azure Cosmos DB Üretim, dağıtılmış, işlemler arası iş akışları

Her üçü de aynı CheckpointStorage protokolü uygular, böylece iş akışını veya yürütücü kodunu değiştirmeden sağlayıcıları değiştirebilirsiniz.

InMemoryCheckpointStorage denetim noktalarını işlem belleğinde tutar. Testler, tanıtımlar ve yeniden başlatmalar arasında dayanıklılığa ihtiyaç duymadığınız kısa süreli iş akışları için idealdir.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder(start_executor=start_executor, checkpoint_storage=checkpoint_storage)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.build()

# Run the workflow
async for event in workflow.run(input, stream=True):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints(workflow_name=workflow.name)

Kontrol Noktalarından Devam Etme

Belirli bir denetim noktasından bir iş akışını aynı çalışmada doğrudan sürdürebilirsiniz.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
// Restore the state directly on the same run instance.
await run.RestoreCheckpointAsync(savedCheckpoint).ConfigureAwait(false);
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Belirli bir denetim noktasından bir iş akışını doğrudan aynı iş akışı örneğinde sürdürebilirsiniz.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(checkpoint_id=saved_checkpoint.checkpoint_id, stream=True):
    ...

Denetim Noktalarından Yeniden Doldurma

Veya bir iş akışını bir denetim noktasından yeni bir çalıştırma örneğine yeniden yükleyebilirsiniz.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = run.Checkpoints[5];
StreamingRun newRun = await InProcessExecution
    .ResumeStreamingAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newRun.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Veya bir kontrol noktasından yeni bir iş akışı örneğini yeniden başlatabilirsiniz.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder(start_executor=start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run(
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    ...

Yürütücü Durumlarını Kaydet

Bir yürütücü durumunun bir denetim noktasında yakalanmasını sağlamak için, yürütücü OnCheckpointingAsync yöntemini geçersiz kılmalı ve durumunu iş akışı bağlamına kaydetmelidir.

using Microsoft.Agents.AI.Workflows;

internal sealed partial class CustomExecutor() : Executor("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Ayrıca, denetim noktasından devam ederken durumun doğru şekilde geri yüklendiğinden emin olmak için yürütücü yöntemi geçersiz kılmalı OnCheckpointRestoredAsync ve iş akışı bağlamından durumunu yüklemelidir.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Bir yürütücünün durumunun bir kontrol noktasında yakalandığından emin olmak için, yürütücü on_checkpoint_save yöntemini geçersiz kılmalı ve durumunu bir sözlük olarak döndürmelidir.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Ayrıca, denetim noktasından devam ederken durumun doğru şekilde geri yüklendiğinden emin olmak için yürütücü yöntemi on_checkpoint_restore geçersiz kılmalı ve sağlanan durum sözlüğünden durumunu geri yüklemelidir.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Güvenlikle İlgili Dikkat Edilmesi Gerekenler

Önemli

Denetim noktası depolaması bir güven sınırıdır. İster yerleşik depolama uygulamalarını ister özel bir depolama uygulamasını kullanın, depolama arka ucu güvenilir, özel altyapı olarak ele alınmalıdır. Güvenilmeyen veya üzerinde oynanma olasılığı olan kaynaklardan hiçbir zaman denetim noktaları yüklemeyin.

Denetim noktaları için kullanılan depolama konumunun uygun şekilde güvenli olduğundan emin olun. Denetim noktası verilerine yalnızca yetkili hizmetlerin ve kullanıcıların okuma veya yazma erişimi olmalıdır.

Pickle modülünde serileştirme

Hem FileCheckpointStorage hem de CosmosCheckpointStorage, veri sınıfları, tarih zamanları ve özel nesneler gibi JSON yerel olmayan durumu seri hale getirmek için Python pickle modülünü kullanır. Seri durumdan çıkarma sırasında rastgele kod yürütme risklerini azaltmak için her iki sağlayıcı da varsayılan olarak kısıtlanmış bir unpickler kullanır. Seri durumdan çıkarma sırasında yalnızca yerleşik bir güvenli Python türü kümesine (temel öğeler, datetime, uuid, Decimal, ortak koleksiyonlar vb.) ve tüm agent_framework dahili türlerine izin verilir. Denetim noktasında karşılaşılan diğer herhangi bir tür, seri durumdan çıkarmanın WorkflowCheckpointException ile başarısız olmasına neden olur.

Uygulamaya özgü ek türlere izin vermek için, allowed_checkpoint_types bunları "module:qualname" biçimini kullanarak parametresi aracılığıyla iletin:

from agent_framework import FileCheckpointStorage

storage = FileCheckpointStorage(
    "/tmp/checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

CosmosCheckpointStorage aynı parametreyi kabul eder:

from azure.identity.aio import DefaultAzureCredential
from agent_framework_azure_cosmos import CosmosCheckpointStorage

storage = CosmosCheckpointStorage(
    endpoint="https://my-account.documents.azure.com:443/",
    credential=DefaultAzureCredential(),
    database_name="agent-db",
    container_name="checkpoints",
    allowed_checkpoint_types=[
        "my_app.models:SafeState",
        "my_app.models:UserProfile",
    ],
)

Tehdit modeliniz pickle tabanlı serileştirmeye hiç izin vermiyorsa, alternatif bir serileştirme stratejisiyle özel bir InMemoryCheckpointStorage kullanın veya CheckpointStorage uygulayın.

Depolama konumu sorumluluğu

FileCheckpointStorage açık storage_path bir parametre gerektirir; varsayılan dizin yoktur. Çerçeve yol geçişi saldırılarına karşı doğrulanırken, depolama dizininin güvenliğini sağlamak (dosya izinleri, bekleyen şifreleme, erişim denetimleri) geliştiricinin sorumluluğundadır. Yalnızca yetkili işlemlerin denetim noktası dizinine okuma veya yazma erişimi olmalıdır.

CosmosCheckpointStorage depolama için Azure Cosmos DB dayanır. Mümkün olduğunda yönetilen kimlik / RBAC kullanın, veritabanı ve kapsayıcının kapsamını iş akışı hizmetine alın ve anahtar tabanlı kimlik doğrulaması kullanıyorsanız hesap anahtarlarını döndürün. Dosya depolamada olduğu gibi, denetim noktası belgelerini barındıran Cosmos DB kapsayıcısına yalnızca yetkili sorumluların okuma veya yazma erişimi olmalıdır.

Sonraki Adımlar